Skip to content
Philip (flip) Kromer edited this page Aug 12, 2012 · 4 revisions

Things in (?question blocks?) are unresolved.

Foundations

  • Simple: The thing you would draw at the whiteboard, spoken aloud, is the the thing you write.

  • Scalable: Scalability is more important than performance. Scalability is important even when there are no performance concerns -- scalable for robots means scalable for people.

  • Readable: The plot should be as simple as the story. Orthogonal concerns (schema, transformation, topology, transport, resources, and configuration) should be crisply separated.

Crucibles

Executor indpendence

Every Hanuman graph can be expressed

  • Buffered flow -- Flume
  • Event-driven flow -- cat, HTTP post, Hadoop
  • Picture -- graphviz etc
  • Explanation -- each record appends the plain-language action it takes on the record

Graph is a graph

Whereever concepts can be paired across the following, they should be, until they prove how they are distinct:

  • micro dataflow
  • macro dataflow
  • workflow
  • ----~
  • subroutine
  • system diagram

Petri Net / Graph Dual

  • every edge connects an action to a product or a product to an action
    • The product might be implied by the edge, and elided from the declaration.

Comparables

  • Kafka, ZeroMQ, Storm, Esper, Flume, Pusher
  • Rake, Chef, Shell scripts, Thor
  • Rack
  • Hadoop

Objects

  • stages

    • products
    • actions -- alter, augment or act on data. Some important types of actions:
      • transformers -- actions with exactly one input and one output
      • sources -- actions with zero inputs and (?one or more outputs?). Sources may be different in another way, not sure yet
      • sinks -- actions with (?one or more inputs?) and no output.
      • taps -- actions that make no changes to the data (?can a tap add metadata? can it mutate metadata?)
    • graphs -- contains other stages. A graph is also an action.
  • schema

  • record

  • control path

  • configuration

  • topic -- a filter This filter is special because the transport can

  • runner -- pairs a graph with concrete executors and products

(?questions exist about the following?)

  • partition --
  • chain -- (?a group of sequential edges understood to be related in an important way?)
  • delivery guarantee --
  • contract

Key Decisions

Data is primary Data is represented as "Arbitrary record with sideband metadata". That is, rather than an event wrapper with a message body you access, the data is primary and the metadata is accessible through it.

Flows are understandable in isolation:

Unless a processor explicitly maintains state or mixes in external entropy, the outcome of a data stream may be completely characterized by

  • Processor
  • Configuration (static parameters delivered at run-time to the processors on the graph)
  • Data record itself
  • Metadata attached to that record

The important point is that the outcome of a processor is not affected by changes to

  • the graph as a whole, including the stages that it pulls from and writes to
  • the machine or machines it runs on
  • the transport layer handling the flow
  • spatially or temporally where the data originates from or proceeds to.

As long as it is fed the same data and the same metadata with the same configuration, a processor doesn't care what's running it, where it is, who fed it the data or who is consuming it.

Orthogonal Capabilities

  • guarantees

    • end-to-end -- ensures acknowledgement by destination
    • next-hop -- guarantees successful handoff to next stage, but nothing more
    • best effort -- fire and forget

