API Reference#
Smallpond provides both high-level and low-level APIs.
Note
Currently, smallpond provides two different APIs, supporting dynamic and static construction of data flow graphs respectively. Due to historical reasons, these two APIs use different scheduler backends and support different configuration options.
The High-level API currently uses Ray as the backend, supporting dynamic construction and execution of data flow graphs.
The Low-level API uses a built-in scheduler and only supports one-time execution of static data flow graphs. However, it offers more performance optimizations and richer configuration options.
We are working to merge them so that in the future, you can use a unified high-level API and freely choose between Ray or the built-in scheduler.
High-level API#
The high-level API is centered around DataFrame. It allows dynamic construction of data flow graphs, execution, and result retrieval.
A typical workflow looks like this:
import smallpond
sp = smallpond.init()
df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")
It is recommended to use the DataFrame API.
Low-level API#
In the low-level API, users manually create Nodes to construct static data flow graphs, then submit them to smallpond to generate Tasks and wait for all tasks to complete.
A complete example is shown below.
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan
from smallpond.execution.driver import Driver
def my_pipeline(input_paths: List[str], npartitions: int):
ctx = Context()
dataset = ParquetDataSet(input_paths)
node = DataSourceNode(ctx, dataset)
node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)
node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")
return LogicalPlan(ctx, node)
if __name__ == "__main__":
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-n", "--npartitions", type=int, default=10)
plan = my_pipeline(**driver.get_arguments())
driver.run(plan)
To run this script:
python script.py -i "path/to/*.parquet" -n 10