smallpond.execution.task.HashPartitionArrowTask#

class smallpond.execution.task.HashPartitionArrowTask(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], npartitions: int, dimension: str, hash_columns: List[str], data_partition_column: str, random_shuffle: bool = False, shuffle_only: bool = False, drop_partition_column=False, use_parquet_writer=False, hive_partitioning=False, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None)#
__init__(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], npartitions: int, dimension: str, hash_columns: List[str], data_partition_column: str, random_shuffle: bool = False, shuffle_only: bool = False, drop_partition_column=False, use_parquet_writer=False, hive_partitioning=False, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None) None#

Methods

__init__(ctx, input_deps, partition_infos, ...)

add_elapsed_time([metric_name])

Start or stop the timer.

adjust_row_group_size(nbytes, num_rows[, ...])

clean_complex_attrs()

clean_output([force])

cleanup()

Called after run() even if there is an exception.

compute_avg_row_size(nbytes, num_rows)

create(engine_type, *args, **kwargs)

dump()

exec([cq])

finalize()

Called after run() to finalize the processing.

get_partition_info(dimension)

initialize()

Called before run() to prepare for running the task.

inject_fault()

merge_metrics(metrics)

oom([nonzero_exitcode_as_oom])

parquet_kv_metadata_bytes([extra_partitions])

parquet_kv_metadata_str([extra_partitions])

partition(batch_index, input_dataset)

random_float()

random_uint32()

run()

run_on_ray()

Run the task on Ray.

set_memory_limit(soft_limit, hard_limit)

Attributes

hash_columns

data_partition_column

random_shuffle

shuffle_only

drop_partition_column

use_parquet_writer

hive_partitioning

parquet_row_group_size

parquet_row_group_bytes

parquet_dictionary_encoding

parquet_compression

parquet_compression_level

partitioned_datasets

allow_speculative_exec

Whether the task is allowed to be executed by speculative execution.

any_input_empty

cpu_limit

ctx

dataset

default_output_name

dimension

elapsed_time

exception

exec_cq

exec_id

exec_on_scheduler

fail_count

final_output_abspath

finish_time

fixed_rand_seeds

gpu_limit

id

input_datasets

input_deps

io_workers

key

local_gpu

Return the first GPU granted to this task.

local_gpu_ranks

Return all GPU ranks granted to this task.

local_rank

Return the first GPU rank granted to this task.

location

max_batch_size

memory_limit

node_id

npartitions

num_workers

numa_node

numpy_random_gen

output

output_deps

output_dirname

output_filename

output_name

output_root

partition_dims

partition_infos

partition_infos_as_dict

perf_metrics

perf_profile

python_random_gen

random_seed_bytes

ray_dataset_path

The path of a pickle file that contains the output dataset of the task.

ray_marker_path

The path of an empty file that is used to determine if the task has been started.

retry_count

runtime_id

runtime_output_abspath

Output data will be produced in this directory at runtime.

runtime_state

sched_epoch

self_contained_output

Whether the output of this node is not dependent on any input nodes.

skip_when_any_input_empty

staging_root

If the task has a special output directory, its runtime output directory will be under it.

start_time

status

temp_abspath

temp_output

Whether the output of this node is stored in a temporary directory.

uniform_failure_prob

write_buffer_size