Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

euphoria-core: unbounded source without explicit windowing defined #38

Open
xitep opened this issue Feb 24, 2017 · 5 comments
Open

euphoria-core: unbounded source without explicit windowing defined #38

xitep opened this issue Feb 24, 2017 · 5 comments

Comments

@xitep
Copy link
Contributor

xitep commented Feb 24, 2017

Executing a stateful operator which itself does not define a windowing strategy and which consumes - directly or indirectly - a non-windowed, unbounded data set is basically undefined. This is, such an operator, e.g. ReduceByKey, consumes an infinite stream of data never reaching a point at which results can be emitted. Note:

  • stateful: consuming non-windowed, unbounded data sources through stateless operators, e.g. FlatMap, Union, Repartition is defined. There's no problem for them. These emit results immediately upon receiving input.
  • non-windowed and unbounded: this is, all along the way from the unbounded data source down to the operator in question, there is no explicit windowing part of the game. since euphoria has the notion of "attached windowing", one windowing on the way to the operator in question, makes the processing defined again - even if that operator itself has no explicit windowing defined.

It'd be good to immediately fail the attempt to translate a flow with the described situation. The situation is mostly unintentional, but the mistake is hard to spot at runtime - basically leaving the programmer wondering why no output is produced.

One might argue, though, that the situation is practically valid (e.g. it works nicely on flink actually): the semantic of such a situation might well be that the results are produced when the unbounded data source is closed/cancelled. however, such computed results are non-deterministic and unsound with the theory.

@je-ik
Copy link
Contributor

je-ik commented Feb 27, 2017

I'm not sure if this is directly related to this - but there is at least one valid use case of stateful non-windowed operation - that is a non-windowed stream join. The example would look like this:

  Dataset<?> stream = ...;
  Dataset<?> table = ...;
  Join.of(stream, table)
    .by(...)
    .using(...);    

This can work without windowing assuming that there is a possibility to convert the table into random access storage. This can be done in many ways, depending on the physical storage of the dataset (e.g. if it is a Kafka stream, then in can be cached locally in the way KTable works in KStreams). There is no windowing needed, because the stateful operation is effectively turned into a stateless, one-element-at-a-time operation.

@xitep
Copy link
Contributor Author

xitep commented Feb 27, 2017

true enough; looks like the special case mentioned in #41

@xitep
Copy link
Contributor Author

xitep commented Feb 28, 2017

@je-ik i think your case suggests that the validation mentioned in this ticket is to be done only after the expansion of to operators to their basic "atoms." (or rather more precisely spoken, after the translation layer performed optimizations.) if we can be optimize the join operator example into an "on-the-fly left/right map-side join" (basically turning it into a stateless operation) the validation would naturally not trigger, in any other case i still see the operation undefined, for which we'd like to trigger the validation error. do you agree?

@je-ik
Copy link
Contributor

je-ik commented Feb 28, 2017

I agree there is a strong connection between my remark and the issue #41. The main difference that I see here, is that it is effectively possible (under some circumstances) to turn a stream into a random access storage locally (e.g. by reading the whole kafka topic and storing it to memory of local db). Therefore, it is kind of valid not to specify windowing in joining of two streams, provided that at least one of them is type of stream that is in fact a commit log (definition of a commit log is that you can seek inside it and therefore read old data from the stream). When user does not specify windowing in such a case, it is needed at the API level to make sure that one stream is treated as real stream, and the other one is treated as a stream of updates to a stateful database (that is called table-stream duality in the sense of KafkaStreams).

When we identify that we need to cache locally one of the input streams (and I am for simplicity omitting the problem of partitioning), then we can turn the join into "standard" map-side join as desribed in #41. Therefore I see my note as a little generalization of #41. Does that make any sense? :)

@xitep
Copy link
Contributor Author

xitep commented Mar 3, 2017

@je-ik sorry for the delay. i think i got your point. yes, it makes sense, absolutely.

i just don't have any particular idea about how we could differentiate the two situations - the one you mentioned and the one where none of the streams is "random access" - at the API level. maybe we don't need to. maybe a runtime check - as initially suggested - can handle both cases based on runtime properties of the two data sets. given we - on purpose - do not distinguish between bounded and unbounded data sets on API level, i even think we should not make a difference on API level between streams which do and don't support "random-access".

right now, we don't have any support for random-access, but that will naturally arise from #41. therefore, i think we can proceed to implement the suggested validation and only later extend it with support for "table-streams".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants