Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support large input collections for Multi-instance #12692

Merged
merged 5 commits into from May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionCreateProcessor;
import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionDeleteProcessor;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.processinstance.ActivateProcessInstanceBatchProcessor;
import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceProcessor;
import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceWithResultProcessor;
import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceCommandProcessor;
Expand Down Expand Up @@ -225,12 +226,21 @@ private static void addProcessInstanceBatchStreamProcessors(
final TypedRecordProcessors typedRecordProcessors,
final MutableProcessingState processingState,
final Writers writers) {
final TerminateProcessInstanceBatchProcessor terminateBatchProcessor =
new TerminateProcessInstanceBatchProcessor(
writers, processingState.getKeyGenerator(), processingState.getElementInstanceState());
typedRecordProcessors.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.TERMINATE,
terminateBatchProcessor);
typedRecordProcessors
.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.TERMINATE,
new TerminateProcessInstanceBatchProcessor(
writers,
processingState.getKeyGenerator(),
processingState.getElementInstanceState()))
.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.ACTIVATE,
new ActivateProcessInstanceBatchProcessor(
writers,
processingState.getKeyGenerator(),
processingState.getElementInstanceState(),
processingState.getProcessState()));
}
}
Expand Up @@ -304,6 +304,23 @@ public long activateChildInstanceWithKey(
return childInstanceKey;
}

/**
* Activate a given amount of children of a multi-instance element.
*
* @param context the context of the multi-instance element
* @param amount the amount of children for which we will write an activate command
*/
public void activateChildInstancesInBatches(final BpmnElementContext context, final int amount) {
final var record =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(context.getProcessInstanceKey())
.setBatchElementInstanceKey(context.getElementInstanceKey())
.setIndex(amount);

final var key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, record);
}

public void activateElementInstanceInFlowScope(
final BpmnElementContext context, final ExecutableFlowElement element) {

Expand Down
Expand Up @@ -274,7 +274,7 @@ private void activate(
if (loopCharacteristics.isSequential()) {
createInnerInstance(element, activated);
} else {
inputCollection.forEach(item -> createInnerInstance(element, activated));
stateTransitionBehavior.activateChildInstancesInBatches(context, inputCollection.size());
}
}

Expand Down
@@ -0,0 +1,104 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.processinstance;

import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;

public final class ActivateProcessInstanceBatchProcessor
implements TypedRecordProcessor<ProcessInstanceBatchRecord> {
private final TypedCommandWriter commandWriter;
private final KeyGenerator keyGenerator;
private final ElementInstanceState elementInstanceState;
private final ProcessState processState;

public ActivateProcessInstanceBatchProcessor(
final Writers writers,
final KeyGenerator keyGenerator,
final ElementInstanceState elementInstanceState,
final ProcessState processState) {
commandWriter = writers.command();
this.keyGenerator = keyGenerator;
this.elementInstanceState = elementInstanceState;
this.processState = processState;
}

@Override
public void processRecord(final TypedRecord<ProcessInstanceBatchRecord> record) {
final var recordValue = record.getValue();

final ProcessInstanceRecord childInstanceRecord = createChildInstanceRecord(recordValue);
var amountOfChildInstancesToActivate = recordValue.getIndex();
while (amountOfChildInstancesToActivate > 0) {
if (canWriteCommands(record, childInstanceRecord)) {
final long childInstanceKey = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(
childInstanceKey, ProcessInstanceIntent.ACTIVATE_ELEMENT, childInstanceRecord);
amountOfChildInstancesToActivate--;
} else {
writeFollowupBatchCommand(recordValue, amountOfChildInstancesToActivate);
break;
}
}
}

private ProcessInstanceRecord createChildInstanceRecord(
final ProcessInstanceBatchRecord recordValue) {
final var parentElementInstance =
elementInstanceState.getInstance(recordValue.getBatchElementInstanceKey());
final var processDefinition =
processState
.getProcessByKey(parentElementInstance.getValue().getProcessDefinitionKey())
.getProcess();

final var parentElement =
processDefinition.getElementById(parentElementInstance.getValue().getElementId());
final var childElement = ((ExecutableMultiInstanceBody) parentElement).getInnerActivity();

final var childInstanceRecord = new ProcessInstanceRecord();
childInstanceRecord.wrap(parentElementInstance.getValue());
childInstanceRecord
.setFlowScopeKey(parentElementInstance.getKey())
.setElementId(childElement.getId())
.setBpmnElementType(childElement.getElementType())
.setBpmnEventType(childElement.getEventType());
return childInstanceRecord;
}

private void writeFollowupBatchCommand(
final ProcessInstanceBatchRecord recordValue, final long index) {
final var nextBatchRecord =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(recordValue.getProcessInstanceKey())
.setBatchElementInstanceKey(recordValue.getBatchElementInstanceKey())
.setIndex(index);
final long key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, nextBatchRecord);
}

private boolean canWriteCommands(
final TypedRecord<ProcessInstanceBatchRecord> record,
final ProcessInstanceRecord childInstanceRecord) {
// We must have space in the batch to write both the ACTIVATE command as the potential
// follow-up batch command. An excessive 8Kb is added to account for metadata. This is way
// more than will be necessary.
final var expectedCommandLength =
record.getLength() + childInstanceRecord.getLength() + (1024 * 8);
return commandWriter.canWriteCommandOfLength(expectedCommandLength);
}
}
Comment on lines +83 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I wonder if it makes sense to extract these two methods in a *Behavior class since there are two somewhat similar methods in the TerminateProcessInstanceBatchProcessor, and I assume we might have further use of the ProcessInstanceBatch record in the future, so we might duplicate these methods even more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this as well. Thing is, in the end it's only 2 lines of code. One of which is used to determine the expectedCommandLength. This is different for both of the processors we have now. Moving this to a behavior class would mean that we require a parameter getting the expectedCommandLength. So in the end all the behavior method will do is make a call to the command writer.

💭 I guess we could argue that the + 8KB is duplicated and could be extracted from the calculation. To be honest, at this time I don't really care. Let's see if we start using this command more often and how similar the other checks will become.

Expand Up @@ -19,9 +19,17 @@ public final class ProcessInstanceBatchRecord extends UnifiedRecordValue
new LongProperty("batchElementInstanceKey");

/**
* The index is used to determine the beginning of the next batch. If the index equals -1 it means
* there won't be another batch. For the TERMINATE intent this index will be the element instance
* key of the first child instance of the next batch.
* The index is used to keep track of the position in the batch. When the index is -1, there won't
* be another batch.
*
* <p>Depending on the Intent the index is used differently:
*
* <ul>
* <li>TERMINATE - The index is the element instance key of the first child instance of the next
* batch.
* <li>ACTIVATE - The index is a counter, indicating how many more child instances need to be
* activated.
* </ul>
*/
private final LongProperty indexProperty = new LongProperty("index", -1L);

Expand Down
Expand Up @@ -16,7 +16,8 @@
package io.camunda.zeebe.protocol.record.intent;

public enum ProcessInstanceBatchIntent implements ProcessInstanceRelatedIntent {
TERMINATE(0);
TERMINATE(0),
ACTIVATE(1);

private final short value;
private final boolean shouldBlacklist;
Expand All @@ -34,6 +35,8 @@ public static Intent from(final short value) {
switch (value) {
case 0:
return TERMINATE;
case 1:
return ACTIVATE;
default:
return UNKNOWN;
}
Expand Down
Expand Up @@ -179,7 +179,9 @@ private void hasTerminatedInMultipleBatches(
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withBatchElementInstanceKey(batchElementInstanceKey)
.limit(2))
.describedAs("Has terminated in multiple batches")
.describedAs(
"Has terminated in multiple batches. If this assertion fails please decrease "
+ "the message size, or increase the amount of element instances.")
.hasSize(2);
}
}
@@ -0,0 +1,151 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.processing;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public final class MultiInstanceLargeInputCollectionTest {
private static final int INPUT_COLLECTION_SIZE = 100;
private static final String INPUT_ELEMENT = "inputElement";
private static final int MAX_MESSAGE_SIZE_KB = 16;
private static final ZeebeObjectMapper OBJECT_MAPPER = new ZeebeObjectMapper();
private static final EmbeddedBrokerRule BROKER_RULE =
new EmbeddedBrokerRule(
cfg -> {
cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
cfg.getProcessing().setMaxCommandsInBatch(1);
});
private static final GrpcClientRule CLIENT_RULE = new GrpcClientRule(BROKER_RULE);

@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE);

