Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ai/skills/check-upstream/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
**Evaluated and not requiring separate Python exposure:**
- `show_limit` — already covered by `DataFrame.show()`, which provides the same functionality with a simpler API
- `with_param_values` — already covered by the `param_values` argument on `SessionContext.sql()`, which accomplishes the same thing more robustly
- `union_by_name_distinct` — already covered by `DataFrame.union_by_name(distinct=True)`, which provides a more Pythonic API

**How to check:**
1. Fetch the upstream DataFrame documentation page listing all methods
Expand Down
157 changes: 126 additions & 31 deletions crates/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ impl PyDataFrame {
Ok(Self::new(df))
}

/// Apply window function expressions to the DataFrame
#[pyo3(signature = (*exprs))]
fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
let df = self.df.as_ref().clone().window(window_exprs)?;
Ok(Self::new(df))
}

fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
let df = self.df.as_ref().clone().filter(predicate.into())?;
Ok(Self::new(df))
Expand Down Expand Up @@ -804,9 +812,27 @@ impl PyDataFrame {
}

/// Print the query plan
#[pyo3(signature = (verbose=false, analyze=false))]
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyDataFusionResult<()> {
let df = self.df.as_ref().clone().explain(verbose, analyze)?;
#[pyo3(signature = (verbose=false, analyze=false, format=None))]
fn explain(
&self,
py: Python,
verbose: bool,
analyze: bool,
format: Option<&str>,
) -> PyDataFusionResult<()> {
let explain_format = match format {
Some(f) => f
.parse::<datafusion::common::format::ExplainFormat>()
.map_err(|e| {
PyDataFusionError::Common(format!("Invalid explain format '{}': {}", f, e))
})?,
None => datafusion::common::format::ExplainFormat::Indent,
};
let opts = datafusion::logical_expr::ExplainOption::default()
.with_verbose(verbose)
.with_analyze(analyze)
.with_format(explain_format);
let df = self.df.as_ref().clone().explain_with_options(opts)?;
print_dataframe(py, df)
}

Expand Down Expand Up @@ -864,22 +890,14 @@ impl PyDataFrame {
Ok(Self::new(new_df))
}

/// Calculate the distinct union of two `DataFrame`s. The
/// two `DataFrame`s must have exactly the same schema
fn union_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.union_distinct(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}

#[pyo3(signature = (column, preserve_nulls=true))]
fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult<Self> {
// TODO: expose RecursionUnnestOptions
// REF: https://github.com/apache/datafusion/pull/11577
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
#[pyo3(signature = (column, preserve_nulls=true, recursions=None))]
fn unnest_column(
&self,
column: &str,
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> PyDataFusionResult<Self> {
let unnest_options = build_unnest_options(preserve_nulls, recursions);
let df = self
.df
.as_ref()
Expand All @@ -888,15 +906,14 @@ impl PyDataFrame {
Ok(Self::new(df))
}

#[pyo3(signature = (columns, preserve_nulls=true))]
#[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
fn unnest_columns(
&self,
columns: Vec<String>,
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> PyDataFusionResult<Self> {
// TODO: expose RecursionUnnestOptions
// REF: https://github.com/apache/datafusion/pull/11577
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
let unnest_options = build_unnest_options(preserve_nulls, recursions);
let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
let df = self
.df
Expand All @@ -907,21 +924,79 @@ impl PyDataFrame {
}

/// Calculate the intersection of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
fn intersect(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.intersect(py_df.df.as_ref().clone())?;
#[pyo3(signature = (py_df, distinct=false))]
fn intersect(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
let base = self.df.as_ref().clone();
let other = py_df.df.as_ref().clone();
let new_df = if distinct {
base.intersect_distinct(other)?
} else {
base.intersect(other)?
};
Ok(Self::new(new_df))
}

/// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
fn except_all(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self.df.as_ref().clone().except(py_df.df.as_ref().clone())?;
#[pyo3(signature = (py_df, distinct=false))]
fn except_all(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
let base = self.df.as_ref().clone();
let other = py_df.df.as_ref().clone();
let new_df = if distinct {
base.except_distinct(other)?
} else {
base.except(other)?
};
Ok(Self::new(new_df))
}

/// Union two DataFrames matching columns by name
#[pyo3(signature = (py_df, distinct=false))]
fn union_by_name(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
let base = self.df.as_ref().clone();
let other = py_df.df.as_ref().clone();
let new_df = if distinct {
base.union_by_name_distinct(other)?
} else {
base.union_by_name(other)?
};
Ok(Self::new(new_df))
}

/// Deduplicate rows based on specific columns, keeping the first row per group
fn distinct_on(
&self,
on_expr: Vec<PyExpr>,
select_expr: Vec<PyExpr>,
sort_expr: Option<Vec<PySortExpr>>,
) -> PyDataFusionResult<Self> {
let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
let sort_expr = sort_expr.map(to_sort_expressions);
let df = self
.df
.as_ref()
.clone()
.distinct_on(on_expr, select_expr, sort_expr)?;
Ok(Self::new(df))
}

/// Sort by column expressions with ascending order and nulls last
fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
let exprs = exprs.into_iter().map(|e| e.into()).collect();
let df = self.df.as_ref().clone().sort_by(exprs)?;
Ok(Self::new(df))
}

/// Return fully qualified column expressions for the given column names
fn find_qualified_columns(&self, names: Vec<String>) -> PyDataFusionResult<Vec<PyExpr>> {
let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
let qualified = self.df.find_qualified_columns(&name_refs)?;
Ok(qualified
.into_iter()
.map(|q| Expr::Column(Column::from(q)).into())
.collect())
}

/// Write a `DataFrame` to a CSV file.
fn write_csv(
&self,
Expand Down Expand Up @@ -1295,6 +1370,26 @@ impl PyDataFrameWriteOptions {
}
}

fn build_unnest_options(
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> UnnestOptions {
let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
if let Some(recs) = recursions {
opts.recursions = recs
.into_iter()
.map(
|(input, output, depth)| datafusion::common::RecursionUnnestOption {
input_column: datafusion::common::Column::from(input.as_str()),
output_column: datafusion::common::Column::from(output.as_str()),
depth,
},
)
.collect();
}
opts
}

/// Print DataFrame
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
// Get string representation of record batches
Expand Down
33 changes: 33 additions & 0 deletions docs/source/user-guide/common-operations/joins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,36 @@ In contrast to the above example, if we wish to get both columns:
.. ipython:: python

left.join(right, "id", how="inner", coalesce_duplicate_keys=False)

Disambiguating Columns with ``DataFrame.col()``
------------------------------------------------

When both DataFrames contain non-key columns with the same name, you can use
:py:meth:`~datafusion.dataframe.DataFrame.col` on each DataFrame **before** the
join to create fully qualified column references. These references can then be
used in the join predicate and when selecting from the result.

This is especially useful with :py:meth:`~datafusion.dataframe.DataFrame.join_on`,
which accepts expression-based predicates.

.. ipython:: python

left = ctx.from_pydict(
{
"id": [1, 2, 3],
"val": [10, 20, 30],
}
)

right = ctx.from_pydict(
{
"id": [1, 2, 3],
"val": [40, 50, 60],
}
)

joined = left.join_on(
right, left.col("id") == right.col("id"), how="inner"
)

joined.select(left.col("id"), left.col("val"), right.col("val"))
2 changes: 2 additions & 0 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from .dataframe import (
DataFrame,
DataFrameWriteOptions,
ExplainFormat,
InsertOp,
ParquetColumnOptions,
ParquetWriterOptions,
Expand Down Expand Up @@ -82,6 +83,7 @@
"DataFrameWriteOptions",
"Database",
"ExecutionPlan",
"ExplainFormat",
"Expr",
"InsertOp",
"LogicalPlan",
Expand Down
Loading
Loading