Skip to content
Philip (flip) Kromer edited this page Aug 12, 2012 · 4 revisions

Things in (?question blocks?) are unresolved.


  • Simple: The thing you would draw at the whiteboard, spoken aloud, is the the thing you write.

  • Scalable: Scalability is more important than performance. Scalability is important even when there are no performance concerns -- scalable for robots means scalable for people.

  • Readable: The plot should be as simple as the story. Orthogonal concerns (schema, transformation, topology, transport, resources, and configuration) should be crisply separated.


Executor indpendence

Every Hanuman graph can be expressed

  • Buffered flow -- Flume
  • Event-driven flow -- cat, HTTP post, Hadoop
  • Picture -- graphviz etc
  • Explanation -- each record appends the plain-language action it takes on the record

Graph is a graph

Whereever concepts can be paired across the following, they should be, until they prove how they are distinct:

  • micro dataflow
  • macro dataflow
  • workflow
  • ----~
  • subroutine
  • system diagram

Petri Net / Graph Dual

  • every edge connects an action to a product or a product to an action
    • The product might be implied by the edge, and elided from the declaration.


  • Kafka, ZeroMQ, Storm, Esper, Flume, Pusher
  • Rake, Chef, Shell scripts, Thor
  • Rack
  • Hadoop


  • stages

    • products
    • actions -- alter, augment or act on data. Some important types of actions:
      • transformers -- actions with exactly one input and one output
      • sources -- actions with zero inputs and (?one or more outputs?). Sources may be different in another way, not sure yet
      • sinks -- actions with (?one or more inputs?) and no output.
      • taps -- actions that make no changes to the data (?can a tap add metadata? can it mutate metadata?)
    • graphs -- contains other stages. A graph is also an action.
  • schema

  • record

  • control path

  • configuration

  • topic -- a filter This filter is special because the transport can

  • runner -- pairs a graph with concrete executors and products

(?questions exist about the following?)

  • partition --
  • chain -- (?a group of sequential edges understood to be related in an important way?)
  • delivery guarantee --
  • contract

Key Decisions

Data is primary Data is represented as "Arbitrary record with sideband metadata". That is, rather than an event wrapper with a message body you access, the data is primary and the metadata is accessible through it.

Flows are understandable in isolation:

Unless a processor explicitly maintains state or mixes in external entropy, the outcome of a data stream may be completely characterized by

  • Processor
  • Configuration (static parameters delivered at run-time to the processors on the graph)
  • Data record itself
  • Metadata attached to that record

The important point is that the outcome of a processor is not affected by changes to

  • the graph as a whole, including the stages that it pulls from and writes to
  • the machine or machines it runs on
  • the transport layer handling the flow
  • spatially or temporally where the data originates from or proceeds to.

As long as it is fed the same data and the same metadata with the same configuration, a processor doesn't care what's running it, where it is, who fed it the data or who is consuming it.

Orthogonal Capabilities

  • guarantees

    • end-to-end -- ensures acknowledgement by destination
    • next-hop -- guarantees successful handoff to next stage, but nothing more
    • best effort -- fire and forget

