Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chunk get together when random rechunk #3205

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

yyy1000
Copy link
Contributor

@yyy1000 yyy1000 commented Apr 12, 2023

This wants to fix #3195

Consider the case: if (size < acc.size)

val (out, rem) = acc.splitAt(size - 1)
Pull.output(out) >> go(rem ++ hd, -1, tl)

The code will continue to append the 'hd' to 'rem', which may be larger than the 'size', to the next 'go' method.
And this will lead to the 'tl' being consumed totally without restricting the size of 'Chunk'.
:)

@@ -2413,7 +2413,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Pull.output(acc) >> go(hd, size, tl)
else {
val (out, rem) = acc.splitAt(size - 1)
Pull.output(out) >> go(rem ++ hd, -1, tl)
Pull.output(out) >> go(rem, -1, Pull.output(hd) >> tl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Pull.output(hd) >> tl equivalent to s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I should change it to s. :) Thanks for that.

@@ -2413,7 +2413,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Pull.output(acc) >> go(hd, size, tl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is another error. The size should ideally be the desired size of the next output chunk, but that is the one being emited in acc. Instead should be -1, so that the next iteration recomputes a new size based on the first chunk of tl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I realize it now.

@yyy1000
Copy link
Contributor Author

yyy1000 commented Apr 12, 2023

Uh, it seems that it will lead the time to be longer and fail the pipeline. :(

test("rechunkRandomly sometimes emits everything in a single chunk") {
val fiveChunks = Stream(1) ++ Stream(2) ++ Stream(3) ++ Stream(4) ++ Stream(5)

val source = fiveChunks.rechunkRandomlyWithSeed(0.1, 2.0)(5).chunks.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the way factor and nextSize are implemented now, I don't think factor can ever reach maxFactor. So for this example nextSize can only be 0 or 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. Now the range is [minFactor, maxFactor).

@armanbilge
Copy link
Member

I can replicate the CI failures locally, can you replicate them as well? Not sure what's wrong, could it be getting into some sort of loop or something?

sbt:root> coreJVM/testOnly fs2.JvmNativeCompressionSuite
fs2.JvmNativeCompressionSuite:
  + inflate please wrap 0.493s
  + inflate please nowrap 0.005s
  + deflate input 0.437s
  + inflate input 0.1s
  + inflate input (deflated larger than inflated) 0.005s
==> X fs2.JvmNativeCompressionSuite.deflate |> inflate ~= id  0.222s munit.FailException: Failing seed: 2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD=
You can reproduce this failure by adding the following override to your suite:

  override def scalaCheckInitialSeed = "2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD="

Exception raised on property evaluation.
> ARG_0: "\u0003"
> ARG_1: true
> ARG_2: NINE
> ARG_3: HUFFMAN_ONLY
> ARG_4: SYNC_FLUSH
> ARG_0_ORIGINAL: "砱弚豗"
> Exception: java.util.zip.DataFormatException: Insufficient data
    at munit.Assertions.fail(Assertions.scala:252)
    at munit.Assertions.fail$(Assertions.scala:246)
    at munit.FunSuite.fail(FunSuite.scala:11)
    at munit.ScalaCheckEffectSuite.munit$ScalaCheckEffectSuite$$parseTestResult(ScalaCheckEffectSuite.scala:80)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2(ScalaCheckEffectSuite.scala:68)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2$adapted(ScalaCheckEffectSuite.scala:68)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at modify @ fs2.internal.Scope.close(Scope.scala:262)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at rethrow$extension @ fs2.Compiler$Target.$anonfun$compile$1(Compiler.scala:157)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.$anonfun$compile$21(Pull.scala:1214)
    at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:224)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
  + deflate.compresses input 0.005s
  + deflate and inflate are reusable 0.573s
  + gzip.compresses input 0.017s
  + gzip and gunzip are reusable 0.358s
  + maybeGunzip - not gzip 0.307s
  + maybeGunzip - gzip 0.326s
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id  120.013s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)  120.005s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:90m467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)  120.012s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
  + gzip |> GZIPInputStream ~= id 0.048s
  + gzip.compresses input, with FLG.FHCRC set 0.013s
  + gunzip limit fileName and comment length 4.22s
  + unix.gzip |> gunzip 0.007s
[error] Failed: Total 19, Failed 4, Errors 0, Passed 15
[error] Failed tests:
[error]         fs2.JvmNativeCompressionSuite
[error] (coreJVM / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 375 s (06:15), completed Apr 14, 2023, 5:57:37 PM

@yyy1000
Copy link
Contributor Author

yyy1000 commented Apr 15, 2023

I can replicate the CI failures locally, can you replicate them as well? Not sure what's wrong, could it be getting into some sort of loop or something?

sbt:root> coreJVM/testOnly fs2.JvmNativeCompressionSuite
fs2.JvmNativeCompressionSuite:
  + inflate please wrap 0.493s
  + inflate please nowrap 0.005s
  + deflate input 0.437s
  + inflate input 0.1s
  + inflate input (deflated larger than inflated) 0.005s
==> X fs2.JvmNativeCompressionSuite.deflate |> inflate ~= id  0.222s munit.FailException: Failing seed: 2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD=
You can reproduce this failure by adding the following override to your suite:

  override def scalaCheckInitialSeed = "2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD="

Exception raised on property evaluation.
> ARG_0: "\u0003"
> ARG_1: true
> ARG_2: NINE
> ARG_3: HUFFMAN_ONLY
> ARG_4: SYNC_FLUSH
> ARG_0_ORIGINAL: "砱弚豗"
> Exception: java.util.zip.DataFormatException: Insufficient data
    at munit.Assertions.fail(Assertions.scala:252)
    at munit.Assertions.fail$(Assertions.scala:246)
    at munit.FunSuite.fail(FunSuite.scala:11)
    at munit.ScalaCheckEffectSuite.munit$ScalaCheckEffectSuite$$parseTestResult(ScalaCheckEffectSuite.scala:80)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2(ScalaCheckEffectSuite.scala:68)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2$adapted(ScalaCheckEffectSuite.scala:68)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at modify @ fs2.internal.Scope.close(Scope.scala:262)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at rethrow$extension @ fs2.Compiler$Target.$anonfun$compile$1(Compiler.scala:157)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.$anonfun$compile$21(Pull.scala:1214)
    at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:224)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
  + deflate.compresses input 0.005s
  + deflate and inflate are reusable 0.573s
  + gzip.compresses input 0.017s
  + gzip and gunzip are reusable 0.358s
  + maybeGunzip - not gzip 0.307s
  + maybeGunzip - gzip 0.326s
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id  120.013s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)  120.005s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:90m467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)  120.012s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
  + gzip |> GZIPInputStream ~= id 0.048s
  + gzip.compresses input, with FLG.FHCRC set 0.013s
  + gunzip limit fileName and comment length 4.22s
  + unix.gzip |> gunzip 0.007s
[error] Failed: Total 19, Failed 4, Errors 0, Passed 15
[error] Failed tests:
[error]         fs2.JvmNativeCompressionSuite
[error] (coreJVM / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 375 s (06:15), completed Apr 14, 2023, 5:57:37 PM

Thanks! I replicated it locally but didn't discover the details. I should check it carefully. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rechunkRandomly sometimes emits everything in a single chunk
4 participants