Replies: 2 comments
-
I have something working by using trait StreamSplitter {
def getNext: ZIO[Any, Throwable, Option[ZStream[Any,Throwable,Byte]]]
}
object StreamSplitter {
def make(input: ZStream[Any,Throwable,Byte], splitSize: Int) = for {
pull <- input.toPull
leftover <- Ref.make(Option.empty[Chunk[Byte]])
finished <- Ref.make(false)
} yield {
def puller(readRef: Ref[Int]) = {
for {
read <- readRef.get
result <- (read == splitSize) match {
case true => ZIO.fail(Option.empty[Throwable])
case _ => for {
lf <- leftover.get
n <- (lf match {
case Some(s) => for {
_ <- leftover.update(_ => None)
} yield s
case _ => pull
}).either
res <- n match {
case Left(e) => for {
_ <- finished.update(_ => true)
result <- ZIO.fail(e)
} yield result
case Right(r) => {
if (read + r.size > splitSize) {
val missing = (splitSize - read).toInt
val toReturn = r.take(missing)
val toRemain = r.drop(missing)
for {
_ <- leftover.update(_ => Some(toRemain))
_ <- readRef.update(_ => splitSize)
} yield toReturn
} else {
for {
_ <- readRef.update(_ => read + r.size)
} yield r
}
}
}
} yield res
}
} yield result
}
new StreamSplitter {
def getNext = finished.get.map{
case true => None
case false => Some(ZStream.fromPull(for {
read <- Ref.make[Int](0)
} yield puller(read)))
}
}
}
} Any style tips or improvement suggestions of course welcome, for example:
|
Beta Was this translation helpful? Give feedback.
0 replies
-
I think you just want to use |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi,
I can't really seem to find how to do the following (in ZIO 2.0), which feels like it should be easy (.. famous last words):
Given for example
with say a size of 68 megabytes, I would like to produce a series of
ZStream
s of a capped size, say 8 megabytes, which cut the original stream into 8 megabyte pieces, with the last piece being whatever is left over. The initial size is not necessarily known (or relevant), so the result could of course also just be a single sub-stream.Traversing those (8mb) sub-streams would be done sequentially so that the original stream is also just traversed once from front to back (it could also come from the network and avoiding buffering would be good).
If I use
.take(..)
it doesn't advance the stream, i.e. if Itake(8mb)
twice I get the same initial 8mb of the file instead of consecutive pieces.Any hints as to which functions would be useful for this, which methods to look at for inspiration, or which approach I could try to take, would be highly appreciated.
[Why? Splitting large files when storing them in SeaweedFS]
Beta Was this translation helpful? Give feedback.
All reactions