smallpond.logical.node.ArrowComputeNode#
- class smallpond.logical.node.ArrowComputeNode(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[Table]], Table] | None = None, parquet_row_group_size: int | None = None, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#
Run Python code to process the input datasets, which have been loaded as Apache Arrow tables. See https://arrow.apache.org/docs/python/generated/pyarrow.Table.html.
If the code needs to access attributes of runtime task, e.g. local_rank, random_seed_long, numpy_random_gen,
create a subclass of ArrowComputeTask, which implements ArrowComputeTask.process(…),
override ArrowComputeNode.spawn(…) and return an instance of the subclass.
Or use runtime_ctx.task in process(runtime_ctx: RuntimeContext, …) function.
- __init__(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[Table]], Table] | None = None, parquet_row_group_size: int | None = None, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None) None #
Construct a ArrowComputeNode. See
Node.__init__()
to find comments on other parameters.Parameters#
- process_func, optional
User-defined process function, which should have the same signature as self.process(…). If user-defined function has extra parameters, use functools.partial(…) to bind arguments. See test_partial_process_func in test/test_execution.py for examples of usage.
- parquet_row_group_size, optional
The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size.
- parquet_dictionary_encoding, optional
Specify if we should use dictionary encoding in general or only for some columns. See use_dictionary in https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html.
- use_duckdb_reader, optional
Use duckdb (instead of pyarrow parquet module) to load parquet files as arrow table.
- cpu_limit, optional
The max number of CPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.
- gpu_limit, optional
The max number of GPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.
- memory_limit, optional
The max memory would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.
Methods
__init__
(ctx, input_deps, *[, process_func, ...])Construct a ArrowComputeNode.
add_perf_metrics
(name, value)create_task
(*args, **kwargs)get_perf_stats
(name)process
(runtime_ctx, input_tables)Put user-defined code here.
slim_copy
()spawn
(*args, **kwargs)Return an instance of subclass of ArrowComputeTask.
task_factory
(task_builder)Attributes
default_row_group_size
enable_resource_boost
num_partitions