Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discuss]SDK supports batching protobuf format #542

Open
mxsm opened this issue Mar 31, 2023 · 5 comments
Open

[Discuss]SDK supports batching protobuf format #542

mxsm opened this issue Mar 31, 2023 · 5 comments
Labels
enhancement New feature or request

Comments

@mxsm
Copy link
Contributor

mxsm commented Mar 31, 2023

Batching support was added to the Protobuf format.SDK supports now?

@mxsm mxsm changed the title [Question]SDK supports batching protobuf format? SDK supports batching protobuf format? Apr 1, 2023
@pierDipi
Copy link
Member

pierDipi commented Apr 6, 2023

We don't support batching yet, PRs welcome!

@pierDipi pierDipi added the enhancement New feature or request label Apr 6, 2023
@mxsm
Copy link
Contributor Author

mxsm commented Apr 17, 2023

I will try

@pierDipi
Copy link
Member

pierDipi commented Apr 17, 2023

Since this a large feature, would you mind sharing what's the high level proposed approach you're planning to follow so that we can discuss it a bit? Maybe if there are multiple approaches we can discuss pros/cons for each

@mxsm
Copy link
Contributor Author

mxsm commented Apr 17, 2023

Sure, I'll write a document and put it here to discuss

@mxsm
Copy link
Contributor Author

mxsm commented Apr 30, 2023

Hi @pierDipi Here is my train of thought and some design ideas for the development of the new feature. Overall, the goal is to make minimal changes to the existing code while meeting the requirements and ensuring better scalability for future extensions. If you have any suggestions or comments, let's discuss and exchange ideas. If there are any necessary changes later on, please leave a comment on the ISSUE to explain. After finalizing the plan through discussion, I will proceed with the coding. Thank you!

1. Batch Data

The limitation for batch data in the specification is that they have no restrictions on the same source, producer, content type, etc. The only limitation is that all CloudEvents in the same batch must have the same value for the specversion property. That is, the fields in the code must be the same.

For version 0.3:

Required fields: "specversion", "id", "type", "source"

Optional fields: "datacontenttype", "datacontentencoding", "schemaurl", "subject", "time"

For version 1.0:

Required fields: "specversion", "id", "type", "source"

Optional fields: "datacontenttype", "dataschema", "subject", "time"

The extension fields and data fields can be different from each other.

2. Define the interface

Add a new interface CloudEventBatch that inherits from CloudEvent. CloudEventBatch can essentially be abstracted as a CloudEvent, but this CloudEvent is special in that the specversion attribute is the same, and the content of the data fields and extension fields for each CloudEvent can be different.

public interface CloudEventBatch extends CloudEvent{

    @Nullable
    List<CloudEventData> getDataBatch();

    @Nullable
    List<CloudEvent> getCloudEventBatch();
}

Add a method that supports batch CloudEventData to the CloudEventWriter interface.

public interface CloudEventWriter<R> extends CloudEventContextWriter {

    /**
     * End the write with a data payload.
     *
     * @param data the data to write
     * @return an eventual return value
     * @throws CloudEventRWException if the message writer cannot be ended.
     */
    R end(CloudEventData data) throws CloudEventRWException;

	//new add method to support batch CloudEvent
    default R end(List<? extends CloudEventData> datas) throws CloudEventRWException{
        if(Objects.isNull(datas) || datas.size() == 0){
            return null;
        }
        return end(datas.get(0));
    }

    /**
     * End the write.
     *
     * @return an eventual return value
     * @throws CloudEventRWException if the message writer cannot be ended.
     */
    R end() throws CloudEventRWException;

}

Modify the CloudEventBuilder as follows.

@ParametersAreNullableByDefault
public interface CloudEventBuilder<R extends CloudEvent> extends CloudEventWriter<R> {

	//newly add method for cloudevent batch
	static io.cloudevents.core.v1.CloudEventBatchBuilder batchV1() {
        return new io.cloudevents.core.v1.CloudEventBatchBuilder();
    }

}

Add CloudEventBatchBuilder implementation for CloudEventBuilder.

public final class CloudEventBatchBuilder extends BaseCloudEventBuilder<CloudEventBatchBuilder, CloudEventBatchV1> {

}

public final class CloudEventBatchBuilder extends BaseCloudEventBuilder<CloudEventBatchBuilder, CloudEventBatchV03> {

}

Then define the implementation of CloudEventBatch.

public final class CloudEventBatchV1 extends BaseCloudEventBatch {

}

public final class CloudEventV03 extends BaseCloudEventBatch {

}

Modify the EventFormat as follows.

@ParametersAreNonnullByDefault
public interface EventFormat {

    /**
     * Serialize a {@link CloudEvent} to a byte array.
     *
     * @param event the event to serialize.
     * @return the byte representation of the provided event.
     * @throws EventSerializationException if something goes wrong during serialization.
     */
    <T extends CloudEvent> byte[] serialize(T event) throws EventSerializationException;

    /**
     * Like {@link #deserialize(byte[], CloudEventDataMapper)}, but with the identity {@link CloudEventDataMapper}.
     *
     * @see #deserialize(byte[], CloudEventDataMapper)
     */
    default <T extends CloudEvent> T deserialize(byte[] bytes) throws EventDeserializationException {
        return this.deserialize(bytes, CloudEventDataMapper.identity());
    }

    /**
     * Deserialize a byte array to a {@link CloudEvent}.
     *
     * @param bytes  the serialized event.
     * @param mapper the mapper to use to map the data.
     * @return the deserialized event.
     * @throws EventDeserializationException if something goes wrong during deserialization.
     */
    <T extends CloudEvent> T deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) throws EventDeserializationException;

}

Make corresponding modifications to the implementation.

3. Design Explanation

  1. CloudEventBatch is abstracted as a special CloudEvent object that extends the CloudEvent interface. Therefore, the entire implementation is also based on CloudEvent to carry out the transformation.
  2. In order to support batch CloudEventData, the CloudEventWriter interface adds a default R end(List<? extends CloudEventData> datas) throws CloudEventRWException method.
  3. Corresponding CloudEventBatch and CloudEventBatchBuilder implementations are provided for V03 and V1 versions.

@mxsm mxsm changed the title SDK supports batching protobuf format? [Discuss]SDK supports batching protobuf format Apr 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants