Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] error while writing data in Parquet format using JsonSchema as schema format #439

Open
grandimk opened this issue Sep 14, 2022 · 3 comments
Labels

Comments

@grandimk
Copy link

Describe the bug
I was using the Cloud Storage Sink to collect data from Pulsar and write it to AWS S3 in Parquet. Messages were produced using a JsonSchema format. The Sink fails as soon as it tries to convert the collected data into org.apache.avro.generic.GenericRecord (within the convertGenericRecord function).

It tried to produce messages both from Python and from Java and both fail but with different stack traces.

Note: if the formatType specified in the configuration is json everything works fine.

To Reproduce
Use this template configuration for the pulsar-io-cloud-storage v2.9.3.6:

tenant: "<theTenant>"
namespace: "schema-registry"
name: "cloud-storage-sink"
inputs: 
  - "persistent://<theTenant>/<theNamespace>/<theTopic>"
  - <otherTopicUrl>
archive: "connectors/pulsar-io-cloud-storage-2.9.3.6.nar"
parallelism: 1

configs:
  provider: "aws-s3"
  accessKeyId: "<yourAccessKeyId>"
  secretAccessKey: "<yourSecretAccessKey>"
  bucket: "<yourS3Bucket>"
  region: "<yourRegion>"
  pathPrefix: "cloud_storage_sink_parquet/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 100
  batchTimeMs: 600000
  withMetadata: false
  withTopicPartitionNumber: false

And produce messages in JsonSchema format. Here the code for a minimal Python producer:

import pulsar
from pulsar.schema import *

class YourMessageClass(Record):
    ...

def generate_message() -> YourMessageClass:
    ...

if __name__ == '__main__':
    client = pulsar.Client('pulsar://host.docker.internal:6650')

    producer = pulsar.Client.create_producer(
        topic='persistent://<theTenant>/<theNamespace>/<theTopic>',
        producer_name='python_producer',
        schema=pulsar.schema.JsonSchema(YourMessageClass)
    )

    for i in range(100):
        msg = generate_message()
        producer.send(msg)

    client.close()

Expected behavior
A chunk of data containing a list of collected messages, written to the specified AWS S3 prefix in Parquet format.

Screenshots
None

Additional context
The tests were done on my laptop, using an Apache Pulsar Docker container where the schema-registry was properly configured (the schema definition of the messages have been uploaded) and the version pulsar-io-cloud-storage-2.9.3.6.nar was loaded.

This is the error occurred while writing data produced with the Python producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-02/180925500946.parquet
java.lang.NullPointerException: null
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:220) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

This is the error occurred while writing data produced with the Java producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-08/64156074046.parquet
java.util.NoSuchElementException: No value present
	at java.util.Optional.get(Optional.java:148) ~[?:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:207) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
@alpreu
Copy link
Contributor

alpreu commented Oct 10, 2022

Hi @grandimk, thanks for opening this issue. Can you please provide more information about your record schema?
It looks like this is where your conversion fails: https://github.com/streamnative/pulsar-io-cloud-storage/blob/branch-2.9.3.6/src/main/java/org/apache/pulsar/io/jcloud/util/AvroRecordUtil.java#L204-L207

@grandimk
Copy link
Author

Hi @alpreu, I looked at the code in the AvroRecordUtil.java file but couldn't figure out why the conversion fails.
This is the schema_info of the record I used for my tests:

{
  "type": "record",
  "name": "Quiz",
  "fields": [
    {
      "name": "audit",
      "type": {
        "type": "record",
        "name": "Audit",
        "fields": [
          {
            "name": "actor",
            "type": {
              "type": "record",
              "name": "Actor",
              "fields": [
                { "name": "actorId", "type": "string" },
                {
                  "name": "actorType",
                  "type": {
                    "type": "enum",
                    "name": "ActorType",
                    "symbols": ["person", "service"]
                  }
                },
                { "name": "ip", "type": ["null", "string"], "default": null }
              ]
            }
          },
          {
            "name": "producer",
            "type": {
              "type": "record",
              "name": "Producer",
              "fields": [
                { "name": "code", "type": "string" },
                {
                  "name": "instanceId",
                  "type": ["null", "string"],
                  "default": null
                },
                { "name": "producerType", "type": "string" },
                {
                  "name": "version",
                  "type": ["null", "string"],
                  "default": null
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "Metadata",
        "fields": [
          { "name": "eventId", "type": "string" },
          { "name": "eventTimestamp", "type": "long" },
          { "name": "eventType", "type": ["null", "string"], "default": null }
        ]
      }
    },
    {
      "name": "payload",
      "type": {
        "type": "record",
        "name": "QuizPayload",
        "fields": [
          {
            "name": "categoryTreeNodesIds",
            "type": ["null", { "type": "array", "items": "long" }],
            "default": null
          },
          { "name": "id", "type": "string" },
          { "name": "isHidden", "type": "boolean" },
          {
            "name": "knowledgeGraphNodesIds",
            "type": ["null", { "type": "array", "items": "long" }],
            "default": null
          },
          {
            "name": "properties",
            "type": {
              "type": "record",
              "name": "ContentProperties",
              "fields": [
                {
                  "name": "authorId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "copiedFrom",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "createdAt",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "creatorId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "ownerId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "permission",
                  "type": {
                    "type": "record",
                    "name": "ContentPermission",
                    "fields": [
                      {
                        "name": "teams",
                        "type": [
                          "null",
                          { "type": "array", "items": "string" }
                        ],
                        "default": null
                      }
                    ]
                  }
                },
                {
                  "name": "version",
                  "type": ["null", "string"],
                  "default": null
                }
              ]
            }
          },
          {
            "name": "questions",
            "type": { "type": "array", "items": "string" }
          },
          {
            "name": "questionsNumber",
            "type": ["null", "long"],
            "default": null
          },
          { "name": "quizId", "type": ["null", "string"], "default": null },
          {
            "name": "showDetailedFeedback",
            "type": ["null", "boolean"],
            "default": null
          },
          { "name": "slug", "type": ["null", "string"], "default": null },
          { "name": "title", "type": "string" }
        ]
      }
    }
  ]
}

As additional note, I want to point out that we define our events using JSON schema and then generate both the related Python and Java classes.

@alpreu
Copy link
Contributor

alpreu commented Oct 11, 2022

@grandimk Thanks for providing the record schema. I had another look but I cannot see an immediate issue either.
Would you be able to create a unit test in ParquetFormatTest that recreates the issue from your generated java record/schema?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants