New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support large input collections for Multi-instance #12692
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
4c7f25a
test(qa): verify activating multi instance with large input collection
remcowesterhoud de94cff
test(qa): improve assertion message
remcowesterhoud ddd34ad
feat(engine): create ActivateProcessInstanceBatchProcessor
remcowesterhoud 307c083
docs(protocol): add ACTIVATE explanation to index
remcowesterhoud 65988a2
feat(engine): activate multi instance children in batches
remcowesterhoud File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
104 changes: 104 additions & 0 deletions
104
...amunda/zeebe/engine/processing/processinstance/ActivateProcessInstanceBatchProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under | ||
* one or more contributor license agreements. See the NOTICE file distributed | ||
* with this work for additional information regarding copyright ownership. | ||
* Licensed under the Zeebe Community License 1.1. You may not use this file | ||
* except in compliance with the Zeebe Community License 1.1. | ||
*/ | ||
package io.camunda.zeebe.engine.processing.processinstance; | ||
|
||
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; | ||
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; | ||
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; | ||
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; | ||
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState; | ||
import io.camunda.zeebe.engine.state.immutable.ProcessState; | ||
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord; | ||
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; | ||
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent; | ||
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; | ||
import io.camunda.zeebe.stream.api.records.TypedRecord; | ||
import io.camunda.zeebe.stream.api.state.KeyGenerator; | ||
|
||
public final class ActivateProcessInstanceBatchProcessor | ||
implements TypedRecordProcessor<ProcessInstanceBatchRecord> { | ||
private final TypedCommandWriter commandWriter; | ||
private final KeyGenerator keyGenerator; | ||
private final ElementInstanceState elementInstanceState; | ||
private final ProcessState processState; | ||
|
||
public ActivateProcessInstanceBatchProcessor( | ||
final Writers writers, | ||
final KeyGenerator keyGenerator, | ||
final ElementInstanceState elementInstanceState, | ||
final ProcessState processState) { | ||
commandWriter = writers.command(); | ||
this.keyGenerator = keyGenerator; | ||
this.elementInstanceState = elementInstanceState; | ||
this.processState = processState; | ||
} | ||
|
||
@Override | ||
public void processRecord(final TypedRecord<ProcessInstanceBatchRecord> record) { | ||
final var recordValue = record.getValue(); | ||
|
||
final ProcessInstanceRecord childInstanceRecord = createChildInstanceRecord(recordValue); | ||
var amountOfChildInstancesToActivate = recordValue.getIndex(); | ||
while (amountOfChildInstancesToActivate > 0) { | ||
if (canWriteCommands(record, childInstanceRecord)) { | ||
final long childInstanceKey = keyGenerator.nextKey(); | ||
commandWriter.appendFollowUpCommand( | ||
childInstanceKey, ProcessInstanceIntent.ACTIVATE_ELEMENT, childInstanceRecord); | ||
amountOfChildInstancesToActivate--; | ||
} else { | ||
writeFollowupBatchCommand(recordValue, amountOfChildInstancesToActivate); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
private ProcessInstanceRecord createChildInstanceRecord( | ||
final ProcessInstanceBatchRecord recordValue) { | ||
final var parentElementInstance = | ||
elementInstanceState.getInstance(recordValue.getBatchElementInstanceKey()); | ||
final var processDefinition = | ||
processState | ||
.getProcessByKey(parentElementInstance.getValue().getProcessDefinitionKey()) | ||
.getProcess(); | ||
|
||
final var parentElement = | ||
processDefinition.getElementById(parentElementInstance.getValue().getElementId()); | ||
final var childElement = ((ExecutableMultiInstanceBody) parentElement).getInnerActivity(); | ||
|
||
final var childInstanceRecord = new ProcessInstanceRecord(); | ||
childInstanceRecord.wrap(parentElementInstance.getValue()); | ||
childInstanceRecord | ||
.setFlowScopeKey(parentElementInstance.getKey()) | ||
.setElementId(childElement.getId()) | ||
.setBpmnElementType(childElement.getElementType()) | ||
.setBpmnEventType(childElement.getEventType()); | ||
return childInstanceRecord; | ||
} | ||
|
||
private void writeFollowupBatchCommand( | ||
final ProcessInstanceBatchRecord recordValue, final long index) { | ||
final var nextBatchRecord = | ||
new ProcessInstanceBatchRecord() | ||
.setProcessInstanceKey(recordValue.getProcessInstanceKey()) | ||
.setBatchElementInstanceKey(recordValue.getBatchElementInstanceKey()) | ||
.setIndex(index); | ||
final long key = keyGenerator.nextKey(); | ||
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, nextBatchRecord); | ||
} | ||
|
||
private boolean canWriteCommands( | ||
final TypedRecord<ProcessInstanceBatchRecord> record, | ||
final ProcessInstanceRecord childInstanceRecord) { | ||
// We must have space in the batch to write both the ACTIVATE command as the potential | ||
// follow-up batch command. An excessive 8Kb is added to account for metadata. This is way | ||
// more than will be necessary. | ||
final var expectedCommandLength = | ||
record.getLength() + childInstanceRecord.getLength() + (1024 * 8); | ||
return commandWriter.canWriteCommandOfLength(expectedCommandLength); | ||
} | ||
} | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
151 changes: 151 additions & 0 deletions
151
...s/src/test/java/io/camunda/zeebe/it/processing/MultiInstanceLargeInputCollectionTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under | ||
* one or more contributor license agreements. See the NOTICE file distributed | ||
* with this work for additional information regarding copyright ownership. | ||
* Licensed under the Zeebe Community License 1.1. You may not use this file | ||
* except in compliance with the Zeebe Community License 1.1. | ||
*/ | ||
package io.camunda.zeebe.it.processing; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import io.camunda.zeebe.broker.test.EmbeddedBrokerRule; | ||
import io.camunda.zeebe.client.api.response.ActivateJobsResponse; | ||
import io.camunda.zeebe.client.impl.ZeebeObjectMapper; | ||
import io.camunda.zeebe.it.util.GrpcClientRule; | ||
import io.camunda.zeebe.model.bpmn.Bpmn; | ||
import io.camunda.zeebe.protocol.record.intent.JobIntent; | ||
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; | ||
import io.camunda.zeebe.protocol.record.value.BpmnElementType; | ||
import io.camunda.zeebe.test.util.BrokerClassRuleHelper; | ||
import io.camunda.zeebe.test.util.record.RecordingExporter; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.junit.ClassRule; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.rules.RuleChain; | ||
import org.springframework.util.unit.DataSize; | ||
|
||
public final class MultiInstanceLargeInputCollectionTest { | ||
private static final int INPUT_COLLECTION_SIZE = 100; | ||
private static final String INPUT_ELEMENT = "inputElement"; | ||
private static final int MAX_MESSAGE_SIZE_KB = 16; | ||
private static final ZeebeObjectMapper OBJECT_MAPPER = new ZeebeObjectMapper(); | ||
private static final EmbeddedBrokerRule BROKER_RULE = | ||
new EmbeddedBrokerRule( | ||
cfg -> { | ||
cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB)); | ||
cfg.getProcessing().setMaxCommandsInBatch(1); | ||
}); | ||
private static final GrpcClientRule CLIENT_RULE = new GrpcClientRule(BROKER_RULE); | ||
|
||
@ClassRule | ||
public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE); | ||
|
||
@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper(); | ||
|
||
@Test | ||
public void shouldCompleteParallelMultiInstanceWithLargeInputCollection() { | ||
// given | ||
final long processKey = | ||
CLIENT_RULE.deployProcess( | ||
Bpmn.createExecutableProcess("PROCESS") | ||
.startEvent() | ||
.serviceTask( | ||
"task", | ||
t -> | ||
t.zeebeJobType("task") | ||
.multiInstance( | ||
mi -> | ||
mi.parallel() | ||
.zeebeInputCollectionExpression( | ||
createInputCollection(INPUT_COLLECTION_SIZE)) | ||
.zeebeInputElement(INPUT_ELEMENT))) | ||
.endEvent() | ||
.done()); | ||
|
||
// when | ||
final var processInstanceKey = CLIENT_RULE.createProcessInstance(processKey); | ||
RecordingExporter.jobRecords(JobIntent.CREATED) | ||
.withProcessInstanceKey(processInstanceKey) | ||
.withElementId("task") | ||
.limit(INPUT_COLLECTION_SIZE) | ||
.collect(Collectors.toList()); | ||
final var actualInputElements = completeJobs(); | ||
|
||
// then | ||
hasCreatedJobForEachInputElement(actualInputElements); | ||
hasCompletedElementsAndProcessInCorrectSequence(processInstanceKey); | ||
hasActivatedElementInBatches(processInstanceKey); | ||
} | ||
|
||
private String createInputCollection(final int size) { | ||
final var inputCollection = new ArrayList<Integer>(); | ||
for (int i = 0; i < size; i++) { | ||
inputCollection.add(i); | ||
} | ||
return OBJECT_MAPPER.toJson(inputCollection); | ||
} | ||
|
||
private Set<Integer> completeJobs() { | ||
final ActivateJobsResponse activatedJobs = | ||
CLIENT_RULE | ||
.getClient() | ||
.newActivateJobsCommand() | ||
.jobType("task") | ||
.maxJobsToActivate(INPUT_COLLECTION_SIZE) | ||
.fetchVariables(INPUT_ELEMENT) | ||
.send() | ||
.join(); | ||
|
||
final var inputElements = new HashSet<Integer>(); | ||
activatedJobs | ||
.getJobs() | ||
.forEach( | ||
job -> { | ||
inputElements.add((Integer) job.getVariablesAsMap().get(INPUT_ELEMENT)); | ||
CLIENT_RULE.getClient().newCompleteCommand(job.getKey()).send().join(); | ||
}); | ||
return inputElements; | ||
} | ||
|
||
private void hasCreatedJobForEachInputElement(final Set<Integer> actualInputElements) { | ||
final var expectedInputElements = new HashSet<Integer>(); | ||
for (int i = 0; i < INPUT_COLLECTION_SIZE; i++) { | ||
expectedInputElements.add(i); | ||
} | ||
assertThat(actualInputElements).containsAll(expectedInputElements); | ||
} | ||
|
||
private void hasCompletedElementsAndProcessInCorrectSequence(final long processInstanceKey) { | ||
assertThat( | ||
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) | ||
.withProcessInstanceKey(processInstanceKey) | ||
.limitToProcessInstanceCompleted()) | ||
.map(record -> record.getValue().getBpmnElementType()) | ||
.describedAs("Has completed process and all jobs") | ||
// Plus 4 for the start event, multi instance, end event and process | ||
.hasSize(INPUT_COLLECTION_SIZE + 4) | ||
.describedAs("Completed in correct sequence") | ||
.startsWith(BpmnElementType.START_EVENT, BpmnElementType.SERVICE_TASK) | ||
.endsWith( | ||
BpmnElementType.SERVICE_TASK, | ||
BpmnElementType.MULTI_INSTANCE_BODY, | ||
BpmnElementType.END_EVENT, | ||
BpmnElementType.PROCESS); | ||
} | ||
|
||
private void hasActivatedElementInBatches(final long processInstanceKey) { | ||
assertThat( | ||
RecordingExporter.processInstanceBatchRecords() | ||
.withProcessInstanceKey(processInstanceKey) | ||
.limit(2)) | ||
.describedAs( | ||
"Has activated in multiple batches. If this assertion fails please decrease " | ||
+ "the message size, or increase the input collection.") | ||
.hasSize(2); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I wonder if it makes sense to extract these two methods in a
*Behavior
class since there are two somewhat similar methods in theTerminateProcessInstanceBatchProcessor
, and I assume we might have further use of theProcessInstanceBatch
record in the future, so we might duplicate these methods even more.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this as well. Thing is, in the end it's only 2 lines of code. One of which is used to determine the
expectedCommandLength
. This is different for both of the processors we have now. Moving this to a behavior class would mean that we require a parameter getting theexpectedCommandLength
. So in the end all the behavior method will do is make a call to the command writer.💭 I guess we could argue that the + 8KB is duplicated and could be extracted from the calculation. To be honest, at this time I don't really care. Let's see if we start using this command more often and how similar the other checks will become.