smallpond.execution.task.ArrowComputeTask#
- class smallpond.execution.task.ArrowComputeTask(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], process_func: Callable[[RuntimeContext, List[Table]], Table] | None = None, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, gpu_limit: float | None = None, memory_limit: int | None = None)#
- __init__(ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], process_func: Callable[[RuntimeContext, List[Table]], Table] | None = None, parquet_row_group_size: int = 122880, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, gpu_limit: float | None = None, memory_limit: int | None = None) None#
Methods
__init__(ctx, input_deps, partition_infos[, ...])add_elapsed_time([metric_name])Start or stop the timer.
adjust_row_group_size(nbytes, num_rows[, ...])clean_complex_attrs()clean_output([force])cleanup()Called after run() even if there is an exception.
compute_avg_row_size(nbytes, num_rows)create_input_views(conn, input_datasets[, ...])dump()dump_output(output_table)exec([cq])exec_query(conn, query_statement[, ...])finalize()Called after run() to finalize the processing.
get_partition_info(dimension)initialize()Called before run() to prepare for running the task.
inject_fault()merge_metrics(metrics)oom([nonzero_exitcode_as_oom])parquet_kv_metadata_bytes([extra_partitions])parquet_kv_metadata_str([extra_partitions])prepare_connection(conn)process(runtime_ctx, input_tables)This method can be overridden in subclass of ArrowComputeTask.
random_float()random_uint32()run()run_on_ray()Run the task on Ray.
set_memory_limit(soft_limit, hard_limit)Attributes
process_funcparquet_row_group_sizeparquet_row_group_bytesparquet_dictionary_encodinguse_duckdb_readerallow_speculative_execWhether the task is allowed to be executed by speculative execution.
any_input_emptycompression_level_strcompression_optionscompression_type_strcpu_limitcpu_overcommit_ratioctxdatasetdefault_output_nameelapsed_timeenable_temp_directoryexceptionexec_cqexec_idexec_on_schedulerfail_countfinal_output_abspathfinish_timegpu_limitidinput_datasetsinput_depsinput_udfsinput_view_indexkeylocal_gpuReturn the first GPU granted to this task.
local_gpu_ranksReturn all GPU ranks granted to this task.
local_rankReturn the first GPU rank granted to this task.
locationmemory_limitmemory_overcommit_rationode_idnuma_nodenumpy_random_genoutputoutput_depsoutput_dirnameoutput_filenameoutput_nameoutput_rootparquet_compressionparquet_compression_levelpartition_dimspartition_infospartition_infos_as_dictperf_metricsperf_profilepython_random_genquery_udfsrand_seed_floatrand_seed_uint32random_seed_bytesray_dataset_pathThe path of a pickle file that contains the output dataset of the task.
ray_marker_pathThe path of an empty file that is used to determine if the task has been started.
retry_countruntime_idruntime_output_abspathOutput data will be produced in this directory at runtime.
runtime_statesched_epochself_contained_outputWhether the output of this node is not dependent on any input nodes.
skip_when_any_input_emptystaging_rootIf the task has a special output directory, its runtime output directory will be under it.
start_timestatustemp_abspathtemp_outputWhether the output of this node is stored in a temporary directory.
udfsuniform_failure_prob