API Reference

cli

task

Classes:

BaseTask([num_out])

Base class of all Tasks, internally, BaseTask iteratively fetch items emitted by Channel inputs asynchronously.

Edge(channel, task)

A edge represents a dependency between a channel and a task.

GetContext([check_fn])

Emit context endlessly, this is aimed for building dependency between flow and input context/config.

RunTask([num_out])

RunTask is subclass of BaseTask, representing tasks with run method exposed to users to implement specific item processing logics.

Task(**kwargs)

Task is subclass of RunTask: 1.

Exceptions:

RunDataFileNotFoundError

RunDataTypeError

class flowsaber.core.task.BaseTask(num_out: int = 1, **kwargs)

Base class of all Tasks, internally, BaseTask iteratively fetch items emitted by Channel inputs asynchronously. And then push the processed result of each item into the _output channels. All items are handled in sequence.

Methods:

enqueue_res(data[, index])

Enqueue processed data into _output channels.

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

initialize_context()

Initialize some attributes of self.config into self.context

initialize_input(*args, **kwargs)

Wrap all _input channels into a consumer object for simultaneous data ferching,

initialize_output()

Create _output channels according to self.num_output

async enqueue_res(data, index=None)

Enqueue processed data into _output channels. Parameters ———- data index

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

async handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

initialize_context()

Initialize some attributes of self.config into self.context

initialize_input(*args, **kwargs)

Wrap all _input channels into a consumer object for simultaneous data ferching,

args kwargs

initialize_output()

Create _output channels according to self.num_output

class flowsaber.core.task.Edge(channel: flowsaber.core.channel.Channel, task: flowsaber.core.task.BaseTask)

A edge represents a dependency between a channel and a task. the Task consumes data emited by the channel.

class flowsaber.core.task.GetContext(check_fn: Callable = None, **kwargs)

Emit context endlessly, this is aimed for building dependency between flow and input context/config. it’s optional, users can also change parameters for different flowruns by rebuilding the flow. However, to make it also work in remote execution, the flow is supposed to read parameters from output of this task. Also note that, since the flow upload to server are built flow, the structure of flow is fixed, python expressions like if, will not work again as to change the structure. With the help of this task, it can simulate some sort of dynamics.

Methods:

run()

The method users need to implement for processing the data emited by _input channels.

run()

The method users need to implement for processing the data emited by _input channels. Parameters ———- args kwargs

exception flowsaber.core.task.RunDataFileNotFoundError
exception flowsaber.core.task.RunDataTypeError
class flowsaber.core.task.RunTask(num_out: int = 1, **kwargs)

RunTask is subclass of BaseTask, representing tasks with run method exposed to users to implement specific item processing logics. Compared to BaseTask: 1. Runs of multiple inputs will be executed in parallel. 2. Runs will be executed in the main loop.

Methods:

call_run(data, **kwargs)

Create a fresh task object and call it’s run method for real data processing.

check_run_data(data, **kwargs)

Match types of _input datas into self.run’s annotations by type conversion.

create_run_data(data)

Wrap consumer fetched data tuple into a BoundArgument paired with self.run’s signature.

handle_consumer(consumer, **kwargs)

Run processing functions in parallel by submitting jobs into schedulers that return awaitable Future-like objects.

handle_run_data(data, **kwargs)

This coroutine will be executed in parallel, thus need to re-enter self.context.

initialize_context()

Expose cache_type and executor_type into self.context

run(*args, **kwargs)

The method users need to implement for processing the data emited by _input channels.

async call_run(data: inspect.BoundArguments, **kwargs)

Create a fresh task object and call it’s run method for real data processing.

async check_run_data(data: inspect.BoundArguments, **kwargs)

Match types of _input datas into self.run’s annotations by type conversion. Check file integrity. Parameters ———- data kwargs

create_run_data(data: Union[tuple, flowsaber.core.utility.target.End]) → inspect.BoundArguments

Wrap consumer fetched data tuple into a BoundArgument paired with self.run’s signature. Parameters ———- data

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Run processing functions in parallel by submitting jobs into schedulers that return awaitable Future-like objects. Parameters ———- consumer kwargs

async handle_run_data(data: inspect.BoundArguments, **kwargs)

This coroutine will be executed in parallel, thus need to re-enter self.context. Parameters ———- data kwargs

initialize_context()

Expose cache_type and executor_type into self.context

run(*args, **kwargs)

The method users need to implement for processing the data emited by _input channels. Parameters ———- args kwargs

class flowsaber.core.task.Task(**kwargs)

Task is subclass of RunTask: 1. Each Task will have a unique task_key/task_workdir 2. Each _input’s run will have a unique run_key/task_workdir. 3. Task’s run will be executed in executor and handled by a task runner. 4. Within the task runner, task will pass through a state machine, callbacks can be registered to each state changes.

Methods:

call_run(data, **kwargs)

Call self.run within the control of a asyncio.Lock identified by run_workdir Parameters ———- data kwargs

clean()

Functions called after the execution of task.

handle_res(res)

Only push Success state result into _output channels.

initialize_context()

Expose cache_type and executor_type into self.context

need_skip(data)

Check if the _input can be directly passed into _output channels by predicate of user specified self.skip_fn Parameters ———- data

skip(skip_fn)

A decorator/function exposed for users to specify skip function.

Attributes:

task_hash

get the task_hash of this task defined by the real source code of self.run

task_workdir

Task’s workdir is resolved in a bottom up way.

async call_run(data: inspect.BoundArguments, **kwargs) → flowsaber.core.utility.state.State

Call self.run within the control of a asyncio.Lock identified by run_workdir Parameters ———- data kwargs

clean()

Functions called after the execution of task. For example, Cache need to persist cached data.

async handle_res(res)

Only push Success state result into _output channels. Some state may be skipped in case of Exceptions occurred within the task runner and thus return a Drop(Failure) state as a signal. Parameters ———- res

initialize_context()

Expose cache_type and executor_type into self.context

need_skip(data: inspect.BoundArguments) → bool

Check if the _input can be directly passed into _output channels by predicate of user specified self.skip_fn Parameters ———- data

skip(skip_fn: Callable)

A decorator/function exposed for users to specify skip function.

skip_fn

property task_hash

get the task_hash of this task defined by the real source code of self.run

property task_workdir

Task’s workdir is resolved in a bottom up way. In this hierarchical way, users can change flowrun.context[‘flow_workdir’] by setting up the flowrun’s initial context, thus make the flow’s workdir configurable.

1: if the task_workdir is already absolute, then use it 2: if parent_flow_workdir/task_workdir is already absolute, then use it 3: otherwise use top_flow_workdir/parent_flow_workdir/task_workdir as the workdir

operators

Classes:

Branch(num, by, **kwargs)

Dispatch _input into specified number of channels base on the returned index of the predicate function.

Collect([num_out])

Opposite to flatten, turns a channel into a tuple

Concat([num_out])

Data in channels are concatenated in the order of _input channels.

Constant(**kwargs)

Count(**kwargs)

Distinct(**kwargs)

Remove continuously duplicated item.

Filter([by])

Filter item by the predicate function or the comparing identity.

First(**kwargs)

Take the first item

Flatten([max_level])

Flatten the _output of channel.

Get(key[, default])

Alias for GetItem task

GetItem(key[, default])

Get item from the output of the input channel with specified key, like obj[key].

Group([by, num, keep])

return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.

Last(**kwargs)

Take the last item

Map(by, **kwargs)

Map each item to another item return by the specified map function into a new channel.

Max([result])

Merge([num_out])

Merge channels into a channel with _output of tuple.

Min([result])

Mix([num_out])

Data emitted bu channels are mixed into a single channel.

Operator([num_out])

Base class for all operators, subclass of BaseTask, all operators runs in the main loop in sequence and do not have runners and run states.

Reduce(by[, result])

Similar to normal reduce.

Sample([num])

Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).

Select(key[, default])

Alias for GetItem task.

Split(num, **kwargs)

Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.

Subscribe([on_next, on_complete])

specify on_next or on_complete function as callbacks of these two event.

Sum(**kwargs)

Take([num])

Only take the first num number of items

Unique(**kwargs)

Emit items at most once(no duplicate).

Until(by, **kwargs)

“Take items until meet a stop marker.

View([fmt])

Print each item emitted by the channel, Equals to call Subscribe(on_next=print)

var

alias of flowsaber.core.operators.Count

class flowsaber.core.operators.Branch(num: int, by: Callable, **kwargs)

Dispatch _input into specified number of channels base on the returned index of the predicate function.

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Collect(num_out: int = 1, **kwargs)

Opposite to flatten, turns a channel into a tuple

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Concat(num_out: int = 1, **kwargs)

Data in channels are concatenated in the order of _input channels.

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Constant(**kwargs)

Methods:

initialize_output()

Create _output channels according to self.num_output

initialize_output()

Create _output channels according to self.num_output

class flowsaber.core.operators.Count(**kwargs)
class flowsaber.core.operators.Distinct(**kwargs)

Remove continuously duplicated item.

class flowsaber.core.operators.Filter(by: Union[Callable[[Any], bool], object] = <function Filter.<lambda>>, **kwargs)

Filter item by the predicate function or the comparing identity.

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.First(**kwargs)

Take the first item

class flowsaber.core.operators.Flatten(max_level: int = 1, **kwargs)

Flatten the _output of channel.

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Get(key: Any, default: Any = None, **kwargs)

Alias for GetItem task

class flowsaber.core.operators.GetItem(key: Any, default: Any = None, **kwargs)

Get item from the output of the input channel with specified key, like obj[key]. The input channel is excepted to emit objects with __getitem__ method. For example, tuple, list, dict ….

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Group(by: Callable = <function Group.<lambda>>, num: int = inf, keep: bool = True, **kwargs)

return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Last(**kwargs)

Take the last item

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Map(by: Callable, **kwargs)

Map each item to another item return by the specified map function into a new channel.

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Max(result: float = -inf, **kwargs)
class flowsaber.core.operators.Merge(num_out: int = 1, **kwargs)

Merge channels into a channel with _output of tuple. Even if there is only one channel _input, always _output a tuple

class flowsaber.core.operators.Min(result: float = inf, **kwargs)
class flowsaber.core.operators.Mix(num_out: int = 1, **kwargs)

Data emitted bu channels are mixed into a single channel.

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Operator(num_out: int = 1, **kwargs)

Base class for all operators, subclass of BaseTask, all operators runs in the main loop in sequence and do not have runners and run states.

Methods:

initialize_context()

Initialize some attributes of self.config into self.context

initialize_context()

Initialize some attributes of self.config into self.context

class flowsaber.core.operators.Reduce(by: Callable[[Any, Any], Any], result=<object object>, **kwargs)

Similar to normal reduce. results = f(n, f(n - 1))

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data: Union[tuple, flowsaber.core.utility.target.End], *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Sample(num: int = 1, **kwargs)

Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Select(key: Any, default: Any = None, **kwargs)

Alias for GetItem task.

class flowsaber.core.operators.Split(num: int, **kwargs)

Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.

class flowsaber.core.operators.Subscribe(on_next: Callable = None, on_complete: Callable = None, **kwargs)

specify on_next or on_complete function as callbacks of these two event.

Methods:

handle_consumer(consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

async handle_consumer(consumer: flowsaber.core.channel.Consumer, **kwargs)

Iteratively fetch data from consumer and then call processing function

consumer kwargs

class flowsaber.core.operators.Sum(**kwargs)
class flowsaber.core.operators.Take(num: int = 1, **kwargs)

Only take the first num number of items

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.Unique(**kwargs)

Emit items at most once(no duplicate).

class flowsaber.core.operators.Until(by: Union[Callable[[Any], bool], object], **kwargs)

“Take items until meet a stop marker. the stop item will not be included

Methods:

handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

async handle_input(data, *args, **kwargs)

Do nothing, send the _input data directly to _output channel Parameters ———- data args kwargs

class flowsaber.core.operators.View(fmt='{x}', **kwargs)

Print each item emitted by the channel, Equals to call Subscribe(on_next=print)

flowsaber.core.operators.var

alias of flowsaber.core.operators.Count

shell

Classes:

BashTask(**kwargs)

Task that execute bash command by using subprocess.

ShellTask(**kwargs)

Task used for composing bash command and then execute the command.

Exceptions:

CommandTaskComposeError

ShellTaskExecuteError

class flowsaber.tasks.shell.BashTask(**kwargs)

Task that execute bash command by using subprocess.

Methods:

execute_command(cmd, run_workdir[, envs])

Run command in shell

get_publish_dirs(flow_workdir, …)

Get absolute path of configured publish dirs.

glob_output_files(item, run_workdir, …)

Iterate over the item hierarchically and convert in-place and glob found str into Files and collect them into the third parameter.

run(cmd[, output, envs])

This method should be thread safe, can not run functions depends one process-base attributes like ENV, ….

execute_command(cmd: str, run_workdir: Union[str, pathlib.Path], envs: dict = None) → Tuple[pathlib.Path, pathlib.Path]

Run command in shell

cmd run_workdir envs

static get_publish_dirs(flow_workdir, configured_publish_dirs: List[str])

Get absolute path of configured publish dirs.

flow_workdir configured_publish_dirs

classmethod glob_output_files(item, run_workdir, collect_files: List[flowsaber.core.utility.target.File])

Iterate over the item hierarchically and convert in-place and glob found str into Files and collect them into the third parameter.

item run_workdir collect_files

run(cmd: str, output=None, envs: dict = None)

This method should be thread safe, can not run functions depends one process-base attributes like ENV, ….

cmd output envs: dict

exception flowsaber.tasks.shell.CommandTaskComposeError
class flowsaber.tasks.shell.ShellTask(**kwargs)

Task used for composing bash command and then execute the command. Users need to implement the command method. Note that the _output Channel of this task simply emits composed bash command in str type, and this bash command needs to be actually executed by ShellTask.

Methods:

command(*args, **kwargs)

Users need to implement this function to compose the final bash command.

run(*args, **kwargs)

Users need to implement this function to compose the final bash command.

Attributes:

task_hash

get the task_hash of this task defined by the real source code of self.run

command(*args, **kwargs) → str

Users need to implement this function to compose the final bash command.

The returned value of this method represents the expected outputs after executing the composed bash command in shell:

  1. None represents the output is stdout.

  2. str variables represents glob syntax for files in the working directory.

To tell flowsaber what’s the composed bash command, users has two options:

1: Assign the composed command to a variable named CMD. 2: Write virtual fstring as the docstring of command method. All variables in the command method

scoped can be used freely.

Here are some examples:

class A(CommandTask):
def command(self, fa, fasta):

“bwa map -t {self.context.cpu} {fa} {fasta} -o {bam_file}” bam_file = “test.bam” return bam_file

class B(CommandTask):
def command(self, file):

a = “xxxx” b = ‘xxxx’ CMD = f”echo {a}

f”echo {b}

f”cat {file}”

# here implicitly returned a None, represents the _output of cmd is stdout

args kwargs

run(*args, **kwargs) → str

Users need to implement this function to compose the final bash command.

The returned value of this method represents the expected outputs after executing the composed bash command in shell:

  1. None represents the output is stdout.

  2. str variables represents glob syntax for files in the working directory.

To tell flowsaber what’s the composed bash command, users has two options:

1: Assign the composed command to a variable named CMD. 2: Write virtual fstring as the docstring of command method. All variables in the command method

scoped can be used freely.

Here are some examples:

class A(CommandTask):
def command(self, fa, fasta):

“bwa map -t {self.context.cpu} {fa} {fasta} -o {bam_file}” bam_file = “test.bam” return bam_file

class B(CommandTask):
def command(self, file):

a = “xxxx” b = ‘xxxx’ CMD = f”echo {a}

f”echo {b}

f”cat {file}”

# here implicitly returned a None, represents the _output of cmd is stdout

args kwargs

property task_hash

get the task_hash of this task defined by the real source code of self.run

exception flowsaber.tasks.shell.ShellTaskExecuteError

flow

Classes:

Flow(**kwargs)

Represents the organizer of tasks, flows can also be used as components.

class flowsaber.core.flow.Flow(**kwargs)

Represents the organizer of tasks, flows can also be used as components. Except for the top-most flow which represents the whole running unit, all flows within are simply virtual flows and don’t have running state like flowrun or taskrun. However, flows and tasks all can have personalized configs.

Methods:

call_initialize(*args, **kwargs)

Copy a new one and initialize some attributes.

initialize_context()

Expose some attributes of self.config into self.context.

start_execute(**kwargs)

The up most flow needd to initialize executors.

call_initialize(*args, **kwargs)

Copy a new one and initialize some attributes. Parameters ———- args kwargs

initialize_context()

Expose some attributes of self.config into self.context.

async start_execute(**kwargs)

The up most flow needd to initialize executors. Parameters ———- kwargs

base

Classes:

Component(**kwargs)

Base class of Flow and Task

Exceptions:

ComponentCallError(*args[, trace_back])

ComponentExecuteError(*args[, futures, …])

Functions:

aenter_context(method)

A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.

enter_context(method)

A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.

class flowsaber.core.base.Component(**kwargs)

Base class of Flow and Task

Classes:

State

An enumeration.

Methods:

call_initialize(*args, **kwargs)

Copy a new one and initialize some attributes.

get_full_name()

Generate a name like flow1.name|flow2.name|flow3.name|cur_task

initialize_context()

Called by call_initialize, merge and update self.config dict of self.context from different sources.

start(**kwargs)

Start running the Flow/Task in the context of self.context, before setting the context, self.context will be merged/updated from kwargs.get(‘context’, {}) Parameters ———- kwargs

Attributes:

config

return a non-editable context

class State

An enumeration.

call_initialize(*args, **kwargs)

Copy a new one and initialize some attributes. Parameters ———- args kwargs

property config

return a non-editable context

get_full_name() → str

Generate a name like flow1.name|flow2.name|flow3.name|cur_task

initialize_context()

Called by call_initialize, merge and update self.config dict of self.context from different sources.

async start(**kwargs)

Start running the Flow/Task in the context of self.context, before setting the context, self.context will be merged/updated from kwargs.get(‘context’, {}) Parameters ———- kwargs

exception flowsaber.core.base.ComponentCallError(*args, trace_back=None)
exception flowsaber.core.base.ComponentExecuteError(*args, futures=None, trace_back=None)
flowsaber.core.base.aenter_context(method: Callable[[...], Any]) → Any

A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.

method

flowsaber.core.base.enter_context(method: Callable[[...], Any]) → Any

A decorator runs the wrapped method within a new context composed of self.context and kwargs’ context.

method

context

During the running of flow, flowsaber.context will automatically be updated, Ideally, at the end of a task run, context will be looked like this:

``` flow_config: {} task_config: {}

flow_id flow_name flow_full_name flow_labels

task_id task_name task_full_name task_labels

id flowrun_name

taskrun_id ```

Classes:

FlowSaberContext(*args, **kwargs)

The global coroutine-safe context meant to be used for inferring the running status of a flow at any time.

Functions:

inject_context_attrs(factory)

Inject context attrs into the log record base on context.context_attrs.

class flowsaber.core.context.FlowSaberContext(*args: Any, **kwargs: Any)

The global coroutine-safe context meant to be used for inferring the running status of a flow at any time. Compared to raw context, this global context has intelligent(automatically change according to the time/position of the callee) properties:

cache: used by running flow/task run_lock: used by running task logger: used in anywhere and anytime executor: used by running task

Attributes:

cache

Fetch a Cache based on cache_type in the current context.

executor

Fetch an initialized executor based on executor_type in the current context.

logger

Get a child logger of flowsaber logger with name of: callee.__name__.agent_id.flow_id.id.task_id.taskrun_id

Methods:

lock(keys)

Fetch a asyncio.Lock based on run_workdir in the current context.

property cache

Fetch a Cache based on cache_type in the current context. Returns ——-

property executor

Fetch an initialized executor based on executor_type in the current context. Returns ——-

lock(keys: List[str])

Fetch a asyncio.Lock based on run_workdir in the current context. Returns ——-

property logger

Get a child logger of flowsaber logger with name of: callee.__name__.agent_id.flow_id.id.task_id.taskrun_id

flowsaber.core.context.inject_context_attrs(factory)

Inject context attrs into the log record base on context.context_attrs. Parameters ———- factory

channel

Classes:

AsyncChannel(**kwargs)

AsyncChannel schedules asynchronous tasks when initialized, the async task accept the channel as the only parameter, and it’s expected to call self.put_nowait/self.put method as to send items into output queues.

Channel([queue_factory])

Subclass of ChannelBase implemented create_queue method, the mechanism for sending data to all created queue is simple, it just loop over all queues and push the item specified by call of put/put_nowait into all queues.

ChannelBase(**kwargs)

A channel it self is an object for storing items by using put/put_nowait.

ConstantChannel(**kwargs)

A channel use ConstantQueue as it’s fetcher queue factory.

ConstantQueue()

A async queue will emit it’s internal value infinitely.

Consumer(*queues, **kwargs)

Consumer is an object used for simultaneously fetching data emitted by multiple channels and output tuples.

EventChannel([interval, value])

A subclass of AsyncChannel, it’s only async task runs in a while loop and periodically call the registered trigger methods

Fetcher(**kwargs)

Fetch simple provide a for/async for method support for classes implemented with get/get_nowait methods.

LazyAsyncQueue(ch, queue_factory, **kwargs)

Internally it’s an asyncio.Queue, but the inner queue will only be created when meets the first call it’s fetcher/putter methods.

Exceptions:

EventChannelCheckError

class flowsaber.core.channel.AsyncChannel(**kwargs)

AsyncChannel schedules asynchronous tasks when initialized, the async task accept the channel as the only parameter, and it’s expected to call self.put_nowait/self.put method as to send items into output queues.

class flowsaber.core.channel.Channel(queue_factory: type = <class 'asyncio.queues.Queue'>, **kwargs)

Subclass of ChannelBase implemented create_queue method, the mechanism for sending data to all created queue is simple, it just loop over all queues and push the item specified by call of put/put_nowait into all queues. Furthermore, to make LazyAsyncQueue initialized only in a running event loop, Channel uses a buffer to buffer all items pushed before entering the event loop.

class flowsaber.core.channel.ChannelBase(**kwargs)

A channel it self is an object for storing items by using put/put_nowait. To consume data from the channel, users must call ChannelBase.create_queue method to get a fetcher LazyAsyncQueue, and then call it’s get/get_nowait method to fetched the data emitted by the channel.

Methods:

branch(*args, num, by, **kwargs)

Dispatch _input into specified number of channels base on the returned index of the predicate function.

collect(*args[, num_out])

Opposite to flatten, turns a channel into a tuple

concat(*args[, num_out])

Data in channels are concatenated in the order of _input channels.

distinct(*args, **kwargs)

Remove continuously duplicated item.

filter(*args[, by])

Filter item by the predicate function or the comparing identity.

first(*args, **kwargs)

Take the first item

flatten(*args[, max_level])

Flatten the _output of channel.

from_list([1, 2, 3, 4, 5])

QueueChannel created by this method will always include a END signal

get(*args, key[, default])

Alias for GetItem task

getitem(*args, key[, default])

Get item from the output of the input channel with specified key, like obj[key].

group(*args[, by, num, keep])

return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.

last(*args, **kwargs)

Take the last item

map(*args, by, **kwargs)

Map each item to another item return by the specified map function into a new channel.

merge(*args[, num_out])

Merge channels into a channel with _output of tuple.

mix(*args[, num_out])

Data emitted bu channels are mixed into a single channel.

reduce(*args, by[, result])

Similar to normal reduce.

sample(*args[, num])

Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).

