Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… #24784

Closed
wants to merge 1 commit into from

Conversation

ldadima
Copy link
Contributor

@ldadima ldadima commented May 14, 2024

What is the purpose of the change

To fix FLINK-35351

Verifying this change

UnalignedCheckpointRescaleITCase Topology.CUSTOM_PARTITIONER

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented May 14, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment on lines 427 to 444
boolean noNeedRescale =
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
.map(JobEdge::getDownstreamSubtaskStateMapper)
.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))
&& stateAssignment.executionJobVertex.getInputs().stream()
.map(IntermediateResult::getProducer)
.map(vertexAssignments::get)
.anyMatch(
taskStateAssignment -> {
final int oldParallelism =
stateAssignment
.oldState
.get(stateAssignment.inputOperatorID)
.getParallelism();
return oldParallelism
== taskStateAssignment.executionJobVertex
.getParallelism();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the logic behind this condition? Maybe both of it's parts separately?

                stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
                                .map(JobEdge::getDownstreamSubtaskStateMapper)
                                .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))

and

 stateAssignment.executionJobVertex.getInputs().stream()
                                .map(IntermediateResult::getProducer)
                                .map(vertexAssignments::get)
                                .anyMatch(
                                        taskStateAssignment -> {
                                            final int oldParallelism =
                                                    stateAssignment
                                                            .oldState
                                                            .get(stateAssignment.inputOperatorID)
                                                            .getParallelism();
                                            return oldParallelism
                                                    == taskStateAssignment.executionJobVertex
                                                            .getParallelism();
                                        }

?

Copy link
Contributor Author

@ldadima ldadima May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separation is a good idea, ok.
The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type
The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed.
But I made little mistake in condition. I will fix it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

But I made little mistake in condition. I will fix it

Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mistake was semantic. It was checked that all the inputs were of FULL type, and it is necessary to have at least one. But since the type of partitioner seems to be the same for all inputs, I don't see how to add a test to check it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in StateAssignmentOperationTest create a unit test that has one FULL and one something else, and assert that the assigned states are as they should be?

Copy link
Contributor

@pnowojski pnowojski May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping:

Maybe in StateAssignmentOperationTest create a unit test that has one FULL and one something else, and assert that the assigned states are as they should be?

Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression?

sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code:

        boolean noNeedRescale =
                stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
                                .map(JobEdge::getDownstreamSubtaskStateMapper)
                                .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))
                        && stateAssignment.executionJobVertex.getInputs().stream()
                                .map(IntermediateResult::getProducer)
                                .map(vertexAssignments::get)
                                .anyMatch(
                                        taskStateAssignment -> {
                                            final int oldParallelism =
                                                    stateAssignment
                                                            .oldState
                                                            .get(stateAssignment.inputOperatorID)
                                                            .getParallelism();
                                            return oldParallelism
                                                    == taskStateAssignment.executionJobVertex
                                                            .getParallelism();
                                        });

        if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) {
            stateAssignment.inputChannelStates.putAll(
                    toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState));
            return;
        }

the tests in StateAssignmentOperationTest are still green.

Copy link
Contributor Author

@ldadima ldadima May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the analysis. I think you can change a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time.

Copy link
Contributor Author

