The proposal has been written down previously in a google doc, but I want to post it here for better visibility.
Engine Abstraction
Based on the discussion we had in our split team document and to support our Zeebe Team split into Distributed Platform and Platform Automation Team we need to make sure that we have clear boundaries, which allow us to work together without disrupting each other.
Glossary:
-
ZDP - Zeebe Distributed Platform, often used to refer to the specific team
-
ZPA - Zeebe Process Automation, often used to refer to the specific team
-
Platform - Often used to refer to the distributed platform, which is built by the ZDP Team
-
Engine - The application which is run inside the distributed platform, and built by the ZPA Team
-
Stream Processor - The component which is part of the platform and is in direct contact with the engine
Proposal
The Zeebe Distributed Platform (ZDP) Team provides an abstraction layer around how records are stored/persisted, read, and written. The Zeebe Platform Automation (ZPA) Team is in charge of processing the records and updating their state. With that, the idea is that both teams can fully concentrate/focus on their topics, e.g. ZPA on the real business logic.
Abstract visualization
todo: add image
In a abstract: the Distributed Platform Team builds a distributed streaming platform and the Process Automation Team creates a process automation engine that consumes records from the platform and produces new records. These new records are later consumers again from the engine.
Interaction / Lifecycle
The engine and stream processor interaction can be visualized via the following model.
todo: add model
Starting Phase
When starting StreamProcessor we first need to set up certain resources and restore from the most recent snapshot. Afterward, we can initialize the Engine with the current context. The context contains, for example, implementation classes for communication with other partitions (for deployments or messaging), but also the ZeebeDb instance. The ZeebeDb instance is used to instantiate the ZeebeState.
Replay Phase
After the initial starting phase, the StreamProcessor goes into the replay phase. Replaying (currently done by the ReplayStateMachine) is rather simple. The stream processor (or platform) reads records (to be specific events) from the stream and lets the Engine apply them. This is used to rebuild the state, events are applied in a batch. All events corresponding to the same source position, meaning to the same command are part of a batch. We do this until we reach the last event on the log. For each event batch, the stream processor creates a ZeebeDB transaction and commits it after applying.
The replay phase is ended if the log end is reached and if the StreamProcessor runs on a Leader. The StreamProcessor will go over to the real processing phase. If the StreamProcessor runs on a Follower it will stay in the replay phase forever.
Potential improvement: on replay, we could batch more events together and commit a transaction once. This could potentially reduce the replay time, but will only work on Leaders where the replay is only at the beginning.
Processing Phase
In the processing phase, commands are given to the Engine only. Before the StreamProcessor submits a Command to the Engine it starts a new transaction. This transaction allows applying all state changes at once, if the processing was successful, or rolling it back if some error occurred.
When the processing was successful the engine will return a processing result, which contains follow-up records (commands and events) and side effects, to the stream processor. The stream processor is in charge of writing the follow-up events, committing the related transaction, and executing the side-effects (like sending responses).
In order to recover from the right position after a restart the stream processor stores some metadata in the ZeebeDB, like the lastProcessedPosition. This position is updated after the engine processed the given records. The state change will be part of the same transaction.
The stream processor will execute side effects after records are written and the transaction is committed, such side effects have been added by the engine to the corresponding queue (part of the processing result object). Part of the side effect execution is currently the sending of responses to the clients.
Note: The side-effects feature is something we want to get rid of at some point.
Error Handling
Any error which can occur in processing or writing the records will cause to roll back the transaction. E.g. if the engine throws a processing exception or a failure occurred on writing the follow-up events (too big records).
On error, the engine is called again to handle the error gracefully. This means the engine is allowed to make further state changes (in a separate transaction) and/or return other follow-up records. The stream processor will write the returned records, no matter which type. For example, for a processing error, this could include blacklisting the process instance and returning an error record (to blacklist the instance on replay again) plus a rejection if a user command was processed.
Note: the error record part might be changed sooner than later.
If the processing error occurred on a process instance-related command, then the process instance needs to be blacklisted. This is a state change the engine does after the transaction is rolled back, into a new transaction.
This means that the engine needs to handle blacklisted instances as well. Upcoming commands which relate to process instances that are blacklisted are ignored on the engine side. The stream processor will still read such commands and hand them over to the engine.
Interface
Engine
The interface for the engine could look like the following.
public interface Engine {
init(ProcessingContext) : Void
replay(Record) : Void
process(Record): ProcessingResult
handleError(Error, Record): ErrorHandlingResult
}
public interface ProcessingResult {
commandResponse() : CommandResponse
followUpRecords() : List<Record>
sideEffects() : List<Runnable> // do we need this?
}
public interface ErrorHandlingResult {
commandResponse() : CommandResponse // potentially rejections
followUpRecords() : List<Record>
sideEffects() : List<Runnable> // do we need this?
}
Pseudocode for the engine logic:
Engine.process(Record r): ProcessingResult {
If r isBlacklisted:
Return Empty
return Process r
}
Engine.handleError(Record r): ProcessingResult {
state.blackListInstance(r)
Return {
CommandRejection,
ErrorRecord,
() // no side effects potentially?
}
}
Inside the engine, we might have such a helper class to create the corresponding processing result.
public interface ProcessingResultBuilder
{
// maybe we use here also step builder pattern
setCommandResponse(Record record) : ProcessingResultBuilder
addFollowUpRecord(Record record) : ProcessingResultBuilder
addSideEffect(Runnable runnable) : ProcessingResultBuilder
}
StreamProcessor
Pseudocode:
ReplayStateMachine:
Replay:
For eventBatch eb on the log:
create transaction t
engine.apply(eb)
T.commit
ProcessingStateMachine:
Processing():
For newCommand command on Log:
create transaction t
Try {
Result = engine.process(t)
write result.followUpRecords
t.commit
send result.sideEffects
} catch (Any) {
T.rollback
HandleError(Any, command)
}
HandleError(Any, Command):
create transaction t
Try {
Result = engine.handleError(t)
Write result.followUpRecords
} catch (Any) {
T.rollback
HandleError(Any, command)
}
Execution Model
Ideally, the Platform Automation Team doesn't need to care about the platform execution model (is it actors, is it simple thread executors, or whatever). They only care about their business logic.
This means the engine only runs if it is called by the stream processor and a record has been passed in, or if the engine has self-scheduled on the SchedulingService
the platform provides.
If the engine is called by the SchedulingService
(whatever this internally means) the engine is not allowed to do any state changes, since it can’t be guaranteed whether the processing is executed concurrently. This means that it is allowed to read from the state and create new commands, which are returned by the engine. These commands are written by the StreamProcessor, similar to the ones which are produced during processing.
The introduction of returning records on execution allows removing the knowledge and usage of separate writers in engine classes, like:
- DueTimerChecker
- JobTimeoutChecker/JobDeadlines
- MessageDeadlines
- DeploymentPendingChecker
Abstracting away the execution model from the engine would simplify our boundaries and test strategies. If the ZDP Team decides to change something in the execution model the ZPA Team doesn’t need to worry about it. The platform just needs to guarantee certain things, like one record is processed at a time, and a replay is done before processing.
The interface of the SchedulingService is rather simple it just contains one method to schedule some runnable at a later point in time.
Interface SchedulingService {
schedule(Duration, Runnable) : Records
}
Snapshotting
The snapshotting is done asynchronously from the processing of commands. This is completely handled by the platform, meaning the ZDP Team is in charge of it.
Snapshots are taken by a class called AsyncSnapshotDirector. This director needs certain details of the processing progress, like the last written position or last processed position. This information can be requested by the stream processor.
Since the ZDP is responsible for the ZeebeDb it should be fairly easy to take a snapshot, since this is part of the interface already.
Exporters
This topic is currently a bit uncertain.
It would be possible to treat the engine and the exporter the same, meaning we use the same infrastructure to run them, the stream processor.
We could run for the engine, and for each exporter an own stream processor. This would make it possible that exporters no longer block each other.
One problem here is that the stream processor hands over events to the engine on replay and commands on processing. The exporters need to consume both.
It might make sense to keep the current implementation for now and iterate over it later. Still, we need to clarify which team is responsible for it.
Current state: ZDP is in charge of the ExporterDirector and ZPA is in charge of Exporter implementation like the Elasticexporter.
Metrics
The Engine has to export certain Prometheus metrics, since this is rather simple with using the PrometheusClient here we need no additional abstraction. It would make things more complicated than useful.
Admin API + Healthiness
The engine is not aware of the admin API, which means not aware of pausing and resuming. This is because it only runs if it consumes are record or runs because a job was scheduled. The same applies to healthiness it is currently not necessary to have a separate health check since the StreamProcessor drives the engine and it has a health check already.
Let me know if you have any comments/additions. I might update the proposal every once in a while.
scope/broker kind/research