Skip to content

Commit

Permalink
merge: #12742
Browse files Browse the repository at this point in the history
12742: [Backport stable/8.1] Support large input collections for Multi-instance r=remcowesterhoud a=backport-action

# Description
Backport of #12692 to `stable/8.1`.

relates to #2890

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed May 11, 2023
2 parents d028690 + 1556c3e commit 05926d9
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 13 deletions.
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionCreateProcessor;
import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionDeleteProcessor;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.processinstance.ActivateProcessInstanceBatchProcessor;
import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceProcessor;
import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceWithResultProcessor;
import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceCommandProcessor;
Expand Down Expand Up @@ -225,12 +226,21 @@ private static void addProcessInstanceBatchStreamProcessors(
final TypedRecordProcessors typedRecordProcessors,
final MutableProcessingState processingState,
final Writers writers) {
final TerminateProcessInstanceBatchProcessor terminateBatchProcessor =
new TerminateProcessInstanceBatchProcessor(
writers, processingState.getKeyGenerator(), processingState.getElementInstanceState());
typedRecordProcessors.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.TERMINATE,
terminateBatchProcessor);
typedRecordProcessors
.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.TERMINATE,
new TerminateProcessInstanceBatchProcessor(
writers,
processingState.getKeyGenerator(),
processingState.getElementInstanceState()))
.onCommand(
ValueType.PROCESS_INSTANCE_BATCH,
ProcessInstanceBatchIntent.ACTIVATE,
new ActivateProcessInstanceBatchProcessor(
writers,
processingState.getKeyGenerator(),
processingState.getElementInstanceState(),
processingState.getProcessState()));
}
}
Expand Up @@ -293,6 +293,23 @@ public long activateChildInstanceWithKey(
return childInstanceKey;
}

/**
* Activate a given amount of children of a multi-instance element.
*
* @param context the context of the multi-instance element
* @param amount the amount of children for which we will write an activate command
*/
public void activateChildInstancesInBatches(final BpmnElementContext context, final int amount) {
final var record =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(context.getProcessInstanceKey())
.setBatchElementInstanceKey(context.getElementInstanceKey())
.setIndex(amount);

final var key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, record);
}

public void activateElementInstanceInFlowScope(
final BpmnElementContext context, final ExecutableFlowElement element) {

Expand Down
Expand Up @@ -274,7 +274,7 @@ private void activate(
if (loopCharacteristics.isSequential()) {
createInnerInstance(element, activated);
} else {
inputCollection.forEach(item -> createInnerInstance(element, activated));
stateTransitionBehavior.activateChildInstancesInBatches(context, inputCollection.size());
}
}

Expand Down
@@ -0,0 +1,104 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.processinstance;

import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;

public final class ActivateProcessInstanceBatchProcessor
implements TypedRecordProcessor<ProcessInstanceBatchRecord> {
private final TypedCommandWriter commandWriter;
private final KeyGenerator keyGenerator;
private final ElementInstanceState elementInstanceState;
private final ProcessState processState;

public ActivateProcessInstanceBatchProcessor(
final Writers writers,
final KeyGenerator keyGenerator,
final ElementInstanceState elementInstanceState,
final ProcessState processState) {
commandWriter = writers.command();
this.keyGenerator = keyGenerator;
this.elementInstanceState = elementInstanceState;
this.processState = processState;
}

@Override
public void processRecord(final TypedRecord<ProcessInstanceBatchRecord> record) {
final var recordValue = record.getValue();

final ProcessInstanceRecord childInstanceRecord = createChildInstanceRecord(recordValue);
var amountOfChildInstancesToActivate = recordValue.getIndex();
while (amountOfChildInstancesToActivate > 0) {
if (canWriteCommands(record, childInstanceRecord)) {
final long childInstanceKey = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(
childInstanceKey, ProcessInstanceIntent.ACTIVATE_ELEMENT, childInstanceRecord);
amountOfChildInstancesToActivate--;
} else {
writeFollowupBatchCommand(recordValue, amountOfChildInstancesToActivate);
break;
}
}
}

private ProcessInstanceRecord createChildInstanceRecord(
final ProcessInstanceBatchRecord recordValue) {
final var parentElementInstance =
elementInstanceState.getInstance(recordValue.getBatchElementInstanceKey());
final var processDefinition =
processState
.getProcessByKey(parentElementInstance.getValue().getProcessDefinitionKey())
.getProcess();

final var parentElement =
processDefinition.getElementById(parentElementInstance.getValue().getElementId());
final var childElement = ((ExecutableMultiInstanceBody) parentElement).getInnerActivity();

final var childInstanceRecord = new ProcessInstanceRecord();
childInstanceRecord.wrap(parentElementInstance.getValue());
childInstanceRecord
.setFlowScopeKey(parentElementInstance.getKey())
.setElementId(childElement.getId())
.setBpmnElementType(childElement.getElementType())
.setBpmnEventType(childElement.getEventType());
return childInstanceRecord;
}

private void writeFollowupBatchCommand(
final ProcessInstanceBatchRecord recordValue, final long index) {
final var nextBatchRecord =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(recordValue.getProcessInstanceKey())
.setBatchElementInstanceKey(recordValue.getBatchElementInstanceKey())
.setIndex(index);
final long key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, nextBatchRecord);
}

private boolean canWriteCommands(
final TypedRecord<ProcessInstanceBatchRecord> record,
final ProcessInstanceRecord childInstanceRecord) {
// We must have space in the batch to write both the ACTIVATE command as the potential
// follow-up batch command. An excessive 8Kb is added to account for metadata. This is way
// more than will be necessary.
final var expectedCommandLength =
record.getLength() + childInstanceRecord.getLength() + (1024 * 8);
return commandWriter.canWriteCommandOfLength(expectedCommandLength);
}
}
Expand Up @@ -19,9 +19,17 @@ public final class ProcessInstanceBatchRecord extends UnifiedRecordValue
new LongProperty("batchElementInstanceKey");