@ldadima ldadima May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, I misjudged the error of not noticing the negation in the condition.
But by substituting anyMatch for allMatch, you can see that the test will stop passing.
Also I changed gateIdx from random generation to always zero (in StateHandleDummyUtil#createNewInputChannelStateHandle). This change have no affects for other tests. But I need zero for this test, because need stability for the number of input gates

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again thanks for the explanation. I've left a couple of more comments.


@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

private static final File CHECKPOINT_FILE = new File("src/test/resources/custom-checkpoint");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you somehow make the name of the directory more related to the test name? unaligned-checkpoint-custom-partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I agree

import static org.junit.Assert.fail;

/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */
public class UnalignedCheckpointCustomRescaleITCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UnalignedCheckpointRescaleWithCustomPartitionITCase?

import static org.junit.Assert.fail;

/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */
public class UnalignedCheckpointCustomRescaleITCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, instead of creating custom test, can you create a new Topology in the pre-existing UnalignedCheckpointRescaleITCase?

Copy link
Contributor Author

@ldadima ldadima May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about it, but in UnalignedCheckpointRescaleITCase used Long as stream data. But for the regular repeatability of the test it was necessary to use the String. I will try to add test to UnalignedCheckpointRescaleITCase, but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full graph (change parallelism for all vertexes), but need to change only one vertex, not all. I have no ideas, how to add new Topology for this test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for the regular repeatability of the test it was necessary to use the String.

There are plenty of solutions to this problem:

  1. You are encoding in your String record three fields: partition, index and some payload. But I don't see you using anything but the partition, so you could convert your whole String record into Long record with value just for the partition.
  2. You can encode any number of fields in single Long just as well. The easiest would be sth like that:
long encode(int partition, int index, int payload) {
  checkArgument(partition < 1024);
  checkArgument(index < 1024);
  checkArgument(payload < Long.MAX_VALUE / (1024*1024));
  return partition + index * 1024 + payload * 1024 * 1024;
}
  1. You could always most likely change the record type for UnalignedCheckpointRescaleITCase. String there should work just as fine, but that's probably more work vs option 1.

but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full graph (change parallelism for all vertexes), but need to change only one vertex

When creating job graph in Topology (UnalignedCheckpointRescaleITCase.Topology#create), you can set parallelism per vertex and AFAIK that will override the env.setParallelism(...) setting. So for the downstream vertex/task, that you don't want to change parallelism, you can call:

                    .addSink(new StringSink(createCheckpoint ? 100 : 1000))
                    .name("sink")
                    .setParallelism(3);

While keep the upstream source's parallelism controlled by the environment.

Copy link
Contributor Author

@ldadima ldadima May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An important point for test repeatability is to split the record into two parts (one at the sink input, one at the source output). That's why we use a string with a sufficiently long length and all bytes set to 1 to be sure to read data leading to exception in case of splitting.
That's why need to realize 3rd option. But maybe we can don't change Long to String, but just to add StringSource exactly for CustomPartitioner case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could always most likely change the record type for UnalignedCheckpointRescaleITCase. String there should work just as fine, but that's probably more work vs option 1.

I've just realised that I don't see any place in the UnalignedCheckpointRescaleITCase that hardcodes the type of the record. You can add a new Topology (UnalignedCheckpointRescaleITCase.Topology) that creates any JobGraph, so you can keep your proposed records format. Also I wouldn't mind if you re-used the same LongSource, but changed the record type from Long to some POJO of Long and String payload

public static class Record {
  private final long value;
  private final @Nullable String payload;
  (...)
}

where payload length/size would be configurable, between 0 (payload == null) to whatever value you would configured (3713?).

This way you won't be duplicating all of the setup code and you will leverage the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A difficult way out of the situation, but quite feasible, ok. Thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but I think this is too complicated a way to implement the test. That's why I came up with an idea that is not as elegant and correct as yours, but is easier to implement and allows you to do without changing less code. Please check it out. Thank you for your patience

@pnowojski
Copy link
Contributor

Also please note that azure is failing on some compilation issue. Probably some check style violation.

@ldadima ldadima force-pushed the FLINK-35351 branch 2 times, most recently from 1aa9484 to f0f82d2 Compare May 20, 2024 08:48
@ldadima
Copy link
Contributor Author

ldadima commented May 20, 2024

Sorry to bother you again, but I made a separate one Pull request with hotfix

@ldadima ldadima force-pushed the FLINK-35351 branch 8 times, most recently from ca6f434 to b4b928e Compare May 22, 2024 08:50
…kpoint with custom partitioner

Co-authored-by:  Andrey Gaskov <31715230+empathy87@users.noreply.github.com>
@pnowojski
Copy link
Contributor

Thanks for the update @ldadima . I have fixed up a couple of things in #24857 so let me close this PR in favour of that one. Could you take a look at my version?

@ldadima
Copy link
Contributor Author

ldadima commented May 29, 2024

Thanks for the update @ldadima . I have fixed up a couple of things in #24857 so let me close this PR in favour of that one. Could you take a look at my version?

I checked the MR. I agree with changes about ITCase. In my case I forget about uid, that's why map vertex doesn't work for me in rescale case. Thanks for changes

@pnowojski pnowojski closed this May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants