feat: Add file cleanup for expire snapshots#592
feat: Add file cleanup for expire snapshots#592shangxinli wants to merge 2 commits intoapache:mainfrom
Conversation
0bb8356 to
596c5d2
Compare
Implement the file cleanup logic that was missing from the expire
snapshots feature (the original PR noted "TODO: File recycling will
be added in a followup PR").
Port the "reachable file cleanup" strategy from Java's
ReachableFileCleanup, following the same phased approach:
Phase 1: Collect manifest paths from expired and retained snapshots
Phase 2: Prune manifests still referenced by retained snapshots
Phase 3: Find data files only in manifests being deleted, subtract
files still reachable from retained manifests (kAll only)
Phase 4: Delete orphaned manifest files
Phase 5: Delete manifest lists from expired snapshots
Phase 6: Delete expired statistics and partition statistics files
Key design decisions matching Java parity:
- Best-effort deletion: suppress errors on individual file deletions
to avoid blocking metadata updates (Java suppressFailureWhenFinished)
- Branch/tag awareness: retained snapshot set includes all snapshots
reachable from any ref (branch or tag), preventing false-positive
deletions of files still referenced by non-main branches
- Data file safety: only delete data files from manifests that are
themselves being deleted, then subtract any files still reachable
from retained manifests (two-pass approach from ReachableFileCleanup)
- Respect CleanupLevel: kNone skips all, kMetadataOnly skips data
files, kAll cleans everything
- FileIO abstraction: uses FileIO::DeleteFile for filesystem
compatibility (S3, HDFS, local), with custom DeleteWith() override
- Statistics cleanup via snapshot ID membership in retained set
TODOs for follow-up:
- Multi-threaded file deletion (Java uses Tasks.foreach with executor)
- IncrementalFileCleanup strategy for linear ancestry optimization
(Java uses this when no branches/cherry-picks involved)
596c5d2 to
d3df6e3
Compare
1a1c501 to
4f36084
Compare
- Fix O(M*S) I/O: Pre-cache ManifestFile objects in manifest_cache_ during Phase 1 (ReadManifestsForSnapshot), eliminating repeated manifest list reads in FindDataFilesToDelete. - Fix storage leak: Use LiveEntries() instead of Entries() to match Java's ManifestFiles.readPaths behavior (only ADDED/EXISTING entries). - Fix data loss risk: When reading a retained manifest fails, abort data file deletion entirely instead of silently continuing. Java retries and throws on failure here. - Fix statistics file deletion: Use path-based set difference instead of snapshot_id-only check, preventing erroneous deletion of statistics files shared across snapshots. - Remove goto anti-pattern: Extract ManifestFile lookup into MakeManifestReader() helper and use manifest_cache_ for direct lookup. - Improve API: FindDataFilesToDelete now returns Result<unordered_set<string>> instead of using a mutable out-parameter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4f36084 to
1ebc015
Compare
|
@wgtmac, made the change. Can you have a look? |
|
I'm on the way :) |
| #include <vector> | ||
|
|
||
| #include "iceberg/iceberg_export.h" | ||
| #include "iceberg/manifest/manifest_list.h" |
There was a problem hiding this comment.
Can we remove these includes by using forward declaration? iceberg/type_fwd.h has already been included.
| /// Cache of manifest path -> ManifestFile, built during ReadManifestsForSnapshot | ||
| /// to avoid O(M*S) repeated I/O from re-reading manifest lists in | ||
| /// FindDataFilesToDelete. | ||
| std::unordered_map<std::string, ManifestFile> manifest_cache_; |
There was a problem hiding this comment.
This is the internal and ephemeral state used when cleaning up files. Can we remove it from here and move to the source file (or concrete FileCleanupStrategy class)?
|
|
||
| /// \brief Read manifest paths from a single snapshot. | ||
| /// Best-effort: returns OK even if the snapshot or its manifests can't be read. | ||
| Status ReadManifestsForSnapshot(int64_t snapshot_id, |
There was a problem hiding this comment.
It seems worth adding a similar abstraction like the Java FileCleanupStrategy if we will add IncrementalFileCleanup or more strategies in the future. We can add them only in the expire_snapshots.cc for now as it is unlikely to be used elsewhere.
I'm also thinking if it is valuable to enable users to customize any strategy by registering their own implementations but that's a separate topic.
| /// deleted, then subtracting files still reachable from retained manifests. | ||
| /// If a retained manifest cannot be read, returns an empty set to prevent | ||
| /// accidental data loss. | ||
| Result<std::unordered_set<std::string>> FindDataFilesToDelete( |
There was a problem hiding this comment.
This function seems to be related to ReachableFileCleanup only.
| const std::unordered_set<std::string>& retained_manifests); | ||
|
|
||
| /// \brief Create a ManifestReader for the given ManifestFile. | ||
| Result<std::shared_ptr<ManifestReader>> MakeManifestReader( |
There was a problem hiding this comment.
This function doesn't justify to be a member function.
| return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove); | ||
| } | ||
|
|
||
| void ExpireSnapshots::DeleteFilePath(const std::string& path) { |
There was a problem hiding this comment.
| void ExpireSnapshots::DeleteFilePath(const std::string& path) { | |
| void ExpireSnapshots::DeleteFile(const std::string& path) { |
| auto status = ctx_->table->io()->DeleteFile(path); | ||
| // Best-effort: ignore NotFound (file already deleted) and other errors. | ||
| // Java uses suppressFailureWhenFinished + onFailure logging. | ||
| std::ignore = status; |
There was a problem hiding this comment.
| auto status = ctx_->table->io()->DeleteFile(path); | |
| // Best-effort: ignore NotFound (file already deleted) and other errors. | |
| // Java uses suppressFailureWhenFinished + onFailure logging. | |
| std::ignore = status; | |
| std::ignore = ctx_->table->io()->DeleteFile(path); |
| } | ||
| } | ||
|
|
||
| Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader( |
There was a problem hiding this comment.
Make it a local function in the anonymous namespace?
| auto schema_result = metadata.Schema(); | ||
| if (!schema_result.has_value()) return std::unexpected<Error>(schema_result.error()); | ||
| auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); | ||
| if (!spec_result.has_value()) return std::unexpected<Error>(spec_result.error()); | ||
| return ManifestReader::Make(manifest, file_io, schema_result.value(), | ||
| spec_result.value()); |
There was a problem hiding this comment.
| auto schema_result = metadata.Schema(); | |
| if (!schema_result.has_value()) return std::unexpected<Error>(schema_result.error()); | |
| auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); | |
| if (!spec_result.has_value()) return std::unexpected<Error>(spec_result.error()); | |
| return ManifestReader::Make(manifest, file_io, schema_result.value(), | |
| spec_result.value()); | |
| ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); | |
| ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(manifest.partition_spec_id)); | |
| return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); |
| // TODO(shangxinli): Parallelize manifest collection with a thread pool. | ||
| std::unordered_set<std::string> expired_manifest_paths; | ||
| for (int64_t snapshot_id : expired_snapshot_ids) { | ||
| std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths); |
There was a problem hiding this comment.
Why we can ignore the error? Isn't it safe to abort entire operation?
Summary
ReachableFileCleanup