/**
* The index is used to determine the beginning of the next batch. If the index equals -1 it means
* there won't be another batch. For the TERMINATE intent this index will be the element instance
* key of the first child instance of the next batch.
* The index is used to keep track of the position in the batch. When the index is -1, there won't
* be another batch.
*
* <p>Depending on the Intent the index is used differently:
*
* <ul>
* <li>TERMINATE - The index is the element instance key of the first child instance of the next
* batch.
* <li>ACTIVATE - The index is a counter, indicating how many more child instances need to be
* activated.
* </ul>
*/
private final LongProperty indexProperty = new LongProperty("index", -1L);

Expand Down
Expand Up @@ -16,7 +16,8 @@
package io.camunda.zeebe.protocol.record.intent;

public enum ProcessInstanceBatchIntent implements ProcessInstanceRelatedIntent {
TERMINATE(0);
TERMINATE(0),
ACTIVATE(1);

private final short value;
private final boolean shouldBlacklist;
Expand All @@ -34,6 +35,8 @@ public static Intent from(final short value) {
switch (value) {
case 0:
return TERMINATE;
case 1:
return ACTIVATE;
default:
return UNKNOWN;
}
Expand Down
Expand Up @@ -171,7 +171,9 @@ private void hasTerminatedInMultipleBatches(
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withBatchElementInstanceKey(batchElementInstanceKey)
.limit(2))
.describedAs("Has terminated in multiple batches")
.describedAs(
"Has terminated in multiple batches. If this assertion fails please decrease "
+ "the message size, or increase the amount of element instances.")
.hasSize(2);
}
}
@@ -0,0 +1,151 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.processing;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public final class MultiInstanceLargeInputCollectionTest {
private static final int INPUT_COLLECTION_SIZE = 100;
private static final String INPUT_ELEMENT = "inputElement";
private static final int MAX_MESSAGE_SIZE_KB = 16;
private static final ZeebeObjectMapper OBJECT_MAPPER = new ZeebeObjectMapper();
private static final EmbeddedBrokerRule BROKER_RULE =
new EmbeddedBrokerRule(
cfg -> {
cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
cfg.getProcessing().setMaxCommandsInBatch(1);
});
private static final GrpcClientRule CLIENT_RULE = new GrpcClientRule(BROKER_RULE);

@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE);

@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();

@Test
public void shouldCompleteParallelMultiInstanceWithLargeInputCollection() {
// given
final long processKey =
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.serviceTask(
"task",
t ->
t.zeebeJobType("task")
.multiInstance(
mi ->
mi.parallel()
.zeebeInputCollectionExpression(
createInputCollection(INPUT_COLLECTION_SIZE))
.zeebeInputElement(INPUT_ELEMENT)))
.endEvent()
.done());

// when
final var processInstanceKey = CLIENT_RULE.createProcessInstance(processKey);
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("task")
.limit(INPUT_COLLECTION_SIZE)
.collect(Collectors.toList());
final var actualInputElements = completeJobs();

// then
hasCreatedJobForEachInputElement(actualInputElements);
hasCompletedElementsAndProcessInCorrectSequence(processInstanceKey);
hasActivatedElementInBatches(processInstanceKey);
}

private String createInputCollection(final int size) {
final var inputCollection = new ArrayList<Integer>();
for (int i = 0; i < size; i++) {
inputCollection.add(i);
}
return OBJECT_MAPPER.toJson(inputCollection);
}

private Set<Integer> completeJobs() {
final ActivateJobsResponse activatedJobs =
CLIENT_RULE
.getClient()
.newActivateJobsCommand()
.jobType("task")
.maxJobsToActivate(INPUT_COLLECTION_SIZE)
.fetchVariables(INPUT_ELEMENT)
.send()
.join();

final var inputElements = new HashSet<Integer>();
activatedJobs
.getJobs()
.forEach(
job -> {
inputElements.add((Integer) job.getVariablesAsMap().get(INPUT_ELEMENT));
CLIENT_RULE.getClient().newCompleteCommand(job.getKey()).send().join();
});
return inputElements;
}

private void hasCreatedJobForEachInputElement(final Set<Integer> actualInputElements) {
final var expectedInputElements = new HashSet<Integer>();
for (int i = 0; i < INPUT_COLLECTION_SIZE; i++) {
expectedInputElements.add(i);
}
assertThat(actualInputElements).containsAll(expectedInputElements);
}

private void hasCompletedElementsAndProcessInCorrectSequence(final long processInstanceKey) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.map(record -> record.getValue().getBpmnElementType())
.describedAs("Has completed process and all jobs")
// Plus 4 for the start event, multi instance, end event and process
.hasSize(INPUT_COLLECTION_SIZE + 4)
.describedAs("Completed in correct sequence")
.startsWith(BpmnElementType.START_EVENT, BpmnElementType.SERVICE_TASK)
.endsWith(
BpmnElementType.SERVICE_TASK,
BpmnElementType.MULTI_INSTANCE_BODY,
BpmnElementType.END_EVENT,
BpmnElementType.PROCESS);
}

private void hasActivatedElementInBatches(final long processInstanceKey) {
assertThat(
RecordingExporter.processInstanceBatchRecords()
.withProcessInstanceKey(processInstanceKey)
.limit(2))
.describedAs(
"Has activated in multiple batches. If this assertion fails please decrease "
+ "the message size, or increase the input collection.")
.hasSize(2);
}
}

0 comments on commit 05926d9

Please sign in to comment.