Skip to content

Commit dc509c3

Browse files
committed
Merge branch 'apache:main' into fix/support-duplicate-column-names-6543
2 parents db4927c + 2b986c8 commit dc509c3

File tree

37 files changed

+1272
-537
lines changed

37 files changed

+1272
-537
lines changed

datafusion-examples/README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,19 @@ cargo run --example dataframe -- dataframe
8888

8989
#### Category: Single Process
9090

91-
| Subcommand | File Path | Description |
92-
| -------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------ |
93-
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94-
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
95-
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
96-
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
97-
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
98-
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
99-
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
100-
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
101-
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
102-
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
91+
| Subcommand | File Path | Description |
92+
| ---------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------- |
93+
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94+
| in_memory_object_store | [`data_io/in_memory_object_store.rs`](examples/data_io/in_memory_object_store.rs) | Read CSV from an in-memory object store (pattern applies to JSON/Parquet) |
95+
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
96+
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
97+
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
98+
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
99+
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
100+
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
101+
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
102+
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
103+
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
103104

104105
## DataFrame Examples
105106

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! See `main.rs` for how to run it.
19+
//!
20+
//! This follows the recommended approach: implement the `ObjectStore` trait
21+
//! (or use an existing implementation), register it with DataFusion, and then
22+
//! read a URL "path" from that store.
23+
//! See the in-memory reference implementation:
24+
//! https://docs.rs/object_store/latest/object_store/memory/struct.InMemory.html
25+
26+
use std::sync::Arc;
27+
28+
use arrow::datatypes::{DataType, Field, Schema};
29+
use datafusion::assert_batches_eq;
30+
use datafusion::common::Result;
31+
use datafusion::execution::object_store::ObjectStoreUrl;
32+
use datafusion::prelude::{CsvReadOptions, SessionContext};
33+
use object_store::memory::InMemory;
34+
use object_store::path::Path;
35+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
36+
37+
/// Demonstrates reading CSV data from an in-memory object store.
38+
///
39+
/// The same pattern applies to JSON/Parquet: register a store for a URL
40+
/// prefix, write bytes into the store, then read via that URL.
41+
pub async fn in_memory_object_store() -> Result<()> {
42+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
43+
let ctx = SessionContext::new();
44+
let object_store_url = ObjectStoreUrl::parse("memory://")?;
45+
// Register a URL prefix to route reads through this object store.
46+
ctx.register_object_store(object_store_url.as_ref(), Arc::clone(&store));
47+
48+
let schema = Schema::new(vec![
49+
Field::new("id", DataType::Int64, false),
50+
Field::new("name", DataType::Utf8, false),
51+
]);
52+
53+
println!("=== CSV from memory ===");
54+
let csv_path = Path::from("/people.csv");
55+
let csv_data = b"id,name\n1,Alice\n2,Bob\n";
56+
// Write bytes into the in-memory object store.
57+
store
58+
.put(&csv_path, PutPayload::from_static(csv_data))
59+
.await?;
60+
// Read using the URL that matches the registered prefix.
61+
let csv = ctx
62+
.read_csv(
63+
"memory:///people.csv",
64+
CsvReadOptions::new().schema(&schema),
65+
)
66+
.await?
67+
.collect()
68+
.await?;
69+
#[rustfmt::skip]
70+
let expected = [
71+
"+----+-------+",
72+
"| id | name |",
73+
"+----+-------+",
74+
"| 1 | Alice |",
75+
"| 2 | Bob |",
76+
"+----+-------+",
77+
];
78+
assert_batches_eq!(expected, &csv);
79+
80+
Ok(())
81+
}

datafusion-examples/examples/data_io/main.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//!
2222
//! ## Usage
2323
//! ```bash
24-
//! cargo run --example data_io -- [all|catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
24+
//! cargo run --example data_io -- [all|catalog|in_memory_object_store|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
2525
//! ```
2626
//!
2727
//! Each subcommand runs a corresponding example:
@@ -30,6 +30,9 @@
3030
//! - `catalog`
3131
//! (file: catalog.rs, desc: Register tables into a custom catalog)
3232
//!
33+
//! - `in_memory_object_store`
34+
//! (file: in_memory_object_store.rs, desc: Read CSV from an in-memory object store (pattern applies to JSON/Parquet))
35+
//!
3336
//! - `json_shredding`
3437
//! (file: json_shredding.rs, desc: Implement filter rewriting for JSON shredding)
3538
//!
@@ -58,6 +61,7 @@
5861
//! (file: remote_catalog.rs, desc: Interact with a remote catalog)
5962
6063
mod catalog;
64+
mod in_memory_object_store;
6165
mod json_shredding;
6266
mod parquet_advanced_index;
6367
mod parquet_embedded_index;
@@ -77,6 +81,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames};
7781
enum ExampleKind {
7882
All,
7983
Catalog,
84+
InMemoryObjectStore,
8085
JsonShredding,
8186
ParquetAdvIdx,
8287
ParquetEmbIdx,
@@ -104,6 +109,9 @@ impl ExampleKind {
104109
}
105110
}
106111
ExampleKind::Catalog => catalog::catalog().await?,
112+
ExampleKind::InMemoryObjectStore => {
113+
in_memory_object_store::in_memory_object_store().await?
114+
}
107115
ExampleKind::JsonShredding => json_shredding::json_shredding().await?,
108116
ExampleKind::ParquetAdvIdx => {
109117
parquet_advanced_index::parquet_advanced_index().await?

datafusion-examples/examples/query_planning/expr_api.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,10 @@ fn simplify_demo() -> Result<()> {
175175
// the ExecutionProps carries information needed to simplify
176176
// expressions, such as the current time (to evaluate `now()`
177177
// correctly)
178-
let context = SimplifyContext::default()
178+
let context = SimplifyContext::builder()
179179
.with_schema(schema)
180-
.with_current_time();
180+
.with_current_time()
181+
.build();
181182
let simplifier = ExprSimplifier::new(context);
182183

183184
// And then call the simplify_expr function:
@@ -192,9 +193,10 @@ fn simplify_demo() -> Result<()> {
192193

193194
// here are some other examples of what DataFusion is capable of
194195
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
195-
let context = SimplifyContext::default()
196+
let context = SimplifyContext::builder()
196197
.with_schema(Arc::clone(&schema))
197-
.with_current_time();
198+
.with_current_time()
199+
.build();
198200
let simplifier = ExprSimplifier::new(context);
199201

200202
// basic arithmetic simplification
@@ -554,9 +556,10 @@ fn type_coercion_demo() -> Result<()> {
554556
assert!(physical_expr.evaluate(&batch).is_ok());
555557

556558
// 2. Type coercion with `ExprSimplifier::coerce`.
557-
let context = SimplifyContext::default()
559+
let context = SimplifyContext::builder()
558560
.with_schema(Arc::new(df_schema.clone()))
559-
.with_current_time();
561+
.with_current_time()
562+
.build();
560563
let simplifier = ExprSimplifier::new(context);
561564
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
562565
let physical_expr = datafusion::physical_expr::create_physical_expr(

datafusion/catalog/src/table.rs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ pub trait TableProvider: Debug + Sync + Send {
8484
None
8585
}
8686

87-
/// Create an [`ExecutionPlan`] for scanning the table with optionally
88-
/// specified `projection`, `filter` and `limit`, described below.
87+
/// Create an [`ExecutionPlan`] for scanning the table with optional
88+
/// `projection`, `filter`, and `limit`, described below.
8989
///
90-
/// The `ExecutionPlan` is responsible scanning the datasource's
90+
/// The returned `ExecutionPlan` is responsible for scanning the datasource's
9191
/// partitions in a streaming, parallelized fashion.
9292
///
9393
/// # Projection
@@ -96,33 +96,30 @@ pub trait TableProvider: Debug + Sync + Send {
9696
/// specified. The projection is a set of indexes of the fields in
9797
/// [`Self::schema`].
9898
///
99-
/// DataFusion provides the projection to scan only the columns actually
100-
/// used in the query to improve performance, an optimization called
101-
/// "Projection Pushdown". Some datasources, such as Parquet, can use this
102-
/// information to go significantly faster when only a subset of columns is
103-
/// required.
99+
/// DataFusion provides the projection so the scan reads only the columns
100+
/// actually used in the query, an optimization called "Projection
101+
/// Pushdown". Some datasources, such as Parquet, can use this information
102+
/// to go significantly faster when only a subset of columns is required.
104103
///
105104
/// # Filters
106105
///
107106
/// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108107
/// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109-
/// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110-
/// expressions are `AND`ed together).
108+
/// which *all* of the `Expr`s evaluate to `true` must be returned (that is,
109+
/// the expressions are `AND`ed together).
111110
///
112-
/// To enable filter pushdown you must override
113-
/// [`Self::supports_filters_pushdown`] as the default implementation does
114-
/// not and `filters` will be empty.
111+
/// To enable filter pushdown, override
112+
/// [`Self::supports_filters_pushdown`]. The default implementation does not
113+
/// push down filters, and `filters` will be empty.
115114
///
116-
/// DataFusion pushes filtering into the scans whenever possible
117-
/// ("Filter Pushdown"), and depending on the format and the
118-
/// implementation of the format, evaluating the predicate during the scan
119-
/// can increase performance significantly.
115+
/// DataFusion pushes filters into scans whenever possible ("Filter
116+
/// Pushdown"). Depending on the data format and implementation, evaluating
117+
/// predicates during the scan can significantly improve performance.
120118
///
121119
/// ## Note: Some columns may appear *only* in Filters
122120
///
123-
/// In certain cases, a query may only use a certain column in a Filter that
124-
/// has been completely pushed down to the scan. In this case, the
125-
/// projection will not contain all the columns found in the filter
121+
/// In some cases, a query may use a column only in a filter and the
122+
/// projection will not contain all columns referenced by the filter
126123
/// expressions.
127124
///
128125
/// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
@@ -154,15 +151,40 @@ pub trait TableProvider: Debug + Sync + Send {
154151
///
155152
/// # Limit
156153
///
157-
/// If `limit` is specified, must only produce *at least* this many rows,
158-
/// (though it may return more). Like Projection Pushdown and Filter
159-
/// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
160-
/// possible, called "Limit Pushdown" as some sources can use this
161-
/// information to improve their performance. Note that if there are any
162-
/// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163-
/// because inexact filters do not guarantee that every filtered row is
164-
/// removed, so applying the limit could lead to too few rows being available
165-
/// to return as a final result.
154+
/// If `limit` is specified, the scan must produce *at least* this many
155+
/// rows, though it may return more. Like Projection Pushdown and Filter
156+
/// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
157+
/// possible. This is called "Limit Pushdown", and some sources can use the
158+
/// information to improve performance.
159+
///
160+
/// Note: If any pushed-down filters are `Inexact`, the `LIMIT` cannot be
161+
/// pushed down. Inexact filters do not guarantee that every filtered row is
162+
/// removed, so applying the limit could leave too few rows to return in the
163+
/// final result.
164+
///
165+
/// # Evaluation Order
166+
///
167+
/// The logical evaluation order is `filters`, then `limit`, then
168+
/// `projection`.
169+
///
170+
/// Note that `limit` applies to the filtered result, not to the unfiltered
171+
/// input, and `projection` affects only which columns are returned, not
172+
/// which rows qualify.
173+
///
174+
/// For example, if a scan receives:
175+
///
176+
/// - `projection = [a]`
177+
/// - `filters = [b > 5]`
178+
/// - `limit = Some(3)`
179+
///
180+
/// It must logically produce results equivalent to:
181+
///
182+
/// ```text
183+
/// PROJECTION a (LIMIT 3 (SCAN WHERE b > 5))
184+
/// ```
185+
///
186+
/// As noted above, columns referenced only by pushed-down filters may be
187+
/// absent from `projection`.
166188
async fn scan(
167189
&self,
168190
state: &dyn Session,

0 commit comments

Comments
 (0)