select(*args, key[, default])

Alias for GetItem task.

split(*args, num, **kwargs)

Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.

subscribe(*args[, on_next, on_complete])

specify on_next or on_complete function as callbacks of these two event.

take(*args[, num])

Only take the first num number of items

unique(*args, **kwargs)

Emit items at most once(no duplicate).

until(*args, by, **kwargs)

“Take items until meet a stop marker.

value(value, **kwargs)

Channel._output(1)

values(1, 2, 3, 4, 5)

QueueChannel created by this method will always include a END signal

view(*args[, fmt])

Print each item emitted by the channel, Equals to call Subscribe(on_next=print)

branch(*args: Union[object, flowsaber.core.channel.Channel], num: int, by: Callable, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Dispatch _input into specified number of channels base on the returned index of the predicate function.

collect(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Opposite to flatten, turns a channel into a tuple

concat(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Data in channels are concatenated in the order of _input channels.

distinct(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Remove continuously duplicated item.

filter(*args: Union[object, flowsaber.core.channel.Channel], by: Union[Callable[[Any], bool], object] = <function Filter.<lambda>>, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Filter item by the predicate function or the comparing identity.

first(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Take the first item

flatten(*args: Union[object, flowsaber.core.channel.Channel], max_level: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Flatten the _output of channel.

classmethod from_list([1, 2, 3, 4, 5])

QueueChannel created by this method will always include a END signal

get(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Alias for GetItem task

getitem(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Get item from the output of the input channel with specified key, like obj[key]. The input channel is excepted to emit objects with __getitem__ method. For example, tuple, list, dict ….

group(*args: Union[object, flowsaber.core.channel.Channel], by: Callable = <function Group.<lambda>>, num: int = inf, keep: bool = True, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

return a new channel with item of Tuple(key_fn(_input), grouped_data_tuple), the group size can be specified.

last(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Take the last item

map(*args: Union[object, flowsaber.core.channel.Channel], by: Callable, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Map each item to another item return by the specified map function into a new channel.

merge(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Merge channels into a channel with _output of tuple. Even if there is only one channel _input, always _output a tuple

mix(*args: Union[object, flowsaber.core.channel.Channel], num_out: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Data emitted bu channels are mixed into a single channel.

reduce(*args: Union[object, flowsaber.core.channel.Channel], by: Callable[[Any, Any], Any], result=<object object>, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Similar to normal reduce. results = f(n, f(n - 1))

sample(*args: Union[object, flowsaber.core.channel.Channel], num: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Randomly sample at most num number of _input emitted by the channel using reservoir algorithm(should visit all elements.).

select(*args: Union[object, flowsaber.core.channel.Channel], key: Any, default: Any = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Alias for GetItem task.

split(*args: Union[object, flowsaber.core.channel.Channel], num: int, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Used when _output is tuple/list, use split to split each item of the tuple into a unique channel.

subscribe(*args: Union[object, flowsaber.core.channel.Channel], on_next: Callable = None, on_complete: Callable = None, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

specify on_next or on_complete function as callbacks of these two event.

take(*args: Union[object, flowsaber.core.channel.Channel], num: int = 1, **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Only take the first num number of items

unique(*args: Union[object, flowsaber.core.channel.Channel], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Emit items at most once(no duplicate).

until(*args: Union[object, flowsaber.core.channel.Channel], by: Union[Callable[[Any], bool], object], **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

“Take items until meet a stop marker. the stop item will not be included

classmethod value(value, **kwargs) → flowsaber.core.channel.ConstantChannel

Channel._output(1)

classmethod values(1, 2, 3, 4, 5)

QueueChannel created by this method will always include a END signal

view(*args: Union[object, flowsaber.core.channel.Channel], fmt='{x}', **kwargs) → Union[Sequence[flowsaber.core.channel.Channel], flowsaber.core.channel.Channel]

Print each item emitted by the channel, Equals to call Subscribe(on_next=print)

class flowsaber.core.channel.ConstantChannel(**kwargs)

A channel use ConstantQueue as it’s fetcher queue factory.

class flowsaber.core.channel.ConstantQueue

A async queue will emit it’s internal value infinitely. Like ordinary queue, the first element needs to be enqueued before fetching

class flowsaber.core.channel.Consumer(*queues: flowsaber.core.channel.LazyAsyncQueue, **kwargs)

Consumer is an object used for simultaneously fetching data emitted by multiple channels and output tuples. Empty consumer will emit only once. The end of __next__/__anext__ will be triggered if any of it’s source channel emits a END object. For simplicity, if there is only a single source channel, the output of consumer will not be a tuple.

class flowsaber.core.channel.EventChannel(interval: int = 5, value=None, **kwargs)

A subclass of AsyncChannel, it’s only async task runs in a while loop and periodically call the registered trigger methods

exception flowsaber.core.channel.EventChannelCheckError
class flowsaber.core.channel.Fetcher(**kwargs)

Fetch simple provide a for/async for method support for classes implemented with get/get_nowait methods. The end of __next__/__anext__ is triggered by the appearance of END fetched from get/get_nowait.

class flowsaber.core.channel.LazyAsyncQueue(ch, queue_factory, **kwargs)

Internally it’s an asyncio.Queue, but the inner queue will only be created when meets the first call it’s fetcher/putter methods. It’s designed like this to handler pickle or coroutine-loop problems.

state

Classes:

Cached([state_type, result, message])

The result of the _input is cached.

Cancelled([trace_back])

Cancelling([state_type, result, message])

Done([state_type, result, message])

Represent the end state of a task run, should not be directly used.

Drop([state_type, result, message])

This state means the _output should be dropped and will not be passed to the _output channel.

Failure([trace_back])

Means some Exception has been raised.

Pending([state_type, result, message])

Retrying([state_type, result, message])

This state comes from Pending, means the task is waiting for rerun due to retry’s waiting time

Running([state_type, result, message])

Scheduled([state_type, result, message])

Skip([state_type, result, message])

This state means this _output should be skipped and directly send to the _output channel

State([state_type, result, message])

State represents status of flowrun/taskrun.

Success([state_type, result, message])

class flowsaber.core.utility.state.Cached(state_type: str = None, result: Any = None, message: str = None)

The result of the _input is cached.

class flowsaber.core.utility.state.Cancelled(trace_back=None, **kwargs)
class flowsaber.core.utility.state.Cancelling(state_type: str = None, result: Any = None, message: str = None)
class flowsaber.core.utility.state.Done(state_type: str = None, result: Any = None, message: str = None)

Represent the end state of a task run, should not be directly used. Use Success/Failure instead.

class flowsaber.core.utility.state.Drop(state_type: str = None, result: Any = None, message: str = None)

This state means the _output should be dropped and will not be passed to the _output channel. Usually this is caused by settled skip on error option in task.config_dict

class flowsaber.core.utility.state.Failure(trace_back=None, **kwargs)

Means some Exception has been raised.

class flowsaber.core.utility.state.Pending(state_type: str = None, result: Any = None, message: str = None)
class flowsaber.core.utility.state.Retrying(state_type: str = None, result: Any = None, message: str = None)

This state comes from Pending, means the task is waiting for rerun due to retry’s waiting time

class flowsaber.core.utility.state.Running(state_type: str = None, result: Any = None, message: str = None)
class flowsaber.core.utility.state.Scheduled(state_type: str = None, result: Any = None, message: str = None)
class flowsaber.core.utility.state.Skip(state_type: str = None, result: Any = None, message: str = None)

This state means this _output should be skipped and directly send to the _output channel

class flowsaber.core.utility.state.State(state_type: str = None, result: Any = None, message: str = None)

State represents status of flowrun/taskrun.

State flows and state hierarchy:

FlowRun:

Scheduled Pending Running Done

TaskRun:

Pending Running Retrying Running Done

Done
Success

Cached Skip

Failure

Drop

class flowsaber.core.utility.state.Success(state_type: str = None, result: Any = None, message: str = None)

cache

Classes:

Cache

Cache is used for persisting results of met inputs.

LocalCache([serializer])

LocalCache treat hash keys as directories in disk and dump/load python objects in the corresponding directories.

Exceptions:

CacheInvalidError

class flowsaber.core.utility.cache.Cache

Cache is used for persisting results of met inputs. The cache should produce unique keys for unique inputs by implementing hash method. Hash of inputs can be further used to write/read related data.

exception flowsaber.core.utility.cache.CacheInvalidError
class flowsaber.core.utility.cache.LocalCache(serializer: flowsaber.core.utility.cache.Serializer = <flowsaber.core.utility.cache.CloudPickleSerializer object>, **kwargs)

LocalCache treat hash keys as directories in disk and dump/load python objects in the corresponding directories.

Methods:

persist()

This should be called before python program ends

persist()

This should be called before python program ends

target

Classes:

End(**kwargs)

End signal of a Channel.

File(*args, **kwargs)

Wrapping of pathlib.Path, with features including integrity checking …

Folder(*args, **kwargs)

Stdin(src, **kwargs)

Represents stdin from a string or File.

Stdout(*args, **kwargs)

Ugly way, Use File to store stdout.

Target(**kwargs)

Target represents item emitted by Channel.

class flowsaber.core.utility.target.End(**kwargs)

End signal of a Channel.

class flowsaber.core.utility.target.File(*args, **kwargs)

Wrapping of pathlib.Path, with features including integrity checking …

class flowsaber.core.utility.target.Folder(*args, **kwargs)
class flowsaber.core.utility.target.Stdin(src: Union[str, flowsaber.core.utility.target.File], **kwargs)

Represents stdin from a string or File.

class flowsaber.core.utility.target.Stdout(*args, **kwargs)

Ugly way, Use File to store stdout. In the idealist implementation, stdout and stdin can be piped/linked across machines over network.

class flowsaber.core.utility.target.Target(**kwargs)

Target represents item emitted by Channel. Theoretically all item emitted by Channel should be wrapped by a Target

executor

Borrowed from prefect.executors.dask

Classes:

DaskExecutor([address, cluster_class, …])

An executor that runs all functions using the dask.distributed scheduler.

Executor

Async Executor is aimed for running submitted jobs in an asynchronous way.

Local(**kwargs)

Executor run jobs in the main loop of the current thread,

class flowsaber.core.utility.executor.DaskExecutor(address: str = None, cluster_class: Union[str, Callable] = None, cluster_kwargs: dict = None, adapt_kwargs: dict = None, client_kwargs: dict = None, debug: bool = False, **kwargs)

An executor that runs all functions using the dask.distributed scheduler.

Check https://docs.dask.org/en/latest/setup.html for all kinds of cluster types.

By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start_loop() contextmanager. To use a different cluster class (e.g. [dask_kubernetes.KubeCluster](https://kubernetes.dask.org/)), you can specify cluster_class/cluster_kwargs.

Alternatively, if you already have a dask cluster _running, you can provide the address of the scheduler via the address kwarg.

Note that if you have tasks with tags of the form “dask-accum_resource:KEY=NUM” they will be parsed and passed as [Worker Resources](https://distributed.dask.org/en/latest/resources.html) of the form {“KEY”: float(NUM)} to the Dask TaskScheduler.

Args:
  • address (string, optional): address of a currently _running dask

    scheduler; if one is not provided, a temporary cluster will be created in executor.start_loop(). Defaults to None.

  • cluster_class (string or callable, optional): the cluster class to use

    when creating a temporary dask cluster. Can be either the full class name (e.g. “distributed.LocalCluster”), or the class itself.

  • cluster_kwargs (dict, optional): addtional kwargs to pass to the

    cluster_class when creating a temporary dask cluster.

  • adapt_kwargs (dict, optional): additional kwargs to pass to cluster.adapt

    when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.

  • client_kwargs (dict, optional): additional kwargs to use when creating a

    [dask.distributed.Client](https://distributed.dask.org/en/latest/api.html#client).

  • debug (bool, optional): When _running with a local cluster, setting

    debug=True will increase dask’s logging level, providing potentially useful debug info. Defaults to the debug value in your Prefect configuration.

Examples:

Using a temporary local dask cluster:

`python executor = DaskExecutor() `

Using a temporary cluster _running elsewhere. Any Dask cluster class should work, here we use [dask-cloudprovider](https://cloudprovider.dask.org):

```python executor = DaskExecutor(

cluster_class=”dask_cloudprovider.FargateCluster”, cluster_kwargs={

“image”: “prefecthq/prefect:latest”, “n_workers”: 5, …

},

Connecting to an existing dask cluster

`python executor = DaskExecutor(address="192.0.2.255:8786") `

Methods:

run(fn, *args[, extra_context])

Submit a function to the executor for execution.

async run(fn: Callable, *args: Any, extra_context: dict = None, **kwargs: Any) → concurrent.futures._base.Future

Submit a function to the executor for execution. Returns a Future object.

Args:
  • fn (Callable): function that is being submitted for execution

  • *args (Any): arguments to be passed to fn

  • extra_context (dict, optional): an optional dictionary with extra information

    about the submitted task

  • **kwargs (Any): keyword arguments to be passed to fn

Returns:
  • Future: a Future-like object that represents the computation of fn(*args, **kwargs)

class flowsaber.core.utility.executor.Executor

Async Executor is aimed for running submitted jobs in an asynchronous way. Executor need to implement three __aenter__/__aexit__/run async functions for initializing/running/cleaning.

class flowsaber.core.utility.executor.Local(**kwargs)

Executor run jobs in the main loop of the current thread,

runner

Some codes are borrowed from https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/runner.py

Exceptions:

RunException(*args[, state])

RunnerExecuteError(*args[, futures])

Classes:

Runner([server_address, id, name, labels])

Base runner class, intended to be the state manager of runnable object like flow and task.

RunnerExecutor([context])

Functions:

call_state_change_handlers(method)

A decorator checks the difference between _input and _output state of the wrapped method, if two states are not identical, trigger runner’s handle_state_change method for calling all state change handlers.

catch_to_failure(method)

A decorator that wraps method into a method that automatically capture exceptions into Failure state.

run_timeout_signal(timeout, func, *args, …)

Run function in main thread in unix system with timeout using SIGALARM signal.

run_timeout_thread(timeout, *args, **kwargs)

Run the task within timeout within a thread pool.

exception flowsaber.core.engine.runner.RunException(*args, state: flowsaber.core.utility.state.State = None)
class flowsaber.core.engine.runner.Runner(server_address: str = None, id: str = None, name: str = None, labels: list = None, **kwargs)

Base runner class, intended to be the state manager of runnable object like flow and task.

Users need to add state change handlers in order to be informed when meeting state changes of some method. Methods of runner should both accept and return state, and need to be decorated with call_state_change_handlers decorator.

Methods:

handle_state_change(prev_state, cur_state)

Call all registered state change handlers with parameter of old_state and new_state.

send_logs()

This should be ran in runner.executor’s async main loop

update_run_state()

Does not use create_task as these jobs has a strict order

handle_state_change(prev_state, cur_state)

Call all registered state change handlers with parameter of old_state and new_state.

prev_state cur_state

send_logs() → Tuple[Coroutine, Callable]

This should be ran in runner.executor’s async main loop

update_run_state() → Tuple[Coroutine, Callable]

Does not use create_task as these jobs has a strict order

runner

exception flowsaber.core.engine.runner.RunnerExecuteError(*args, futures=None)
class flowsaber.core.engine.runner.RunnerExecutor(context=<class 'dict'>, **kwargs)

Miscellaneous:

DoneException

Methods:

join([timeout])

Wait until the thread terminates.

main_loop()

Endlessly fetch async task anc schedule for running in asyncio envent loop untill some task raise an exception.

run()

Method representing the thread’s activity.

start()

Start the thread’s activity.

exception DoneException
join(timeout: Optional[float] = Ellipsis) → None

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

async main_loop()

Endlessly fetch async task anc schedule for running in asyncio envent loop untill some task raise an exception.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start() → None

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

flowsaber.core.engine.runner.call_state_change_handlers(method: Callable[[...], flowsaber.core.utility.state.State]) → Callable[[...], flowsaber.core.utility.state.State]

A decorator checks the difference between _input and _output state of the wrapped method, if two states are not identical, trigger runner’s handle_state_change method for calling all state change handlers.

method

flowsaber.core.engine.runner.catch_to_failure(method: Callable[[...], flowsaber.core.utility.state.State]) → Callable[[...], flowsaber.core.utility.state.State]

A decorator that wraps method into a method that automatically capture exceptions into Failure state.

method

flowsaber.core.engine.runner.run_timeout_signal(timeout: int, func: Callable, *args, **kwargs)

Run function in main thread in unix system with timeout using SIGALARM signal.

https://github.com/pnpnpn/timeout-decorator/blob/master/timeout_decorator/timeout_decorator.py

func timeout args kwargs

flowsaber.core.engine.runner.run_timeout_thread(timeout: int, *args, **kwargs)

Run the task within timeout within a thread pool. Note that the flowsaber.context would be corrupted in the new thread.

func timeout kwargs

task_runner

Classes:

TaskRunner(task, inputs, **kwargs)

The task runner moves the task state forward through a complicated process including: retry, cache read/write, skip, drop …

class flowsaber.core.engine.task_runner.TaskRunner(task: Task, inputs: inspect.BoundArguments, **kwargs)

The task runner moves the task state forward through a complicated process including: retry, cache read/write, skip, drop …

Methods:

run_task_timeout(**kwargs)

Call task.run with timeout handling by using signal.

run_task_timeout(**kwargs)

Call task.run with timeout handling by using signal. Parameters ———- kwargs

flow_runner

Classes:

FlowRunner(flow, **kwargs)

Aimed for executing flow and maintaining/recording/responding state changes of the flow.

class flowsaber.core.engine.flow_runner.FlowRunner(flow: flowsaber.core.flow.Flow, **kwargs)

Aimed for executing flow and maintaining/recording/responding state changes of the flow.

models