-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embed task metadata in record header #80
base: master
Are you sure you want to change the base?
Conversation
Thanks for starting this 👍 Read your proposal and can agree with most of the parts. Some comments inline:
While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?
Also wrt rename to
Can't we distinguish it based on presence of the metadata header field in a given record?
|
Good point. Seems we don't have to rename it, just add deprecation annotation (to let users know the protocol will be deprecated eventually) would be fine.
I thought we have to keep legacy impl, because Then "Move to the new way immediately" sounds better for simplicity. (If we preserve all API compatibility and providing different method to use new way (like By the way maybe bit off-topic, will adding a method to For now,
The problem is when we use Or should the upgrade path be like below?
|
Not sure if we should really avoid that, but we might possibly workaround that by: default DecatonTask<T> extract(ConsumerRecord<String, byte[]> record) {
return extract(record.value());
}
That's true, good point.
No, I think the current approach suites the most I think. Maybe we can guide two ways - 1 is to shutdown all processrs at once and one is for graceful rolling restart. Some users might be okay to stop their processors for short duration and prefers the simpler way. |
Can you rebase onto latest master branch then? I'll start looking into detail then. |
81fd0d7
to
c62299d
Compare
Rebased and reworked based on discussions so far. (Still I didn't fix tests yet, since I think there's point of argue about overall direction) Then, I came up with thoughts about two more points:
|
c62299d
to
242685c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looking good.
/** | ||
* Version that the Decaton processor is upgraded from. | ||
* This will be used to provide a graceful way to upgrade processor | ||
* when Decaton introduces backward-incompatible change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you prepare a dedicated migration guide and put link here please?
* | ||
* Reloadable: no | ||
*/ | ||
public static final PropertyDefinition<UpgradeFrom> CONFIG_UPGRADE_FROM = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should stick with primitive types + List
and Map
for the prpoperty's value type. We can't put any assumption on how a ProeprtySupplier
implementation builds a property, and I think it's bit optimistic to think all suppliers could instantiate an enum w/o any per-case effort.
|
||
CompletableFuture<PutTaskResult> future = producer.sendRequest(context.key(), request); | ||
final CompletableFuture<PutTaskResult> future; | ||
switch (upgradeFrom) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you extract this part out as a method please
* This class will live until the task process has been completed. | ||
* To lessen heap pressure, rawRequestBytes should be purged by calling this once the task is extracted. | ||
*/ | ||
public void purgeRawRequestBytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm. not sure how much impact was this actually producing, but we want to rip off unncessary (and potentially learge) chunk as possible as we can?
public void purgeRawRequestBytes() { | ||
rawRequestBytes = null; | ||
public TopicPartition topicPartition() { | ||
return new TopicPartition(record.topic(), record.partition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we cache this topic partition instance to avoid instantiating multiple times knowing this method is called relatively often?
|
||
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; | ||
|
||
public class TaskMetadataUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point to place this in protocol
module? I think the common
module is right place to go? (protocol module intends to contain only generated protocol files)
message DecatonTaskRequest { | ||
option deprecated = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we still want to expose this class as a public interface? otherwise shall we move it to the package with intenral
added?
DecatonTask<byte[]> retryTask = innerExtractor.extract(record); | ||
|
||
// Restore ConsumerRecord which holds original task bytes as its value | ||
ConsumerRecord<String, byte[]> restoredRecord = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, now this implementation's getting bigger. shall we extract it out as a method or a class?
Motivation
DecatonTaskRequest
protobuf format because when Decaton had started, Kafka didn't have record header yetSummary of changes
protocol
DecatonTaskRequest
protobuf message toLegacyDecatonTaskRequest
and marked it as@Deprecated
TaskMetadataHeaders
which is responsible for read/write TaskMetadata from/to Headersclient
DecatonClient
which produces tasks as record value directly and embed metadata in headers and make it defaultDecatonClientBuilder#buildLegacyClient
, that instantiates old decaton client impl which produces tasks inDecatonTaskRequest
formatprocessor
TaskExtractor#extract
to receiveConsumerRecord
instead ofbyte[]
, to allow users to access record header when extracting a taskDefaultTaskExtractor
to be able to parse both formats of tasks,DecatonTaskRequest
protobuf and header-based tasksCONFIG_UPGRADE_FROM
processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams'supgrade.from
property ref)DecatonTaskRetryQueueingProcessor
switches decaton task request format based onCONFIG_UPGRADED_FROM
V0_X_X
=> produce inDecatonTaskRequest
format same as currentCONFIG_UPGRADE_FROM
toV0_X_X
CONFIG_UPGRADE_FROM
Want discussion
CONFIG_UPGRADE_FROM
looks bit awkwardThis PR is still WIP. I'd like to complete it after we agreed about overall design.