Skip to content

Commit df0d966

Browse files
authored
use a single row_count column during predicate pruning instead of one per column (#14295)
* use a single row_count column during predicate pruning instead of one per column * fix tests * fix conflicts and test: * lint * fix assertions
1 parent 0f9773a commit df0d966

File tree

5 files changed

+98
-59
lines changed

5 files changed

+98
-59
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1687,7 +1687,7 @@ mod tests {
16871687

16881688
assert_contains!(
16891689
&display,
1690-
"pruning_predicate=c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)"
1690+
"pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)"
16911691
);
16921692

16931693
assert_contains!(&display, r#"predicate=c1@0 != bar"#);

datafusion/physical-optimizer/src/pruning.rs

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -807,11 +807,22 @@ impl RequiredColumns {
807807
column: &phys_expr::Column,
808808
statistics_type: StatisticsType,
809809
) -> Option<usize> {
810-
self.columns
811-
.iter()
812-
.enumerate()
813-
.find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
814-
.map(|(i, (_c, _t, _f))| i)
810+
match statistics_type {
811+
StatisticsType::RowCount => {
812+
// Use the first row count we find, if any
813+
self.columns
814+
.iter()
815+
.enumerate()
816+
.find(|(_i, (_c, t, _f))| t == &statistics_type)
817+
.map(|(i, (_c, _t, _f))| i)
818+
}
819+
_ => self
820+
.columns
821+
.iter()
822+
.enumerate()
823+
.find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
824+
.map(|(i, (_c, _t, _f))| i),
825+
}
815826
}
816827

817828
/// Rewrites column_expr so that all appearances of column
@@ -834,15 +845,15 @@ impl RequiredColumns {
834845
None => (self.columns.len(), true),
835846
};
836847

837-
let suffix = match stat_type {
838-
StatisticsType::Min => "min",
839-
StatisticsType::Max => "max",
840-
StatisticsType::NullCount => "null_count",
841-
StatisticsType::RowCount => "row_count",
848+
let column_name = column.name();
849+
let stat_column_name = match stat_type {
850+
StatisticsType::Min => format!("{column_name}_min"),
851+
StatisticsType::Max => format!("{column_name}_max"),
852+
StatisticsType::NullCount => format!("{column_name}_null_count"),
853+
StatisticsType::RowCount => "row_count".to_string(),
842854
};
843855

844-
let stat_column =
845-
phys_expr::Column::new(&format!("{}_{}", column.name(), suffix), idx);
856+
let stat_column = phys_expr::Column::new(&stat_column_name, idx);
846857

847858
// only add statistics column if not previously added
848859
if need_to_insert {
@@ -2189,6 +2200,38 @@ mod tests {
21892200
}
21902201
}
21912202

2203+
/// Row count should only be referenced once in the pruning expression, even if we need the row count
2204+
/// for multiple columns.
2205+
#[test]
2206+
fn test_unique_row_count_field_and_column() {
2207+
// c1 = 100 AND c2 = 200
2208+
let schema: SchemaRef = Arc::new(Schema::new(vec![
2209+
Field::new("c1", DataType::Int32, true),
2210+
Field::new("c2", DataType::Int32, true),
2211+
]));
2212+
let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2213+
let expr = logical2physical(&expr, &schema);
2214+
let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2215+
// note pruning expression refers to row_count twice
2216+
assert_eq!(
2217+
"c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2218+
p.predicate_expr.to_string()
2219+
);
2220+
2221+
// Fields in required schema should be unique, otherwise when creating batches
2222+
// it will fail because of duplicate field names
2223+
let mut fields = HashSet::new();
2224+
for (_col, _ty, field) in p.required_columns().iter() {
2225+
let was_new = fields.insert(field);
2226+
if !was_new {
2227+
panic!(
2228+
"Duplicate field in required schema: {:?}. Previous fields:\n{:#?}",
2229+
field, fields
2230+
);
2231+
}
2232+
}
2233+
}
2234+
21922235
#[test]
21932236
fn prune_all_rows_null_counts() {
21942237
// if null_count = row_count then we should prune the container for i = 0
@@ -2475,7 +2518,7 @@ mod tests {
24752518
fn row_group_predicate_eq() -> Result<()> {
24762519
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
24772520
let expected_expr =
2478-
"c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2521+
"c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
24792522

24802523
// test column on the left
24812524
let expr = col("c1").eq(lit(1));
@@ -2496,7 +2539,7 @@ mod tests {
24962539
fn row_group_predicate_not_eq() -> Result<()> {
24972540
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
24982541
let expected_expr =
2499-
"c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2542+
"c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
25002543

25012544
// test column on the left
25022545
let expr = col("c1").not_eq(lit(1));
@@ -2516,7 +2559,7 @@ mod tests {
25162559
#[test]
25172560
fn row_group_predicate_gt() -> Result<()> {
25182561
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2519-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 > 1";
2562+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
25202563

25212564
// test column on the left
25222565
let expr = col("c1").gt(lit(1));
@@ -2536,7 +2579,7 @@ mod tests {
25362579
#[test]
25372580
fn row_group_predicate_gt_eq() -> Result<()> {
25382581
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2539-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 >= 1";
2582+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
25402583

25412584
// test column on the left
25422585
let expr = col("c1").gt_eq(lit(1));
@@ -2555,7 +2598,7 @@ mod tests {
25552598
#[test]
25562599
fn row_group_predicate_lt() -> Result<()> {
25572600
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2558-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1";
2601+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
25592602

25602603
// test column on the left
25612604
let expr = col("c1").lt(lit(1));
@@ -2575,7 +2618,7 @@ mod tests {
25752618
#[test]
25762619
fn row_group_predicate_lt_eq() -> Result<()> {
25772620
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2578-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 <= 1";
2621+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
25792622

25802623
// test column on the left
25812624
let expr = col("c1").lt_eq(lit(1));
@@ -2600,7 +2643,7 @@ mod tests {
26002643
]);
26012644
// test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
26022645
let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2603-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1";
2646+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
26042647
let predicate_expr =
26052648
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
26062649
assert_eq!(predicate_expr.to_string(), expected_expr);
@@ -2666,7 +2709,7 @@ mod tests {
26662709
#[test]
26672710
fn row_group_predicate_lt_bool() -> Result<()> {
26682711
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2669-
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < true";
2712+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
26702713

26712714
// DF doesn't support arithmetic on boolean columns so
26722715
// this predicate will error when evaluated
@@ -2689,16 +2732,12 @@ mod tests {
26892732
let expr = col("c1")
26902733
.lt(lit(1))
26912734
.and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2692-
let expected_expr = "c1_null_count@1 != c1_row_count@2 \
2693-
AND c1_min@0 < 1 AND (\
2694-
c2_null_count@5 != c2_row_count@6 \
2695-
AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR \
2696-
c2_null_count@5 != c2_row_count@6 AND c2_min@3 <= 3 AND 3 <= c2_max@4\
2697-
)";
2735+
let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
26982736
let predicate_expr =
26992737
test_build_predicate_expression(&expr, &schema, &mut required_columns);
27002738
assert_eq!(predicate_expr.to_string(), expected_expr);
2701-
// c1 < 1 should add c1_min
2739+
println!("required_columns: {:#?}", required_columns); // for debugging assertions below
2740+
// c1 < 1 should add c1_min
27022741
let c1_min_field = Field::new("c1_min", DataType::Int32, false);
27032742
assert_eq!(
27042743
required_columns.columns[0],
@@ -2718,14 +2757,14 @@ mod tests {
27182757
c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
27192758
)
27202759
);
2721-
// c1 < 1 should add c1_row_count
2722-
let c1_row_count_field = Field::new("c1_row_count", DataType::UInt64, false);
2760+
// c1 < 1 should add row_count
2761+
let row_count_field = Field::new("row_count", DataType::UInt64, false);
27232762
assert_eq!(
27242763
required_columns.columns[2],
27252764
(
27262765
phys_expr::Column::new("c1", 0),
27272766
StatisticsType::RowCount,
2728-
c1_row_count_field.with_nullable(true) // could be nullable if stats are not present
2767+
row_count_field.with_nullable(true) // could be nullable if stats are not present
27292768
)
27302769
);
27312770
// c2 = 2 should add c2_min and c2_max
@@ -2757,18 +2796,18 @@ mod tests {
27572796
c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
27582797
)
27592798
);
2760-
// c2 = 2 should add c2_row_count
2761-
let c2_row_count_field = Field::new("c2_row_count", DataType::UInt64, false);
2799+
// c2 = 1 should add row_count
2800+
let row_count_field = Field::new("row_count", DataType::UInt64, false);
27622801
assert_eq!(
2763-
required_columns.columns[6],
2802+
required_columns.columns[2],
27642803
(
2765-
phys_expr::Column::new("c2", 1),
2804+
phys_expr::Column::new("c1", 0),
27662805
StatisticsType::RowCount,
2767-
c2_row_count_field.with_nullable(true) // could be nullable if stats are not present
2806+
row_count_field.with_nullable(true) // could be nullable if stats are not present
27682807
)
27692808
);
27702809
// c2 = 3 shouldn't add any new statistics fields
2771-
assert_eq!(required_columns.columns.len(), 7);
2810+
assert_eq!(required_columns.columns.len(), 6);
27722811

27732812
Ok(())
27742813
}
@@ -2785,7 +2824,7 @@ mod tests {
27852824
vec![lit(1), lit(2), lit(3)],
27862825
false,
27872826
));
2788-
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2827+
let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
27892828
let predicate_expr =
27902829
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
27912830
assert_eq!(predicate_expr.to_string(), expected_expr);
@@ -2821,7 +2860,7 @@ mod tests {
28212860
vec![lit(1), lit(2), lit(3)],
28222861
true,
28232862
));
2824-
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2863+
let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
28252864
let predicate_expr =
28262865
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
28272866
assert_eq!(predicate_expr.to_string(), expected_expr);
@@ -2867,7 +2906,7 @@ mod tests {
28672906
// test c1 in(1, 2) and c2 BETWEEN 4 AND 5
28682907
let expr3 = expr1.and(expr2);
28692908

2870-
let expected_expr = "(c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != c2_row_count@6 AND c2_max@4 >= 4 AND c2_null_count@5 != c2_row_count@6 AND c2_min@7 <= 5";
2909+
let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
28712910
let predicate_expr =
28722911
test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
28732912
assert_eq!(predicate_expr.to_string(), expected_expr);
@@ -2894,7 +2933,7 @@ mod tests {
28942933
#[test]
28952934
fn row_group_predicate_cast() -> Result<()> {
28962935
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2897-
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2936+
let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
28982937

28992938
// test cast(c1 as int64) = 1
29002939
// test column on the left
@@ -2910,7 +2949,7 @@ mod tests {
29102949
assert_eq!(predicate_expr.to_string(), expected_expr);
29112950

29122951
let expected_expr =
2913-
"c1_null_count@1 != c1_row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
2952+
"c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
29142953

29152954
// test column on the left
29162955
let expr =
@@ -2942,7 +2981,7 @@ mod tests {
29422981
],
29432982
false,
29442983
));
2945-
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
2984+
let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
29462985
let predicate_expr =
29472986
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
29482987
assert_eq!(predicate_expr.to_string(), expected_expr);
@@ -2956,7 +2995,7 @@ mod tests {
29562995
],
29572996
true,
29582997
));
2959-
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
2998+
let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
29602999
let predicate_expr =
29613000
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
29623001
assert_eq!(predicate_expr.to_string(), expected_expr);

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ SELECT * FROM test_table ORDER BY int_col;
7979
5 eee 500 1970-01-06
8080
6 fff 600 1970-01-07
8181

82-
# Check output plan, expect no "output_ordering" clause in the physical_plan -> DataSourceExec:
82+
# Check output plan, expect no "output_ordering" clause in the physical_plan -> ParquetExec:
8383
query TT
8484
EXPLAIN SELECT int_col, string_col
8585
FROM test_table
@@ -109,7 +109,7 @@ STORED AS PARQUET
109109
WITH ORDER (string_col ASC NULLS LAST, int_col ASC NULLS LAST)
110110
LOCATION 'test_files/scratch/parquet/test_table';
111111

112-
# Check output plan, expect an "output_ordering" clause in the physical_plan -> DataSourceExec:
112+
# Check output plan, expect an "output_ordering" clause in the physical_plan -> ParquetExec:
113113
query TT
114114
EXPLAIN SELECT int_col, string_col
115115
FROM test_table
@@ -130,7 +130,7 @@ STORED AS PARQUET;
130130
----
131131
3
132132

133-
# Check output plan again, expect no "output_ordering" clause in the physical_plan -> DataSourceExec,
133+
# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
134134
# due to there being more files than partitions:
135135
query TT
136136
EXPLAIN SELECT int_col, string_col
@@ -625,7 +625,7 @@ physical_plan
625625
01)CoalesceBatchesExec: target_batch_size=8192
626626
02)--FilterExec: column1@0 LIKE f%
627627
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
628-
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[]
628+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[]
629629

630630
statement ok
631631
drop table foo

0 commit comments

Comments
 (0)