smallpond.execution.task.Task#
- class smallpond.execution.task.Task(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#
The base class for all tasks.
Task is the basic unit of work in smallpond. Each task represents a specific operation that takes a series of input datasets and produces an output dataset. Tasks can depend on other tasks, forming a directed acyclic graph (DAG). Tasks can specify resource requirements such as CPU, GPU, and memory limits. Tasks should be idempotent. They can be retried if they fail.
Lifetime of a task object:
instantiated at planning time on the scheduler node
pickled and sent to a worker node
initialize() is called to prepare for execution
run() is called to execute the task
finalize() or cleanup() is called to release resources
pickled and sent back to the scheduler node
- __init__(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None) None#
Methods
__init__(ctx, input_deps, partition_infos[, ...])add_elapsed_time([metric_name])Start or stop the timer.
adjust_row_group_size(nbytes, num_rows[, ...])clean_complex_attrs()clean_output([force])cleanup()Called after run() even if there is an exception.
compute_avg_row_size(nbytes, num_rows)dump()exec([cq])finalize()Called after run() to finalize the processing.
get_partition_info(dimension)initialize()Called before run() to prepare for running the task.
inject_fault()merge_metrics(metrics)oom([nonzero_exitcode_as_oom])parquet_kv_metadata_bytes([extra_partitions])parquet_kv_metadata_str([extra_partitions])random_float()random_uint32()run()run_on_ray()Run the task on Ray.
set_memory_limit(soft_limit, hard_limit)Attributes
ctxidnode_idsched_epochoutput_nameoutput_rootdatasetinput_depsoutput_depsperf_metricsperf_profileruntime_stateinput_datasetsallow_speculative_execWhether the task is allowed to be executed by speculative execution.
any_input_emptycpu_limitdefault_output_nameelapsed_timeexceptionexec_cqexec_idexec_on_schedulerfail_countfinal_output_abspathfinish_timegpu_limitkeylocal_gpuReturn the first GPU granted to this task.
local_gpu_ranksReturn all GPU ranks granted to this task.
local_rankReturn the first GPU rank granted to this task.
locationmemory_limitnuma_nodenumpy_random_genoutputoutput_dirnameoutput_filenamepartition_dimspartition_infospartition_infos_as_dictpython_random_genrandom_seed_bytesray_dataset_pathThe path of a pickle file that contains the output dataset of the task.
ray_marker_pathThe path of an empty file that is used to determine if the task has been started.
retry_countruntime_idruntime_output_abspathOutput data will be produced in this directory at runtime.
self_contained_outputWhether the output of this node is not dependent on any input nodes.
skip_when_any_input_emptystaging_rootIf the task has a special output directory, its runtime output directory will be under it.
start_timestatustemp_abspathtemp_outputWhether the output of this node is stored in a temporary directory.
uniform_failure_prob