Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails #608

Open
nikolasten opened this issue Jun 22, 2022 · 1 comment
Open
Labels

Comments

@nikolasten
Copy link

nikolasten commented Jun 22, 2022

Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails
When upgrading flink pipeline that was using 1.13.1.4 pulsar flink connector, more specifically org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource to 1.13.6.2 pulsar flink connector, upgrade fails with error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e851a344fc332b3e7b727e57889fc262_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
... 10 common frames omitted\nCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:485)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 12 common frames omitted\nCaused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange; local class incompatible: stream classdesc serialVersionUID = -6297347936093846291, local class serialVersionUID = -4628744661831747115
at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.toObject(TopicSubscriptionSerializer.java:103)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:121)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
... 16 common frames omitted

To Reproduce
Steps to reproduce the behavior:

  1. Deploy flink pipeline with pulsar flink connector 1.13.1.4 and use org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource as streaming source.
  2. Enable checkpointing, close flink job with a savepoint
  3. Upgrade flink-pulsar dependecy to 1.13.6.2
  4. Deploy flink job from savepoint

Expected behavior
Upgrade was successful

Additional context
Seems like same issue is happening here apache/flink-cdc#78

@dialalpha
Copy link

I have ran into this exact issue - unfortunately it doesn't seem like this is fixable: the operator state is not compatible between the 2 versions. The TopicSubscription class in the state did not originally define a serialVersionUid, causing the JVM to generate a default version number for the serialized class. In a later commit, serialVersionUid was added to all the serializable classes, breaking everything after 1.13.1.4.

Not sure how critical it is that you upgrade, but you can always try manipulating the flink state ... I haven't tried this myself. If you can also tolerate restarting your application and ignoring existing state that's probably the simplest thing. version 1.13.1.4 seems like it's a dead end.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants