nested package

Submodules

nested.Agent module

This module contains the Agent class. The Agent and Stream classes are the building blocks of PythonStreams.

class nested.Agent.Agent(in_streams, out_streams, transition, state=None, call_streams=None, stream_manager=None, name=None)

Bases: object

An agent is an automaton: a state-transition machine. An agent is initialized in __init__ and a state transition is executed by next().

An agent has lists of input streams, output streams and call streams. Streams are described in Stream.py.

During a state transition an agent may read values from its input streams, append values to its output streams, change its state and carry out operations on other objects.

When a call stream is modified the agent’s next() method is called which causes the agent to execute a state transition.

The default is that every input stream is also a call stream, i.e., the agent executes a state transition when any of its input streams is modified. For performance reasons, we may not want the agent to execute state transitions when some input streams are modified; in this case, the sets of call and input streams will be different.

InList : a named_tuple with arguments:

list, start, stop An InList defines the list slice:

list[start:stop]

in_streams : list of streams out_streams : list of streams call_streams : list of streams

When a new value is added to a stream in this list a state transition is invoked. This the usual way (but not the only way) in which state transitions occur.
state: object
The state of the agent. The state is updated after a transition.
transition: function
This function is called by next() which is the state-transition operation for this agent. An agent’s state transition is specified by its transition function.
stream_manager : function
Each stream has management variables, such as whether the stream is open or closed. After a state-transition the agent executes the stream_manager function to modify the management variables of the agent’s output and call streams.
name : str, optional
name of this agent
_in_lists: list of InList
InList defines the slice of a list. The j-th element of _in_lists is an InList that defines the slice of the j-th input stream that may be read by this agent in a state transition.
_out_lists: list
The j-th element of the list is the list of values to be appended to the j-th output stream after the state transition.
next(stream_name=None)
Execute a state transition. The method has 3 parts:
  1. set up the data structures to execute a state transition,

  2. call transition to: (a) get the values to be appended to output streams, (b) get the next state, and (c) update pointers into input streams identifying what

    parts of the stream may be read in the future.

  3. update data structures after the transition.

next(stream_name=None)

Execute the next state transition.

This function does the following: Part 1: set up data structures for the state transition. Part 2: execute the state transition by calling self.transition Part 3: update data structures after the transition.

This method can be called by any agent and is called whenever a value is appended to any stream in call_streams

stream_name : str, optional
A new value was appended to the stream with name stream_name as a result of which this agent executes a state transition.
class nested.Agent.InList(list, start, stop)

Bases: tuple

list

Alias for field number 0

start

Alias for field number 1

stop

Alias for field number 2

nested.Animation module

Using a JSON file in my special format, this module generates the Javascript data structures for the animation:

  1. The graph

    ie. how are the nodes/edges arranged?

  2. The animation sequence

    ie. which values are at each edge at each time step

nested.Animation.make_graph(agent_descriptor_dict, stream_names_tuple)

Returns the 2 strings whose values are the edge and node arrays for the JS file

agent_descriptor_dict : dict

Dict form of JSON in our special format. Component names paired with the associated:

in streams, out streams, function, parameters, type, state
stream_names_tuple : list
List of all stream names.
nodes : str
String representation of the ‘nodes’ array for Cytoscape JS to create a graph
edges : str
String representation of the ‘edges’ array for Cytoscape JS to create a graph
nested.Animation.make_js(json_file)

Using a JSON file of my format, generate Javascript text the fills in a template .js file with:

graph configuration (draws the graph) animation sequence (animates graph)

Opens default browser to display animation.

json_file : json
JSON file object of a JSON in my special format

None

nested.Animation.make_seq(agent_descriptor_dict, stream_names_tuple)

Executes graph. All components are fired at every time step and value/edge pairs are stored and returned to populate the JS file.

agent_descriptor_dict : dict

Dict form of JSON in our special format. Component names paired with the associated:

in streams, out streams, function, parameters, type, state
stream_names_tuple : list
List of all stream names.

stream_str + ‘

‘ + selector_str + ‘ ‘ + val_str : str

String representation of 3 JS arrays of (# of time steps) * (# of streams) elements, where elements at each index correspond to each other

‘stream_str’: ‘stream_name’ array with names of all streams ‘selector_str’: ‘edge’ array is ‘stream_name’ array but

formatted as Cytoscape edge selectors

‘val_str’: ‘value’ array with stream values

nested.MakeNetwork module

Handle JSON <–> agent descriptor dict -> Streams network

nested.MakeNetwork.JSON_to_descriptor_dict_and_stream_names(my_json_file_name)
nested.MakeNetwork.make_agent_descriptor_dict(instance_dict, comp_list)
nested.MakeNetwork.make_my_JSON(instance_dict, comp_list, json_data)
nested.MakeNetwork.make_network(stream_names_tuple, agent_descriptor_dict)

This function makes a network of agents given the names of the streams in the network and a description of the agents in the network.

stream_names_tuple: tuple of lists
A tuple consisting of names of streams in the network. Each stream in the network must have a unique name.
agent_descriptor_dict: dict of tuples

The key is an agent name The value is a tuple:

in_list, out_list, f, f_type, f_args, state where:

in_list: list of input stream names out_list: list of output stream names f: function associated with the agent f_type: ‘element’, ‘list’, ‘window’, etc f_args: tuple of arguments for functions f state: the state associated with this agent.
stream_dict: dict
key: stream name value: Stream
agent_dict: dict

key: agent name value: agent with the specified description:

in_list, out_list, f, f_type, f_args, state, call_streams=[timer_stream] where one timer stream is associated with each agent.
agent_timer_dict: dict
key: agent_name value: Stream The value is the timer stream associated with the agent. When the timer stream has a message, the agent is made to execute a step.
nested.MakeNetwork.make_stream_names_tuple(instance_dict, comp_list)

nested.Operators module

This module has functions that convert operations on standard Python data structures to operations on streams.

The module has three collections of functions: (1) functions that convert operations on standard Python data structures to operations on streams. These functions operate on a list of input streams to generate a list of output streams. The functions deal with the following data structures: lists, elements of lists, (moving) windows, and timed windows. (2) functions that map the general case of multiple input streams and multiple output streams described above to the following special cases:

  1. merge: an arbitrary number of input streams and a single output stream.
  2. split: a single input stream and an arbitrary number of output streams.
  3. op: a single input stream and a single output stream.
  4. source: no input and an arbitrary number of output streams.

(e) sink: no ouput and an arbitrary number of input streams. These special cases simplify that functions that need to be written for standard Python data structures. You can always use the arbitrary number of inputs and outputs case even if there is only one or zero input or output streams. The functions for merge, split, op, source, and sink are simpler than the general case; so use them, where appropriate.

(3) a function that provides a single common signature for converting operations on Python structures to operations on streams regardless of whether the function has no inputs, a single input stream, a list of input streams, or no outputs, a single output stream or a list of output streams.

nested.Operators.assert_is_list(x)
nested.Operators.assert_is_list_of_lists(x, list_size=None)
nested.Operators.assert_is_list_of_streams(x)
nested.Operators.assert_is_list_of_streams_or_None(x)
nested.Operators.assert_is_list_or_None(x)
nested.Operators.asynch_element_agent(f, inputs, outputs, state, call_streams, window_size, step_size)
nested.Operators.asynch_element_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.element_agent(f, inputs, outputs, state, call_streams, window_size, step_size)
nested.Operators.element_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.h(f_type, *args)
nested.Operators.h_agent(f_type, *args)
nested.Operators.list_agent(f, inputs, outputs, state, call_streams, window_size, step_size)
nested.Operators.list_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.list_index_for_timestamp(in_list, start_index, timestamp)

A helper function for timed operators. The basic idea is to return the earliest index in in_list.list with a time field that is greater than or equal to timestamp. If no such index exists then return a negative number.

Returns positive integer i where: either: ‘FOUND TIME WINDOW IN IN_LIST’

i > start_index and i <= in_list.stop and in_list.list[i-1].time >= timestamp and (i == start_index+1 or in_list.list[i-2].time < timestamp)
or: ‘NO TIME WINDOW IN IN_LIST’
i < 0 and
(in_list.list[in_list.stop-1] <= timestamp
or

(in_list.start = in_list.stop)

Requires:
start_index >= in_list.start and start_index < in_list.stop
nested.Operators.main()
nested.Operators.many_outputs_source(f_type, f, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.many_outputs_source_agent(f_type, f, f_args, outputs, state, call_streams, window_size, step_size)
nested.Operators.many_to_many(f_type, f, in_streams, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.many_to_many_agent(f_type, f, f_args, in_streams, out_streams, state, call_streams, window_size, step_size)
nested.Operators.merge(f_type, f, in_streams, state, call_streams, window_size, step_size)
nested.Operators.merge_agent(f_type, f, f_args, in_streams, out_stream, state, call_streams, window_size, step_size)
nested.Operators.op(f_type, f, in_stream, state, call_streams, window_size, step_size)
nested.Operators.op_agent(f_type, f, f_args, in_stream, out_stream, state, call_streams, window_size, step_size)
nested.Operators.remove_novalue_and_open_multivalue(l)

This function returns a list which is the same as the input parameter l except that (1) _no_value elements in l are deleted and (2) each _multivalue element in l is opened

i.e., for an object _multivalue(list_x) each element of list_x appears in the returned list.
nested.Operators.single_output_source(f_type, f, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.single_output_source_agent(f_type, f, f_args, out_stream, state, call_streams, window_size, step_size)
nested.Operators.sink(f_type, f, in_stream, state, call_streams, window_size, step_size)
nested.Operators.sink_agent(f_type, f, f_args, in_stream, state, call_streams, window_size, step_size)
nested.Operators.split(f_type, f, in_stream, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.split_agent(f_type, f, f_args, in_stream, out_streams, state, call_streams, window_size, step_size)
nested.Operators.stream_agent(inputs, outputs, f_type, f, f_args, state=None, call_streams=None, window_size=None, step_size=None)

Provides a common signature for converting functions f on standard Python data structures to streams.

f_type : {‘element’, ‘list’, ‘window’, ‘timed’, ‘asynch_element’}
f_type identifies the type of function f where f is the next parameter.

f : function inputs : {Stream, list of Streams}

When stream_func has:
no input streams, inputs is None a single input Stream, inputs is a single Stream multiple input Streams, inputs is a list of Streams.

outputs : list of Streams state : object

state is None or is an arbitrary object. The state captures all the information necessary to continue processing the input streams.
call_streams : None or list of Stream
If call_streams is None then the program sets it to inputs (converting inputs to a list of Stream if necessary). This function is called when, and only when any stream in call_streams is modified.
window_size : None or int
window_size must be a positive integer if f_type is ‘window’ or ‘timed’. window_size is the size of the moving window on which the function operates.
step_size : None or int
step_size must be a positive integer if f_type is ‘window’ or ‘timed’. step_size is the number of steps by which the moving window moves on each execution of the function.

None

nested.Operators.stream_func(inputs, f_type, f, num_outputs, state=None, call_streams=None, window_size=None, step_size=None)

Provides a common signature for converting functions f on standard Python data structures to streams.

f_type : {‘element’, ‘list’, ‘window’, ‘timed’, ‘asynch_element’}
f_type identifies the type of function f where f is the next parameter.

f : function inputs : {Stream, list of Streams}

When stream_func has:
no input streams, inputs is None a single input Stream, inputs is a single Stream multiple input Streams, inputs is a list of Streams.
num_outputs : int
A nonnegative integer which is the number of output streams of this function.
state : object
state is None or is an arbitrary object. The state captures all the information necessary to continue processing the input streams.
call_streams : None or list of Stream
If call_streams is None then the program sets it to inputs (converting inputs to a list of Stream if necessary). This function is called when, and only when any stream in call_streams is modified.
window_size : None or int
window_size must be a positive integer if f_type is ‘window’ or ‘timed’. window_size is the size of the moving window on which the function operates.
step_size : None or int
step_size must be a positive integer if f_type is ‘window’ or ‘timed’. step_size is the number of steps by which the moving window moves on each execution of the function.
list of Streams
Function f is applied to the appropriate data structure in the input streams to put values in the output streams. stream_func returns the output streams.
nested.Operators.timed_agent(f, inputs, outputs, state, call_streams, window_size, step_size)
nested.Operators.timed_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)
nested.Operators.window_agent(f, inputs, outputs, state, call_streams, window_size, step_size)
nested.Operators.window_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)

nested.Stream module

This module contains the Stream class. The Stream and Agent classes are the building blocks of PythonStreams.

class nested.Stream.Stream(name='No Name', proc_name='Unkown Process')

Bases: object

A stream is a sequence of values. Agents can: (1) Append values to the tail of stream. (2) Read a stream. (3) Subscribe to be notified when a value is added to a stream.

The ONLY way in which a stream can be modified is that values can be appended to its tail. The length of a stream (number of elements in its sequence) can stay the same or increase, but never decreases. If at some point, the length of a stream is k, then from that point onwards, the first k elements of the stream remain unchanged.

A stream is written by only one agent. Any number of agents can read a stream, and any number of agents can subscribe to a stream. An agent can be a reader and a subscriber and a writer of the same stream.

If agent x is a subscriber to a stream s then x.next() — a state transition of x — is invoked whenever s is modified.

READING A STREAM An agent can read a stream only after it registers with the stream as a reader. An agents r registers with a stream s by executing s.reader(r).

After a reader of a stream s reads the first k values s, the reader may determine that from that point onwards, it will no longer read the first j values of the stream for some j where j <= k. A reader r of a stream s can inform s at a point in the computation that from that point onwards r will no longer read the first j values of the stream, for some j. Stream s uses this information to manage its memory.

Associated with each stream s is a list s.recent which consists of the most recent elements of s. s.recent is a tailing subsequence (or suffix) of s. If the value of s is a sequence s[0], ..., s[n-1], at a point in a computation then at that point, s.recent is a list s[m], .., s[n-1] for some m.

The length of s.recent is large enough so that at each point in a computation, all readers of stream s only read elements of s that are in s.recent. Operations on streams are implemented as operations on lists. A reader reads a stream s by reading the list s.recent.

Associated with a reader r of stream s is an integer s.start[r]. Reader r can only read the slice s.recent[s.start[r] : ] of the list recent. Reader r informs stream s that it will only read values in the list recent with indexes greater than or equal to j by calling

s.set_start(r, j)

which causes s.start[r] to be set to j.

For readers r1 and r2 of a stream s the values s.start[r1] and s.start[r2] may be different.

WRITING A STREAM An agent adds elements to a stream s by calling s.append(value) or s.extend(value_list); these operations are similar to operations on lists. s.append(value) appends the single value to the tail of the list and s.extend(value_list) extends the stream by the sequence of values in the list value_list.

SUBSCRIBE TO BE CALLED WHEN A STREAM IS MODIFIED An agent x subscribes to a stream s by executing

s.call(x).

Then, when stream s is modified, s calls x.next(s) where next() executes a state-transition. An agent x unsubscribe from a stream s by executing

s.delete_caller(x)

CLOSING A STREAM A stream can be closed or open (i.e., not closed). Initially a stream is open. The agent that writes a stream s can close s by executing s.close(). A closed stream cannot be modified.

Associated with a stream s is: (1) a list, s.recent. (2) a nonnegative integer s.stop where:

(a) the slice s.recent[:s.stop] contains the most recent values of stream s, and (b) the slice s.recent[s.stop:] is padded with padding values (either 0 or 0.0).
  1. a nonnegative integer s.offset where
    recent[i] = stream[i + offset]

    for 0 <= i < s.stop

For example, if the stream s consists of range(950), i.e., 0, 1, .., 949, and s.offset is 900, then s.recent[i] = s[900+i] for i in range(50).

Note that the number of entries in stream s is: s.offset + s.stop

name: str (optional)
name of the stream. Though the name is optional a named stream helps with debugging.
recent: list
A list of the most recent values of the stream.
stop: index into the list recent.
s.recent[:s.stop] contains the s.stop most recent values of stream s. s.recent[s.stop:] contains padded values.
offset: index into the stream.
For a stream s: s.recent[i] = s[i + offset] for i in range(s.stop)
start: dict of readers.
key = reader value = start index of the reader Reader r can read the slide recent[start[r] : ]
subscribers_set: set
the set of subscribers for this stream, agents to be notified when an element is added to the stream.
closed: boolean
True if and only if the stream is closed. A closed stream is not modified.
_buffer_size: nonnegative integer
Used to manage the recent list.
_begin: index into the list recent
recent[_begin:] mqy be read by some reader. recent[:_begin] is not being accessed by any reader; therefore recent[:_begin] can be safely deleted.
append(value)

Append a single value to the end of the stream.

call(agent)

Register a subscriber for this stream.

close()

Close this stream.”

delete_caller(agent)

Delete a subscriber for this stream.

delete_reader(reader)

Delete this reader from this stream.

extend(value_list)

Extend the stream by the list of values, value_list.

value_list: list
print_recent()
reader(reader, start=0)

Register a reader.

The newly registered reader starts reading list recent from index start, i.e., reads the slice recent[start:s.stop] If reader has already been registered with this stream its start value is updated to the parameter in the call.

set_name(name)
set_start(reader, start)

The reader tells the stream that it is only accessing elements of the list recent with index start or higher.

class nested.Stream.StreamArray(name=None)

Bases: nested.Stream.Stream

extend(a)

Extend the stream by an numpy ndarray.

a: np.ndarray or list
class nested.Stream.StreamSeries(name=None)

Bases: nested.Stream.Stream

class nested.Stream.StreamTimed(name=None)

Bases: nested.Stream.Stream

class nested.Stream.TimeAndValue(time, value)

Bases: tuple

time

Alias for field number 0

value

Alias for field number 1

nested.Subgraph module

This module handles unwrapping nested subgraphs, provided the JSON files of each graph.

nested.Subgraph.make_json(json_file_name)

Checks and converts input JSON file to a JSON file in my special format if it’s not already

json_file_name : str
Path to JSON file to be converted
my_json_file_name : str
Path to converted JSON file
nested.Subgraph.unwrap_subgraph(my_json_file_name)

Recursively exposes nested subgraphs to be executed for the animation.

my_json_file_name : str
Path to JSON file of my special format to be converted
“json_file.json” : str
json_file.json is the name of the file with the fully exposed graph

nested.SystemParameters module

SYSTEM_PARAMETERS

nested.components module

This module holds all the basic Python functions that each component represents. Include your own functions here.

nested.components.consecutive_ints(state)
nested.components.generate_of_random_integers(f_args=(100, ))

generate_of_random_integers() generates a random integer

f_args : tuple
First element is the maximum integer generated
randint(0, max_integer) : int
Integer between 0 and ‘max_integer’
nested.components.make_circles(curr_num)
nested.components.make_rectangles(curr_num)
nested.components.make_triangles(curr_num)
nested.components.multiply_elements(v, f_args)

multiply_elements() returns the product of 2 numbers

v : int/float
Number, potentially from a stream
f_args : list
First element is another number. Constant parameter.
multiplier * v : list
Product
nested.components.print_value(v, index)

print_value() prints out to console the value it was passed.

v : any
Value to be printed
index : int
Index of the value in the stream
index + 1 : int
Index of next element to be printed
nested.components.show(curr_num, stop_num)
nested.components.split(m, f_args)

split() returns the input number as the second value in a 2-element list, where the first value is ‘_no_value_’ based on whether it’s a multiple of the argument parameter

m : int/float
Number, potentially from a stream
f_args : list
List where 1st element is a number that you’re comparing ‘m’ to
[_no_value, m] or [m, _no_value] : list
m is sorted into the 1st or 2nd element of the list
nested.components.split_into_even_odd(m)

split_into_even_off() returns an even number as the second value in a 2-element list, where the first value is ‘_no_value_’ (vice versa for odd numbers).

m : int/float
Number, potentially from a stream
[_no_value, m] or [m, _no_value] : list
m is sorted into the 1st or 2nd element of the list based on its parity

nested.helper module

Helper functions that aid in renaming streams and components and creating data structures that we need to convert from the Flowhub UI’s JSON to my special JSON.

nested.helper.cast(s)

cast() automatically converts a str to the object type associated with its value (float, int or str)

s : str
String of possibly a number
s : int/float/str
Same thing as input arg but as the appropriate object type
nested.helper.clean_id(component)

clean_id() splits and returns the component name with id in two strings

component : str
Component names with random id
label : str
Plain component name with no id
cid : str
The id that was appended to the component
nested.helper.make_comp_list(instance_dict)

make_comp_list() creates a dict used for replacing the random 4 or 5 char id associated with each instance of a component with a shorter integer in name_with_new_id()

instance_dict : dict
Component names with random id’s paired with dict of it’s ‘in’ and ‘out’ ports
comp_list : dict
Plain component name paired with list of id’s associated with it
nested.helper.make_instance_dict(data, instances)

make_instance_dict() makes a dict of component names with id paired with a dict of streams that go in and out of that component instance Eg. {component: {‘in’: [in_stream], ‘out’: [out_stream]}}

data : dict
Dict created from a Flowhub UI generated JSON file with the key ‘connections’
instances : list
List of each unique component-with-id name
instance_dict : dict
Dict of each unique component-with-id with a dict of in and out streams
nested.helper.name_with_new_id(comp_list, name, id)

name_with_new_id() replaces a component name, if there are multiple instances of it, with the random id with an integer (1, 2, 3,...) based on the index of the random id in comp_list.

comp_list : dict
Dict of each component name paired with list of id’s associated with it
name : str
Plain component name
id: str
Random id attached to component name
name : str
Component name appended with new integer id

nested.run module