Key Mysteries

  • naming stages -- here are some desirable features:

    • unambiguously referencable
    • name doesn't change if irrelevant changes are made to the graph
    • name does change if relevant changes are made
    • name is predictable (by looking at the graph
    • name is readable _ name is unaffected by configuration
  • fanout: when several stages consume a given stage's ourputs,

    • does the stage have one output (a resourec) and the product has many consuming stages
    • or does the stage have multiple outputs, and the emit method needs re-think?
  • messages

    • can stages message each other directly?
    • or do they spek event a sideband that other stages can consume
    • or are they broadcase to all notds
  • amelioration: when are

    • mixin of a module (calling super)
    • wrapping a stage (before/after/around filter)
    • flow insertion (put module before / after it in the flow)
  • delivery guarantees

  • control path

  • biographers

Have / Want

processor widgets

  • choke -- emits all records it receives but not faster than a given averaged rate
  • Uniform consistent sampling
  • dashpot tap
  • channel topic
  • aggregates -- sum, avg/stdev, %ile
  • graph change stream -- an activity stream of macro graph actions (topology or configuration changes).

topology widgets

  • many-to-many -- all input records sent to all output slots. (?input partition is somehow recorded in event metadata?).
  • switch -- each record sent to exactly one out slot.


  • window --

  • cogroup

  • sort

  • grouping "fences" -- indicate start/end boundaries of a group in the stream (or in metadata) rather than by combining into a rolled-up record

  • buffer -- ...

  • retry --

  • load balance --

  • barrier -- execution pauses until some condition met

  • schedule -- ...


  • counters, timers, gauges, values
  • announce / discover


  • simulate -- sampled yet illustrative flow goes end-to-end
  • explain -- processors explain what they do, appending it to the explanation passed along the graph
  • audit -- metadata element showing full provenance of the record


  • Hamming's Problem
  • Newton's method

Further reading

Juneteenth: definition of interface




  • .make(*, {}) -- new stage
  • #output(out_name) -- the stage, if any, that the named output slot of this stage feeds
  • #outputs -- an ordered list of stages this one feeds
  • #owner -- graph containing this stage. Every stage has an owner except Universe
  • #into(other, from_slot, into_slot) -- asks its graph to connect self to other; returns other


  • #set_output
  • #set_owner

Inessential (maybe):

  • #name -- name of stage; is unique on its graph
  • #fullname -- globally unique name of stage, formed as "#{}.#{}"

Decisions to make / Restrictions we are committing to:

  • !! each stage is owned by exactly one graph (except Universe)
  • !! stages cannot fruitfully exist in the absence of a graph
  • names are:
    • ??determined only by graph??
    • ??immutable on object??
    • ??only given in special cases??



  • stages -- list of all stages added to graph
  • inputs -- list of named input slots
  • drive(source) -- asks the named source to drive. ??run?? ??drive??
  • ?? fetch(sink) -- fetches next record from named sink ??


  • set_stage(name, stage) -- (might need to be (stage, name))
    • adds it to the stages collection. A named stage replaces any existing stage with that key, ??at the same position it held??.
    • note that you cannot unset_stage. Feels dangerous.
  • connect(from, into, from_slot, into_slot)

how are names chosen?

  • optional arbitrary name supplied at time of storage. Only some things are named.
  • optional arbitrary name supplied at time of storage. Otherwise a mangled name is supplied
  • calls #name on objects. You have to keep things straight. (This is trouble).

(one could separate naming a stage from adding it to graph. still don't know if names are intrinsic)


  • <magic stage method> -- stage types register for a magic method *eg. StdinSource is stdin, MapProcessor is map, RegexpFilter is re). This
    • calls .make on that class
    • add stage to itself (which handles owner and naming)

currently, from pry (lightly organized):

Gorillib::Model#methods            :
  ==  inspect
  read_attribute unset_attribute write_attribute attribute_set?
  receive!   update_attributes  attribute_values  attributes  compact_attributes
  read_unset_attribute  handle_extra_attributes
  as_json  to_json  to_tsv  to_wire
Gorillib::Builder#methods          :
  getset  getset_member  inspect_helper
  collection_of  get_collection_item  getset_collection_item  has_collection_item? set_collection_item

Meta::Hanuman::StageType#methods   : name  name?  receive_name owner  owner?  receive_owner doc receive_doc
Hanuman::Stage#methods             : report  setup  stop  fullname --- configure  key_method lookup  notify  to_key
Hanuman::Action                    :
Hanuman::IsOwnInputSlot#methods    : inputs
Hanuman::IsOwnOutputSlot#methods   : outputs
Hanuman::Inlinkable#methods        : <<  from  set_input
Hanuman::Outlinkable#methods       : >  into  set_output
Wukong::Processor#methods          : bad_record  emit input  input?  output  output? receive_input  receive_output
Wukong::Map#methods                : blk  process call

Meta::Hanuman::StageType#methods   : doc  name  name?  owner  owner?  receive_doc  receive_name receive_owner
Hanuman::Stage#methods             : configure  fullname  key_method  notify  report  to_key
Hanuman::Action                    :
Hanuman::Inlinkable#methods        : <<  from
Hanuman::SplatInputs#methods       : has_input?  inslots  set_input
Hanuman::Outlinkable#methods       : >
Hanuman::SplatOutputs#methods      : into  outputs
Hanuman::Slottable#methods         : handle_extra_attributes  inputs
Hanuman::Graph#methods             : declare_stage  connect  action  next_name_for  product tree
  as_is  file_sink  file_source  flatten  foreach  from_json  from_tsv  graph  integers  limit lookup  map  not_re  null  re  shell  stderr  stdin  stdout  to_json  to_tsv
Meta::Hanuman::GraphType#methods   : stage  stages has_stage?  receive_stages  edges receive_edges  input?  output? receive_input  receive_output
Wukong::Dataflow#methods           : setup  stop drive  input  output  process  process_stages reject  select  set_output  sink_stages  source_stages
Meta::Wukong::DataflowType#methods : has_outslot?  has_splat_inslot?  has_splat_outslot? outslot outslots  receive_outslots  receive_splat_inslots  receive_splat_outslots  splat_inslot splat_inslots  splat_outslot  splat_outslots

"code is written thoughfully but run recklessly"

make vs new/initialize vs receive

  • interface to receive is (*args, {attrs}):
    • the last element must be a hash of attributes
    • args must win out over attrs

The rule is "last given arg is always the attrs" -- in cases like

def foo(alpha=nil, beta=nil, gamma={}, attrs={}) ; end

 input                     alpha  beta  gamma        attrs
-------                    -----  ----  -----        -----
foo                      # nil    nil   {}           {}
foo({})                  # nil    nil   {}           {}
foo 7                    # 7      nil   {}           {}
foo :a => b              # nil    nil   {}           {:a => :b}
foo 7, 3, :a => :b       # 7      3     {}           {:a => :b}
foo 7, 3, {:a => :b}, {} # 7      3     {:a => :b}   {}

If any positional arg is a hash, you must slap an extra hash on the end. Also, don't use hashes for positional args.

is there a non-receive path for initialization?

You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:

  • I get to choose any signature for initialize
  • The signature for receive is predictable, so I can provide generic advice to an object
  • It is predictable whether attributes are receive_xxd or set directly
  • only type-converts once and only evals block once
  • objects are fully-formed when initialize is done
  • I can initialize an object directly, without passing values through the receive door, if I'm willing to take responsibility that the objects are safe.

Case A: Objects own their own new/initialize

You can initialize an object directly, with no fuckery in between.

  • new calls initialize (and in every other way is untouched)
  • initialize(*args,{attrs}) is whatever you want, but it must be sensible to call it with no args
  • receive(*args,{attrs},&block) calls
    • new -- with no args
    • receive! -- with the args it was given

In this world,

  • an object must be comfortable with being incomplete even after initialize finishes

Case B:

  • new(*args,{attrs},&block)
    • calls initialize (and in every other way is untouched)
  • initialize(*args,{attrs},&block)
    • you can do what you want,
    • ...but you must call super (or otherwise ensure that receive! is called).
  • .receive(*args,{attrs},&block) just calls new, which calls initialize, which calls receive!

In this world,

  • you can't choose your own initialize signature
  • there's no way to construct a model that doesn't incur the receive chain business (apart from adding a _native bailout flag -- I'm thinking ahead to avro)


You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:

  • Stages have a terse name that doesn't look like robot spew
  • Stages can produce a globally unique name that doesn't look like robot spew.
  • I can label a spot on a graph, assign to it directly (replacing whatever's there), and refer to it uniquely (getting back the last stage assigned to that spot).
  • If an object has a clear intrinsic name it becomes (or heavily informs) its label on the graph
  • Objects are encouraged to have names and then make opinionated assumptions about how to configure themselves based on it. (This is really important, as proven by chef; it consequently means that names don't exist at the graph's primary convenience)
  • I can have multiple differently-named instances of a stage template on the same graph
  • I can have multiple identically-named instances of a stage template on the same graph, and I don't have to do anything special -- eg several pig('dump_to_s3') stages

Hmm... what if stages weren't named but rather labeled -- that is, the label is just a retrieval key for a stage on a graph.

  • in workflow world esp., things are very excited to learn their names/labels, and can do a lot of nice magic with them.
    • you can cause trouble with the unique-name-on-graph rule when names become labels always.
  • we want to distribute configuration by letting it drape naturally over the graph. (That is: the configuration key crips.snoop.gat becomes available to the snoop node on the crips graph as the gat configuration setting).
    • This argues that as macro flows start to get more complex, you'll want to start labelling things more.
  • the number 11 doesn't know that I've labeled it as awesomeness; rather, its binding knows how to retrieve the object labeled awesomeness.


We have lots of examples where we want a collection of objects that meets the following:

  • retrieval like a hash: objects retrieved by key, key maps uniquely to object, adding object with same key replaces former contents

  • iterates like an array -- each returns the values only

  • serializes as an array

  • Objects all adhere to similar-enough contract

  • receiveing a group of objects passes each to a factory for creation

  • I don't want the kitchen soup of enumerable methods

  • things are stored in order they are added, and retrievable by index

  • things can be retrieved by name:

    • how is that name chosen?
      • optional arbitrary name supplied at time of storage
      • collection calls #name on objects
      • collection calls certain method on objects, uses that as retreival key
    • regardless: if I replace the object in the foo slot, the index does not change. (trying to balance needs of positional slots with keyword slots)
  • retrieval should be fairly direct -- would rather not iterate over whole collection each time

How aware of its objects should the collection be?

  • if a method is used to assign names, they must all have that method

  • Does the container add anything to the attributes hash as it goes by? It does now, is probably a mistake.

  • receive! should pass contents to a factory.

  • ... which means find_or_create seems plausible

  • ... create seems a bit less so

  • #[] -- gets object with given name

  • #[]= -- adds object with given name

  • #<< -- adds object

  • delete --

  • fetch --

  • include? -- true if has given key

  • to_a -- all the objects

  • empty? -- true if has no objects

  • blank? -- true if empty?

  • present? -- true if not blank?

  • receive! -- merges in contents of given enumerable. (

  • receive -- nem

inessential, but reasonable:

  • each -- iterates over the values
  • each_pair -- iterates over key-value pairs
  • values -- same as to_a
  • to_hash -- hash of key-value pairs, in same order as collection
  • keys -- all the keys
  • has_key? -- same as include?
  • length -- number of items
  • size -- same as length

  • all stages have a source and sink; emit(result) calls sink.proces(result) / emit(:spam, result) calls sink(:spam).process(result). This method always refers to a concrete stage, the one that has by whatever action of fate been wired to it.

  • input and output refer to the slots of

  • The inslot -- holds the schema & the label

  • a data source -- for argument's sake, a driving stage (it will call process on the stage it is wired to)

  • from inside the graph, the stage that gets data sent through the port

    src inslot

    _        | |    +---+

    [_]-------|-|----| |--- | | +---+ +-+ | +----------------------

    src inslot

             | _ |    +---+
             |[_]+----|   |---
             |   |    +---+

    http_source > chain(:stuff){ input > parse > fiddle > output } > hbase_sink

    http_source > chain(:stuff){ parse > fiddle } > hbase_sink

    chain(:stuff){ http_source > parse > fiddle } > hbase_sink

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) numberer << lines << ints > numbered_lines }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }

    stdin > chain(:ln).lines ; integers > chain(:ln).ints ; chain(:ln) > stdout

    not allowing (positional assignment): chain(:ln) << stdin << ints > stdout

    chain(:ln) << { lines: stdin, ints: integers } > stdout

    chain(:ln).from(lines: stdin, ints: integers).into(stdout)

    dataflow(:word_count_map){ input > tokenize > flatten > output } dataflow(:word_count_red){ input > count > output }


actions are 1 in many out, branching uses channel

all stages have one source multiple (anonymous) sinks

def emit(rec) sinks.each{|sink| process(rec) } ; end

To have topological branching, label data wit a topic and use a switch:

class Switch
  def emit(rec) sinks.each{|sink| process(rec) if match?(sink, rec._channel) } ; end

Actions declare inputs and outputs; sinks/sources are 1:1 with inputs/outputs

stages can have named outputs with (optional) splat outputs, or be singular output

# named outputs
def emit_foo(rec) emit(:foo, rec) ; end
# splat outputs (and used by named outputs)
def emit(label, rec) sink(label).process(rec) ; end

# singular output
def emit(rec) sink.process(rec) ; end

Inputs are just labels; you only connect stages to stages

Inputs are real; they are a type of product.

class Transformer # (singular in, singular out)
  consumes String
  produces Integer

class ProcessorWithMultInMultOut
  consumes :lines,   String
  consumes :numbers, Integer
  produces :even_numbered_lines, String
  produces :odd_numbered_lines,  String

class ProcessorWithMultInSplatInSplatOut
  consumes :splats, String
Clone this wiki locally