Key Mysteries

  • naming stages -- here are some desirable features:

    • unambiguously referencable
    • name doesn't change if irrelevant changes are made to the graph
    • name does change if relevant changes are made
    • name is predictable (by looking at the graph
    • name is readable _ name is unaffected by configuration
  • fanout: when several stages consume a given stage's ourputs,

    • does the stage have one output (a resourec) and the product has many consuming stages
    • or does the stage have multiple outputs, and the emit method needs re-think?
  • messages

    • can stages message each other directly?
    • or do they spek event a sideband that other stages can consume
    • or are they broadcase to all notds
  • amelioration: when are

    • mixin of a module (calling super)
    • wrapping a stage (before/after/around filter)
    • flow insertion (put module before / after it in the flow)
  • delivery guarantees

  • control path

  • biographers

Have / Want

processor widgets

  • choke -- emits all records it receives but not faster than a given averaged rate
  • Uniform consistent sampling
  • dashpot tap
  • channel topic
  • aggregates -- sum, avg/stdev, %ile
  • graph change stream -- an activity stream of macro graph actions (topology or configuration changes).

topology widgets

  • many-to-many -- all input records sent to all output slots. (?input partition is somehow recorded in event metadata?).
  • switch -- each record sent to exactly one out slot.

contract

  • window --

  • cogroup

  • sort

  • grouping "fences" -- indicate start/end boundaries of a group in the stream (or in metadata) rather than by combining into a rolled-up record

  • buffer -- ...

  • retry --

  • load balance --

  • barrier -- execution pauses until some condition met

  • schedule -- ...

central-ish

  • counters, timers, gauges, values
  • announce / discover

understanding

  • simulate -- sampled yet illustrative flow goes end-to-end
  • explain -- processors explain what they do, appending it to the explanation passed along the graph
  • audit -- metadata element showing full provenance of the record

demo

  • Hamming's Problem
  • Newton's method

Further reading

Juneteenth: definition of interface

Contract

Stage

Front-facing:

  • .make(*, {}) -- new stage
  • #output(out_name) -- the stage, if any, that the named output slot of this stage feeds
  • #outputs -- an ordered list of stages this one feeds
  • #owner -- graph containing this stage. Every stage has an owner except Universe
  • #into(other, from_slot, into_slot) -- asks its graph to connect self to other; returns other

Framework:

  • #set_output
  • #set_owner

Inessential (maybe):

  • #name -- name of stage; is unique on its graph
  • #fullname -- globally unique name of stage, formed as "#{owner.name}.#{self.name}"

Decisions to make / Restrictions we are committing to:

  • !! each stage is owned by exactly one graph (except Universe)
  • !! stages cannot fruitfully exist in the absence of a graph
  • names are:
    • ??determined only by graph??
    • ??immutable on object??
    • ??only given in special cases??

Graph

Front-facing:

  • stages -- list of all stages added to graph
  • inputs -- list of named input slots
  • drive(source) -- asks the named source to drive. ??run?? ??drive??
  • ?? fetch(sink) -- fetches next record from named sink ??

Framework:

  • set_stage(name, stage) -- (might need to be (stage, name))
    • adds it to the stages collection. A named stage replaces any existing stage with that key, ??at the same position it held??.
    • note that you cannot unset_stage. Feels dangerous.
  • connect(from, into, from_slot, into_slot)

how are names chosen?

  • optional arbitrary name supplied at time of storage. Only some things are named.
  • optional arbitrary name supplied at time of storage. Otherwise a mangled name is supplied
  • calls #name on objects. You have to keep things straight. (This is trouble).

(one could separate naming a stage from adding it to graph. still don't know if names are intrinsic)

Sugar:

  • <magic stage method> -- stage types register for a magic method *eg. StdinSource is stdin, MapProcessor is map, RegexpFilter is re). This
    • calls .make on that class
    • add stage to itself (which handles owner and naming)



currently, from pry (lightly organized):

Gorillib::Model#methods            :
  ==  inspect
  read_attribute unset_attribute write_attribute attribute_set?
  receive!   update_attributes  attribute_values  attributes  compact_attributes
  read_unset_attribute  handle_extra_attributes
  as_json  to_json  to_tsv  to_wire
Gorillib::Builder#methods          :
  getset  getset_member  inspect_helper
  collection_of  get_collection_item  getset_collection_item  has_collection_item? set_collection_item

Meta::Hanuman::StageType#methods   : name  name?  receive_name owner  owner?  receive_owner doc receive_doc
Hanuman::Stage#methods             : report  setup  stop  fullname --- configure  key_method lookup  notify  to_key
#
Hanuman::Action                    :
#
Hanuman::IsOwnInputSlot#methods    : inputs
Hanuman::IsOwnOutputSlot#methods   : outputs
Hanuman::Inlinkable#methods        : <<  from  set_input
Hanuman::Outlinkable#methods       : >  into  set_output
#
Wukong::Processor#methods          : bad_record  emit input  input?  output  output? receive_input  receive_output
Wukong::Map#methods                : blk  process call

Meta::Hanuman::StageType#methods   : doc  name  name?  owner  owner?  receive_doc  receive_name receive_owner
Hanuman::Stage#methods             : configure  fullname  key_method  notify  report  to_key
#
Hanuman::Action                    :
#
Hanuman::Inlinkable#methods        : <<  from
Hanuman::SplatInputs#methods       : has_input?  inslots  set_input
Hanuman::Outlinkable#methods       : >
Hanuman::SplatOutputs#methods      : into  outputs
Hanuman::Slottable#methods         : handle_extra_attributes  inputs
#
Hanuman::Graph#methods             : declare_stage  connect  action  next_name_for  product tree
  as_is  file_sink  file_source  flatten  foreach  from_json  from_tsv  graph  integers  limit lookup  map  not_re  null  re  shell  stderr  stdin  stdout  to_json  to_tsv
Meta::Hanuman::GraphType#methods   : stage  stages has_stage?  receive_stages  edges receive_edges  input?  output? receive_input  receive_output
#
Wukong::Dataflow#methods           : setup  stop drive  input  output  process  process_stages reject  select  set_output  sink_stages  source_stages
Meta::Wukong::DataflowType#methods : has_outslot?  has_splat_inslot?  has_splat_outslot? outslot outslots  receive_outslots  receive_splat_inslots  receive_splat_outslots  splat_inslot splat_inslots  splat_outslot  splat_outslots



"code is written thoughfully but run recklessly"

make vs new/initialize vs receive

  • interface to receive is (*args, {attrs}):
    • the last element must be a hash of attributes
    • args must win out over attrs

The rule is "last given arg is always the attrs" -- in cases like

def foo(alpha=nil, beta=nil, gamma={}, attrs={}) ; end

 input                     alpha  beta  gamma        attrs
-------                    -----  ----  -----        -----
foo                      # nil    nil   {}           {}
foo({})                  # nil    nil   {}           {}
foo 7                    # 7      nil   {}           {}
foo :a => b              # nil    nil   {}           {:a => :b}
foo 7, 3, :a => :b       # 7      3     {}           {:a => :b}
foo 7, 3, {:a => :b}, {} # 7      3     {:a => :b}   {}

If any positional arg is a hash, you must slap an extra hash on the end. Also, don't use hashes for positional args.

is there a non-receive path for initialization?

You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:

  • I get to choose any signature for initialize
  • The signature for receive is predictable, so I can provide generic advice to an object
  • It is predictable whether attributes are receive_xxd or set directly
  • only type-converts once and only evals block once
  • objects are fully-formed when initialize is done
  • I can initialize an object directly, without passing values through the receive door, if I'm willing to take responsibility that the objects are safe.

Case A: Objects own their own new/initialize

You can initialize an object directly, with no fuckery in between.

  • new calls initialize (and in every other way is untouched)
  • initialize(*args,{attrs}) is whatever you want, but it must be sensible to call it with no args
  • receive(*args,{attrs},&block) calls
    • new -- with no args
    • receive! -- with the args it was given

In this world,

  • an object must be comfortable with being incomplete even after initialize finishes

Case B:

  • new(*args,{attrs},&block)
    • calls initialize (and in every other way is untouched)
  • initialize(*args,{attrs},&block)
    • you can do what you want,
    • ...but you must call super (or otherwise ensure that receive! is called).
  • .receive(*args,{attrs},&block) just calls new, which calls initialize, which calls receive!

In this world,

  • you can't choose your own initialize signature
  • there's no way to construct a model that doesn't incur the receive chain business (apart from adding a _native bailout flag -- I'm thinking ahead to avro)

Names

You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:

  • Stages have a terse name that doesn't look like robot spew
  • Stages can produce a globally unique name that doesn't look like robot spew.
  • I can label a spot on a graph, assign to it directly (replacing whatever's there), and refer to it uniquely (getting back the last stage assigned to that spot).
  • If an object has a clear intrinsic name it becomes (or heavily informs) its label on the graph
  • Objects are encouraged to have names and then make opinionated assumptions about how to configure themselves based on it. (This is really important, as proven by chef; it consequently means that names don't exist at the graph's primary convenience)
  • I can have multiple differently-named instances of a stage template on the same graph
  • I can have multiple identically-named instances of a stage template on the same graph, and I don't have to do anything special -- eg several pig('dump_to_s3') stages

Hmm... what if stages weren't named but rather labeled -- that is, the label is just a retrieval key for a stage on a graph.

  • in workflow world esp., things are very excited to learn their names/labels, and can do a lot of nice magic with them.
    • you can cause trouble with the unique-name-on-graph rule when names become labels always.
  • we want to distribute configuration by letting it drape naturally over the graph. (That is: the configuration key crips.snoop.gat becomes available to the snoop node on the crips graph as the gat configuration setting).
    • This argues that as macro flows start to get more complex, you'll want to start labelling things more.
  • the number 11 doesn't know that I've labeled it as awesomeness; rather, its binding knows how to retrieve the object labeled awesomeness.

Collection

We have lots of examples where we want a collection of objects that meets the following:

  • retrieval like a hash: objects retrieved by key, key maps uniquely to object, adding object with same key replaces former contents

  • iterates like an array -- each returns the values only

  • serializes as an array

  • Objects all adhere to similar-enough contract

  • receiveing a group of objects passes each to a factory for creation

  • I don't want the kitchen soup of enumerable methods

  • things are stored in order they are added, and retrievable by index

  • things can be retrieved by name:

    • how is that name chosen?
      • optional arbitrary name supplied at time of storage
      • collection calls #name on objects
      • collection calls certain method on objects, uses that as retreival key
    • regardless: if I replace the object in the foo slot, the index does not change. (trying to balance needs of positional slots with keyword slots)
  • retrieval should be fairly direct -- would rather not iterate over whole collection each time

How aware of its objects should the collection be?

  • if a method is used to assign names, they must all have that method

  • Does the container add anything to the attributes hash as it goes by? It does now, is probably a mistake.

  • receive! should pass contents to a factory.

  • ... which means find_or_create seems plausible

  • ... create seems a bit less so

  • #[] -- gets object with given name

  • #[]= -- adds object with given name

  • #<< -- adds object

  • delete --

  • fetch --

  • include? -- true if has given key

  • to_a -- all the objects

  • empty? -- true if has no objects

  • blank? -- true if empty?

  • present? -- true if not blank?

  • receive! -- merges in contents of given enumerable. (

  • receive -- nem

inessential, but reasonable:

  • each -- iterates over the values
  • each_pair -- iterates over key-value pairs
  • values -- same as to_a
  • to_hash -- hash of key-value pairs, in same order as collection
  • keys -- all the keys
  • has_key? -- same as include?
  • length -- number of items
  • size -- same as length

  • all stages have a source and sink; emit(result) calls sink.proces(result) / emit(:spam, result) calls sink(:spam).process(result). This method always refers to a concrete stage, the one that has by whatever action of fate been wired to it.

  • input and output refer to the slots of

  • The inslot -- holds the schema & the label

  • a data source -- for argument's sake, a driving stage (it will call process on the stage it is wired to)

  • from inside the graph, the stage that gets data sent through the port

    src inslot

             +-------------------
             |
             +-+
    _        | |    +---+
    

    [_]-------|-|----| |--- | | +---+ +-+ | +----------------------

    src inslot

             +-------------------
             |
             +---+
             | _ |    +---+
             |[_]+----|   |---
             |   |    +---+
             +---+
             |
             +----------------------
    

    http_source > chain(:stuff){ input > parse > fiddle > output } > hbase_sink

    http_source > chain(:stuff){ parse > fiddle } > hbase_sink

    chain(:stuff){ http_source > parse > fiddle } > hbase_sink

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) numberer << lines << ints > numbered_lines }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }

    chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }

    stdin > chain(:ln).lines ; integers > chain(:ln).ints ; chain(:ln) > stdout

    not allowing (positional assignment): chain(:ln) << stdin << ints > stdout

    chain(:ln) << { lines: stdin, ints: integers } > stdout

    chain(:ln).from(lines: stdin, ints: integers).into(stdout)

    dataflow(:word_count_map){ input > tokenize > flatten > output } dataflow(:word_count_red){ input > count > output }

    .

actions are 1 in many out, branching uses channel

all stages have one source multiple (anonymous) sinks

def emit(rec) sinks.each{|sink| process(rec) } ; end

To have topological branching, label data wit a topic and use a switch:

class Switch
  def emit(rec) sinks.each{|sink| process(rec) if match?(sink, rec._channel) } ; end

Actions declare inputs and outputs; sinks/sources are 1:1 with inputs/outputs

stages can have named outputs with (optional) splat outputs, or be singular output

# named outputs
def emit_foo(rec) emit(:foo, rec) ; end
# splat outputs (and used by named outputs)
def emit(label, rec) sink(label).process(rec) ; end

# singular output
def emit(rec) sink.process(rec) ; end

Inputs are just labels; you only connect stages to stages

Inputs are real; they are a type of product.

class Transformer # (singular in, singular out)
  consumes String
  produces Integer
end

class ProcessorWithMultInMultOut
  consumes :lines,   String
  consumes :numbers, Integer
  produces :even_numbered_lines, String
  produces :odd_numbered_lines,  String
end

class ProcessorWithMultInSplatInSplatOut
  consumes :splats, String
end
Clone this wiki locally