-
Notifications
You must be signed in to change notification settings - Fork 1
/
StatefulMapConcatWithCompleteFlow.scala
88 lines (73 loc) · 2.56 KB
/
StatefulMapConcatWithCompleteFlow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package dev.chopsticks.stream
import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy
import org.apache.pekko.stream._
import org.apache.pekko.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
import scala.util.control.NonFatal
final class StatefulMapConcatWithCompleteFlow[In, Out](
val funs: () => (In => immutable.Iterable[Out], () => immutable.Iterable[Out])
) extends GraphStage[FlowShape[In, Out]] {
val in: Inlet[In] = Inlet[In]("StatefulMapConcatWithCompleteFlow.in")
val out: Outlet[Out] = Outlet[Out]("StatefulMapConcatWithCompleteFlow.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = Attributes.name("statefulMapConcatWithComplete")
//noinspection TypeAnnotation
// scalastyle:off
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var currentIterator: Iterator[Out] = _
var (plainFun, completeFun) = funs()
var isCompleted = false
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
setHandlers(in, out, this)
private def maybeComplete(drain: Boolean) = {
if (!isCompleted) {
isCompleted = true
currentIterator = completeFun().iterator
if (drain) pushPull()
}
else completeStage()
}
def pushPull(): Unit =
if (hasNext) {
push(out, currentIterator.next())
if (!hasNext && isClosed(in)) {
maybeComplete(false)
}
}
else if (!isClosed(in))
pull(in)
else {
maybeComplete(true)
}
def onFinish(): Unit = if (!hasNext) {
maybeComplete(isAvailable(out))
}
override def onPush(): Unit =
try {
currentIterator = plainFun(grab(in)).iterator
pushPull()
}
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in)
case Supervision.Restart =>
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
override def onUpstreamFinish(): Unit = onFinish()
override def onPull(): Unit = {
pushPull()
}
private def restartState(): Unit = {
val fs = funs()
plainFun = fs._1
completeFun = fs._2
currentIterator = null
}
}
override def toString = "StatefulMapConcatWithCompleteFlow"
}