Revert "Make pipelines properly propagate their targets to child pipelines"
This reverts commit 443e3112b7f532ae70537c625c48e8b04cb326b1.
diff --git a/python/src/pipeline/pipeline.py b/python/src/pipeline/pipeline.py
index 573623a..018342f 100755
--- a/python/src/pipeline/pipeline.py
+++ b/python/src/pipeline/pipeline.py
@@ -1587,7 +1587,8 @@
continue
blocking_slot_dict[slot_record.key()] = slot_record
- barriers_to_trigger = []
+ task_list = []
+ updated_barriers = []
for barrier in results:
ready_slots = []
for blocking_slot_key in barrier.blocking_slots:
@@ -1606,44 +1607,32 @@
# the task name tombstones.
pending_slots = set(barrier.blocking_slots) - set(ready_slots)
if not pending_slots:
- barriers_to_trigger.append(barrier)
+ if barrier.status != _BarrierRecord.FIRED:
+ barrier.status = _BarrierRecord.FIRED
+ barrier.trigger_time = self._gettime()
+ updated_barriers.append(barrier)
+
+ purpose = barrier.key().name()
+ if purpose == _BarrierRecord.START:
+ path = self.pipeline_handler_path
+ countdown = None
+ else:
+ path = self.finalized_handler_path
+ # NOTE: Wait one second before finalization to prevent
+ # contention on the _PipelineRecord entity.
+ countdown = 1
+ pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
+ logging.debug('Firing barrier %r', barrier.key())
+ task_list.append(taskqueue.Task(
+ url=path,
+ countdown=countdown,
+ name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
+ params=dict(pipeline_key=pipeline_key, purpose=purpose),
+ headers={'X-Ae-Pipeline-Key': pipeline_key}))
else:
logging.debug('Not firing barrier %r, Waiting for slots: %r',
barrier.key(), pending_slots)
- pipeline_keys_to_trigger = [
- _BarrierRecord.target.get_value_for_datastore(barrier)
- for barrier in barriers_to_trigger]
- pipelines_to_trigger = dict(zip(
- pipeline_keys_to_trigger, db.get(pipeline_keys_to_trigger)))
- task_list = []
- updated_barriers = []
- for barrier in barriers_to_trigger:
- if barrier.status != _BarrierRecord.FIRED:
- barrier.status = _BarrierRecord.FIRED
- barrier.trigger_time = self._gettime()
- updated_barriers.append(barrier)
-
- purpose = barrier.key().name()
- if purpose == _BarrierRecord.START:
- path = self.pipeline_handler_path
- countdown = None
- else:
- path = self.finalized_handler_path
- # NOTE: Wait one second before finalization to prevent
- # contention on the _PipelineRecord entity.
- countdown = 1
- pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
- target = pipelines_to_trigger[pipeline_key].params.get('target')
- logging.debug('Firing barrier %r', barrier.key())
- task_list.append(taskqueue.Task(
- url=path,
- countdown=countdown,
- name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
- params=dict(pipeline_key=pipeline_key, purpose=purpose),
- headers={'X-Ae-Pipeline-Key': pipeline_key},
- target=target))
-
# Blindly overwrite _BarrierRecords that have an updated status. This is
# acceptable because by this point all finalization barriers for
# generator children should have already had their final outputs assigned.
@@ -2608,8 +2597,7 @@
params=dict(pipeline_key=pipeline_key,
purpose=_BarrierRecord.START,
attempt=pipeline_record.current_attempt),
- headers={'X-Ae-Pipeline-Key': pipeline_key},
- target=pipeline_record.params.get('target'))
+ headers={'X-Ae-Pipeline-Key': pipeline_key})
task.add(queue_name=self.queue_name, transactional=True)
pipeline_record.put()
@@ -2727,7 +2715,7 @@
all_tasks.append(taskqueue.Task(
url=context.pipeline_handler_path,
params=dict(pipeline_key=pipeline_key),
- target=child_pipeline.params.get('target'),
+ target=child_pipeline.params['target'],
headers={'X-Ae-Pipeline-Key': pipeline_key},
name='ae-pipeline-fan-out-' + child_pipeline.key().name()))