@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();

@Test
public void shouldCompleteParallelMultiInstanceWithLargeInputCollection() {
// given
final long processKey =
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.serviceTask(
"task",
t ->
t.zeebeJobType("task")
.multiInstance(
mi ->
mi.parallel()
.zeebeInputCollectionExpression(
createInputCollection(INPUT_COLLECTION_SIZE))
.zeebeInputElement(INPUT_ELEMENT)))
.endEvent()
.done());

// when
final var processInstanceKey = CLIENT_RULE.createProcessInstance(processKey);
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("task")
.limit(INPUT_COLLECTION_SIZE)
.collect(Collectors.toList());
final var actualInputElements = completeJobs();

// then
hasCreatedJobForEachInputElement(actualInputElements);
hasCompletedElementsAndProcessInCorrectSequence(processInstanceKey);
hasActivatedElementInBatches(processInstanceKey);
}

private String createInputCollection(final int size) {
final var inputCollection = new ArrayList<Integer>();
for (int i = 0; i < size; i++) {
inputCollection.add(i);
}
return OBJECT_MAPPER.toJson(inputCollection);
}

private Set<Integer> completeJobs() {
final ActivateJobsResponse activatedJobs =
CLIENT_RULE
.getClient()
.newActivateJobsCommand()
.jobType("task")
.maxJobsToActivate(INPUT_COLLECTION_SIZE)
.fetchVariables(INPUT_ELEMENT)
.send()
.join();

final var inputElements = new HashSet<Integer>();
activatedJobs
.getJobs()
.forEach(
job -> {
inputElements.add((Integer) job.getVariablesAsMap().get(INPUT_ELEMENT));
CLIENT_RULE.getClient().newCompleteCommand(job.getKey()).send().join();
});
return inputElements;
}

private void hasCreatedJobForEachInputElement(final Set<Integer> actualInputElements) {
final var expectedInputElements = new HashSet<Integer>();
for (int i = 0; i < INPUT_COLLECTION_SIZE; i++) {
expectedInputElements.add(i);
}
assertThat(actualInputElements).containsAll(expectedInputElements);
}

private void hasCompletedElementsAndProcessInCorrectSequence(final long processInstanceKey) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.map(record -> record.getValue().getBpmnElementType())
.describedAs("Has completed process and all jobs")
// Plus 4 for the start event, multi instance, end event and process
.hasSize(INPUT_COLLECTION_SIZE + 4)
.describedAs("Completed in correct sequence")
.startsWith(BpmnElementType.START_EVENT, BpmnElementType.SERVICE_TASK)
.endsWith(
BpmnElementType.SERVICE_TASK,
BpmnElementType.MULTI_INSTANCE_BODY,
BpmnElementType.END_EVENT,
BpmnElementType.PROCESS);
}

private void hasActivatedElementInBatches(final long processInstanceKey) {
assertThat(
RecordingExporter.processInstanceBatchRecords()
.withProcessInstanceKey(processInstanceKey)
.limit(2))
.describedAs(
"Has activated in multiple batches. If this assertion fails please decrease "
+ "the message size, or increase the input collection.")
.hasSize(2);
}
}