Skip to content

Commit 806037c

Browse files
PARQUET-41: Add bloom filter (#757)
* PARQUET-1328: Add Bloom filter reader and writer (#587) * PARQUET-1516: Store Bloom filters near to footer (#608) * PARQUET-1391: Integrate Bloom filter logic (#619) * PARQUET-1660: align Bloom filter implementation with format (#686)
1 parent 7469e87 commit 806037c

36 files changed

+2094
-62
lines changed

parquet-column/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@
5858
<artifactId>fastutil</artifactId>
5959
<version>${fastutil.version}</version>
6060
</dependency>
61+
<dependency>
62+
<groupId>net.openhft</groupId>
63+
<artifactId>zero-allocation-hashing</artifactId>
64+
<version>${net.openhft.version}</version>
65+
</dependency>
6166

6267
<dependency>
6368
<groupId>com.carrotsearch</groupId>

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,24 @@
1818
*/
1919
package org.apache.parquet.column;
2020

21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Objects;
27+
import java.util.Set;
28+
2129
import org.apache.parquet.Preconditions;
2230
import org.apache.parquet.bytes.ByteBufferAllocator;
2331
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
2432
import org.apache.parquet.bytes.HeapByteBufferAllocator;
25-
26-
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
27-
28-
import java.util.Objects;
29-
3033
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
3134
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
3235
import org.apache.parquet.column.page.PageWriteStore;
3336
import org.apache.parquet.column.values.ValuesWriter;
3437
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
38+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
3539
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
3640
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
3741
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
@@ -56,6 +60,7 @@ public class ParquetProperties {
5660
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
5761
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
5862
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
63+
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
5964

6065
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
6166

@@ -96,6 +101,11 @@ public static WriterVersion fromString(String name) {
96101
private final ValuesWriterFactory valuesWriterFactory;
97102
private final int columnIndexTruncateLength;
98103
private final int statisticsTruncateLength;
104+
105+
// The key-value pair represents the column name and its expected distinct number of values in a row group.
106+
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
107+
private final int maxBloomFilterBytes;
108+
private final List<String> bloomFilterColumns;
99109
private final int pageRowCountLimit;
100110
private final boolean pageWriteChecksumEnabled;
101111
private final boolean enableByteStreamSplit;
@@ -115,6 +125,9 @@ private ParquetProperties(Builder builder) {
115125
this.valuesWriterFactory = builder.valuesWriterFactory;
116126
this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
117127
this.statisticsTruncateLength = builder.statisticsTruncateLength;
128+
this.bloomFilterExpectedDistinctNumbers = builder.bloomFilterColumnExpectedNDVs;
129+
this.bloomFilterColumns = builder.bloomFilterColumns;
130+
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
118131
this.pageRowCountLimit = builder.pageRowCountLimit;
119132
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
120133
this.enableByteStreamSplit = builder.enableByteStreamSplit;
@@ -189,11 +202,24 @@ public ByteBufferAllocator getAllocator() {
189202

190203
public ColumnWriteStore newColumnWriteStore(MessageType schema,
191204
PageWriteStore pageStore) {
205+
switch (writerVersion) {
206+
case PARQUET_1_0:
207+
return new ColumnWriteStoreV1(schema, pageStore, this);
208+
case PARQUET_2_0:
209+
return new ColumnWriteStoreV2(schema, pageStore, this);
210+
default:
211+
throw new IllegalArgumentException("unknown version " + writerVersion);
212+
}
213+
}
214+
215+
public ColumnWriteStore newColumnWriteStore(MessageType schema,
216+
PageWriteStore pageStore,
217+
BloomFilterWriteStore bloomFilterWriteStore) {
192218
switch (writerVersion) {
193219
case PARQUET_1_0:
194-
return new ColumnWriteStoreV1(schema, pageStore, this);
220+
return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
195221
case PARQUET_2_0:
196-
return new ColumnWriteStoreV2(schema, pageStore, this);
222+
return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
197223
default:
198224
throw new IllegalArgumentException("unknown version " + writerVersion);
199225
}
@@ -231,6 +257,22 @@ public boolean getPageWriteChecksumEnabled() {
231257
return pageWriteChecksumEnabled;
232258
}
233259

260+
public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
261+
return bloomFilterExpectedDistinctNumbers;
262+
}
263+
264+
public Set<String> getBloomFilterColumns() {
265+
if (bloomFilterColumns != null && bloomFilterColumns.size() > 0){
266+
return new HashSet<>(bloomFilterColumns);
267+
}
268+
269+
return bloomFilterExpectedDistinctNumbers.keySet();
270+
}
271+
272+
public int getMaxBloomFilterBytes() {
273+
return maxBloomFilterBytes;
274+
}
275+
234276
public static Builder builder() {
235277
return new Builder();
236278
}
@@ -250,6 +292,9 @@ public String toString() {
250292
+ "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n'
251293
+ "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n'
252294
+ "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n'
295+
+ "Bloom filter enabled column names are: " + getBloomFilterColumns() + '\n'
296+
+ "Max Bloom filter size for a column is " + getMaxBloomFilterBytes() + '\n'
297+
+ "Bloom filter enabled column expected number of distinct values are: " + getBloomFilterColumnExpectedNDVs().values() + '\n'
253298
+ "Page row count limit to " + getPageRowCountLimit() + '\n'
254299
+ "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off");
255300
}
@@ -266,6 +311,9 @@ public static class Builder {
266311
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
267312
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
268313
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
314+
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
315+
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
316+
private List<String> bloomFilterColumns = new ArrayList<>();
269317
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
270318
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
271319
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
@@ -286,6 +334,9 @@ private Builder(ParquetProperties toCopy) {
286334
this.allocator = toCopy.allocator;
287335
this.pageRowCountLimit = toCopy.pageRowCountLimit;
288336
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
337+
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
338+
this.bloomFilterColumns = toCopy.bloomFilterColumns;
339+
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
289340
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
290341
}
291342

@@ -396,6 +447,41 @@ public Builder withStatisticsTruncateLength(int length) {
396447
return this;
397448
}
398449

450+
/**
451+
* Set max Bloom filter bytes for related columns.
452+
*
453+
* @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
454+
* @return this builder for method chaining
455+
*/
456+
public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
457+
this.maxBloomFilterBytes = maxBloomFilterBytes;
458+
return this;
459+
}
460+
461+
/**
462+
* Set Bloom filter column names and expected NDVs.
463+
*
464+
* @param columnToNDVMap the columns which has bloom filter enabled.
465+
*
466+
* @return this builder for method chaining
467+
*/
468+
public Builder withBloomFilterColumnToNDVMap(Map<String, Long> columnToNDVMap) {
469+
this.bloomFilterColumnExpectedNDVs = columnToNDVMap;
470+
return this;
471+
}
472+
473+
/**
474+
* Set Bloom filter column names.
475+
*
476+
* @param columns the columns which has bloom filter enabled.
477+
*
478+
* @return this builder for method chaining
479+
*/
480+
public Builder withBloomFilterColumnNames(List<String> columns) {
481+
this.bloomFilterColumns = columns;
482+
return this;
483+
}
484+
399485
public Builder withPageRowCountLimit(int rowCount) {
400486
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
401487
pageRowCountLimit = rowCount;

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.parquet.column.ParquetProperties;
3535
import org.apache.parquet.column.page.PageWriteStore;
3636
import org.apache.parquet.column.page.PageWriter;
37+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
38+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
3739
import org.apache.parquet.schema.MessageType;
3840

3941
/**
@@ -74,7 +76,7 @@ private interface ColumnWriterProvider {
7476
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
7577
ColumnWriterBase column = columns.get(path);
7678
if (column == null) {
77-
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
79+
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
7880
columns.put(path, column);
7981
}
8082
return column;
@@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
9193
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
9294
for (ColumnDescriptor path : schema.getColumns()) {
9395
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
94-
mcolumns.put(path, createColumnWriter(path, pageWriter, props));
96+
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
9597
}
9698
this.columns = unmodifiableMap(mcolumns);
9799

@@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
105107
};
106108
}
107109

108-
abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
110+
// The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
111+
ColumnWriteStoreBase(
112+
MessageType schema,
113+
PageWriteStore pageWriteStore,
114+
BloomFilterWriteStore bloomFilterWriteStore,
115+
ParquetProperties props) {
116+
this.props = props;
117+
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
118+
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
119+
for (ColumnDescriptor path : schema.getColumns()) {
120+
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
121+
if (props.getBloomFilterColumns() != null && props.getBloomFilterColumns().size() > 0) {
122+
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
123+
mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
124+
} else {
125+
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
126+
}
127+
}
128+
this.columns = unmodifiableMap(mcolumns);
129+
130+
this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
131+
132+
columnWriterProvider = new ColumnWriterProvider() {
133+
@Override
134+
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
135+
return columns.get(path);
136+
}
137+
};
138+
}
139+
140+
abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
141+
BloomFilterWriter bloomFilterWriter, ParquetProperties props);
109142

110143
@Override
111144
public ColumnWriter getColumnWriter(ColumnDescriptor path) {

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.parquet.column.ParquetProperties;
2323
import org.apache.parquet.column.page.PageWriteStore;
2424
import org.apache.parquet.column.page.PageWriter;
25+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
26+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
2527
import org.apache.parquet.schema.MessageType;
2628

2729
public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
@@ -36,8 +38,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
3638
super(pageWriteStore, props);
3739
}
3840

41+
public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
42+
BloomFilterWriteStore bloomFilterWriteStore,
43+
ParquetProperties props) {
44+
super(schema, pageWriteStore, bloomFilterWriteStore, props);
45+
}
46+
3947
@Override
40-
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
41-
return new ColumnWriterV1(path, pageWriter, props);
48+
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
49+
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
50+
return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
4251
}
4352
}

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.parquet.column.ParquetProperties;
2323
import org.apache.parquet.column.page.PageWriteStore;
2424
import org.apache.parquet.column.page.PageWriter;
25+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
26+
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
2527
import org.apache.parquet.schema.MessageType;
2628

2729
public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
@@ -30,8 +32,15 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par
3032
super(schema, pageWriteStore, props);
3133
}
3234

35+
public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
36+
BloomFilterWriteStore bloomFilterWriteStore,
37+
ParquetProperties props) {
38+
super(schema, pageWriteStore, bloomFilterWriteStore, props);
39+
}
40+
3341
@Override
34-
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
35-
return new ColumnWriterV2(path, pageWriter, props);
42+
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
43+
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
44+
return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
3645
}
3746
}

0 commit comments

Comments
 (0)