smallpond.logical.node.SqlEngineNode#

class smallpond.logical.node.SqlEngineNode(ctx: Context, input_deps: Tuple[Node, ...], sql_query: str | Iterable[str], *, udfs: List[str | UserDefinedFunction] | None = None, per_thread_output=True, materialize_output=True, materialize_in_memory=False, relax_memory_if_oom=None, batched_processing=False, extension_paths: List[str] | None = None, udf_module_paths: List[str] | None = None, enable_temp_directory=False, parquet_row_group_size: int | None = None, parquet_dictionary_encoding: bool = False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None, cpu_overcommit_ratio: float = 1.0, memory_overcommit_ratio: float = 0.9)#

Run SQL query against the outputs of input_deps.

__init__(ctx: Context, input_deps: Tuple[Node, ...], sql_query: str | Iterable[str], *, udfs: List[str | UserDefinedFunction] | None = None, per_thread_output=True, materialize_output=True, materialize_in_memory=False, relax_memory_if_oom=None, batched_processing=False, extension_paths: List[str] | None = None, udf_module_paths: List[str] | None = None, enable_temp_directory=False, parquet_row_group_size: int | None = None, parquet_dictionary_encoding: bool = False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None, cpu_overcommit_ratio: float = 1.0, memory_overcommit_ratio: float = 0.9) None#

Construct a SqlEngineNode. See Node.__init__() to find comments on other parameters.

Parameters#

sql_query

SQL query string or a list of query strings, currently DuckDB query syntax is supported, see https://duckdb.org/docs/sql/query_syntax/select. All queries are executed. But only the results of the last query is persisted as the output.

The output dataset of each input_deps can be referenced as {0}, {1}, {2}, etc. For example, the following query counts the total number of product items from {0} that have category_id included in {1}.

select count(product_item.id) from {0}
where product_item.id > 0 and
    product_item.category_id in ( select category_id from {1} )

The following placeholders are supported in the query:

  • {batch_index}: the index of the current batch.

  • {query_index}: the index of the current query.

  • {rand_seed}: the random seed of the current query.

  • {__data_partition__}: the index of the current data partition.

udfs, optional

A list of user-defined functions to be referenced in sql_query. Each element can be one of the following:

  • A @udf decorated function.

  • A path to a duckdb extension file, e.g. path/to/udf.duckdb_extension.

  • A string returned by ctx.create_function() or ctx.create_duckdb_extension().

If udfs is not empty, the resource requirement is downgraded to min(cpu_limit, 3) and min(memory_limit, 50*GB) since UDF execution in duckdb is not highly paralleled.

per_thread_output, optional

If the final number of Parquet files is not important, writing one file per thread can significantly improve performance. Also see https://duckdb.org/docs/data/parquet/tips.html#enabling-per_thread_output.

materialize_output, optional

Query result is materialized to the underlying filesystem as parquet files if enabled.

materialize_in_memory, optional

Materialize query result in memory before writing to the underlying filesystem, by default False.

relax_memory_if_oom, optional

Double the memory limit and retry if sql engine OOM, by default False.

batched_processing, optional

Split input dataset into multiple batches, each of which fits into memory limit, and then run sql query against each batch. Enabled only if len(input_deps) == 1.

extension_paths, optional

A list of duckdb extension paths to be loaded at runtime.

enable_temp_directory, optional

Write temp files when memory is low, by default False.

parquet_row_group_size, optional

The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size.

parquet_dictionary_encoding, optional

Specify if we should use dictionary encoding in general or only for some columns. When encoding the column, if the dictionary size is too large, the column will fallback to PLAIN encoding. By default, dictionary encoding is enabled for all columns. Set it to False to disable dictionary encoding, or pass in column names to enable it only for specific columns. eg: parquet_dictionary_encoding=[‘column_1’]

cpu_limit, optional

The max number of CPUs used by the SQL engine.

memory_limit, optional

The max memory used by the SQL engine.

cpu_overcommit_ratio, optional

The effective number of threads used by the SQL engine is: cpu_limit * cpu_overcommit_ratio.

memory_overcommit_ratio, optional

The effective size of memory used by the SQL engine is: memory_limit * memory_overcommit_ratio.

Methods

__init__(ctx, input_deps, sql_query, *[, ...])

Construct a SqlEngineNode.

add_perf_metrics(name, value)

create_task(*args, **kwargs)

get_perf_stats(name)

slim_copy()

spawn(*args, **kwargs)

task_factory(task_builder)

Attributes

default_cpu_limit

default_memory_limit

default_row_group_size

enable_resource_boost

max_udf_cpu_limit

num_partitions

oneline_query