smallpond.execution.task.HashPartitionTask#
- class smallpond.execution.task.HashPartitionTask(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], npartitions: int, dimension: str, hash_columns: List[str], data_partition_column: str, random_shuffle: bool = False, shuffle_only: bool = False, drop_partition_column=False, use_parquet_writer=False, hive_partitioning=False, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None)#
- __init__(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], npartitions: int, dimension: str, hash_columns: List[str], data_partition_column: str, random_shuffle: bool = False, shuffle_only: bool = False, drop_partition_column=False, use_parquet_writer=False, hive_partitioning=False, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None) None#
Methods
__init__(ctx, input_deps, partition_infos, ...)add_elapsed_time([metric_name])Start or stop the timer.
adjust_row_group_size(nbytes, num_rows[, ...])clean_complex_attrs()clean_output([force])cleanup()Called after run() even if there is an exception.
compute_avg_row_size(nbytes, num_rows)create(engine_type, *args, **kwargs)dump()exec([cq])finalize()Called after run() to finalize the processing.
get_partition_info(dimension)initialize()Called before run() to prepare for running the task.
inject_fault()merge_metrics(metrics)oom([nonzero_exitcode_as_oom])parquet_kv_metadata_bytes([extra_partitions])parquet_kv_metadata_str([extra_partitions])partition(input_dataset)random_float()random_uint32()run()run_on_ray()Run the task on Ray.
set_memory_limit(soft_limit, hard_limit)Attributes
hash_columnsdata_partition_columnrandom_shuffleshuffle_onlydrop_partition_columnuse_parquet_writerhive_partitioningparquet_row_group_sizeparquet_row_group_bytesparquet_dictionary_encodingparquet_compressionparquet_compression_levelpartitioned_datasetsallow_speculative_execWhether the task is allowed to be executed by speculative execution.
any_input_emptycpu_limitctxdatasetdefault_output_namedimensionelapsed_timeexceptionexec_cqexec_idexec_on_schedulerfail_countfinal_output_abspathfinish_timegpu_limitidinput_datasetsinput_depsio_workerskeylocal_gpuReturn the first GPU granted to this task.
local_gpu_ranksReturn all GPU ranks granted to this task.
local_rankReturn the first GPU rank granted to this task.
locationmax_batch_sizememory_limitnode_idnpartitionsnum_workersnuma_nodenumpy_random_genoutputoutput_depsoutput_dirnameoutput_filenameoutput_nameoutput_rootpartition_dimspartition_infospartition_infos_as_dictperf_metricsperf_profilepython_random_genrandom_seed_bytesray_dataset_pathThe path of a pickle file that contains the output dataset of the task.
ray_marker_pathThe path of an empty file that is used to determine if the task has been started.
retry_countruntime_idruntime_output_abspathOutput data will be produced in this directory at runtime.
runtime_statesched_epochself_contained_outputWhether the output of this node is not dependent on any input nodes.
skip_when_any_input_emptystaging_rootIf the task has a special output directory, its runtime output directory will be under it.
start_timestatustemp_abspathtemp_outputWhether the output of this node is stored in a temporary directory.
uniform_failure_probwrite_buffer_size