smallpond.logical.node.SqlEngineNode#
- class smallpond.logical.node.SqlEngineNode(ctx: Context, input_deps: Tuple[Node, ...], sql_query: str | Iterable[str], *, udfs: List[str | UserDefinedFunction] | None = None, per_thread_output=True, materialize_output=True, materialize_in_memory=False, relax_memory_if_oom=None, batched_processing=False, extension_paths: List[str] | None = None, udf_module_paths: List[str] | None = None, enable_temp_directory=False, parquet_row_group_size: int | None = None, parquet_dictionary_encoding: bool = False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None, cpu_overcommit_ratio: float = 1.0, memory_overcommit_ratio: float = 0.9)#
Run SQL query against the outputs of input_deps.
- __init__(ctx: Context, input_deps: Tuple[Node, ...], sql_query: str | Iterable[str], *, udfs: List[str | UserDefinedFunction] | None = None, per_thread_output=True, materialize_output=True, materialize_in_memory=False, relax_memory_if_oom=None, batched_processing=False, extension_paths: List[str] | None = None, udf_module_paths: List[str] | None = None, enable_temp_directory=False, parquet_row_group_size: int | None = None, parquet_dictionary_encoding: bool = False, parquet_compression='ZSTD', parquet_compression_level=3, output_name: str | None = None, output_path: str | None = None, cpu_limit: int | None = None, memory_limit: int | None = None, cpu_overcommit_ratio: float = 1.0, memory_overcommit_ratio: float = 0.9) None #
Construct a SqlEngineNode. See
Node.__init__()
to find comments on other parameters.Parameters#
- sql_query
SQL query string or a list of query strings, currently DuckDB query syntax is supported, see https://duckdb.org/docs/sql/query_syntax/select. All queries are executed. But only the results of the last query is persisted as the output.
The output dataset of each input_deps can be referenced as {0}, {1}, {2}, etc. For example, the following query counts the total number of product items from {0} that have category_id included in {1}.
select count(product_item.id) from {0} where product_item.id > 0 and product_item.category_id in ( select category_id from {1} )
The following placeholders are supported in the query:
{batch_index}: the index of the current batch.
{query_index}: the index of the current query.
{rand_seed}: the random seed of the current query.
{__data_partition__}: the index of the current data partition.
- udfs, optional
A list of user-defined functions to be referenced in sql_query. Each element can be one of the following:
A @udf decorated function.
A path to a duckdb extension file, e.g. path/to/udf.duckdb_extension.
A string returned by ctx.create_function() or ctx.create_duckdb_extension().
If udfs is not empty, the resource requirement is downgraded to min(cpu_limit, 3) and min(memory_limit, 50*GB) since UDF execution in duckdb is not highly paralleled.
- per_thread_output, optional
If the final number of Parquet files is not important, writing one file per thread can significantly improve performance. Also see https://duckdb.org/docs/data/parquet/tips.html#enabling-per_thread_output.
- materialize_output, optional
Query result is materialized to the underlying filesystem as parquet files if enabled.
- materialize_in_memory, optional
Materialize query result in memory before writing to the underlying filesystem, by default False.
- relax_memory_if_oom, optional
Double the memory limit and retry if sql engine OOM, by default False.
- batched_processing, optional
Split input dataset into multiple batches, each of which fits into memory limit, and then run sql query against each batch. Enabled only if len(input_deps) == 1.
- extension_paths, optional
A list of duckdb extension paths to be loaded at runtime.
- enable_temp_directory, optional
Write temp files when memory is low, by default False.
- parquet_row_group_size, optional
The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size.
- parquet_dictionary_encoding, optional
Specify if we should use dictionary encoding in general or only for some columns. When encoding the column, if the dictionary size is too large, the column will fallback to PLAIN encoding. By default, dictionary encoding is enabled for all columns. Set it to False to disable dictionary encoding, or pass in column names to enable it only for specific columns. eg: parquet_dictionary_encoding=[‘column_1’]
- cpu_limit, optional
The max number of CPUs used by the SQL engine.
- memory_limit, optional
The max memory used by the SQL engine.
- cpu_overcommit_ratio, optional
The effective number of threads used by the SQL engine is: cpu_limit * cpu_overcommit_ratio.
- memory_overcommit_ratio, optional
The effective size of memory used by the SQL engine is: memory_limit * memory_overcommit_ratio.
Methods
__init__
(ctx, input_deps, sql_query, *[, ...])Construct a SqlEngineNode.
add_perf_metrics
(name, value)create_task
(*args, **kwargs)get_perf_stats
(name)slim_copy
()spawn
(*args, **kwargs)task_factory
(task_builder)Attributes
default_cpu_limit
default_memory_limit
default_row_group_size
enable_resource_boost
max_udf_cpu_limit
num_partitions
oneline_query