blob: 4e80f57533747369fc95aef2cdb9235077b78198 [file] [log] [blame]
// Copyright 2011 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package com.google.appengine.tools.pipeline.impl;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.KeyFactory;
import com.google.appengine.api.taskqueue.TaskAlreadyExistsException;
import com.google.appengine.tools.pipeline.FutureList;
import com.google.appengine.tools.pipeline.ImmediateValue;
import com.google.appengine.tools.pipeline.Job;
import com.google.appengine.tools.pipeline.Job0;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec.Group;
import com.google.appengine.tools.pipeline.impl.model.Barrier;
import com.google.appengine.tools.pipeline.impl.model.ExceptionRecord;
import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.JobRecord.InflationType;
import com.google.appengine.tools.pipeline.impl.model.JobRecord.State;
import com.google.appengine.tools.pipeline.impl.model.PipelineObjects;
import com.google.appengine.tools.pipeline.impl.model.Slot;
import com.google.appengine.tools.pipeline.impl.model.SlotDescriptor;
import com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet;
import com.google.appengine.tools.pipeline.impl.tasks.CancelJobTask;
import com.google.appengine.tools.pipeline.impl.tasks.DelayedSlotFillTask;
import com.google.appengine.tools.pipeline.impl.tasks.DeletePipelineTask;
import com.google.appengine.tools.pipeline.impl.tasks.FanoutTask;
import com.google.appengine.tools.pipeline.impl.tasks.FinalizeJobTask;
import com.google.appengine.tools.pipeline.impl.tasks.HandleChildExceptionTask;
import com.google.appengine.tools.pipeline.impl.tasks.HandleSlotFilledTask;
import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask;
import com.google.appengine.tools.pipeline.impl.tasks.Task;
import com.google.appengine.tools.pipeline.impl.util.GUIDGenerator;
import com.google.appengine.tools.pipeline.impl.util.StringUtils;
import com.google.appengine.tools.pipeline.util.Pair;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The central hub of the Pipeline implementation.
*
* @author rudominer@google.com (Mitch Rudominer)
*
*/
public class PipelineManager {
private static final Logger logger = Logger.getLogger(PipelineManager.class.getName());
private static PipelineBackEnd backEnd = new AppEngineBackEnd();
/**
* Creates and launches a new Pipeline
* <p>
* Creates the root Job with its associated Barriers and Slots and saves them
* to the data store. All slots in the root job are immediately filled with
* the values of the parameters to this method, and
* {@link HandleSlotFilledTask HandleSlotFilledTasks} are enqueued for each of
* the filled slots.
*
* @param settings JobSetting array used to control details of the Pipeline
* @param jobInstance A user-supplied instance of {@link Job} that will serve
* as the root job of the Pipeline.
* @param params Arguments to the root job's run() method
* @return The pipelineID of the newly created pipeline, also known as the
* rootJobID.
*/
public static String startNewPipeline(
JobSetting[] settings, Job<?> jobInstance, Object... params) {
UpdateSpec updateSpec = new UpdateSpec(null);
Job<?> rootJobInstance = jobInstance;
// If rootJobInstance has exceptionHandler it has to be wrapped to ensure that root job
// ends up in finalized state in case of exception of run method and
// exceptionHandler returning a result.
if (JobRecord.isExceptionHandlerSpecified(jobInstance)) {
rootJobInstance = new RootJobInstance(jobInstance, settings, params);
params = new Object[0];
}
// Create the root Job and its associated Barriers and Slots
// Passing null for parent JobRecord and graphGUID
// Create HandleSlotFilledTasks for the input parameters.
JobRecord jobRecord = registerNewJobRecord(
updateSpec, settings, null, null, rootJobInstance, params);
updateSpec.setRootJobKey(jobRecord.getRootJobKey());
// Save the Pipeline model objects and enqueue the tasks that start the Pipeline executing.
backEnd.save(updateSpec, jobRecord.getQueueSettings());
return jobRecord.getKey().getName();
}
/**
* Creates a new JobRecord with its associated Barriers and Slots. Also
* creates new {@link HandleSlotFilledTask} for any inputs to the Job that are
* immediately specified. Registers all newly created objects with the
* provided {@code UpdateSpec} for later saving.
* <p>
* This method is called when starting a new Pipeline, in which case it is
* used to create the root job, and it is called from within the run() method
* of a generator job in order to create a child job.
*
* @param updateSpec The {@code UpdateSpec} with which to register all newly
* created objects. All objects will be added to the
* {@link UpdateSpec#getNonTransactionalGroup() non-transaction group}
* of the {@code UpdateSpec}.
* @param settings Array of {@code JobSetting} to apply to the newly created
* JobRecord.
* @param generatorJob The generator job or {@code null} if we are creating
* the root job.
* @param graphGUID The GUID of the child graph to which the new Job belongs
* or {@code null} if we are creating the root job.
* @param jobInstance The user-supplied instance of {@code Job} that
* implements the Job that the newly created JobRecord represents.
* @param params The arguments to be passed to the run() method of the newly
* created Job. Each argument may be an actual value or it may be an
* object of type {@link Value} representing either an
* {@link ImmediateValue} or a
* {@link com.google.appengine.tools.pipeline.FutureValue FutureValue}.
* For each element of the array, if the Object is not of type
* {@link Value} then it is interpreted as an {@link ImmediateValue}
* with the given Object as its value.
* @return The newly constructed JobRecord.
*/
public static JobRecord registerNewJobRecord(UpdateSpec updateSpec, JobSetting[] settings,
JobRecord generatorJob, String graphGUID, Job<?> jobInstance, Object[] params) {
JobRecord jobRecord;
if (generatorJob == null) {
// Root Job
if (graphGUID != null) {
throw new IllegalArgumentException("graphGUID must be null for root jobs");
}
jobRecord = JobRecord.createRootJobRecord(jobInstance, settings);
} else {
jobRecord = new JobRecord(generatorJob, graphGUID, jobInstance, false, settings);
}
return registerNewJobRecord(updateSpec, jobRecord, params);
}
public static JobRecord registerNewJobRecord(UpdateSpec updateSpec, JobRecord jobRecord,
Object[] params) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("registerNewJobRecord job(\"" + jobRecord + "\")");
}
updateSpec.setRootJobKey(jobRecord.getRootJobKey());
Key generatorKey = jobRecord.getGeneratorJobKey();
// Add slots to the RunBarrier corresponding to the input parameters
String graphGuid = jobRecord.getGraphGuid();
for (Object param : params) {
Value<?> value;
if (null != param && param instanceof Value<?>) {
value = (Value<?>) param;
} else {
value = new ImmediateValue<>(param);
}
registerSlotsWithBarrier(updateSpec, value, jobRecord.getRootJobKey(), generatorKey,
jobRecord.getQueueSettings(), graphGuid, jobRecord.getRunBarrierInflated());
}
if (0 == jobRecord.getRunBarrierInflated().getWaitingOnKeys().size()) {
// If the run barrier is not waiting on anything, add a phantom filled
// slot in order to trigger a HandleSlotFilledTask in order to trigger
// a RunJobTask.
Slot slot = new Slot(jobRecord.getRootJobKey(), generatorKey, graphGuid);
jobRecord.getRunBarrierInflated().addPhantomArgumentSlot(slot);
registerSlotFilled(updateSpec, jobRecord.getQueueSettings(), slot, null);
}
// Register the newly created objects with the UpdateSpec.
// The slots in the run Barrier have already been registered
// and the finalize Barrier doesn't have any slots yet.
// Any HandleSlotFilledTasks have also been registered already.
Group updateGroup = updateSpec.getNonTransactionalGroup();
updateGroup.includeBarrier(jobRecord.getRunBarrierInflated());
updateGroup.includeBarrier(jobRecord.getFinalizeBarrierInflated());
updateGroup.includeSlot(jobRecord.getOutputSlotInflated());
updateGroup.includeJob(jobRecord);
updateGroup.includeJobInstanceRecord(jobRecord.getJobInstanceInflated());
return jobRecord;
}
/**
* Given a {@code Value} and a {@code Barrier}, we add one or more slots to
* the waitingFor list of the barrier corresponding to the {@code Value}. We
* also create {@link HandleSlotFilledTask HandleSlotFilledTasks} for all
* filled slots. We register all newly created Slots and Tasks with the given
* {@code UpdateSpec}.
* <p>
* If the value is an {@code ImmediateValue} we make a new slot, register it
* as filled and add it. If the value is a {@code FutureValue} we add the slot
* wrapped by the {@code FutreValueImpl}. If the value is a {@code FutureList}
* then we add multiple slots, one for each {@code Value} in the List wrapped
* by the {@code FutureList}. This process is not recursive because we do not
* currently support {@code FutureLists} of {@code FutureLists}.
*
* @param updateSpec All newly created Slots will be added to the
* {@link UpdateSpec#getNonTransactionalGroup() non-transactional
* group} of the updateSpec. All {@link HandleSlotFilledTask
* HandleSlotFilledTasks} created will be added to the
* {@link UpdateSpec#getFinalTransaction() final transaction}. Note
* that {@code barrier} will not be added to updateSpec. That must be
* done by the caller.
* @param value A {@code Value}. {@code Null} is interpreted as an
* {@code ImmediateValue} with a value of {@code Null}.
* @param rootJobKey The rootJobKey of the Pipeline in which the given Barrier
* lives.
* @param generatorJobKey The key of the generator Job of the local graph in
* which the given barrier lives, or {@code null} if the barrier lives
* in the root Job graph.
* @param queueSettings The QueueSettings for tasks created by this method
* @param graphGUID The GUID of the local graph in which the barrier lives, or
* {@code null} if the barrier lives in the root Job graph.
* @param barrier The barrier to which we will add the slots
*/
private static void registerSlotsWithBarrier(UpdateSpec updateSpec, Value<?> value,
Key rootJobKey, Key generatorJobKey, QueueSettings queueSettings, String graphGUID,
Barrier barrier) {
if (null == value || value instanceof ImmediateValue<?>) {
Object concreteValue = null;
if (null != value) {
ImmediateValue<?> iv = (ImmediateValue<?>) value;
concreteValue = iv.getValue();
}
Slot slot = new Slot(rootJobKey, generatorJobKey, graphGUID);
registerSlotFilled(updateSpec, queueSettings, slot, concreteValue);
barrier.addRegularArgumentSlot(slot);
} else if (value instanceof FutureValueImpl<?>) {
FutureValueImpl<?> futureValue = (FutureValueImpl<?>) value;
Slot slot = futureValue.getSlot();
barrier.addRegularArgumentSlot(slot);
updateSpec.getNonTransactionalGroup().includeSlot(slot);
} else if (value instanceof FutureList<?>) {
FutureList<?> futureList = (FutureList<?>) value;
List<Slot> slotList = new ArrayList<>(futureList.getListOfValues().size());
// The dummyListSlot is a marker slot that indicates that the
// next group of slots forms a single list argument.
Slot dummyListSlot = new Slot(rootJobKey, generatorJobKey, graphGUID);
registerSlotFilled(updateSpec, queueSettings, dummyListSlot, null);
for (Value<?> valFromList : futureList.getListOfValues()) {
Slot slot = null;
if (valFromList instanceof ImmediateValue<?>) {
ImmediateValue<?> ivFromList = (ImmediateValue<?>) valFromList;
slot = new Slot(rootJobKey, generatorJobKey, graphGUID);
registerSlotFilled(updateSpec, queueSettings, slot, ivFromList.getValue());
} else if (valFromList instanceof FutureValueImpl<?>) {
FutureValueImpl<?> futureValFromList = (FutureValueImpl<?>) valFromList;
slot = futureValFromList.getSlot();
} else if (value instanceof FutureList<?>) {
throw new IllegalArgumentException(
"The Pipeline framework does not currently support FutureLists of FutureLists");
} else {
throwUnrecognizedValueException(valFromList);
}
slotList.add(slot);
updateSpec.getNonTransactionalGroup().includeSlot(slot);
}
barrier.addListArgumentSlots(dummyListSlot, slotList);
} else {
throwUnrecognizedValueException(value);
}
}
private static void throwUnrecognizedValueException(Value<?> value) {
throw new RuntimeException(
"Internal logic error: Unrecognized implementation of Value interface: "
+ value.getClass().getName());
}
/**
* Given a Slot and a concrete value with which to fill the slot, we fill the
* value with the slot and create a new {@link HandleSlotFilledTask} for the
* newly filled Slot. We register the Slot and the Task with the given
* UpdateSpec for later saving.
*
* @param updateSpec The Slot will be added to the
* {@link UpdateSpec#getNonTransactionalGroup() non-transactional
* group} of the updateSpec. The new {@link HandleSlotFilledTask} will
* be added to the {@link UpdateSpec#getFinalTransaction() final
* transaction}.
* @param queueSettings queue settings for the created task
* @param slot the Slot to fill
* @param value the value with which to fill it
*/
private static void registerSlotFilled(
UpdateSpec updateSpec, QueueSettings queueSettings, Slot slot, Object value) {
slot.fill(value);
updateSpec.getNonTransactionalGroup().includeSlot(slot);
Task task = new HandleSlotFilledTask(slot.getKey(), queueSettings);
updateSpec.getFinalTransaction().registerTask(task);
}
/**
* Returns all the associated PipelineModelObject for a root pipeline.
*
* @throws IllegalArgumentException if root pipeline was not found.
*/
public static PipelineObjects queryFullPipeline(String rootJobHandle) {
Key rootJobKey = KeyFactory.createKey(JobRecord.DATA_STORE_KIND, rootJobHandle);
return backEnd.queryFullPipeline(rootJobKey);
}
public static Pair<? extends Iterable<JobRecord>, String> queryRootPipelines(
String classFilter, String cursor, int limit) {
return backEnd.queryRootPipelines(classFilter, cursor, limit);
}
public static Set<String> getRootPipelinesDisplayName() {
return backEnd.getRootPipelinesDisplayName();
}
private static void checkNonEmpty(String s, String name) {
if (null == s || s.trim().length() == 0) {
throw new IllegalArgumentException(name + " is empty.");
}
}
/**
* Retrieves a JobRecord for the specified job handle. The returned instance
* will be only partially inflated. The run and finalize barriers will not be
* available but the output slot will be.
*
* @param jobHandle The handle of a job.
* @return The corresponding JobRecord
* @throws NoSuchObjectException If a JobRecord with the given handle cannot
* be found in the data store.
*/
public static JobRecord getJob(String jobHandle) throws NoSuchObjectException {
checkNonEmpty(jobHandle, "jobHandle");
Key key = KeyFactory.createKey(JobRecord.DATA_STORE_KIND, jobHandle);
logger.finest("getJob: " + key.getName());
return backEnd.queryJob(key, JobRecord.InflationType.FOR_OUTPUT);
}
/**
* Changes the state of the specified job to STOPPED.
*
* @param jobHandle The handle of a job
* @throws NoSuchObjectException If a JobRecord with the given handle cannot
* be found in the data store.
*/
public static void stopJob(String jobHandle) throws NoSuchObjectException {
checkNonEmpty(jobHandle, "jobHandle");
Key key = KeyFactory.createKey(JobRecord.DATA_STORE_KIND, jobHandle);
JobRecord jobRecord = backEnd.queryJob(key, JobRecord.InflationType.NONE);
jobRecord.setState(State.STOPPED);
UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
updateSpec.getOrCreateTransaction("stopJob").includeJob(jobRecord);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
}
/**
* Sends cancellation request to the root job.
*
* @param jobHandle The handle of a job
* @throws NoSuchObjectException If a JobRecord with the given handle cannot
* be found in the data store.
*/
public static void cancelJob(String jobHandle) throws NoSuchObjectException {
checkNonEmpty(jobHandle, "jobHandle");
Key key = KeyFactory.createKey(JobRecord.DATA_STORE_KIND, jobHandle);
JobRecord jobRecord = backEnd.queryJob(key, InflationType.NONE);
CancelJobTask cancelJobTask = new CancelJobTask(key, jobRecord.getQueueSettings());
try {
backEnd.enqueue(cancelJobTask);
} catch (TaskAlreadyExistsException e) {
// OK. Some other thread has already enqueued this task.
}
}
/**
* Delete all data store entities corresponding to the given pipeline.
*
* @param pipelineHandle The handle of the pipeline to be deleted
* @param force If this parameter is not {@code true} then this method will
* throw an {@link IllegalStateException} if the specified pipeline is
* not in the {@link State#FINALIZED} or {@link State#STOPPED} state.
* @param async If this parameter is {@code true} then instead of performing
* the delete operation synchronously, this method will enqueue a task
* to perform the operation.
* @throws NoSuchObjectException If there is no Job with the given key.
* @throws IllegalStateException If {@code force = false} and the specified
* pipeline is not in the {@link State#FINALIZED} or
* {@link State#STOPPED} state.
*/
public static void deletePipelineRecords(String pipelineHandle, boolean force, boolean async)
throws NoSuchObjectException, IllegalStateException {
checkNonEmpty(pipelineHandle, "pipelineHandle");
Key key = KeyFactory.createKey(JobRecord.DATA_STORE_KIND, pipelineHandle);
backEnd.deletePipeline(key, force, async);
}
public static void acceptPromisedValue(String promiseHandle, Object value)
throws NoSuchObjectException, OrphanedObjectException {
checkNonEmpty(promiseHandle, "promiseHandle");
Key key = KeyFactory.stringToKey(promiseHandle);
Slot slot = null;
// It is possible, though unlikely, that we might be asked to accept a
// promise before the slot to hold the promise has been saved. We will try 5
// times, sleeping 1, 2, 4, 8 seconds between attempts.
int attempts = 0;
boolean interrupted = false;
try {
while (slot == null) {
attempts++;
try {
slot = backEnd.querySlot(key, false);
} catch (NoSuchObjectException e) {
if (attempts >= 5) {
throw new NoSuchObjectException("There is no promise with handle " + promiseHandle);
}
try {
Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L);
} catch (InterruptedException f) {
interrupted = true;
}
}
}
} finally {
// TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava
if (interrupted) {
Thread.currentThread().interrupt();
}
}
Key generatorJobKey = slot.getGeneratorJobKey();
if (null == generatorJobKey) {
throw new RuntimeException(
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
}
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
if (null == generatorJob) {
throw new RuntimeException("Pipeline is fatally corrupted. "
+ "The generator job for a promised value slot was not found: " + generatorJobKey);
}
String childGraphGuid = generatorJob.getChildGraphGuid();
if (null == childGraphGuid) {
// The generator job has not been saved with a childGraphGuid yet. This can happen if the
// promise handle leaked out to an external thread before the job that generated it
// had finished.
throw new NoSuchObjectException(
"The framework is not ready to accept the promised value yet. "
+ "Please try again after the job that generated the promis handle has completed.");
}
if (!childGraphGuid.equals(slot.getGraphGuid())) {
// The slot has been orphaned
throw new OrphanedObjectException(promiseHandle);
}
UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
backEnd.save(updateSpec, generatorJob.getQueueSettings());
}
/**
* The root job instance used to wrap a user provided root if the user
* provided root job has exceptionHandler specified.
*/
private static final class RootJobInstance extends Job0<Object> {
private static final long serialVersionUID = -2162670129577469245L;
private final Job<?> jobInstance;
private final JobSetting[] settings;
private final Object[] params;
public RootJobInstance(Job<?> jobInstance, JobSetting[] settings, Object[] params) {
this.jobInstance = jobInstance;
this.settings = settings;
this.params = params;
}
@Override
public Value<Object> run() throws Exception {
return futureCallUnchecked(settings, jobInstance, params);
}
@Override
public String getJobDisplayName() {
return jobInstance.getJobDisplayName();
}
}
/**
* A RuntimeException which, when thrown, causes us to abandon the current
* task, by returning a 200.
*/
private static class AbandonTaskException extends RuntimeException {
private static final long serialVersionUID = 358437646006972459L;
}
/**
* Process an incoming task received from the App Engine task queue.
*
* @param task The task to be processed.
*/
public static void processTask(Task task) {
logger.finest("Processing task " + task);
try {
switch (task.getType()) {
case RUN_JOB:
runJob((RunJobTask) task);
break;
case HANDLE_SLOT_FILLED:
handleSlotFilled((HandleSlotFilledTask) task);
break;
case FINALIZE_JOB:
finalizeJob((FinalizeJobTask) task);
break;
case FAN_OUT:
handleFanoutTaskOrAbandonTask((FanoutTask) task);
break;
case CANCEL_JOB:
cancelJob((CancelJobTask) task);
break;
case DELETE_PIPELINE:
DeletePipelineTask deletePipelineTask = (DeletePipelineTask) task;
try {
backEnd.deletePipeline(
deletePipelineTask.getRootJobKey(), deletePipelineTask.shouldForce(), false);
} catch (Exception e) {
logger.log(Level.WARNING, "DeletePipeline operation failed.", e);
}
break;
case HANDLE_CHILD_EXCEPTION:
handleChildException((HandleChildExceptionTask) task);
break;
case DELAYED_SLOT_FILL:
handleDelayedSlotFill((DelayedSlotFillTask) task);
break;
default:
throw new IllegalArgumentException("Unrecognized task type: " + task.getType());
}
} catch (AbandonTaskException ate) {
// return 200;
}
}
public static PipelineBackEnd getBackEnd() {
return backEnd;
}
private static void invokePrivateJobMethod(String methodName, Job<?> job, Object... params) {
Class<?>[] signature = new Class<?>[params.length];
int i = 0;
for (Object param : params) {
signature[i++] = param.getClass();
}
invokePrivateJobMethod(methodName, job, signature, params);
}
private static void invokePrivateJobMethod(
String methodName, Job<?> job, Class<?>[] signature, Object... params) {
try {
Method method = Job.class.getDeclaredMethod(methodName, signature);
method.setAccessible(true);
method.invoke(job, params);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
/**
* Return handleException method (if callErrorHandler is <code>true</code>).
* Otherwise return first run method (order is defined by order of
* {@link Class#getMethods()} is return. <br>
* TODO(user) Consider actually looking for a method with matching
* signature<br>
* TODO(maximf) Consider getting rid of reflection based invocations
* completely.
*
* @param klass Class of the user provided job
* @param callErrorHandler should handleException method be returned instead
* of run
* @param params parameters to be passed to the method to invoke
* @return Either run or handleException method of {@code class}. <code>null
* </code> is returned if @{code callErrorHandler} is <code>true</code>
* and no handleException with matching signature is found.
*
*/
@SuppressWarnings("unchecked")
private static Method findJobMethodToInvoke(
Class<?> klass, boolean callErrorHandler, Object[] params) {
Method runMethod = null;
if (callErrorHandler) {
if (params.length != 1) {
throw new RuntimeException(
"Invalid number of parameters passed to handleException:" + params.length);
}
Object parameter = params[0];
if (parameter == null) {
throw new RuntimeException("Null parameters passed to handleException:" + params.length);
}
Class<? extends Object> parameterClass = parameter.getClass();
if (!Throwable.class.isAssignableFrom(parameterClass)) {
throw new RuntimeException("Parameter that is not an exception passed to handleException:"
+ parameterClass.getName());
}
Class<? extends Throwable> exceptionClass = (Class<? extends Throwable>) parameterClass;
do {
try {
runMethod = klass.getMethod(
JobRecord.EXCEPTION_HANDLER_METHOD_NAME, new Class<?>[] {exceptionClass});
break;
} catch (NoSuchMethodException e) {
// Ignore, try parent instead.
}
exceptionClass = (Class<? extends Throwable>) exceptionClass.getSuperclass();
} while (exceptionClass != null);
} else {
for (Method method : klass.getMethods()) {
if ("run".equals(method.getName())) {
runMethod = method;
break;
}
}
}
return runMethod;
}
private static void setJobRecord(Job<?> job, JobRecord jobRecord) {
invokePrivateJobMethod("setJobRecord", job, jobRecord);
}
private static void setCurrentRunGuid(Job<?> job, String guid) {
invokePrivateJobMethod("setCurrentRunGuid", job, guid);
}
private static void setUpdateSpec(Job<?> job, UpdateSpec updateSpec) {
invokePrivateJobMethod("setUpdateSpec", job, updateSpec);
}
/**
* Run the job with the given key.
* <p>
* We fetch the {@link JobRecord} from the data store and then fetch its run
* {@link Barrier} and all of the {@link Slot Slots} in the run
* {@code Barrier} (which should be filled.) We use the values of the filled
* {@code Slots} to populate the arguments of the run() method of the
* {@link Job} instance associated with the job and we invoke the
* {@code run()} method. We save any generated child job graph and we enqueue
* a {@link HandleSlotFilledTask} for any slots that are filled immediately.
*
* @see "http://goto/java-pipeline-model"
*/
private static void runJob(RunJobTask task) {
Key jobKey = task.getJobKey();
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
jobRecord.getQueueSettings().merge(task.getQueueSettings());
Key rootJobKey = jobRecord.getRootJobKey();
logger.info("Running pipeline job " + jobKey.getName() + "; UI at "
+ PipelineServlet.makeViewerUrl(rootJobKey, jobKey));
JobRecord rootJobRecord;
if (rootJobKey.equals(jobKey)) {
rootJobRecord = jobRecord;
} else {
rootJobRecord = queryJobOrAbandonTask(rootJobKey, JobRecord.InflationType.NONE);
}
if (rootJobRecord.getState() == State.STOPPED) {
logger.warning("The pipeline has been stopped: " + rootJobRecord);
throw new AbandonTaskException();
}
// TODO(user): b/12301978, check if its a previous run and if so AbandonTaskException
Barrier runBarrier = jobRecord.getRunBarrierInflated();
if (null == runBarrier) {
throw new RuntimeException("Internal logic error: " + jobRecord + " has not been inflated.");
}
Barrier finalizeBarrier = jobRecord.getFinalizeBarrierInflated();
if (null == finalizeBarrier) {
throw new RuntimeException(
"Internal logic error: finalize barrier not inflated in " + jobRecord);
}
// Release the run barrier now so any concurrent HandleSlotFilled tasks
// will stop trying to release it.
runBarrier.setReleased();
UpdateSpec tempSpec = new UpdateSpec(rootJobKey);
tempSpec.getOrCreateTransaction("releaseRunBarrier").includeBarrier(runBarrier);
backEnd.save(tempSpec, jobRecord.getQueueSettings());
State jobState = jobRecord.getState();
switch (jobState) {
case WAITING_TO_RUN:
case RETRY:
// OK, proceed
break;
case WAITING_TO_FINALIZE:
logger.info("This job has already been run " + jobRecord);
return;
case STOPPED:
logger.info("This job has been stoped " + jobRecord);
return;
case CANCELED:
logger.info("This job has already been canceled " + jobRecord);
return;
case FINALIZED:
logger.info("This job has already been run " + jobRecord);
return;
}
// Deserialize the instance of Job and set some values on the instance
JobInstanceRecord record = jobRecord.getJobInstanceInflated();
if (null == record) {
throw new RuntimeException(
"Internal logic error:" + jobRecord + " does not have jobInstanceInflated.");
}
Job<?> job = record.getJobInstanceDeserialized();
UpdateSpec updateSpec = new UpdateSpec(rootJobKey);
setJobRecord(job, jobRecord);
String currentRunGUID = GUIDGenerator.nextGUID();
setCurrentRunGuid(job, currentRunGUID);
setUpdateSpec(job, updateSpec);
// Get the run() method we will invoke and its arguments
Object[] params = runBarrier.buildArgumentArray();
boolean callExceptionHandler = jobRecord.isCallExceptionHandler();
Method methodToExecute =
findJobMethodToInvoke(job.getClass(), callExceptionHandler, params);
if (callExceptionHandler && methodToExecute == null) {
// No matching exceptionHandler found. Propagate to the parent.
Throwable exceptionToHandle = (Throwable) params[0];
handleExceptionDuringRun(jobRecord, rootJobRecord, currentRunGUID, exceptionToHandle);
return;
}
if (logger.isLoggable(Level.FINEST)) {
StringBuilder builder = new StringBuilder(1024);
builder.append("Running " + jobRecord + " with params: ");
builder.append(StringUtils.toString(params));
logger.finest(builder.toString());
}
// Set the Job's start time and save the jobRecord now before we invoke
// run(). The start time will be displayed in the UI.
jobRecord.incrementAttemptNumber();
jobRecord.setStartTime(new Date());
tempSpec = new UpdateSpec(jobRecord.getRootJobKey());
tempSpec.getNonTransactionalGroup().includeJob(jobRecord);
if (!backEnd.saveWithJobStateCheck(
tempSpec, jobRecord.getQueueSettings(), jobKey, State.WAITING_TO_RUN, State.RETRY)) {
logger.info("Ignoring runJob request for job " + jobRecord + " which is not in a"
+ " WAITING_TO_RUN or a RETRY state");
return;
}
//TODO(user): Use the commented code to avoid resetting stack trace of rethrown exceptions
// List<Throwable> throwableParams = new ArrayList<Throwable>(params.length);
// for (Object param : params) {
// if (param instanceof Throwable) {
// throwableParams.add((Throwable) param);
// }
// }
// Invoke the run or handleException method. This has the side-effect of populating
// the UpdateSpec with any child job graph generated by the invoked method.
Value<?> returnValue = null;
Throwable caughtException = null;
try {
methodToExecute.setAccessible(true);
returnValue = (Value<?>) methodToExecute.invoke(job, params);
} catch (InvocationTargetException e) {
caughtException = e.getCause();
} catch (Throwable e) {
caughtException = e;
}
if (null != caughtException) {
//TODO(user): use the following condition to keep original exception trace
// if (!throwableParams.contains(caughtException)) {
// }
handleExceptionDuringRun(jobRecord, rootJobRecord, currentRunGUID, caughtException);
return;
}
// The run() method returned without error.
// We do all of the following in a transaction:
// (1) Check that the job is currently in the state WAITING_TO_RUN or RETRY
// (2) Change the state of the job to WAITING_TO_FINALIZE
// (3) Set the finalize slot to be the one generated by the run() method
// (4) Set the job's child graph GUID to be the currentRunGUID
// (5) Enqueue a FanoutTask that will fan-out to a set of
// HandleSlotFilledTasks for each of the slots that were immediately filled
// by the running of the job.
// See "http://goto/java-pipeline-model".
logger.finest("Job returned: " + returnValue);
registerSlotsWithBarrier(updateSpec, returnValue, rootJobKey, jobRecord.getKey(),
jobRecord.getQueueSettings(), currentRunGUID, finalizeBarrier);
jobRecord.setState(State.WAITING_TO_FINALIZE);
jobRecord.setChildGraphGuid(currentRunGUID);
updateSpec.getFinalTransaction().includeJob(jobRecord);
updateSpec.getFinalTransaction().includeBarrier(finalizeBarrier);
backEnd.saveWithJobStateCheck(
updateSpec, jobRecord.getQueueSettings(), jobKey, State.WAITING_TO_RUN, State.RETRY);
}
private static void cancelJob(CancelJobTask cancelJobTask) {
Key jobKey = cancelJobTask.getJobKey();
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
jobRecord.getQueueSettings().merge(cancelJobTask.getQueueSettings());
Key rootJobKey = jobRecord.getRootJobKey();
logger.info("Cancelling pipeline job " + jobKey.getName());
JobRecord rootJobRecord;
if (rootJobKey.equals(jobKey)) {
rootJobRecord = jobRecord;
} else {
rootJobRecord = queryJobOrAbandonTask(rootJobKey, JobRecord.InflationType.NONE);
}
if (rootJobRecord.getState() == State.STOPPED) {
logger.warning("The pipeline has been stopped: " + rootJobRecord);
throw new AbandonTaskException();
}
switch (jobRecord.getState()) {
case WAITING_TO_RUN:
case RETRY:
case WAITING_TO_FINALIZE:
// OK, proceed
break;
case STOPPED:
logger.info("This job has been stoped " + jobRecord);
return;
case CANCELED:
logger.info("This job has already been canceled " + jobRecord);
return;
case FINALIZED:
logger.info("This job has already been run " + jobRecord);
return;
}
if (jobRecord.getChildKeys().size() > 0) {
cancelChildren(jobRecord, null);
}
// No error handler present. So just mark this job as CANCELED.
if (logger.isLoggable(Level.FINEST)) {
logger.finest("Marking " + jobRecord + " as CANCELED");
}
jobRecord.setState(State.CANCELED);
UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
if (jobRecord.isExceptionHandlerSpecified()) {
executeExceptionHandler(updateSpec, jobRecord, new CancellationException(), true);
}
backEnd.save(updateSpec, jobRecord.getQueueSettings());
}
private static void handleExceptionDuringRun(JobRecord jobRecord, JobRecord rootJobRecord,
String currentRunGUID, Throwable caughtException) {
int attemptNumber = jobRecord.getAttemptNumber();
int maxAttempts = jobRecord.getMaxAttempts();
if (jobRecord.isCallExceptionHandler()) {
logger.log(Level.INFO,
"An exception occurred when attempting to execute exception hander job " + jobRecord
+ ". ", caughtException);
} else {
logger.log(Level.INFO, "An exception occurred when attempting to run " + jobRecord + ". "
+ "This was attempt number " + attemptNumber + " of " + maxAttempts + ".",
caughtException);
}
if (jobRecord.isIgnoreException()) {
return;
}
UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
ExceptionRecord exceptionRecord = new ExceptionRecord(
jobRecord.getRootJobKey(), jobRecord.getKey(), currentRunGUID, caughtException);
updateSpec.getNonTransactionalGroup().includeException(exceptionRecord);
Key exceptionKey = exceptionRecord.getKey();
jobRecord.setExceptionKey(exceptionKey);
if (jobRecord.isCallExceptionHandler() || attemptNumber >= maxAttempts) {
jobRecord.setState(State.STOPPED);
updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
if (jobRecord.isExceptionHandlerSpecified()) {
cancelChildren(jobRecord, null);
executeExceptionHandler(updateSpec, jobRecord, caughtException, false);
} else {
if (null != jobRecord.getExceptionHandlingAncestorKey()) {
cancelChildren(jobRecord, null);
// current job doesn't have an error handler. So just delegate it to the
// nearest ancestor that has one.
Task handleChildExceptionTask = new HandleChildExceptionTask(
jobRecord.getExceptionHandlingAncestorKey(), jobRecord.getKey(),
jobRecord.getQueueSettings());
updateSpec.getFinalTransaction().registerTask(handleChildExceptionTask);
} else {
rootJobRecord.setState(State.STOPPED);
rootJobRecord.setExceptionKey(exceptionKey);
updateSpec.getNonTransactionalGroup().includeJob(rootJobRecord);
}
}
backEnd.save(updateSpec, jobRecord.getQueueSettings());
} else {
jobRecord.setState(State.RETRY);
int backoffFactor = jobRecord.getBackoffFactor();
int backoffSeconds = jobRecord.getBackoffSeconds();
RunJobTask task =
new RunJobTask(jobRecord.getKey(), attemptNumber, jobRecord.getQueueSettings());
task.getQueueSettings().setDelayInSeconds(
backoffSeconds * (long) Math.pow(backoffFactor, attemptNumber));
updateSpec.getFinalTransaction().includeJob(jobRecord);
updateSpec.getFinalTransaction().registerTask(task);
backEnd.saveWithJobStateCheck(updateSpec, jobRecord.getQueueSettings(), jobRecord.getKey(),
State.WAITING_TO_RUN, State.RETRY);
}
}
/**
* @param updateSpec UpdateSpec to use
* @param jobRecord record of the job with exception handler to execute
* @param caughtException failure cause
* @param ignoreException if failure should be ignored (used for cancellation)
*/
private static void executeExceptionHandler(UpdateSpec updateSpec, JobRecord jobRecord,
Throwable caughtException, boolean ignoreException) {
updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
String errorHandlingGraphGuid = GUIDGenerator.nextGUID();
Job<?> jobInstance = jobRecord.getJobInstanceInflated().getJobInstanceDeserialized();
JobRecord errorHandlingJobRecord =
new JobRecord(jobRecord, errorHandlingGraphGuid, jobInstance, true, new JobSetting[0]);
errorHandlingJobRecord.setOutputSlotInflated(jobRecord.getOutputSlotInflated());
errorHandlingJobRecord.setIgnoreException(ignoreException);
registerNewJobRecord(updateSpec, errorHandlingJobRecord,
new Object[] {new ImmediateValue<>(caughtException)});
}
private static void cancelChildren(JobRecord jobRecord, Key failedChildKey) {
for (Key childKey : jobRecord.getChildKeys()) {
if (!childKey.equals(failedChildKey)) {
CancelJobTask cancelJobTask = new CancelJobTask(childKey, jobRecord.getQueueSettings());
try {
backEnd.enqueue(cancelJobTask);
} catch (TaskAlreadyExistsException e) {
// OK. Some other thread has already enqueued this task.
}
}
}
}
private static void handleChildException(HandleChildExceptionTask handleChildExceptionTask) {
Key jobKey = handleChildExceptionTask.getKey();
Key failedChildKey = handleChildExceptionTask.getFailedChildKey();
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
jobRecord.getQueueSettings().merge(handleChildExceptionTask.getQueueSettings());
Key rootJobKey = jobRecord.getRootJobKey();
logger.info("Running pipeline job " + jobKey.getName() + " exception handler; UI at "
+ PipelineServlet.makeViewerUrl(rootJobKey, jobKey));
JobRecord rootJobRecord;
if (rootJobKey.equals(jobKey)) {
rootJobRecord = jobRecord;
} else {
rootJobRecord = queryJobOrAbandonTask(rootJobKey, JobRecord.InflationType.NONE);
}
if (rootJobRecord.getState() == State.STOPPED) {
logger.warning("The pipeline has been stopped: " + rootJobRecord);
throw new AbandonTaskException();
}
// TODO(user): add jobState check
JobRecord failedJobRecord =
queryJobOrAbandonTask(failedChildKey, JobRecord.InflationType.FOR_OUTPUT);
UpdateSpec updateSpec = new UpdateSpec(rootJobKey);
cancelChildren(jobRecord, failedChildKey);
executeExceptionHandler(updateSpec, jobRecord, failedJobRecord.getException(), false);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
}
/**
* Finalize the job with the given key.
* <p>
* We fetch the {@link JobRecord} from the data store and then fetch its
* finalize {@link Barrier} and all of the {@link Slot Slots} in the finalize
* {@code Barrier} (which should be filled.) We set the finalize Barrier to
* released and save it. We fetch the job's output Slot. We use the values of
* the filled finalize {@code Slots} to populate the output slot. We set the
* state of the Job to {@code FINALIZED} and save the {@link JobRecord} and
* the output slot. Finally we enqueue a {@link HandleSlotFilledTask} for the
* output slot.
*
* @see "http://goto/java-pipeline-model"
*/
private static void finalizeJob(FinalizeJobTask finalizeJobTask) {
Key jobKey = finalizeJobTask.getJobKey();
// Get the JobRecord, its finalize Barrier, all the slots in the
// finalize Barrier, and the job's output Slot.
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_FINALIZE);
jobRecord.getQueueSettings().merge(finalizeJobTask.getQueueSettings());
switch (jobRecord.getState()) {
case WAITING_TO_FINALIZE:
// OK, proceed
break;
case WAITING_TO_RUN:
case RETRY:
throw new RuntimeException("" + jobRecord + " is in RETRY state");
case STOPPED:
logger.info("This job has been stoped " + jobRecord);
return;
case CANCELED:
logger.info("This job has already been canceled " + jobRecord);
return;
case FINALIZED:
logger.info("This job has already been run " + jobRecord);
return;
}
Barrier finalizeBarrier = jobRecord.getFinalizeBarrierInflated();
if (null == finalizeBarrier) {
throw new RuntimeException("" + jobRecord + " has not been inflated");
}
Slot outputSlot = jobRecord.getOutputSlotInflated();
if (null == outputSlot) {
throw new RuntimeException("" + jobRecord + " has not been inflated.");
}
// release the finalize barrier now so that any concurrent
// HandleSlotFilled tasks will stop trying
finalizeBarrier.setReleased();
UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
updateSpec.getOrCreateTransaction("releaseFinalizeBarrier").includeBarrier(finalizeBarrier);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
// Copy the finalize value to the output slot
List<Object> finalizeArguments = finalizeBarrier.buildArgumentList();
int numFinalizeArguments = finalizeArguments.size();
if (1 != numFinalizeArguments) {
throw new RuntimeException(
"Internal logic error: numFinalizeArguments=" + numFinalizeArguments);
}
Object finalizeValue = finalizeArguments.get(0);
logger.finest("Finalizing " + jobRecord + " with value=" + finalizeValue);
outputSlot.fill(finalizeValue);
// Change state of the job to FINALIZED and set the end time
jobRecord.setState(State.FINALIZED);
jobRecord.setEndTime(new Date());
// Propagate the filler of the finalize slot to also be the filler of the
// output slot. If there is no unique filler of the finalize slot then we
// resort to assigning the current job as the filler job.
Key fillerJobKey = getFinalizeSlotFiller(finalizeBarrier);
if (null == fillerJobKey) {
fillerJobKey = jobKey;
}
outputSlot.setSourceJobKey(fillerJobKey);
// Save the job and the output slot
updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
updateSpec.getNonTransactionalGroup().includeSlot(outputSlot);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
// enqueue a HandleSlotFilled task
HandleSlotFilledTask task =
new HandleSlotFilledTask(outputSlot.getKey(), jobRecord.getQueueSettings());
backEnd.enqueue(task);
}
/**
* Return the unique value of {@link Slot#getSourceJobKey()} for all slots in
* the finalize Barrier, if there is a unique such value. Otherwise return
* {@code null}.
*
* @param finalizeBarrier A finalize Barrier
* @return The unique slot filler for the finalize slots, or {@code null}
*/
private static Key getFinalizeSlotFiller(Barrier finalizeBarrier) {
Key fillerJobKey = null;
for (SlotDescriptor slotDescriptor : finalizeBarrier.getWaitingOnInflated()) {
Key key = slotDescriptor.slot.getSourceJobKey();
if (null != key) {
if (null == fillerJobKey) {
fillerJobKey = key;
} else {
if (!fillerJobKey.toString().equals(key.toString())) {
// Found 2 non-equal values, return null
return null;
}
}
}
}
return fillerJobKey;
}
/**
* Handle the fact that the slot with the given key has been filled.
* <p>
* For each barrier that is waiting on the slot, if all of the slots that the
* barrier is waiting on are now filled then the barrier should be released.
* Release the barrier by enqueueing an appropriate task (either
* {@link RunJobTask} or {@link FinalizeJobTask}.
*/
private static void handleSlotFilled(HandleSlotFilledTask hsfTask) {
Key slotKey = hsfTask.getSlotKey();
Slot slot = querySlotOrAbandonTask(slotKey, true);
List<Barrier> waitingList = slot.getWaitingOnMeInflated();
if (null == waitingList) {
throw new RuntimeException("Internal logic error: " + slot + " is not inflated");
}
// For each barrier that is waiting on the slot ...
for (Barrier barrier : waitingList) {
logger.finest("Checking " + barrier);
// unless the barrier has already been released,
if (!barrier.isReleased()) {
// we check whether the barrier should be released.
boolean shouldBeReleased = true;
if (null == barrier.getWaitingOnInflated()) {
throw new RuntimeException("Internal logic error: " + barrier + " is not inflated.");
}
// For each slot that the barrier is waiting on...
for (SlotDescriptor sd : barrier.getWaitingOnInflated()) {
// see if it is full.
if (!sd.slot.isFilled()) {
logger.finest("Not filled: " + sd.slot);
shouldBeReleased = false;
break;
}
}
if (shouldBeReleased) {
Key jobKey = barrier.getJobKey();
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.NONE);
jobRecord.getQueueSettings().merge(hsfTask.getQueueSettings());
Task task;
switch (barrier.getType()) {
case RUN:
task = new RunJobTask(jobKey, jobRecord.getQueueSettings());
break;
case FINALIZE:
task = new FinalizeJobTask(jobKey, jobRecord.getQueueSettings());
break;
default:
throw new RuntimeException("Unknown barrier type " + barrier.getType());
}
try {
backEnd.enqueue(task);
} catch (TaskAlreadyExistsException e) {
// OK. Some other thread has already enqueued this task.
}
}
}
}
}
/**
* Fills the slot with null value and calls handleSlotFilled
*/
private static void handleDelayedSlotFill(DelayedSlotFillTask task) {
Key slotKey = task.getSlotKey();
Slot slot = querySlotOrAbandonTask(slotKey, true);
Key rootJobKey = task.getRootJobKey();
UpdateSpec updateSpec = new UpdateSpec(rootJobKey);
slot.fill(null);
updateSpec.getNonTransactionalGroup().includeSlot(slot);
backEnd.save(updateSpec, task.getQueueSettings());
// re-reading Slot (in handleSlotFilled) is needed (to capture slot fill after this one)
handleSlotFilled(new HandleSlotFilledTask(slotKey, task.getQueueSettings()));
}
/**
* Queries for the job with the given key from the data store and if the job
* is not found then throws an {@link AbandonTaskException}.
*
* @param key The key of the JobRecord to be fetched
* @param inflationType Specifies the manner in which the returned JobRecord
* should be inflated.
* @return A {@code JobRecord}, possibly with a partially-inflated associated
* graph of objects.
* @throws AbandonTaskException If Either the JobRecord or any of the
* associated Slots or Barriers are not found in the data store.
*/
private static JobRecord queryJobOrAbandonTask(Key key, JobRecord.InflationType inflationType) {
try {
return backEnd.queryJob(key, inflationType);
} catch (NoSuchObjectException e) {
logger.log(
Level.WARNING, "Cannot find some part of the job: " + key + ". Ignoring the task.", e);
throw new AbandonTaskException();
}
}
/**
* Queries the Slot with the given Key from the data store and if the Slot is
* not found then throws an {@link AbandonTaskException}.
*
* @param key The Key of the slot to fetch.
* @param inflate If this is {@code true} then the Barriers that are waiting
* on the Slot and the other Slots that those Barriers are waiting on
* will also be fetched from the data store and used to partially
* populate the graph of objects attached to the returned Slot. In
* particular: {@link Slot#getWaitingOnMeInflated()} will not return
* {@code null} and also that for each of the {@link Barrier Barriers}
* returned from that method {@link Barrier#getWaitingOnInflated()}
* will not return {@code null}.
* @return A {@code Slot}, possibly with a partially-inflated associated graph
* of objects.
* @throws AbandonTaskException If either the Slot or the associated Barriers
* and slots are not found in the data store.
*/
private static Slot querySlotOrAbandonTask(Key key, boolean inflate) {
try {
return backEnd.querySlot(key, inflate);
} catch (NoSuchObjectException e) {
logger.log(Level.WARNING, "Cannot find the slot: " + key + ". Ignoring the task.", e);
throw new AbandonTaskException();
}
}
/**
* Handles the given FanoutTask and if the corresponding FanoutTaskRecord is
* not found then throws an {@link AbandonTaskException}.
*
* @param fanoutTask The FanoutTask to handle
*/
private static void handleFanoutTaskOrAbandonTask(FanoutTask fanoutTask) {
try {
backEnd.handleFanoutTask(fanoutTask);
} catch (NoSuchObjectException e) {
logger.log(Level.SEVERE, "Pipeline is fatally corrupted. Fanout task record not found", e);
throw new AbandonTaskException();
}
}
/**
* @param slot delayed value slot
*/
public static void registerDelayedValue(
UpdateSpec spec, JobRecord generatorJobRecord, long delaySec, Slot slot) {
Key rootKey = generatorJobRecord.getRootJobKey();
QueueSettings queueSettings = generatorJobRecord.getQueueSettings();
DelayedSlotFillTask task = new DelayedSlotFillTask(slot, delaySec, rootKey , queueSettings);
spec.getFinalTransaction().registerTask(task);
}
}