Fixes Issue #60
This was tried differently before in commit 443e311.
For barrier notifications, this was tried before by getting all barrier records' pipeline keys and doing a batch get, but that method did not account for situations in which barrier checks were fired but no pipes were ready yet. My method will potentially result in more RPC requests, but only pipes that are ready to be executed will be fetched.
For retries: we already have the `pipeline_record` fetched, just pass in the target parameter
diff --git a/python/src/pipeline/pipeline.py b/python/src/pipeline/pipeline.py
index b2ee16f..32381bc 100755
--- a/python/src/pipeline/pipeline.py
+++ b/python/src/pipeline/pipeline.py
@@ -1622,13 +1622,15 @@
# contention on the _PipelineRecord entity.
countdown = 1
pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
+ pipeline_record = db.get(pipeline_key)
logging.debug('Firing barrier %r', barrier.key())
task_list.append(taskqueue.Task(
url=path,
countdown=countdown,
name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
params=dict(pipeline_key=pipeline_key, purpose=purpose),
- headers={'X-Ae-Pipeline-Key': pipeline_key}))
+ headers={'X-Ae-Pipeline-Key': pipeline_key},
+ target=pipeline_record.params['target']))
else:
logging.debug('Not firing barrier %r, Waiting for slots: %r',
barrier.key(), pending_slots)
@@ -2604,7 +2606,8 @@
params=dict(pipeline_key=pipeline_key,
purpose=_BarrierRecord.START,
attempt=pipeline_record.current_attempt),
- headers={'X-Ae-Pipeline-Key': pipeline_key})
+ headers={'X-Ae-Pipeline-Key': pipeline_key},
+ target=pipeline_record.params['target'])
task.add(queue_name=self.queue_name, transactional=True)
pipeline_record.put()