Class AbstractIterativeTaskPartExecution<I,TH extends AbstractTaskHandler<TH,TE>,TE extends AbstractTaskExecution<TH,TE>,PE extends AbstractIterativeTaskPartExecution<I,TH,TE,PE,IP>,IP extends AbstractIterativeItemProcessor<I,TH,TE,PE,IP>>
- java.lang.Object
-
- com.evolveum.midpoint.repo.common.task.AbstractIterativeTaskPartExecution<I,TH,TE,PE,IP>
-
- Direct Known Subclasses:
AbstractSearchIterativeTaskPartExecution
,AsyncUpdateTaskHandler.PartExecution
,LiveSyncTaskHandler.PartExecution
public abstract class AbstractIterativeTaskPartExecution<I,TH extends AbstractTaskHandler<TH,TE>,TE extends AbstractTaskExecution<TH,TE>,PE extends AbstractIterativeTaskPartExecution<I,TH,TE,PE,IP>,IP extends AbstractIterativeItemProcessor<I,TH,TE,PE,IP>> extends Object
Represents an execution of a generic iterative part of a task. Responsibilities: 1. Prepares _item gathering_ operation, like the iterative search, live sync, or async update listening. For example, for the search, it determines the object type, the query, and the options. SeeprepareItemSource(OperationResult)
. 2. Initiates item gathering operation. SeeprocessItems(OperationResult)
method. 3. Funnels all items received into theProcessingCoordinator
. 4. Holds all relevant state (including e.g. statistics) of the execution. *TODO finish the cleanup*
-
-
Field Summary
Fields Modifier and Type Field Description protected @NotNull CurrentBucketStatistics
bucketStatistics
Maintains selected statistical information related to processing items in during task part execution.protected ProcessingCoordinator<I>
coordinator
Schedules individual items for processing by worker tasks (if running in multiple threads).protected IP
itemProcessor
Processes individual items - in cooperation withProcessingCoordinator
andItemProcessingGatekeeper
.protected @NotNull RunningTask
localCoordinatorTask
The persistent task that carries out the work.protected @NotNull Trace
logger
Logger that is specific to the concrete task handler class.protected @NotNull TaskReportingOptions
reportingOptions
Reporting options specific for this task part.protected @NotNull TaskWorkBucketProcessingResult
runResult
Result of the processing of the current bucket in the current task part.protected TE
taskExecution
Task execution.protected TH
taskHandler
The task handler.
-
Constructor Summary
Constructors Modifier Constructor Description protected
AbstractIterativeTaskPartExecution(TE taskExecution)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected IP
createItemProcessor(OperationResult opResult)
Creates the item processor.protected void
ensureNoWorkerThreads()
protected void
finish(OperationResult opResult)
Ends the processing.@NotNull String
getContextDescription()
protected abstract ErrorHandlingStrategyExecutor.Action
getDefaultErrorAction()
int
getExpectedParts()
@NotNull Trace
getLogger()
int
getPartNumber()
long
getPartStartTimestamp()
When did current part start? We need this for execution record creation that is needed for wall clock average and throughput computation.String
getPartUri()
@NotNull String
getProcessShortName()
@NotNull String
getProcessShortNameCapitalized()
@NotNull TaskReportingOptions
getReportingOptions()
protected @NotNull Task
getRootTask(OperationResult result)
@NotNull String
getRootTaskOid()
long
getStartTimeMillis()
TE
getTaskExecution()
Long
heartbeat()
protected void
initialize(OperationResult opResult)
Initializes task part execution.boolean
isMultithreaded()
boolean
isSimulate()
protected void
prepareItemSource(OperationResult opResult)
Prepares the item source.protected abstract void
processItems(OperationResult opResult)
Starts the item source (e.g.abstract boolean
providesTracingAndDynamicProfiling()
@NotNull TaskWorkBucketProcessingResult
run(OperationResult opResult)
void
setContextDescription(String value)
void
setExpectedParts(int expectedParts)
protected void
setExpectedTotal(OperationResult opResult)
Computes expected total and sets the value in the task.void
setPartNumber(int partNumber)
void
setPartUri(String partUri)
void
setProcessShortNameCapitalized(String value)
-
-
-
Field Detail
-
taskExecution
@NotNull protected final TE extends AbstractTaskExecution<TH,TE> taskExecution
Task execution. Maintains objects (e.g. resource definition, scan timestamps, and so on) common to all task parts.
-
taskHandler
@NotNull protected final TH extends AbstractTaskHandler<TH,TE> taskHandler
The task handler. Used to access e.g. necessary Spring beans.
-
localCoordinatorTask
@NotNull protected final @NotNull RunningTask localCoordinatorTask
The persistent task that carries out the work. It can have lightweight asynchronous worker tasks (threads), hence the name. TODO better name
-
logger
@NotNull protected final @NotNull Trace logger
Logger that is specific to the concrete task handler class. This is to avoid logging everything under commonAbstractIterativeTaskPartExecution
or some of its generic subclasses. Also, it allows to group all processing related to the given task under a single logger. SeeAbstractTaskHandler.logger
.
-
itemProcessor
protected IP extends AbstractIterativeItemProcessor<I,TH,TE,PE,IP> itemProcessor
Processes individual items - in cooperation withProcessingCoordinator
andItemProcessingGatekeeper
.
-
coordinator
protected ProcessingCoordinator<I> coordinator
Schedules individual items for processing by worker tasks (if running in multiple threads).
-
runResult
@NotNull protected final @NotNull TaskWorkBucketProcessingResult runResult
Result of the processing of the current bucket in the current task part. TODO decide its fate
-
bucketStatistics
@NotNull protected final @NotNull CurrentBucketStatistics bucketStatistics
Maintains selected statistical information related to processing items in during task part execution. (I.e. current bucket.)
-
reportingOptions
@NotNull protected final @NotNull TaskReportingOptions reportingOptions
Reporting options specific for this task part. They are copied from the main options present in the task handler - by default. Note this will change when we allow configuring them for individual tasks.
-
-
Constructor Detail
-
AbstractIterativeTaskPartExecution
protected AbstractIterativeTaskPartExecution(@NotNull TE taskExecution)
-
-
Method Detail
-
run
@NotNull public @NotNull TaskWorkBucketProcessingResult run(OperationResult opResult) throws SchemaException, ObjectNotFoundException, SecurityViolationException, CommunicationException, ConfigurationException, ExpressionEvaluationException, TaskException, ObjectAlreadyExistsException, PolicyViolationException, PreconditionViolationException
-
initialize
protected void initialize(OperationResult opResult) throws SchemaException, ConfigurationException, ObjectNotFoundException, CommunicationException, SecurityViolationException, ExpressionEvaluationException, TaskException
Initializes task part execution.
-
prepareItemSource
protected void prepareItemSource(OperationResult opResult) throws TaskException, CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException
Prepares the item source. E.g. for search-iterative tasks we prepare object type, query, and options here.
-
setExpectedTotal
protected void setExpectedTotal(OperationResult opResult) throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException
Computes expected total and sets the value in the task. E.g. for search-iterative tasks we count the objects here. TODO reconsider
-
createItemProcessor
@NotNull protected IP createItemProcessor(OperationResult opResult) throws SchemaException, SecurityViolationException, ObjectNotFoundException, ExpressionEvaluationException, CommunicationException, ConfigurationException
Creates the item processor. This method should not do anything more. For initialization there are other methods.
-
processItems
protected abstract void processItems(OperationResult opResult) throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException, PolicyViolationException, PreconditionViolationException
Starts the item source (e.g. searchObjectsIterative call or synchronize call) and begins processing items generated by it. Returns when the source finishes. For example: - for search-based tasks, this call returns immediately after the search is over ... TODO ok? - for live sync task, this returns after all changes were fetched and acknowledged, and the resulting token was written - for async update task, this returns also after all changes were fetched and acknowledged and confirmed to the source
-
finish
protected void finish(OperationResult opResult) throws SchemaException
Ends the processing.- Throws:
SchemaException
-
getLogger
@NotNull public @NotNull Trace getLogger()
-
providesTracingAndDynamicProfiling
public abstract boolean providesTracingAndDynamicProfiling()
- Returns:
- True if this task execution should periodically write traces, according to relevant task extension items. It is also used to drive dynamic profiling (which is going to be deprecated).
-
getTaskExecution
@NotNull public TE getTaskExecution()
-
getStartTimeMillis
public long getStartTimeMillis()
-
isMultithreaded
public boolean isMultithreaded()
-
heartbeat
public Long heartbeat()
-
ensureNoWorkerThreads
protected void ensureNoWorkerThreads()
-
getProcessShortNameCapitalized
@NotNull public @NotNull String getProcessShortNameCapitalized()
-
getProcessShortName
@NotNull public @NotNull String getProcessShortName()
-
setProcessShortNameCapitalized
public void setProcessShortNameCapitalized(String value)
-
getContextDescription
@NotNull public @NotNull String getContextDescription()
-
setContextDescription
public void setContextDescription(String value)
-
getDefaultErrorAction
@NotNull protected abstract ErrorHandlingStrategyExecutor.Action getDefaultErrorAction()
- Returns:
- Default error action if no policy is specified or if no policy entry matches.
-
getReportingOptions
@NotNull public @NotNull TaskReportingOptions getReportingOptions()
-
isSimulate
public boolean isSimulate()
-
getRootTaskOid
@NotNull public @NotNull String getRootTaskOid()
-
getRootTask
@NotNull protected @NotNull Task getRootTask(OperationResult result) throws SchemaException
- Throws:
SchemaException
-
getPartUri
public String getPartUri()
-
setPartUri
public void setPartUri(String partUri)
-
getPartNumber
public int getPartNumber()
-
setPartNumber
public void setPartNumber(int partNumber)
-
getExpectedParts
public int getExpectedParts()
-
setExpectedParts
public void setExpectedParts(int expectedParts)
-
getPartStartTimestamp
public long getPartStartTimestamp()
When did current part start? We need this for execution record creation that is needed for wall clock average and throughput computation. However, answering this question is a bit tricky because of bucketed runs. By recording part start when the bucket starts is precise, but 1. does not take bucket management overhead into account, 2. generates a lot of execution records - more than we can reasonably handle. So, for all except multi-part runs (that are never bucketed by definition) we use regular task start information. All of this will change when bucketing will be (eventually) rewritten.
-
-