Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

#386: Implemented passing of CryptoKeyReader. #387

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

objecttrouve
Copy link

Implemented passing a CryptoKeyReader (and encryption keys) to FlinkPulsarSource and FlinkPulsarSink, as requested here.

Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors.

Added test. (Maybe it can be moved to one of the other tests to avoid overhead.)

As requested [here](streamnative#386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`.

Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors.

Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.)
@nlu90
Copy link
Contributor

nlu90 commented Aug 10, 2021

@jianyun8023 @syhily could you help take a look?

@syhily
Copy link
Contributor

syhily commented Aug 27, 2021

Hi @objecttrouve, tks for your contribution. Using a builder pattern on internal Fetcher don't sounds like a Good design.
Add builder pattern for public source & sink is accepted, by we may need extra validation on final build method.

The detailed review advice would be given in weekend.

private final PulsarSerializationSchema<T> serializationSchema;

public FlinkPulsarSink(final Builder<T> builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be public for a Builder pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Must have missed it. Will fix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made c'tors private.

private final PulsarSerializationSchema<T> serializationSchema;

public FlinkPulsarSink(final Builder<T> builder) {
super(
new FlinkPulsarSinkBase.Config<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be boilerplate. Why not simply use ctor? Since it's a builder pattern. The end user would never touch this ctor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The end user would never touch this ctor.

I wasn't sure about which classes an end user could touch. Therefore, I tried hard not to change the public c'tors plus not to add more than absolutely necessary. But if the class is just for internal use, right, it's unnecessary boilerplate. Will change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.


super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
this.serializationSchema = serializationSchema;
this(new Builder<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor old ctor doen't look like a good choice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place.
But to be honest, I don't mind very much. So I'll just revert to the old c'tor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

if (clientConf == null){
throw new IllegalStateException("Client conf must be set.");
}
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptoKeyReader requires extra serialization check. Use Preconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

@@ -86,6 +89,64 @@
@Slf4j
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction {

public static class Config<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need builder for this base class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much the same reasoning as implemented throughout. A builder or config object makes it easier to add parameters in the future, without breaking an API or resulting in more and more overloaded c'tors. I have to admit I'm not sure which classes exactly were intended strictly for private purposes, so I just applied the pattern all over the place.
(Knowing that builders and config objects imply a lot of boilerplate.)

If overloading c'tors or changing the API of the class is OK, I'm completely fine with it though. So I'll change this piece, too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
public FlinkPulsarSinkBase(final Config<T> config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may just keep this ctor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much the same as above, I'll change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

if (clientConf == null){
throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties.");
}
return new FlinkPulsarSource<>(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cryptoKeyReader doesn't require encryptionKeys here. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to check the serialization for encryptionKeys here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think it doesn't require the encryptionKeys. But I'll double check.
OK, will check serialization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the source, no encryption keys are required. Added serialization check.

streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP),
useMetrics
);
return new PulsarFetcher.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the PulsarFetcher is an internal API. Plz just keep the ctor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to old c'tor.

@objecttrouve
Copy link
Author

@syhily, thanks for the feedback!

I'll make the requested changes in my next free time slot.

Turned out builder pattern wasn't necessary here. There's one path now for the CryptoKeyReader (plus the encryption keys) and possibly others where it's null (and the keys are empty).
@objecttrouve
Copy link
Author

@syhily, I made the changes as I understood your requests. Please let me know if there's anything else. Also, if I should squash. (Left multiple commits for better relatability to your remarks. For the time being.)

@syhily
Copy link
Contributor

syhily commented Sep 15, 2021

Thanks for your contribution. The PR looks reasonable on my side. I will merge this on the weekend.

@objecttrouve
Copy link
Author

Thanks for your contribution. The PR looks reasonable on my side. I will merge this on the weekend.

Thanks @syhily!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants