Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wikipedia/Wikimedia stream "canary" records cause NullPointerException due to missing "bot" field #451

Open
javabrett opened this issue Jan 8, 2024 · 0 comments

Comments

@javabrett
Copy link
Member

javabrett commented Jan 8, 2024

Description

A while back, Wikimedia stream (recentchange at least) added some "canary" records to help with heartbeat/keepalive. Helper libraries were updated to automatically filter these out. They can be identifed by meta.domain == 'canary'. See https://phabricator.wikimedia.org/T266798

These canary records are thin - specifically they don't include a data bot boolean field which is present on all other records. This is resulting in a NullPointerException when attempting to filter on the bot field as a boolean:

[wikipedia-activity-monitor-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskExecutor - stream-thread [wikipedia-activity-monitor-StreamThread-1] Failed to
process stream task 0_1 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=wikipedia.parsed, partition=1, offset=1139, stacktrace=java.lang.NullPointerException
        at io.confluent.demos.common.wiki.WikipediaActivityMonitor.lambda$createMonitorStream$1(WikipediaActivityMonitor.java:119)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: java.lang.NullPointerException
        at io.confluent.demos.common.wiki.WikipediaActivityMonitor.lambda$createMonitorStream$1(WikipediaActivityMonitor.java:119)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        ... 6 more
[wikipedia-activity-monitor-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [wikipedia-activity-monitor] Encountered the following exception during pr
ocessing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.

Here's a sample canary record as parsed-in by our connector:

{
  "BOT": null,
  "COMMENT": null,
  "ID": null,
  "LENGTH": null,
  "LOG_ACTION": null,
  "LOG_ACTION_COMMENT": null,
  "LOG_ID": null,
  "LOG_TYPE": null,
  "META": {
    "DOMAIN": "canary",
    "DT": 1704674745330,
    "ID": "343e51c8-035e-44e1-9f2f-d755afcd0cf1",
    "REQUEST_ID": "374ea0fa-3ac0-49e4-a656-9454a5d59c06",
    "STREAM": "mediawiki.recentchange",
    "URI": null
  },
  "MINOR": null,
  "NAMESPACE": null,
  "PARSEDCOMMENT": null,
  "PATROLLED": null,
  "REVISION": null,
  "SERVER_NAME": null,
  "SERVER_SCRIPT_PATH": null,
  "SERVER_URL": null,
  "TIMESTAMP": null,
  "TITLE": null,
  "TYPE": null,
  "USER": null,
  "WIKI": null
}

You can also observe these canary records with:

curl https://stream.wikimedia.org/v2/stream/recentchange | cat | grep canary
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 34.0M    0 34.0M    0     0  50229      0 --:--:--  0:11:49 --:--:-- 63765data: {"$schema":"/mediawiki/recentchange/1.0.1","meta":{"stream":"mediawiki.recentchange","domain":"canary","id":"a691352b-9c23-4001-ab62-b373aa521904","dt":"2024-01-07T22:15:41.978Z","request_id":"7db98c7f-fbbf-4436-a912-ff0a5e2eaa0e","topic":"codfw.mediawiki.recentchange","partition":0,"offset":910279214}}
100 34.0M    0 34.0M    0     0  50246      0 --:--:--  0:11:51 --:--:-- 63137data: {"$schema":"/mediawiki/recentchange/1.0.1","meta":{"stream":"mediawiki.recentchange","domain":"canary","id":"d27abe41-c87e-4949-a9a1-14408d5323eb","dt":"2024-01-07T22:15:41.912Z","request_id":"5dd83696-ae8c-4da4-b216-ff4095b0206d","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":4957728053}}
100 44.7M    0 44.7M    0     0  52054      0 --:--:--  0:15:01 --:--:-- 70626
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant