Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed task metadata in record header #80

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ocadaruma
Copy link
Member

Motivation

  • Currently, DecatonClient serializes tasks in DecatonTaskRequest protobuf format because when Decaton had started, Kafka didn't have record header yet
    • As Kafka started record header support quite long ago, it's natural to use it to embed task metadata

Summary of changes

protocol

  • Rename DecatonTaskRequest protobuf message to LegacyDecatonTaskRequest and marked it as @Deprecated
  • Add TaskMetadataHeaders which is responsible for read/write TaskMetadata from/to Headers

client

  • Add new DecatonClient which produces tasks as record value directly and embed metadata in headers and make it default
  • Add new method DecatonClientBuilder#buildLegacyClient, that instantiates old decaton client impl which produces tasks in DecatonTaskRequest format
    • for backward compatibility

processor

  • Change the signature of TaskExtractor#extract to receive ConsumerRecord instead of byte[], to allow users to access record header when extracting a task
  • Make DefaultTaskExtractor to be able to parse both formats of tasks, DecatonTaskRequest protobuf and header-based tasks
    • for compatibility when rolling-upgrading decaton client application
  • Introduce CONFIG_UPGRADE_FROM processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams's upgrade.from property ref)
  • DecatonTaskRetryQueueingProcessor switches decaton task request format based on CONFIG_UPGRADED_FROM
    • If it is V0_X_X => produce in DecatonTaskRequest format same as current
    • otherwise => produce in header-based format
    • This switch is necessary to prevent producing retry tasks in new header-based format when rolling-upgrading decaton processor application, which is unparseable by old decaton processor version (0.x.x)
  • Thus, upgrading process will be like below:
    • upgrade decaton processor by setting CONFIG_UPGRADE_FROM to V0_X_X
    • one more restart by unsetting CONFIG_UPGRADE_FROM
      • decaton processor starts producing retry tasks in header-based format
    • upgrade decaton client

Want discussion

  • As you noticed, current design enforces users to restart processor twice, and pay attention not to upgrade decaton-client first which could produce unparseable tasks
    • Besides, introducing CONFIG_UPGRADE_FROM looks bit awkward
  • Isn't the upgrade procedure too complicated and error-prone?

This PR is still WIP. I'd like to complete it after we agreed about overall design.

@kawamuray
Copy link
Member

Thanks for starting this 👍 Read your proposal and can agree with most of the parts.

Some comments inline:

Rename DecatonTaskRequest protobuf message to LegacyDecatonTaskRequest and marked it as @deprecated

While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?

Add new method DecatonClientBuilder#buildLegacyClient, that instantiates old decaton client impl which produces tasks in DecatonTaskRequest format

Also wrt rename to LegacyDecatonClient, I'm not sure how is it meaningful to keep the legacy implementation if we're going to break backward API compatibility anyway.
I think we should discuss and take either one of following ways rather than attempting to do both?

  • Move to the new way immediately, breaking any necessary API compatibility and release 2.0
  • Preserve all backward compatibility and introduce some interfaces for using the new way, persuading users to migrate gracefully

Introduce CONFIG_UPGRADE_FROM processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams's upgrade.from property ref)

Can't we distinguish it based on presence of the metadata header field in a given record?

  1. Rolling restart processors to new version which supports both types
  2. During the rolling restart, there will be no new task produced in new protocol because all the source tasks are in older format (DecatonTaskRequest)
  3. After completing to upgrade all processors, restart producers with newer DecatonClient so it will start producing tasks with newer format. However all processor instances are already capable of handling it.

@ocadaruma
Copy link
Member Author

While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?

Good point. Seems we don't have to rename it, just add deprecation annotation (to let users know the protocol will be deprecated eventually) would be fine.

Also wrt rename to LegacyDecatonClient, I'm not sure how is it meaningful to keep the legacy implementation

I thought we have to keep legacy impl, because processor needs both implementation for graceful upgrade.
... but I realized that DecatonRetryQueueingProcessor is using raw DecatonTaskProducer rather than DecatonClient, so seems we don't need to keep both impl.

Then "Move to the new way immediately" sounds better for simplicity. (If we preserve all API compatibility and providing different method to use new way (like DecatonClientBuilder#enableHeaderMeta() or any), users have to change the code twice (1. #enableHeaderMeta when upgrade to 1.2.0 and 2. delete #enableHeaderMeta because it's now default when upgrade to 2.0), which sounds bothersome)

By the way maybe bit off-topic, will adding a method to TaskExtractor be breaking change ?

For now, TaskExtractor is "effectively FunctionalInterface" because it has only one method, so we often use lambda.
If we add a method to TaskExtractor we have to rewrite such codes.

Can't we distinguish it based on presence of the metadata header field in a given record?

The problem is when we use TaskExtractor with retry feature.
When we consume non-Decaton topic, source metadata header is always null so there's no trigger to switch to new format.

Or should the upgrade path be like below?

  1. Publish Decaton 2.0, which can understand both formats, but produces retry tasks in new format only when metadata header is not null (or always use old format)
  2. Publish Decaton 2.1, which always produces retry tasks in new format
    • Decaton < 2.0 users must upgrade to 2.0 first
  3. Publish Decaton 3.0, which removes old format-related code completely

@kawamuray
Copy link
Member

By the way maybe bit off-topic, will adding a method to TaskExtractor be breaking change ?

Not sure if we should really avoid that, but we might possibly workaround that by:

    default DecatonTask<T> extract(ConsumerRecord<String, byte[]> record) {
        return extract(record.value());
    }

The problem is when we use TaskExtractor with retry feature.

That's true, good point.

Or should the upgrade path be like below?

No, I think the current approach suites the most I think. Maybe we can guide two ways - 1 is to shutdown all processrs at once and one is for graceful rolling restart. Some users might be okay to stop their processors for short duration and prefers the simpler way.

@kawamuray
Copy link
Member

Can you rebase onto latest master branch then? I'll start looking into detail then.

@ocadaruma
Copy link
Member Author

Rebased and reworked based on discussions so far. (Still I didn't fix tests yet, since I think there's point of argue about overall direction)

Then, I came up with thoughts about two more points:

  • It's necessary to "synthesize" ConsumerRecord, considering when a record is produced by retry-queueing processor < 2.x
  • Once we start embedding task metadata in record header, decaton-common's Serializer/Deserializer will be effectively same as kafka-clients's Serializer/Deserializer
    • Shouldn't we deprecate that as well? Or better to do in another PR?

Copy link
Member

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good.

/**
* Version that the Decaton processor is upgraded from.
* This will be used to provide a graceful way to upgrade processor
* when Decaton introduces backward-incompatible change.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you prepare a dedicated migration guide and put link here please?

*
* Reloadable: no
*/
public static final PropertyDefinition<UpgradeFrom> CONFIG_UPGRADE_FROM =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick with primitive types + List and Map for the prpoperty's value type. We can't put any assumption on how a ProeprtySupplier implementation builds a property, and I think it's bit optimistic to think all suppliers could instantiate an enum w/o any per-case effort.


CompletableFuture<PutTaskResult> future = producer.sendRequest(context.key(), request);
final CompletableFuture<PutTaskResult> future;
switch (upgradeFrom) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you extract this part out as a method please

* This class will live until the task process has been completed.
* To lessen heap pressure, rawRequestBytes should be purged by calling this once the task is extracted.
*/
public void purgeRawRequestBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm. not sure how much impact was this actually producing, but we want to rip off unncessary (and potentially learge) chunk as possible as we can?

public void purgeRawRequestBytes() {
rawRequestBytes = null;
public TopicPartition topicPartition() {
return new TopicPartition(record.topic(), record.partition());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we cache this topic partition instance to avoid instantiating multiple times knowing this method is called relatively often?


import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class TaskMetadataUtil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point to place this in protocol module? I think the common module is right place to go? (protocol module intends to contain only generated protocol files)

message DecatonTaskRequest {
option deprecated = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we still want to expose this class as a public interface? otherwise shall we move it to the package with intenral added?

DecatonTask<byte[]> retryTask = innerExtractor.extract(record);

// Restore ConsumerRecord which holds original task bytes as its value
ConsumerRecord<String, byte[]> restoredRecord =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, now this implementation's getting bigger. shall we extract it out as a method or a class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants