DataFrame#

DataFrame is the main class in smallpond. It represents a lazily computed, partitioned data set.

A typical workflow looks like this:

import smallpond

sp = smallpond.init()

df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")

Initialization#

smallpond.init([job_id, job_time, job_name, ...])

Initialize smallpond environment.

Loading Data#

Session.from_items(items)

Create a DataFrame from a list of local Python objects.

Session.from_arrow(table)

Create a DataFrame from a pyarrow Table.

Session.from_pandas(df)

Create a DataFrame from a pandas DataFrame.

Session.read_csv(paths, schema[, delim])

Create a DataFrame from CSV files.

Session.read_json(paths, schema)

Create a DataFrame from JSON files.

Session.read_parquet(paths[, recursive, ...])

Create a DataFrame from Parquet files.

Partitioning Data#

DataFrame.repartition(npartitions[, ...])

Repartition the data into the given number of partitions.

Transformations#

Apply transformations and return a new DataFrame.

Session.partial_sql(query, *inputs, **kwargs)

Execute a SQL query on each partition of the input DataFrames.

DataFrame.map(sql_or_func, *[, schema])

Apply a function to each row.

DataFrame.map_batches(func, *[, batch_size])

Apply the given function to batches of data.

DataFrame.flat_map(sql_or_func, *[, schema])

Apply a function to each row and flatten the result.

DataFrame.filter(sql_or_func, **kwargs)

Filter out rows that don't satisfy the given predicate.

DataFrame.limit(limit)

Limit the number of rows to the given number.

DataFrame.partial_sort(by, **kwargs)

Sort rows by the given columns in each partition.

DataFrame.random_shuffle(**kwargs)

Randomly shuffle all rows globally.

Consuming Data#

These operations will trigger execution of the lazy transformations performed on this DataFrame.

DataFrame.count()

Count the number of rows.

DataFrame.take(limit)

Return up to limit rows.

DataFrame.take_all()

Return all rows.

DataFrame.to_arrow()

Convert to an arrow Table.

DataFrame.to_pandas()

Convert to a pandas DataFrame.

DataFrame.write_parquet(path)

Write data to a series of parquet files under the given path.

DataFrame.write_parquet_lazy(path)

Write data to a series of parquet files under the given path.

Execution#

DataFrames are lazily computed. You can use these methods to manually trigger computation.

DataFrame.compute()

Compute the data.

DataFrame.is_computed()

Check if the data is ready on disk.

DataFrame.recompute()

Always recompute the data regardless of whether it's already computed.

Session.wait(*dfs)

Wait for all DataFrames to be computed.