smallpond.logical.node.PythonScriptNode#

class smallpond.logical.node.PythonScriptNode(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[DataSet], str], bool] | None = None, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#

Run Python code to process the input datasets with PythonScriptNode.process(…).

If the code needs to access attributes of runtime task, e.g. local_rank, random_seed_long, numpy_random_gen,

  • create a subclass of PythonScriptTask, which implements PythonScriptTask.process(…),

  • override PythonScriptNode.spawn(…) and return an instance of the subclass.

Or use runtime_ctx.task in process(runtime_ctx: RuntimeContext, …) function.

__init__(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[DataSet], str], bool] | None = None, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#

Construct a PythonScriptNode. See Node.__init__() to find comments on other parameters.

Parameters#

process_func, optional

User-defined process function, which should have the same signature as self.process(…). If user-defined function has extra parameters, use functools.partial(…) to bind arguments. See test_partial_process_func in test/test_execution.py for examples of usage.

Methods

__init__(ctx, input_deps, *[, process_func, ...])

Construct a PythonScriptNode.

add_perf_metrics(name, value)

create_task(*args, **kwargs)

get_perf_stats(name)

process(runtime_ctx, input_datasets, output_path)

Put user-defined code here.

slim_copy()

spawn(*args, **kwargs)

Return an instance of subclass of PythonScriptTask.

task_factory(task_builder)

Attributes

enable_resource_boost

num_partitions