Nodes#

Nodes represent the fundamental building blocks of a data processing pipeline. Each node encapsulates a specific operation or transformation that can be applied to a dataset. Nodes can be chained together to form a logical plan, which is a directed acyclic graph (DAG) of nodes that represent the overall data processing workflow.

A typical workflow to create a logical plan is as follows:

# Create a global context
ctx = Context()

# Create a dataset
dataset = ParquetDataSet("path/to/dataset/*.parquet")

# Create a data source node
node = DataSourceNode(ctx, dataset)

# Partition the data
node = DataSetPartitionNode(ctx, (node,), npartitions=2)

# Create a SQL engine node to transform the data
node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")

# Create a logical plan from the root node
plan = LogicalPlan(ctx, node)

You can then create tasks from the logical plan, see Tasks.

Notable properties of Node:

  1. Nodes are partitioned. Each Node generates a series of tasks, with each task processing one partition of data.

  2. The input and output of a Node are a series of partitioned Datasets. A Node may write data to shared storage and return a new Dataset, or it may simply recombine the input Datasets.

Context#

Context()

Global context for each logical plan.

NodeId

A unique identifier for each node.

LogicalPlan#

LogicalPlan(ctx, root_node)

The logical plan that defines a directed acyclic computation graph.

LogicalPlanVisitor(*args, **kwds)

Visit the nodes of a logcial plan in depth-first order.

Nodes#

Node(ctx, input_deps[, output_name, ...])

The base class for all nodes.

DataSetPartitionNode(ctx, input_deps, ...[, ...])

Partition the outputs of input_deps into n partitions.

ArrowBatchNode(ctx, input_deps, *[, ...])

Run user-defined code to process the input datasets as a series of arrow tables.

ArrowComputeNode(ctx, input_deps, *[, ...])

Run Python code to process the input datasets, which have been loaded as Apache Arrow tables.

ArrowStreamNode(ctx, input_deps, *[, ...])

Run Python code to process the input datasets, which have been loaded as RecordBatchReader.

ConsolidateNode(ctx, input_dep, dimensions)

Consolidate partitions into larger ones.

DataSinkNode(ctx, input_deps, output_path[, ...])

Collect the output files of input_deps to output_path.

DataSourceNode(ctx, dataset)

All inputs of a logical plan are represented as DataSourceNode.

EvenlyDistributedPartitionNode(ctx, ...[, ...])

Evenly distribute the output files or rows of input_deps into n partitions.

HashPartitionNode(ctx, input_deps, npartitions)

Partition the outputs of input_deps into n partitions based on the hash values of hash_columns.

LimitNode(ctx, input_dep, limit)

Limit the number of rows of the output of an input node.

LoadPartitionedDataSetNode(ctx, input_deps, ...)

Load existing partitioned dataset (only parquet files are supported).

PandasBatchNode(ctx, input_deps, *[, ...])

Run Python code to process the input datasets as a series of pandas DataFrames.

PandasComputeNode(ctx, input_deps, *[, ...])

Run Python code to process the input datasets as a single pandas DataFrame.

PartitionNode(ctx, input_deps, npartitions)

The base class for all partition nodes.

ProjectionNode(ctx, input_dep[, columns, ...])

Select columns from output of an input node.

PythonScriptNode(ctx, input_deps, *[, ...])

Run Python code to process the input datasets with PythonScriptNode.process(...).

RangePartitionNode(ctx, input_deps, split_points)

Partition the outputs of input_deps into partitions defined by split_points.

RepeatPartitionNode(ctx, input_deps, ...[, ...])

Create a new partition dimension by repeating the input_deps.

RootNode(ctx, input_deps)

A virtual node that assembles multiple nodes and outputs nothing.

ShuffleNode(ctx, input_deps, npartitions[, ...])

SqlEngineNode(ctx, input_deps, sql_query, *)

Run SQL query against the outputs of input_deps.

UnionNode(ctx, input_deps)

Union two or more nodes into one flow of data.

UserDefinedPartitionNode(ctx, input_deps, ...)

Distribute the output files or rows of input_deps into n partitions based on user code.

UserPartitionedDataSourceNode(ctx, ...[, ...])