smallpond.logical.node.PandasBatchNode#

class smallpond.logical.node.PandasBatchNode(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[RecordBatchReader]], Iterable[Table]] | None = None, background_io_thread=True, streaming_batch_size: int | None = None, secs_checkpoint_interval: int | None = None, parquet_row_group_size: int | None = None, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None)#

Run Python code to process the input datasets as a series of pandas DataFrames.

__init__(ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[RecordBatchReader]], Iterable[Table]] | None = None, background_io_thread=True, streaming_batch_size: int | None = None, secs_checkpoint_interval: int | None = None, parquet_row_group_size: int | None = None, parquet_dictionary_encoding=False, parquet_compression='ZSTD', parquet_compression_level=3, use_duckdb_reader=False, output_name: str | None = None, output_path: str | None = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: int | None = None) None#

Construct a ArrowStreamNode. See Node.__init__() to find comments on other parameters.

Parameters#

process_func, optional

User-defined process function, which should have the same signature as self.process(…). If user-defined function has extra parameters, use functools.partial(…) to bind arguments. See test_partial_process_func in test/test_execution.py for examples of usage.

background_io_thread, optional

Create a background IO thread for read/write.

streaming_batch_size, optional

Split the input datasets into batches, each of which has length less or equal to streaming_batch_size.

secs_checkpoint_interval, optional

Create a checkpoint of the stream task every secs_checkpoint_interval seconds.

parquet_row_group_size, optional

The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size.

parquet_dictionary_encoding, optional

Specify if we should use dictionary encoding in general or only for some columns. See use_dictionary in https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html.

use_duckdb_reader, optional

Use duckdb (instead of pyarrow parquet module) to load parquet files as arrow table.

cpu_limit, optional

The max number of CPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.

gpu_limit, optional

The max number of GPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.

memory_limit, optional

The max memory would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit.

Methods

__init__(ctx, input_deps, *[, process_func, ...])

Construct a ArrowStreamNode.

add_perf_metrics(name, value)

create_task(*args, **kwargs)

get_perf_stats(name)

process(runtime_ctx, input_dfs)

Put user-defined code here.

slim_copy()

spawn(*args, **kwargs)

Return an instance of subclass of ArrowStreamTask.

task_factory(task_builder)

Attributes

default_batch_size

default_row_group_size

default_secs_checkpoint_interval

enable_resource_boost

num_partitions