Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

:trigger/emit not working with :onyx.triggers/watermark #839

Open
neverfox opened this issue Jan 12, 2018 · 5 comments
Open

:trigger/emit not working with :onyx.triggers/watermark #839

neverfox opened this issue Jan 12, 2018 · 5 comments

Comments

@neverfox
Copy link

When using a watermark trigger, emitted segments don't flow downstream (as they do with a segment-based trigger, for instance). Perhaps it's unsupported for that trigger type, though I cannot imagine why that would be the case since I confirmed that the emit function is firing as expected. Here's a runnable example:

(ns traffic-events.core
  (:require
   [clojure.core.async :refer [chan >!! <!! close!]]
   [onyx.api]
   [onyx.plugin.core-async :refer [take-segments!]])
  (:gen-class))

(def id (java.util.UUID/randomUUID))

(def env-config
  {:zookeeper/address     "127.0.0.1:2188"
   :zookeeper/server?     true
   :zookeeper.server/port 2188
   :onyx/tenancy-id       id})

(def peer-config
  {:zookeeper/address        "127.0.0.1:2188"
   :onyx/tenancy-id          id
   :onyx.peer/job-scheduler  :onyx.job-scheduler/balanced
   :onyx.messaging/impl      :aeron
   :onyx.messaging/peer-port 40200
   :onyx.messaging/bind-addr "localhost"})

(def batch-size 10)

(def workflow
  [[:in :count]
   [:count :out]])

(defn watermark-fn [segment]
  (.getTime (:event-time segment)))

(def catalog
  [{:onyx/name                :in
    :onyx/plugin              :onyx.plugin.core-async/input
    :onyx/type                :input
    :onyx/medium              :core.async
    :onyx/assign-watermark-fn ::watermark-fn
    :onyx/batch-size          batch-size
    :onyx/max-peers           1
    :onyx/doc                 "Reads segments from a core.async channel"}

   {:onyx/name         :count
    :onyx/type         :reduce
    :onyx/group-by-key :url
    :onyx/flux-policy  :continue
    :onyx/batch-size   batch-size}

   {:onyx/name       :out
    :onyx/plugin     :onyx.plugin.core-async/output
    :onyx/type       :output
    :onyx/medium     :core.async
    :onyx/max-peers  1
    :onyx/batch-size batch-size
    :onyx/doc        "Writes segments to a core.async channel"}])

(def capacity 1000)

(def input-chan (chan capacity))
(def input-buffer (atom {}))

(def output-chan (chan capacity))

(def input-segments
  [{:url "http://example.com/0" :event-time #inst "2015-09-13T03:00:00.829-00:00"}
   {:url "http://example.com/1" :event-time #inst "2015-09-13T03:02:00.829-00:00"}
   {:url "http://example.com/1" :event-time #inst "2015-09-13T03:03:00.829-00:00"}
   {:url "http://example.com/0" :event-time #inst "2015-09-13T03:07:00.829-00:00"}
   {:url "http://example.com/0" :event-time #inst "2015-09-13T03:11:00.829-00:00"}
   {:url "http://example.com/1" :event-time #inst "2015-09-13T03:15:00.829-00:00"}])

(doseq [segment input-segments]
  (>!! input-chan segment))

(def env (onyx.api/start-env env-config))

(def peer-group (onyx.api/start-peer-group peer-config))

(def n-peers (count (set (mapcat identity workflow))))

(def v-peers (onyx.api/start-peers n-peers peer-group))

(defn inject-in-ch [event lifecycle]
  {:core.async/buffer input-buffer
   :core.async/chan   input-chan})

(defn inject-out-ch [event lifecycle]
  {:core.async/chan output-chan})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

(def lifecycles
  [{:lifecycle/task  :in
    :lifecycle/calls ::in-calls}
   {:lifecycle/task  :in
    :lifecycle/calls :onyx.plugin.core-async/reader-calls}
   {:lifecycle/task  :out
    :lifecycle/calls ::out-calls}
   {:lifecycle/task  :out
    :lifecycle/calls :onyx.plugin.core-async/writer-calls}])

(def windows
  [{:window/id          :count-events
    :window/task        :count
    :window/type        :sliding
    :window/aggregation :onyx.windowing.aggregation/count
    :window/window-key  :event-time
    :window/range       [5 :minutes]
    :window/slide       [1 :minute]}])

(def triggers
  [{:trigger/window-id     :count-events
    :trigger/id            :sync
    :trigger/on            :onyx.triggers/watermark
    :trigger/state-context [:window-state]
    :trigger/emit          ::emit-window!}])

(defn print-window!
  [event window trigger {:keys [lower-bound upper-bound group-key] :as window-data} state]
  (println (format "Window extent [%s - %s] contents: %s: %s"
                   lower-bound upper-bound group-key state)))

(defn emit-window!
  [event window trigger {:keys [lower-bound upper-bound group-key] :as window-data} state]
  {:url   group-key
   :count state})

(defn -main
  [& args]
  (let [submission (onyx.api/submit-job peer-config
                                        {:workflow       workflow
                                         :catalog        catalog
                                         :lifecycles     lifecycles
                                         :windows        windows
                                         :triggers       triggers
                                         :task-scheduler :onyx.task-scheduler/balanced})
        _          (close! input-chan)]
    (onyx.api/await-job-completion peer-config (:job-id submission)))

  (clojure.pprint/pprint (take-segments! output-chan 50))

  (doseq [v-peer v-peers]
    (onyx.api/shutdown-peer v-peer))

  (onyx.api/shutdown-peer-group peer-group)

  (onyx.api/shutdown-env env))
@lbradstreet
Copy link
Member

Is this the same problem as #840?

@neverfox
Copy link
Author

neverfox commented Jan 21, 2018

I don't believe so as there's no Kafka involved. Here I was trying to implement the new emit capability but couldn't get it to work with a watermark trigger. It's possible that it's just something else I don't understand about watermarks. The trigger fires if I use sync, but nothing goes to the output-ch if I use emit instead.

@lbradstreet
Copy link
Member

lbradstreet commented Jan 21, 2018 via email

@wehu
Copy link

wehu commented Feb 1, 2018

I am meeting the same issue and :trigger/emit used to be working with :onyx.triggers/watermark in 0.12.2, but seems it's broken in recent releases.

@wehu
Copy link

wehu commented Feb 1, 2018

looks like I have to set batch-size=1 to get all outputs when playing with trigger. If I set batch size > size of input data, I get nothing. If I set batch size > 1 and < size of input data, I missed some data in the end.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants