API Reference¶
cli¶
task¶
Classes:
|
Base class of all Tasks, internally, BaseTask iteratively fetch items emitted by Channel inputs asynchronously. |
|
A edge represents a dependency between a channel and a task. |
|
Emit context endlessly, this is aimed for building dependency between flow and input context/config. |
|
RunTask is subclass of BaseTask, representing tasks with run method exposed to users to implement specific item processing logics. |
|
Task is subclass of RunTask: 1. |
Exceptions:
|
|
|
-
class
flowsaber.core.task.
BaseTask
(num_out: int = 1, **kwargs)¶ Base class of all Tasks, internally, BaseTask iteratively fetch items emitted by Channel inputs asynchronously. And then push the processed result of each item into the _output channels. All items are handled in sequence.
Methods:
enqueue_res
(data[, index])Enqueue processed data into _output channels.
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
initialize_context
()Initialize some attributes of self.config into self.context
initialize_input
(*args, **kwargs)Wrap all _input channels into a consumer object for simultaneous data ferching,
initialize_output
()Create _output channels according to self.num_output
-
async
enqueue_res
(data, index=None)¶ Enqueue processed data into _output channels. Parameters ———- data index
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
handle_input
(data, *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
initialize_context
()¶ Initialize some attributes of self.config into self.context
-
initialize_input
(*args, **kwargs)¶ Wrap all _input channels into a consumer object for simultaneous data ferching,
args kwargs
-
initialize_output
()¶ Create _output channels according to self.num_output
-
async
-
class
flowsaber.core.task.
Edge
(channel: flowsaber.core.channel.Channel, task: flowsaber.core.task.BaseTask)¶ A edge represents a dependency between a channel and a task. the Task consumes data emited by the channel.
-
class
flowsaber.core.task.
GetContext
(check_fn: Callable = None, **kwargs)¶ Emit context endlessly, this is aimed for building dependency between flow and input context/config. it’s optional, users can also change parameters for different flowruns by rebuilding the flow. However, to make it also work in remote execution, the flow is supposed to read parameters from output of this task. Also note that, since the flow upload to server are built flow, the structure of flow is fixed, python expressions like if, will not work again as to change the structure. With the help of this task, it can simulate some sort of dynamics.
Methods:
run
()The method users need to implement for processing the data emited by _input channels.
-
run
()¶ The method users need to implement for processing the data emited by _input channels. Parameters ———- args kwargs
-
-
exception
flowsaber.core.task.
RunDataFileNotFoundError
¶
-
exception
flowsaber.core.task.
RunDataTypeError
¶
-
class
flowsaber.core.task.
RunTask
(num_out: int = 1, **kwargs)¶ RunTask is subclass of BaseTask, representing tasks with run method exposed to users to implement specific item processing logics. Compared to BaseTask: 1. Runs of multiple inputs will be executed in parallel. 2. Runs will be executed in the main loop.
Methods:
call_run
(data, **kwargs)Create a fresh task object and call it’s run method for real data processing.
check_run_data
(data, **kwargs)Match types of _input datas into self.run’s annotations by type conversion.
create_run_data
(data)Wrap consumer fetched data tuple into a BoundArgument paired with self.run’s signature.
handle_consumer
(consumer, **kwargs)Run processing functions in parallel by submitting jobs into schedulers that return awaitable Future-like objects.
handle_run_data
(data, **kwargs)This coroutine will be executed in parallel, thus need to re-enter self.context.
initialize_context
()Expose cache_type and executor_type into self.context
run
(*args, **kwargs)The method users need to implement for processing the data emited by _input channels.
-
async
call_run
(data: inspect.BoundArguments, **kwargs)¶ Create a fresh task object and call it’s run method for real data processing.
-
async
check_run_data
(data: inspect.BoundArguments, **kwargs)¶ Match types of _input datas into self.run’s annotations by type conversion. Check file integrity. Parameters ———- data kwargs
-
create_run_data
(data: Union[tuple, flowsaber.core.utility.target.End]) → inspect.BoundArguments¶ Wrap consumer fetched data tuple into a BoundArgument paired with self.run’s signature. Parameters ———- data
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Run processing functions in parallel by submitting jobs into schedulers that return awaitable Future-like objects. Parameters ———- consumer kwargs
-
async
handle_run_data
(data: inspect.BoundArguments, **kwargs)¶ This coroutine will be executed in parallel, thus need to re-enter self.context. Parameters ———- data kwargs
-
initialize_context
()¶ Expose cache_type and executor_type into self.context
-
run
(*args, **kwargs)¶ The method users need to implement for processing the data emited by _input channels. Parameters ———- args kwargs
-
async
-
class
flowsaber.core.task.
Task
(**kwargs)¶ Task is subclass of RunTask: 1. Each Task will have a unique task_key/task_workdir 2. Each _input’s run will have a unique run_key/task_workdir. 3. Task’s run will be executed in executor and handled by a task runner. 4. Within the task runner, task will pass through a state machine, callbacks can be registered to each state changes.
Methods:
call_run
(data, **kwargs)Call self.run within the control of a asyncio.Lock identified by run_workdir Parameters ———- data kwargs
clean
()Functions called after the execution of task.
handle_res
(res)Only push Success state result into _output channels.
initialize_context
()Expose cache_type and executor_type into self.context
need_skip
(data)Check if the _input can be directly passed into _output channels by predicate of user specified self.skip_fn Parameters ———- data
skip
(skip_fn)A decorator/function exposed for users to specify skip function.
Attributes:
task_hash
get the task_hash of this task defined by the real source code of self.run
task_workdir
Task’s workdir is resolved in a bottom up way.
-
async
call_run
(data: inspect.BoundArguments, **kwargs) → flowsaber.core.utility.state.State¶ Call self.run within the control of a asyncio.Lock identified by run_workdir Parameters ———- data kwargs
-
clean
()¶ Functions called after the execution of task. For example, Cache need to persist cached data.
-
async
handle_res
(res)¶ Only push Success state result into _output channels. Some state may be skipped in case of Exceptions occurred within the task runner and thus return a Drop(Failure) state as a signal. Parameters ———- res
-
initialize_context
()¶ Expose cache_type and executor_type into self.context
-
need_skip
(data: inspect.BoundArguments) → bool¶ Check if the _input can be directly passed into _output channels by predicate of user specified self.skip_fn Parameters ———- data
-
skip
(skip_fn: Callable)¶ A decorator/function exposed for users to specify skip function.
skip_fn
-
property
task_hash
¶ get the task_hash of this task defined by the real source code of self.run
-
property
task_workdir
¶ Task’s workdir is resolved in a bottom up way. In this hierarchical way, users can change flowrun.context[‘flow_workdir’] by setting up the flowrun’s initial context, thus make the flow’s workdir configurable.
1: if the task_workdir is already absolute, then use it 2: if parent_flow_workdir/task_workdir is already absolute, then use it 3: otherwise use top_flow_workdir/parent_flow_workdir/task_workdir as the workdir
-
async
operators¶
Classes:
|
Dispatch _input into specified number of channels base on the returned index of the predicate function. |
|
Opposite to flatten, turns a channel into a tuple |
|
Data in channels are concatenated in the order of _input channels. |
|
|
|
|
|
Remove continuously duplicated item. |
|
Filter item by the predicate function or the comparing identity. |
|
Take the first item |
|
Flatten the _output of channel. |
|
Alias for GetItem task |
|
Get item from the output of the input channel with specified key, like obj[key]. |
|
return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified. |
|
Take the last item |
|
Map each item to another item return by the specified map function into a new channel. |
|
|
|
Merge channels into a channel with _output of tuple. |
|
|
|
Data emitted bu channels are mixed into a single channel. |
|
Base class for all operators, subclass of BaseTask, all operators runs in the main loop in sequence and do not have runners and run states. |
|
Similar to normal reduce. |
|
Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.). |
|
Alias for GetItem task. |
|
Used when _output is tuple/list, use split to split each item of the tuple into a unique channel. |
|
specify on_next or on_complete function as callbacks of these two event. |
|
|
|
Only take the first num number of items |
|
Emit items at most once(no duplicate). |
|
“Take items until meet a stop marker. |
|
Print each item emitted by the channel, Equals to call Subscribe(on_next=print) |
alias of |
-
class
flowsaber.core.operators.
Branch
(num: int, by: Callable, **kwargs)¶ Dispatch _input into specified number of channels base on the returned index of the predicate function.
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Collect
(num_out: int = 1, **kwargs)¶ Opposite to flatten, turns a channel into a tuple
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Concat
(num_out: int = 1, **kwargs)¶ Data in channels are concatenated in the order of _input channels.
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Constant
(**kwargs)¶ Methods:
Create _output channels according to self.num_output
-
initialize_output
()¶ Create _output channels according to self.num_output
-
-
class
flowsaber.core.operators.
Count
(**kwargs)¶
-
class
flowsaber.core.operators.
Distinct
(**kwargs)¶ Remove continuously duplicated item.
-
class
flowsaber.core.operators.
Filter
(by: Union[Callable[[Any], bool], object] = <function Filter.<lambda>>, **kwargs)¶ Filter item by the predicate function or the comparing identity.
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
First
(**kwargs)¶ Take the first item
-
class
flowsaber.core.operators.
Flatten
(max_level: int = 1, **kwargs)¶ Flatten the _output of channel.
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Get
(key: Any, default: Any = None, **kwargs)¶ Alias for GetItem task
-
class
flowsaber.core.operators.
GetItem
(key: Any, default: Any = None, **kwargs)¶ Get item from the output of the input channel with specified key, like obj[key]. The input channel is excepted to emit objects with __getitem__ method. For example, tuple, list, dict ….
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data, *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Group
(by: Callable = <function Group.<lambda>>, num: int = inf, keep: bool = True, **kwargs)¶ return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Last
(**kwargs)¶ Take the last item
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Map
(by: Callable, **kwargs)¶ Map each item to another item return by the specified map function into a new channel.
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Max
(result: float = -inf, **kwargs)¶
-
class
flowsaber.core.operators.
Merge
(num_out: int = 1, **kwargs)¶ Merge channels into a channel with _output of tuple. Even if there is only one channel _input, always _output a tuple
-
class
flowsaber.core.operators.
Min
(result: float = inf, **kwargs)¶
-
class
flowsaber.core.operators.
Mix
(num_out: int = 1, **kwargs)¶ Data emitted bu channels are mixed into a single channel.
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Operator
(num_out: int = 1, **kwargs)¶ Base class for all operators, subclass of BaseTask, all operators runs in the main loop in sequence and do not have runners and run states.
Methods:
Initialize some attributes of self.config into self.context
-
initialize_context
()¶ Initialize some attributes of self.config into self.context
-
-
class
flowsaber.core.operators.
Reduce
(by: Callable[[Any, Any], Any], result=<object object>, **kwargs)¶ Similar to normal reduce. results = f(n, f(n - 1))
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Sample
(num: int = 1, **kwargs)¶ Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Select
(key: Any, default: Any = None, **kwargs)¶ Alias for GetItem task.
-
class
flowsaber.core.operators.
Split
(num: int, **kwargs)¶ Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.
-
class
flowsaber.core.operators.
Subscribe
(on_next: Callable = None, on_complete: Callable = None, **kwargs)¶ specify on_next or on_complete function as callbacks of these two event.
Methods:
handle_consumer
(consumer, **kwargs)Iteratively fetch data from consumer and then call processing function
-
async
handle_consumer
(consumer: flowsaber.core.channel.Consumer, **kwargs)¶ Iteratively fetch data from consumer and then call processing function
consumer kwargs
-
async
-
class
flowsaber.core.operators.
Sum
(**kwargs)¶
-
class
flowsaber.core.operators.
Take
(num: int = 1, **kwargs)¶ Only take the first num number of items
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data, *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
Unique
(**kwargs)¶ Emit items at most once(no duplicate).
-
class
flowsaber.core.operators.
Until
(by: Union[Callable[[Any], bool], object], **kwargs)¶ “Take items until meet a stop marker. the stop item will not be included
Methods:
handle_input
(data, *args, **kwargs)Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
handle_input
(data, *args, **kwargs)¶ Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs
-
async
-
class
flowsaber.core.operators.
View
(fmt='{x}', **kwargs)¶ Print each item emitted by the channel, Equals to call Subscribe(on_next=print)
-
flowsaber.core.operators.
var
¶ alias of
flowsaber.core.operators.Count
shell¶
Classes:
|
Task that execute bash command by using subprocess. |
|
Task used for composing bash command and then execute the command. |
Exceptions:
-
class
flowsaber.tasks.shell.
BashTask
(**kwargs)¶ Task that execute bash command by using subprocess.
Methods:
execute_command
(cmd, run_workdir[, envs])Run command in shell
get_publish_dirs
(flow_workdir, …)Get absolute path of configured publish dirs.
glob_output_files
(item, run_workdir, …)Iterate over the item hierarchically and convert in-place and glob found str into Files and collect them into the third parameter.
run
(cmd[, output, envs])This method should be thread safe, can not run functions depends one process-base attributes like ENV, ….
-
execute_command
(cmd: str, run_workdir: Union[str, pathlib.Path], envs: dict = None) → Tuple[pathlib.Path, pathlib.Path]¶ Run command in shell
cmd run_workdir envs
-
static
get_publish_dirs
(flow_workdir, configured_publish_dirs: List[str])¶ Get absolute path of configured publish dirs.
flow_workdir configured_publish_dirs
-
classmethod
glob_output_files
(item, run_workdir, collect_files: List[flowsaber.core.utility.target.File])¶ Iterate over the item hierarchically and convert in-place and glob found str into Files and collect them into the third parameter.
item run_workdir collect_files
-
run
(cmd: str, output=None, envs: dict = None)¶ This method should be thread safe, can not run functions depends one process-base attributes like ENV, ….
cmd output envs: dict
-
-
exception
flowsaber.tasks.shell.
CommandTaskComposeError
¶
-
class
flowsaber.tasks.shell.
ShellTask
(**kwargs)¶ Task used for composing bash command and then execute the command. Users need to implement the command method. Note that the _output Channel of this task simply emits composed bash command in str type, and this bash command needs to be actually executed by ShellTask.
Methods:
command
(*args, **kwargs)Users need to implement this function to compose the final bash command.
run
(*args, **kwargs)Users need to implement this function to compose the final bash command.
Attributes:
get the task_hash of this task defined by the real source code of self.run
-
command
(*args, **kwargs) → str¶ Users need to implement this function to compose the final bash command.
The returned value of this method represents the expected outputs after executing the composed bash command in shell:
None represents the output is stdout.
str variables represents glob syntax for files in the working directory.
- To tell flowsaber what’s the composed bash command, users has two options:
1: Assign the composed command to a variable named CMD. 2: Write virtual fstring as the docstring of command method. All variables in the command method
scoped can be used freely.
Here are some examples:
- class A(CommandTask):
- def command(self, fa, fasta):
“bwa map -t {self.context.cpu} {fa} {fasta} -o {bam_file}” bam_file = “test.bam” return bam_file
- class B(CommandTask):
- def command(self, file):
a = “xxxx” b = ‘xxxx’ CMD = f”echo {a}
- “
f”echo {b}
- “
f”cat {file}”
# here implicitly returned a None, represents the _output of cmd is stdout
args kwargs
-
run
(*args, **kwargs) → str¶ Users need to implement this function to compose the final bash command.
The returned value of this method represents the expected outputs after executing the composed bash command in shell:
None represents the output is stdout.
str variables represents glob syntax for files in the working directory.
- To tell flowsaber what’s the composed bash command, users has two options:
1: Assign the composed command to a variable named CMD. 2: Write virtual fstring as the docstring of command method. All variables in the command method
scoped can be used freely.
Here are some examples:
- class A(CommandTask):
- def command(self, fa, fasta):
“bwa map -t {self.context.cpu} {fa} {fasta} -o {bam_file}” bam_file = “test.bam” return bam_file
- class B(CommandTask):
- def command(self, file):
a = “xxxx” b = ‘xxxx’ CMD = f”echo {a}
- “
f”echo {b}
- “
f”cat {file}”
# here implicitly returned a None, represents the _output of cmd is stdout
args kwargs
-
property
task_hash
¶ get the task_hash of this task defined by the real source code of self.run
-
-
exception
flowsaber.tasks.shell.
ShellTaskExecuteError
¶
flow¶
Classes:
|
Represents the organizer of tasks, flows can also be used as components. |
-
class
flowsaber.core.flow.
Flow
(**kwargs)¶ Represents the organizer of tasks, flows can also be used as components. Except for the top-most flow which represents the whole running unit, all flows within are simply virtual flows and don’t have running state like flowrun or taskrun. However, flows and tasks all can have personalized configs.
Methods:
call_initialize
(*args, **kwargs)Copy a new one and initialize some attributes.
initialize_context
()Expose some attributes of self.config into self.context.
start_execute
(**kwargs)The up most flow needd to initialize executors.
-
call_initialize
(*args, **kwargs)¶ Copy a new one and initialize some attributes. Parameters ———- args kwargs
-
initialize_context
()¶ Expose some attributes of self.config into self.context.
-
async
start_execute
(**kwargs)¶ The up most flow needd to initialize executors. Parameters ———- kwargs
-
base¶
Classes:
|
Base class of Flow and Task |
Exceptions:
|
|
|
Functions:
|
A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context. |
|
A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context. |
-
class
flowsaber.core.base.
Component
(**kwargs)¶ Base class of Flow and Task
Classes:
State
An enumeration.
Methods:
call_initialize
(*args, **kwargs)Copy a new one and initialize some attributes.
get_full_name
()Generate a name like flow1.name|flow2.name|flow3.name|cur_task
initialize_context
()Called by call_initialize, merge and update self.config dict of self.context from different sources.
start
(**kwargs)Start running the Flow/Task in the context of self.context, before setting the context, self.context will be merged/updated from kwargs.get(‘context’, {}) Parameters ———- kwargs
Attributes:
config
return a non-editable context
-
class
State
¶ An enumeration.
-
call_initialize
(*args, **kwargs)¶ Copy a new one and initialize some attributes. Parameters ———- args kwargs
-
property
config
¶ return a non-editable context
-
get_full_name
() → str¶ Generate a name like flow1.name|flow2.name|flow3.name|cur_task
-
initialize_context
()¶ Called by call_initialize, merge and update self.config dict of self.context from different sources.
-
async
start
(**kwargs)¶ Start running the Flow/Task in the context of self.context, before setting the context, self.context will be merged/updated from kwargs.get(‘context’, {}) Parameters ———- kwargs
-
class
-
exception
flowsaber.core.base.
ComponentCallError
(*args, trace_back=None)¶
-
exception
flowsaber.core.base.
ComponentExecuteError
(*args, futures=None, trace_back=None)¶
-
flowsaber.core.base.
aenter_context
(method: Callable[[...], Any]) → Any¶ A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.
method
-
flowsaber.core.base.
enter_context
(method: Callable[[...], Any]) → Any¶ A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.
method
context¶
During the running of flow, flowsaber.context will automatically be updated, Ideally, at the end of a task run, context will be looked like this:
``` flow_config: {} task_config: {}
flow_id flow_name flow_full_name flow_labels
task_id task_name task_full_name task_labels
id flowrun_name
Classes:
|
The global coroutine-safe context meant to be used for inferring the running status of a flow at any time. |
Functions:
|
Inject context attrs into the log record base on context.context_attrs. |
-
class
flowsaber.core.context.
FlowSaberContext
(*args: Any, **kwargs: Any)¶ The global coroutine-safe context meant to be used for inferring the running status of a flow at any time. Compared to raw context, this global context has intelligent(automatically change according to the time/position of the callee) properties:
cache: used by running flow/task run_lock: used by running task logger: used in anywhere and anytime executor: used by running task
Attributes:
Fetch a Cache based on cache_type in the current context.
Fetch an initialized executor based on executor_type in the current context.
Get a child logger of flowsaber logger with name of: callee.__name__.agent_id.flow_id.id.task_id.taskrun_id
Methods:
lock
(keys)Fetch a asyncio.Lock based on run_workdir in the current context.
-
property
cache
¶ Fetch a Cache based on cache_type in the current context. Returns ——-
-
property
executor
¶ Fetch an initialized executor based on executor_type in the current context. Returns ——-
-
lock
(keys: List[str])¶ Fetch a asyncio.Lock based on run_workdir in the current context. Returns ——-
-
property
logger
¶ Get a child logger of flowsaber logger with name of: callee.__name__.agent_id.flow_id.id.task_id.taskrun_id
-
property
-
flowsaber.core.context.
inject_context_attrs
(factory)¶ Inject context attrs into the log record base on context.context_attrs. Parameters ———- factory
channel¶
Classes:
|
AsyncChannel schedules asynchronous tasks when initialized, the async task accept the channel as the only parameter, and it’s expected to call self.put_nowait/self.put method as to send items into output queues. |
|
Subclass of ChannelBase implemented create_queue method, the mechanism for sending data to all created queue is simple, it just loop over all queues and push the item specified by call of put/put_nowait into all queues. |
|
A channel it self is an object for storing items by using put/put_nowait. |
|
A channel use ConstantQueue as it’s fetcher queue factory. |
|
A async queue will emit it’s internal value infinitely. |
|
Consumer is an object used for simultaneously fetching data emitted by multiple channels and output tuples. |
|
A subclass of AsyncChannel, it’s only async task runs in a while loop and periodically call the registered trigger methods |
|
Fetch simple provide a for/async for method support for classes implemented with get/get_nowait methods. |
|
Internally it’s an asyncio.Queue, but the inner queue will only be created when meets the first call it’s fetcher/putter methods. |
Exceptions:
|
-
class
flowsaber.core.channel.
AsyncChannel
(**kwargs)¶ AsyncChannel schedules asynchronous tasks when initialized, the async task accept the channel as the only parameter, and it’s expected to call self.put_nowait/self.put method as to send items into output queues.
-
class
flowsaber.core.channel.
Channel
(queue_factory: type = <class 'asyncio.queues.Queue'>, **kwargs)¶ Subclass of ChannelBase implemented create_queue method, the mechanism for sending data to all created queue is simple, it just loop over all queues and push the item specified by call of put/put_nowait into all queues. Furthermore, to make LazyAsyncQueue initialized only in a running event loop, Channel uses a buffer to buffer all items pushed before entering the event loop.
-
class
flowsaber.core.channel.
ChannelBase
(**kwargs)¶ A channel it self is an object for storing items by using put/put_nowait. To consume data from the channel, users must call ChannelBase.create_queue method to get a fetcher LazyAsyncQueue, and then call it’s get/get_nowait method to fetched the data emitted by the channel.
Methods:
branch
(*args, num, by, **kwargs)Dispatch _input into specified number of channels base on the returned index of the predicate function.
collect
(*args[, num_out])Opposite to flatten, turns a channel into a tuple
concat
(*args[, num_out])Data in channels are concatenated in the order of _input channels.
distinct
(*args, **kwargs)Remove continuously duplicated item.
filter
(*args[, by])Filter item by the predicate function or the comparing identity.
first
(*args, **kwargs)Take the first item
flatten
(*args[, max_level])Flatten the _output of channel.
from_list
([1, 2, 3, 4, 5])QueueChannel created by this method will always include a END signal
get
(*args, key[, default])Alias for GetItem task
getitem
(*args, key[, default])Get item from the output of the input channel with specified key, like obj[key].
group
(*args[, by, num, keep])return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.
last
(*args, **kwargs)Take the last item
map
(*args, by, **kwargs)Map each item to another item return by the specified map function into a new channel.
merge
(*args[, num_out])Merge channels into a channel with _output of tuple.
mix
(*args[, num_out])Data emitted bu channels are mixed into a single channel.
reduce
(*args, by[, result])Similar to normal reduce.
sample
(*args[, num])Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).
select
(*args, key[, default])Alias for GetItem task.
split
(*args, num, **kwargs)Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.
subscribe
(*args[, on_next, on_complete])specify on_next or on_complete function as callbacks of these two event.
take
(*args[, num])Only take the first num number of items
unique
(*args, **kwargs)Emit items at most once(no duplicate).
until
(*args, by, **kwargs)“Take items until meet a stop marker.
value
(value, **kwargs)Channel._output(1)
values
(1, 2, 3, 4, 5)QueueChannel created by this method will always include a END signal
view
(*args[, fmt])Print each item emitted by the channel, Equals to call Subscribe(on_next=print)
-
branch
(*args: Union[object, flowsaber.core.channel.Channel], num: int, by: Callable, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Dispatch _input into specified number of channels base on the returned index of the predicate function.
-
collect
(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Opposite to flatten, turns a channel into a tuple
-
concat
(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Data in channels are concatenated in the order of _input channels.
-
distinct
(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Remove continuously duplicated item.
-
filter
(*args: Union[object, flowsaber.core.channel.Channel], by: Union[Callable[[Any], bool], object] = <function Filter.<lambda>>, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Filter item by the predicate function or the comparing identity.
-
first
(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Take the first item
-
flatten
(*args: Union[object, flowsaber.core.channel.Channel], max_level: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Flatten the _output of channel.
-
classmethod
from_list
([1, 2, 3, 4, 5])¶ QueueChannel created by this method will always include a END signal
-
get
(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Alias for GetItem task
-
getitem
(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Get item from the output of the input channel with specified key, like obj[key]. The input channel is excepted to emit objects with __getitem__ method. For example, tuple, list, dict ….
-
group
(*args: Union[object, flowsaber.core.channel.Channel], by: Callable = <function Group.<lambda>>, num: int = inf, keep: bool = True, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.
-
last
(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Take the last item
-
map
(*args: Union[object, flowsaber.core.channel.Channel], by: Callable, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Map each item to another item return by the specified map function into a new channel.
-
merge
(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Merge channels into a channel with _output of tuple. Even if there is only one channel _input, always _output a tuple
-
mix
(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Data emitted bu channels are mixed into a single channel.
-
reduce
(*args: Union[object, flowsaber.core.channel.Channel], by: Callable[[Any, Any], Any], result=<object object>, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Similar to normal reduce. results = f(n, f(n - 1))
-
sample
(*args: Union[object, flowsaber.core.channel.Channel], num: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).
-
select
(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Alias for GetItem task.
-
split
(*args: Union[object, flowsaber.core.channel.Channel], num: int, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.
-
subscribe
(*args: Union[object, flowsaber.core.channel.Channel], on_next: Callable = None, on_complete: Callable = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ specify on_next or on_complete function as callbacks of these two event.
-
take
(*args: Union[object, flowsaber.core.channel.Channel], num: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Only take the first num number of items
-
unique
(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Emit items at most once(no duplicate).
-
until
(*args: Union[object, flowsaber.core.channel.Channel], by: Union[Callable[[Any], bool], object], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ “Take items until meet a stop marker. the stop item will not be included
-
classmethod
value
(value, **kwargs) → flowsaber.core.channel.ConstantChannel¶ Channel._output(1)
-
classmethod
values
(1, 2, 3, 4, 5)¶ QueueChannel created by this method will always include a END signal
-
view
(*args: Union[object, flowsaber.core.channel.Channel], fmt='{x}', **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]¶ Print each item emitted by the channel, Equals to call Subscribe(on_next=print)
-
-
class
flowsaber.core.channel.
ConstantChannel
(**kwargs)¶ A channel use ConstantQueue as it’s fetcher queue factory.
-
class
flowsaber.core.channel.
ConstantQueue
¶ A async queue will emit it’s internal value infinitely. Like ordinary queue, the first element needs to be enqueued before fetching
-
class
flowsaber.core.channel.
Consumer
(*queues: flowsaber.core.channel.LazyAsyncQueue, **kwargs)¶ Consumer is an object used for simultaneously fetching data emitted by multiple channels and output tuples. Empty consumer will emit only once. The end of __next__/__anext__ will be triggered if any of it’s source channel emits a END object. For simplicity, if there is only a single source channel, the output of consumer will not be a tuple.
-
class
flowsaber.core.channel.
EventChannel
(interval: int = 5, value=None, **kwargs)¶ A subclass of AsyncChannel, it’s only async task runs in a while loop and periodically call the registered trigger methods
-
exception
flowsaber.core.channel.
EventChannelCheckError
¶
-
class
flowsaber.core.channel.
Fetcher
(**kwargs)¶ Fetch simple provide a for/async for method support for classes implemented with get/get_nowait methods. The end of __next__/__anext__ is triggered by the appearance of END fetched from get/get_nowait.
-
class
flowsaber.core.channel.
LazyAsyncQueue
(ch, queue_factory, **kwargs)¶ Internally it’s an asyncio.Queue, but the inner queue will only be created when meets the first call it’s fetcher/putter methods. It’s designed like this to handler pickle or coroutine-loop problems.
state¶
Classes:
|
The result of the _input is cached. |
|
|
|
|
|
Represent the end state of a task run, should not be directly used. |
|
This state means the _output should be dropped and will not be passed to the _output channel. |
|
Means some Exception has been raised. |
|
|
|
This state comes from Pending, means the task is waiting for rerun due to retry’s waiting time |
|
|
|
|
|
This state means this _output should be skipped and directly send to the _output channel |
|
State represents status of flowrun/taskrun. |
|
-
class
flowsaber.core.utility.state.
Cached
(state_type: str = None, result: Any = None, message: str = None)¶ The result of the _input is cached.
-
class
flowsaber.core.utility.state.
Cancelled
(trace_back=None, **kwargs)¶
-
class
flowsaber.core.utility.state.
Cancelling
(state_type: str = None, result: Any = None, message: str = None)¶
-
class
flowsaber.core.utility.state.
Done
(state_type: str = None, result: Any = None, message: str = None)¶ Represent the end state of a task run, should not be directly used. Use Success/Failure instead.
-
class
flowsaber.core.utility.state.
Drop
(state_type: str = None, result: Any = None, message: str = None)¶ This state means the _output should be dropped and will not be passed to the _output channel. Usually this is caused by settled skip on error option in task.config_dict
-
class
flowsaber.core.utility.state.
Failure
(trace_back=None, **kwargs)¶ Means some Exception has been raised.
-
class
flowsaber.core.utility.state.
Pending
(state_type: str = None, result: Any = None, message: str = None)¶
-
class
flowsaber.core.utility.state.
Retrying
(state_type: str = None, result: Any = None, message: str = None)¶ This state comes from Pending, means the task is waiting for rerun due to retry’s waiting time
-
class
flowsaber.core.utility.state.
Running
(state_type: str = None, result: Any = None, message: str = None)¶
-
class
flowsaber.core.utility.state.
Scheduled
(state_type: str = None, result: Any = None, message: str = None)¶
-
class
flowsaber.core.utility.state.
Skip
(state_type: str = None, result: Any = None, message: str = None)¶ This state means this _output should be skipped and directly send to the _output channel
-
class
flowsaber.core.utility.state.
State
(state_type: str = None, result: Any = None, message: str = None)¶ State represents status of flowrun/taskrun.
State flows and state hierarchy:
- FlowRun:
Scheduled Pending Running Done
- TaskRun:
Pending Running Retrying Running Done
- Done
- Success
Cached Skip
- Failure
Drop
-
class
flowsaber.core.utility.state.
Success
(state_type: str = None, result: Any = None, message: str = None)¶
cache¶
Classes:
|
Cache is used for persisting results of met inputs. |
|
LocalCache treat hash keys as directories in disk and dump/load python objects in the corresponding directories. |
Exceptions:
|
-
class
flowsaber.core.utility.cache.
Cache
¶ Cache is used for persisting results of met inputs. The cache should produce unique keys for unique inputs by implementing hash method. Hash of inputs can be further used to write/read related data.
-
exception
flowsaber.core.utility.cache.
CacheInvalidError
¶
-
class
flowsaber.core.utility.cache.
LocalCache
(serializer: flowsaber.core.utility.cache.Serializer = <flowsaber.core.utility.cache.CloudPickleSerializer object>, **kwargs)¶ LocalCache treat hash keys as directories in disk and dump/load python objects in the corresponding directories.
Methods:
persist
()This should be called before python program ends
-
persist
()¶ This should be called before python program ends
-
target¶
Classes:
|
End signal of a Channel. |
|
Wrapping of pathlib.Path, with features including integrity checking … |
|
|
|
Represents stdin from a string or File. |
|
Ugly way, Use File to store stdout. |
|
Target represents item emitted by Channel. |
-
class
flowsaber.core.utility.target.
End
(**kwargs)¶ End signal of a Channel.
-
class
flowsaber.core.utility.target.
File
(*args, **kwargs)¶ Wrapping of pathlib.Path, with features including integrity checking …
-
class
flowsaber.core.utility.target.
Folder
(*args, **kwargs)¶
-
class
flowsaber.core.utility.target.
Stdin
(src: Union[str, flowsaber.core.utility.target.File], **kwargs)¶ Represents stdin from a string or File.
-
class
flowsaber.core.utility.target.
Stdout
(*args, **kwargs)¶ Ugly way, Use File to store stdout. In the idealist implementation, stdout and stdin can be piped/linked across machines over network.
-
class
flowsaber.core.utility.target.
Target
(**kwargs)¶ Target represents item emitted by Channel. Theoretically all item emitted by Channel should be wrapped by a Target
executor¶
Borrowed from prefect.executors.dask
Classes:
|
An executor that runs all functions using the dask.distributed scheduler. |
|
Async Executor is aimed for running submitted jobs in an asynchronous way. |
|
Executor run jobs in the main loop of the current thread, |
-
class
flowsaber.core.utility.executor.
DaskExecutor
(address: str = None, cluster_class: Union[str, Callable] = None, cluster_kwargs: dict = None, adapt_kwargs: dict = None, client_kwargs: dict = None, debug: bool = False, **kwargs)¶ An executor that runs all functions using the dask.distributed scheduler.
Check https://docs.dask.org/en/latest/setup.html for all kinds of cluster types.
By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start_loop() contextmanager. To use a different cluster class (e.g. [dask_kubernetes.KubeCluster](https://kubernetes.dask.org/)), you can specify cluster_class/cluster_kwargs.
Alternatively, if you already have a dask cluster _running, you can provide the address of the scheduler via the address kwarg.
Note that if you have tasks with tags of the form “dask-accum_resource:KEY=NUM” they will be parsed and passed as [Worker Resources](https://distributed.dask.org/en/latest/resources.html) of the form {“KEY”: float(NUM)} to the Dask TaskScheduler.
- Args:
- address (string, optional): address of a currently _running dask
scheduler; if one is not provided, a temporary cluster will be created in executor.start_loop(). Defaults to None.
- cluster_class (string or callable, optional): the cluster class to use
when creating a temporary dask cluster. Can be either the full class name (e.g. “distributed.LocalCluster”), or the class itself.
- cluster_kwargs (dict, optional): addtional kwargs to pass to the
cluster_class when creating a temporary dask cluster.
- adapt_kwargs (dict, optional): additional kwargs to pass to cluster.adapt
when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
- client_kwargs (dict, optional): additional kwargs to use when creating a
[dask.distributed.Client](https://distributed.dask.org/en/latest/api.html#client).
- debug (bool, optional): When _running with a local cluster, setting
debug=True will increase dask’s logging level, providing potentially useful debug info. Defaults to the debug value in your Prefect configuration.
Examples:
Using a temporary local dask cluster:
`python executor = DaskExecutor() `
Using a temporary cluster _running elsewhere. Any Dask cluster class should work, here we use [dask-cloudprovider](https://cloudprovider.dask.org):
```python executor = DaskExecutor(
cluster_class=”dask_cloudprovider.FargateCluster”, cluster_kwargs={
“image”: “prefecthq/prefect:latest”, “n_workers”: 5, …
},
Connecting to an existing dask cluster
`python executor = DaskExecutor(address="192.0.2.255:8786") `
Methods:
run
(fn, *args[, extra_context])Submit a function to the executor for execution.
-
async
run
(fn: Callable, *args: Any, extra_context: dict = None, **kwargs: Any) → concurrent.futures._base.Future¶ Submit a function to the executor for execution. Returns a Future object.
- Args:
- Returns:
Future: a Future-like object that represents the computation of fn(*args, **kwargs)
-
class
flowsaber.core.utility.executor.
Executor
¶ Async Executor is aimed for running submitted jobs in an asynchronous way. Executor need to implement three __aenter__/__aexit__/run async functions for initializing/running/cleaning.
-
class
flowsaber.core.utility.executor.
Local
(**kwargs)¶ Executor run jobs in the main loop of the current thread,
runner¶
Some codes are borrowed from https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/runner.py
Exceptions:
|
|
|
Classes:
|
Base runner class, intended to be the state manager of runnable object like flow and task. |
|
Functions:
|
A decorator checks the difference between _input and _output state of the wrapped method, if two states are not identical, trigger runner’s handle_state_change method for calling all state change handlers. |
|
A decorator that wraps method into a method that automatically capture exceptions into Failure state. |
|
Run function in main thread in unix system with timeout using SIGALARM signal. |
|
Run the task within timeout within a thread pool. |
-
exception
flowsaber.core.engine.runner.
RunException
(*args, state: flowsaber.core.utility.state.State = None)¶
-
class
flowsaber.core.engine.runner.
Runner
(server_address: str = None, id: str = None, name: str = None, labels: list = None, **kwargs)¶ Base runner class, intended to be the state manager of runnable object like flow and task.
Users need to add state change handlers in order to be informed when meeting state changes of some method. Methods of runner should both accept and return state, and need to be decorated with call_state_change_handlers decorator.
Methods:
handle_state_change
(prev_state, cur_state)Call all registered state change handlers with parameter of old_state and new_state.
send_logs
()This should be ran in runner.executor’s async main loop
update_run_state
()Does not use create_task as these jobs has a strict order
-
handle_state_change
(prev_state, cur_state)¶ Call all registered state change handlers with parameter of old_state and new_state.
prev_state cur_state
-
send_logs
() → Tuple[Coroutine, Callable]¶ This should be ran in runner.executor’s async main loop
-
update_run_state
() → Tuple[Coroutine, Callable]¶ Does not use create_task as these jobs has a strict order
runner
-
-
exception
flowsaber.core.engine.runner.
RunnerExecuteError
(*args, futures=None)¶
-
class
flowsaber.core.engine.runner.
RunnerExecutor
(context=<class 'dict'>, **kwargs)¶ Miscellaneous:
DoneException
Methods:
join
([timeout])Wait until the thread terminates.
main_loop
()Endlessly fetch async task anc schedule for running in asyncio envent loop untill some task raise an exception.
run
()Method representing the thread’s activity.
start
()Start the thread’s activity.
-
exception
DoneException
¶
-
join
(timeout: Optional[float] = Ellipsis) → None¶ Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
-
async
main_loop
()¶ Endlessly fetch async task anc schedule for running in asyncio envent loop untill some task raise an exception.
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
start
() → None¶ Start the thread’s activity.
It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
-
exception
-
flowsaber.core.engine.runner.
call_state_change_handlers
(method: Callable[[...], flowsaber.core.utility.state.State]) → Callable[[...], flowsaber.core.utility.state.State]¶ A decorator checks the difference between _input and _output state of the wrapped method, if two states are not identical, trigger runner’s handle_state_change method for calling all state change handlers.
method
-
flowsaber.core.engine.runner.
catch_to_failure
(method: Callable[[...], flowsaber.core.utility.state.State]) → Callable[[...], flowsaber.core.utility.state.State]¶ A decorator that wraps method into a method that automatically capture exceptions into Failure state.
method
-
flowsaber.core.engine.runner.
run_timeout_signal
(timeout: int, func: Callable, *args, **kwargs)¶ Run function in main thread in unix system with timeout using SIGALARM signal.
https://github.com/pnpnpn/timeout-decorator/blob/master/timeout_decorator/timeout_decorator.py
func timeout args kwargs
-
flowsaber.core.engine.runner.
run_timeout_thread
(timeout: int, *args, **kwargs)¶ Run the task within timeout within a thread pool. Note that the flowsaber.context would be corrupted in the new thread.
func timeout kwargs
task_runner¶
Classes:
|
The task runner moves the task state forward through a complicated process including: retry, cache read/write, skip, drop … |
-
class
flowsaber.core.engine.task_runner.
TaskRunner
(task: Task, inputs: inspect.BoundArguments, **kwargs)¶ The task runner moves the task state forward through a complicated process including: retry, cache read/write, skip, drop …
Methods:
run_task_timeout
(**kwargs)Call task.run with timeout handling by using signal.
-
run_task_timeout
(**kwargs)¶ Call task.run with timeout handling by using signal. Parameters ———- kwargs
-
flow_runner¶
Classes:
|
Aimed for executing flow and maintaining/recording/responding state changes of the flow. |
-
class
flowsaber.core.engine.flow_runner.
FlowRunner
(flow: flowsaber.core.flow.Flow, **kwargs)¶ Aimed for executing flow and maintaining/recording/responding state changes of the flow.