-
I have a task combinator that repeatedly executes the task, retries if it fails, and reports the execution status to the provided ZSink. The simplified code (without retrying, for brevity):
I have some troubles with the "report" part. As you can see, I use a
This works, but the I would (probably) be able to achieve this if I could feed Also, it's entirely possible that I'm trying to use inappropriate abstractions for the thing I'm trying to accomplish. For example, I might have used a Also, I could try to return a (This was also posted on StackOverflow. Let me know if such cross-posting is inappropriate.) |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 10 replies
-
If you want to run the sink before repeating you could do something like |
Beta Was this translation helpful? Give feedback.
-
Isn't this just something like (working off your original example)? import zio._
import zio.stream._
object Example {
sealed trait RunStatus
object RunStatus {
case object Running extends RunStatus
case object Starting extends RunStatus
}
def repeatAndFeedStatus(
initTask: Task[Unit],
repeatTask: Task[Unit],
repeatDelay: Duration,
retrySchedule: Schedule[Any, Throwable, Unit],
statusSink: Sink[Throwable, RunStatus, Nothing, Unit]
): Task[Unit] = {
val repeatSchedule: Schedule[Any, Any, Any] =
Schedule.fixed(repeatDelay)
val stream: ZStream[Any, Throwable, RunStatus] =
ZStream.succeed(RunStatus.Starting) ++
ZStream.repeatZIOWithSchedule(initTask *> repeatTask *> ZIO.succeed(RunStatus.Running), repeatSchedule) ++
ZStream.fail(new IllegalStateException("Repeat isn't expected to complete"))
stream.run(statusSink)
}
} I think the fundamental problem is you are using a streaming data type here with a sink and that is going to work best if you use it with other streaming data types or if you don't want to do that just use |
Beta Was this translation helpful? Give feedback.
-
So apparently the answer to the question "Can I feed items into ZSink manually?" is "No". However, thanks to @adamgfraser's suggestion, I was able to implement the After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately. |
Beta Was this translation helpful? Give feedback.
So apparently the answer to the question "Can I feed items into ZSink manually?" is "No".
However, thanks to @adamgfraser's suggestion, I was able to implement the
repeatAndFeedStatus
function via streams, see here.After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately.