smallpond.execution.task.Task#

class smallpond.execution.task.Task(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#

The base class for all tasks.

Task is the basic unit of work in smallpond. Each task represents a specific operation that takes a series of input datasets and produces an output dataset. Tasks can depend on other tasks, forming a directed acyclic graph (DAG). Tasks can specify resource requirements such as CPU, GPU, and memory limits. Tasks should be idempotent. They can be retried if they fail.

Lifetime of a task object:

  • instantiated at planning time on the scheduler node

  • pickled and sent to a worker node

  • initialize() is called to prepare for execution

  • run() is called to execute the task

  • finalize() or cleanup() is called to release resources

  • pickled and sent back to the scheduler node

__init__(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None) None#

Methods

__init__(ctx, input_deps, partition_infos[, ...])

add_elapsed_time([metric_name])

Start or stop the timer.

adjust_row_group_size(nbytes, num_rows[, ...])

clean_complex_attrs()

clean_output([force])

cleanup()

Called after run() even if there is an exception.

compute_avg_row_size(nbytes, num_rows)

dump()

exec([cq])

finalize()

Called after run() to finalize the processing.

get_partition_info(dimension)

initialize()

Called before run() to prepare for running the task.

inject_fault()

merge_metrics(metrics)

oom([nonzero_exitcode_as_oom])

parquet_kv_metadata_bytes([extra_partitions])

parquet_kv_metadata_str([extra_partitions])

random_float()

random_uint32()

run()

run_on_ray()

Run the task on Ray.

set_memory_limit(soft_limit, hard_limit)

Attributes

ctx

id

node_id

sched_epoch

output_name

output_root

dataset

input_deps

output_deps

perf_metrics

perf_profile

runtime_state

input_datasets

allow_speculative_exec

Whether the task is allowed to be executed by speculative execution.

any_input_empty

cpu_limit

default_output_name

elapsed_time

exception

exec_cq

exec_id

exec_on_scheduler

fail_count

final_output_abspath

finish_time

gpu_limit

key

local_gpu

Return the first GPU granted to this task.

local_gpu_ranks

Return all GPU ranks granted to this task.

local_rank

Return the first GPU rank granted to this task.

location

memory_limit

numa_node

numpy_random_gen

output

output_dirname

output_filename

partition_dims

partition_infos

partition_infos_as_dict

python_random_gen

random_seed_bytes

ray_dataset_path

The path of a pickle file that contains the output dataset of the task.

ray_marker_path

The path of an empty file that is used to determine if the task has been started.

retry_count

runtime_id

runtime_output_abspath

Output data will be produced in this directory at runtime.

self_contained_output

Whether the output of this node is not dependent on any input nodes.

skip_when_any_input_empty

staging_root

If the task has a special output directory, its runtime output directory will be under it.

start_time

status

temp_abspath

temp_output

Whether the output of this node is stored in a temporary directory.

uniform_failure_prob