Skip to content

Commit

Permalink
Add support for update admitted event (#2041)
Browse files Browse the repository at this point in the history
Add support for update admitted event
  • Loading branch information
Quinn-With-Two-Ns committed May 8, 2024
1 parent 0013675 commit a41c64e
Show file tree
Hide file tree
Showing 5 changed files with 605 additions and 42 deletions.
Expand Up @@ -20,13 +20,13 @@

package io.temporal.internal.statemachines;

import com.google.common.base.Preconditions;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.internal.common.WorkflowExecutionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* This class buffers events between WorkflowTaskStarted events and return them in one chunk so any
Expand All @@ -42,9 +42,29 @@ private enum WFTState {
Closed,
}

public static class EventBatch {
private final List<HistoryEvent> events;
private final Optional<HistoryEvent> workflowTaskCompletedEvent;

public EventBatch(
Optional<HistoryEvent> workflowTaskCompletedEvent, List<HistoryEvent> events) {
this.workflowTaskCompletedEvent = workflowTaskCompletedEvent;
this.events = events;
}

public List<HistoryEvent> getEvents() {
return events;
}

public Optional<HistoryEvent> getWorkflowTaskCompletedEvent() {
return workflowTaskCompletedEvent;
}
}

private WFTState wftSequenceState = WFTState.None;

private final List<HistoryEvent> wftBuffer = new ArrayList<>();
private Optional<HistoryEvent> workflowTaskCompletedEvent = Optional.empty();
private final List<HistoryEvent> readyToFetch = new ArrayList<>();

/**
Expand Down Expand Up @@ -73,17 +93,20 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
// This is the only way to enter into the WFT sequence -
// any WorkflowTaskStarted event that it not the last in the history
if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(event.getEventType())) {
// if there is something in wftBuffer, let's flush it
flushBuffer();
// and init a new sequence
// Init a new sequence
wftSequenceState = WFTState.Started;
addToBuffer(event);
return;
}

if (WFTState.Started.equals(wftSequenceState)
&& WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event)) {
wftSequenceState = WFTState.Closed;
if (event.getEventType().equals(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)) {
workflowTaskCompletedEvent = Optional.of(event);
wftSequenceState = WFTState.Closed;
} else {
wftSequenceState = WFTState.None;
}
addToBuffer(event);
return;
}
Expand All @@ -94,14 +117,19 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
// If event is WFT_STARTED or any of the Closing events, it's handled by if statements
// earlier, so it's safe to switch to None here, we are not inside WFT sequence
wftSequenceState = WFTState.None;
// no open WFT sequence, can't add to buffer, it's ok to add directly to readyToFetch, this
// event can't be EVENT_TYPE_WORKFLOW_TASK_STARTED because we checked it above.
readyToFetch.add(event);

addToBuffer(event);
return;
}
if (WFTState.Closed.equals(wftSequenceState) && WorkflowExecutionUtils.isCommandEvent(event)) {
// we are inside a closed WFT sequence, we can add to buffer
addToBuffer(event);
return;
}

if (WFTState.None.equals(wftSequenceState)) {
// we should be returning the events one by one, we are not inside a WFT sequence
if (WorkflowExecutionUtils.isCommandEvent(event)
|| WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event)) {
flushBuffer();
readyToFetch.add(event);
} else {
addToBuffer(event);
Expand All @@ -114,21 +142,22 @@ private void flushBuffer() {
}

private void addToBuffer(HistoryEvent event) {
Preconditions.checkState(
!WFTState.None.equals(wftSequenceState),
"We should be inside an open WFT sequence to add to the buffer");
wftBuffer.add(event);
}

public List<HistoryEvent> fetch() {
public EventBatch fetch() {
if (readyToFetch.size() == 1) {
HistoryEvent event = readyToFetch.get(0);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return Collections.singletonList(event);
return new EventBatch(wftStarted, Collections.singletonList(event));
} else {
List<HistoryEvent> result = new ArrayList<>(readyToFetch);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return result;
return new EventBatch(wftStarted, result);
}
}
}
Expand Up @@ -113,6 +113,7 @@ enum HandleEventStatus {

private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();

/** Key is the protocol instance id */
private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();

private final Queue<Message> messageOutbox = new ArrayDeque<>();
Expand All @@ -126,9 +127,19 @@ enum HandleEventStatus {
private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();

/**
* Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
* to true for the time when we apply events from the server even if the commands were created by
* an actual execution with replaying=false.
* Is workflow executing new code or replaying from the history. The definition of replaying here
* is that we are no longer replaying as soon as we see new events that have never been seen or
* produced by the SDK.
*
* <p>Specifically, replay ends once we have seen any non-command event (IE: events that aren't a
* result of something we produced in the SDK) on a WFT which has the final event in history
* (meaning we are processing the most recent WFT and there are no more subsequent WFTs). WFT
* Completed in this case does not count as a non-command event, because that will typically show
* up as the first event in an incremental history, and we want to ignore it and its associated
* commands since we "produced" them.
*
* <p>Note: that this flag ALWAYS flips to true for the time when we apply events from the server
* even if the commands were created by an actual execution with replaying=false.
*/
private boolean replaying;

Expand Down Expand Up @@ -160,6 +171,12 @@ enum HandleEventStatus {

private List<Message> messages = new ArrayList<>();

/**
* Set of accepted durably admitted updates by update id a "durably admitted" update is one with
* an UPDATE_ADMITTED event.
*/
private final Set<String> acceptedUpdates = new HashSet<>();

private final SdkFlags flags;

public WorkflowStateMachines(
Expand Down Expand Up @@ -277,18 +294,19 @@ public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
* Handle an events batch for one workflow task. Events that are related to one workflow task
* during replay should be prefetched and supplied in one batch.
*
* @param events events belong to one workflow task
* @param hasNextEvent true if there are more events in the history follow this batch, false if
* @param eventBatch events belong to one workflow task
* @param hasNextBatch true if there are more events in the history follow this batch, false if
* this batch contains the last events of the history
*/
private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
private void handleEventsBatch(WFTBuffer.EventBatch eventBatch, boolean hasNextBatch) {
List<HistoryEvent> events = eventBatch.getEvents();
if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
for (SdkFlag flag : initialFlags) {
flags.tryUseSdkFlag(flag);
}
}

if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
if (eventBatch.getWorkflowTaskCompletedEvent().isPresent()) {
for (HistoryEvent event : events) {
handleSingleEventLookahead(event);
}
Expand All @@ -304,7 +322,10 @@ private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent)
}

try {
handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
boolean isLastTask =
!hasNextBatch && !eventBatch.getWorkflowTaskCompletedEvent().isPresent();
boolean hasNextEvent = iterator.hasNext() || hasNextBatch;
handleSingleEvent(event, isLastTask, hasNextEvent);
} catch (RuntimeException e) {
throw createEventProcessingException(e, event);
}
Expand All @@ -330,13 +351,20 @@ private void handleSingleEventLookahead(HistoryEvent event) {
// Look ahead to infer protocol messages
WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
event.getWorkflowExecutionUpdateAcceptedEventAttributes();
this.messages.add(
Message.newBuilder()
.setId(updateEvent.getAcceptedRequestMessageId())
.setProtocolInstanceId(updateEvent.getProtocolInstanceId())
.setEventId(updateEvent.getAcceptedRequestSequencingEventId())
.setBody(Any.pack(updateEvent.getAcceptedRequest()))
.build());
// If an EXECUTION_UPDATE_ACCEPTED event does not have an accepted request, then it
// must be from an admitted update. This is the only way to infer an admitted update was
// accepted.
if (!updateEvent.hasAcceptedRequest()) {
acceptedUpdates.add(updateEvent.getProtocolInstanceId());
} else {
messages.add(
Message.newBuilder()
.setId(updateEvent.getAcceptedRequestMessageId())
.setProtocolInstanceId(updateEvent.getProtocolInstanceId())
.setEventId(updateEvent.getAcceptedRequestSequencingEventId())
.setBody(Any.pack(updateEvent.getAcceptedRequest()))
.build());
}
break;
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
WorkflowTaskCompletedEventAttributes completedEvent =
Expand All @@ -352,6 +380,9 @@ private void handleSingleEventLookahead(HistoryEvent event) {
}
flags.setSdkFlag(sdkFlag);
}
// Remove any finished update protocol state machines. We can't remove them on an event like
// other state machines because a rejected update produces no event in history.
protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
break;
}
}
Expand Down Expand Up @@ -416,16 +447,17 @@ private void handleSingleMessage(Message message) {
stateMachine.handleMessage(message);
}

private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
private void handleSingleEvent(HistoryEvent event, boolean lastTask, boolean hasNextEvent) {
if (isCommandEvent(event)) {
handleCommandEvent(event);
return;
}

// We don't explicitly check if the event is a command event here because it's already handled
// above.
if (replaying
&& !hasNextEvent
&& (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
|| WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
&& lastTask
&& event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
replaying = false;
}

Expand Down Expand Up @@ -705,6 +737,20 @@ private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
callbacks.cancel(event);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
WorkflowExecutionUpdateAdmittedEventAttributes admittedEvent =
event.getWorkflowExecutionUpdateAdmittedEventAttributes();
Message msg =
Message.newBuilder()
.setId(admittedEvent.getRequest().getMeta().getUpdateId() + "/request")
.setProtocolInstanceId(admittedEvent.getRequest().getMeta().getUpdateId())
.setEventId(event.getEventId())
.setBody(Any.pack(admittedEvent.getRequest()))
.build();
if (replaying && acceptedUpdates.remove(msg.getProtocolInstanceId()) || !replaying) {
messages.add(msg);
}
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case UNRECOGNIZED:
break;
Expand Down Expand Up @@ -1028,9 +1074,6 @@ public Functions.Proc scheduleLocalActivityTask(

/** Validates that command matches the event during replay. */
private void validateCommand(Command command, HistoryEvent event) {
// TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
// matching.

// ProtocolMessageCommand is different from other commands because it can be associated with
// multiple types of events
// TODO(#1781) Validate protocol message is expected type.
Expand Down Expand Up @@ -1291,6 +1334,7 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
return OptionalLong.of(event.getEventId());

default:
Expand Down
Expand Up @@ -537,6 +537,10 @@ private HistoryEvent newAttributes(EventType type, Object attributes) {
result.setChildWorkflowExecutionTerminatedEventAttributes(
(ChildWorkflowExecutionTerminatedEventAttributes) attributes);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
result.setWorkflowExecutionUpdateAdmittedEventAttributes(
(WorkflowExecutionUpdateAdmittedEventAttributes) attributes);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
result.setWorkflowExecutionUpdateAcceptedEventAttributes(
(WorkflowExecutionUpdateAcceptedEventAttributes) attributes);
Expand Down

0 comments on commit a41c64e

Please sign in to comment.