Skip to content

Documenting Communications Layer

Andrew Clark edited this page Jan 20, 2015 · 7 revisions

Comms Layer

An outline of how the communication layer works in cylc 6.1.2.

Interfaces

Two interfaces provided are via Pyro for communicating with suite:

  • A "command_queue" - from cylc.suite_cmd_interface import comqueue
  • A "info_interface" - from cylc.suite_info_interface import info_interface

Also using Pyro (for reference down the line) is a log interface via:

  • "log_interface" - from cylc.suite_log_interface import log_interface

The commands these work with are defined in scheduler.py under def configure which creates a dict defining the command names and the appropriate methods to link them to, outlined below.

A command in the command_queue is processed in scheduler.py by a call to def process_command_queue( self ) which performs name, args = queue.get(False) to extract the command and it's arguments.

If a piece of code wishes to interact with the suite and put entries in the queue it will first request a proxy from the pyro client, targeted at the appropriate named interface e.g.:

    proxy = cylc_pyro_client.client( suite, pphrase, options.owner,
            options.host, options.pyro_timeout,
            options.port ).get_proxy( 'command-interface' )

Commands can then be issued using this proxy as:

    proxy.put( command, *args )

Commands for retrieving info from a suite

All of these require access to the suite passphrase. A number of them could potentially be made available as accessible by anyone e.g. "get cylc version".

Command: ping suite
Calls: info_ping_suite( self )
Returns: True
Notes:
Command: ping task
Calls: info_ping_task( self, task_id )
Returns: self.pool.ping_task( task_id )
Notes:
Command: suite info
Calls: info_get_suite_info( self )
Returns: [ self.config.cfg['title'], user ]
Notes:
Command: task info
Calls: info_get_task_info( self, task_names )
Returns: info
Notes: info is a dict with keys for each of the task names requested
Command: all families
Calls: info_get_all_families( self, exclude_root=False )
Returns: List of families including or excluding the "root" family
Notes:
Command: triggering families
Calls: info_get_triggering_families( self )
Returns: self.config.triggering_families
Notes:
Command: first-parent ancestors
Calls: info_get_first_parent_ancestors( self, pruned=False )
Returns: deepcopy(self.config.get_first_parent_ancestors(pruned) )
Notes: single-inheritance hierarchy based on first parents
Command: first-parent descendants
Calls: info_get_first_parent_descendants( self )
Returns: deepcopy(self.config.get_first_parent_descendants())
Notes:
Command: graph raw
Calls: info_get_graph_raw( self, cto, ctn, group_nodes, ungroup_nodes,ungroup_recursive, group_all, ungroup_all )
Returns: self.config.get_graph_raw( cto, ctn, group_nodes, ungroup_nodes, ungroup_recursive, group_all, ungroup_all), self.config.suite_polling_tasks, self.config.leaves, self.config.feet
Notes:
Command: task requisites
Calls: info_get_task_requisites( self, in_ids )
Returns: self.pool.get_task_requisites( ids )
Notes:
Command: get cylc version
Calls: info_get_cylc_version(self)
Returns: CYLC_VERSION
Notes:

Commands for controlling the suite

As with the info commands, these all require access to the suite passphrase.

Command: stop cleanly
Calls: command_set_stop_cleanly(self, kill_active_tasks=False)
Returns: -
Dependency negotiation run after this?
Notes: Sets shutdown flag in scheduler.py
Command: stop now
Calls: command_stop_now(self)
Returns: -
Dependency negotiation run after this?
Notes: Results in "immediate" suite shutdown
Command: stop after point
Calls: command_set_stop_after_point( self, point_string )
Returns: -
Dependency negotiation run after this?
Notes: Calls self.set_stop_point( point_string ) in scheduler
Command: stop after clock time
Calls: command_set_stop_after_clock_time( self, arg )
Returns: -
Dependency negotiation run after this?
Notes: Parses an input time and sets suite to stop then.
Command: stop after task
Calls: command_set_stop_after_task(self, tid)
Returns: -
Dependency negotiation run after this?
Notes: Results in call to self.set_stop_task(tid)
Command: release suite
Calls: command_release_suite( self )
Returns: -
Dependency negotiation run after this? Y
Notes: Calls self.release_suite()
Command: release task
Calls: command_release_task( self, name, point_string, is_family )
Returns: -
Dependency negotiation run after this? Y
Notes: Parses inputs to result in call to self.pool.release_tasks( task_ids )
Command: remove cycle
Calls: command_remove_cycle( self, point_string, spawn )
Returns: -
Dependency negotiation run after this?
Notes: Results in call to self.pool.remove_entire_cycle( point, spawn )
Command: remove task
Calls: command_remove_task( self, name, point_string, is_family, spawn )
Returns: -
Dependency negotiation run after this?
Notes: Results in call to self.pool.remove_tasks( task_ids, spawn )
Command: hold suite now
Calls: command_hold_suite( self )
Returns: -
Dependency negotiation run after this?
Notes: Just calls self.hold_suite() - ultimately an alias
Command: hold task now
Calls: command_hold_task( self, name, point_string, is_family )
Returns: -
Dependency negotiation run after this?
Notes: Processes inputs and calls self.pool.hold_tasks( task_ids )
Command: set runahead
Calls: command_set_runahead( self, *args )
Returns: -
Dependency negotiation run after this? Y
Notes: Calls self.pool.set_runahead(*args) - just an alias.
Command: set verbosity
Calls: command_set_verbosity(self, lvl)
Returns: True, 'OK'
Dependency negotiation run after this?
Notes:
Command: purge tree
Calls: command_purge_tree( self, id, stop )
Returns: -
Dependency negotiation run after this? Y
Notes: Leads to call to self.pool.purge_tree( id, get_point(stop) )
Command: reset task state
Calls: command_reset_task_state( self, name, point_string, state, is_family )
Returns:
Dependency negotiation run after this? Y
Notes: Results in call to self.pool.reset_task_states( task_ids, state )
Command: trigger task
Calls: command_trigger_task( self, name, point_string, is_family )
Returns: -
Dependency negotiation run after this? Y
Notes: Calls self.pool.trigger_tasks( task_ids )
Command: nudge suite
Calls: command_nudge( self )
Returns:
Dependency negotiation run after this? Y
Notes: Puts an entry in the command queue
Command: insert task
Calls: command_insert_task( self, name, point_string, is_family, stop_point_string )
Returns: -
Dependency negotiation run after this? Y
Notes:
Command: reload suite
Calls: command_reload_suite( self )
Returns: -
Dependency negotiation run after this? Y
Notes: Just calls self.reconfigure()
Command: add prerequisite
Calls: command_add_prerequisite( self, task_id, message )
Returns: -
Dependency negotiation run after this?
Notes: Calls self.pool.add_prereq_to_task( task_id, message )
Command: poll tasks
Calls: command_poll_tasks( self, name, point_string, is_family )
Returns: -
Dependency negotiation run after this?
Notes: Extracts task ids and calls self.pool.poll_tasks( task_ids )
Command: kill tasks
Calls: command_kill_tasks( self, name, point_string, is_family )
Returns: -
Dependency negotiation run after this?
Notes: Extracts task ids and calls self.pool.kill_tasks( task_ids )

Other events that trigger "dependency negotiation":

'kill cycle',
'kill task',
'prerequisite'