Skip to content

Commit 56a79f1

Browse files
authored
feat(storage): Use raw proto access for read resumption strategy (#1764)
The proto-plus library provides protocol buffer message classes that behave like native Python types. While convenient, my profiling confirms this abstraction comes with a significant performance penalty in our hot paths. Every time we access a field on a proto-plus object, the library triggers dynamic lookups and wrapper instantiation. In our hot data ingestion loop, this overhead accumulates rapidly over multiple chunks. My benchmarking shows that accessing the underlying C++ Protobuf directly is ~2x faster than going through the proto-plus wrapper (measured over 30,000 iterations). While 30,000 operations might sound high, it corresponds to downloading just 60GB of data (assuming 2MB chunks) for accessing just one attribute. For high-performance workloads (e.g., downloading at 1GB/s) so it only takes 60s to complete the process. Additionally this wrapper overhead introduces measurable latency not just for data access, but for every metadata check and state update that repeats per chunk. This PR bypass proto-plus in our critical IO loops and interact directly with the underlying C++ Protobuf structures. This eliminates the "Wrapper tax" without changing the external behavior of the application.
1 parent 90e92d1 commit 56a79f1

File tree

5 files changed

+35
-29
lines changed

5 files changed

+35
-29
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,28 @@ def update_state_from_response(
8181
self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any]
8282
) -> None:
8383
"""Processes a server response, performs integrity checks, and updates state."""
84+
proto = getattr(response, "_pb", response)
8485

8586
# Capture read_handle if provided.
86-
if response.read_handle:
87-
state["read_handle"] = response.read_handle
87+
if proto.HasField("read_handle"):
88+
state["read_handle"] = storage_v2.BidiReadHandle(
89+
handle=proto.read_handle.handle
90+
)
8891

8992
download_states = state["download_states"]
9093

91-
for object_data_range in response.object_data_ranges:
94+
for object_data_range in proto.object_data_ranges:
9295
# Ignore empty ranges or ranges for IDs not in our state
9396
# (e.g., from a previously cancelled request on the same stream).
94-
if not object_data_range.read_range:
97+
if not object_data_range.HasField("read_range"):
9598
logger.warning(
9699
"Received response with missing read_range field; ignoring."
97100
)
98101
continue
99102

100-
read_id = object_data_range.read_range.read_id
103+
read_range_pb = object_data_range.read_range
104+
read_id = read_range_pb.read_id
105+
101106
if read_id not in download_states:
102107
logger.warning(
103108
f"Received data for unknown or stale read_id {read_id}; ignoring."
@@ -107,7 +112,8 @@ def update_state_from_response(
107112
read_state = download_states[read_id]
108113

109114
# Offset Verification
110-
chunk_offset = object_data_range.read_range.read_offset
115+
# We must validate data before updating state or writing to buffer.
116+
chunk_offset = read_range_pb.read_offset
111117
if chunk_offset != read_state.next_expected_offset:
112118
raise DataCorruption(
113119
response,
@@ -116,11 +122,11 @@ def update_state_from_response(
116122
)
117123

118124
# Checksum Verification
119-
# We must validate data before updating state or writing to buffer.
120-
data = object_data_range.checksummed_data.content
121-
server_checksum = object_data_range.checksummed_data.crc32c
125+
checksummed_data = object_data_range.checksummed_data
126+
data = checksummed_data.content
122127

123-
if server_checksum is not None:
128+
if checksummed_data.HasField("crc32c"):
129+
server_checksum = checksummed_data.crc32c
124130
client_checksum = int.from_bytes(Checksum(data).digest(), "big")
125131
if server_checksum != client_checksum:
126132
raise DataCorruption(

packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
import socket
1919
import psutil
2020

21-
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
21+
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
22+
2223

2324
def publish_benchmark_extra_info(
2425
benchmark: Any,
@@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
2829
download_bytes_list: Optional[List[int]] = None,
2930
duration: Optional[int] = None,
3031
) -> None:
31-
3232
"""
3333
Helper function to publish benchmark parameters to the extra_info property.
3434
"""
@@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
4848
benchmark.group = benchmark_group
4949

5050
if download_bytes_list is not None:
51-
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
51+
assert (
52+
duration is not None
53+
), "Duration must be provided if total_bytes_transferred is provided."
5254
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
5355
min_throughput = min(throughputs_list)
5456
max_throughput = max(throughputs_list)
5557
mean_throughput = statistics.mean(throughputs_list)
5658
median_throughput = statistics.median(throughputs_list)
5759

58-
5960
else:
6061
object_size = params.file_size_bytes
6162
num_files = params.num_files
@@ -211,13 +212,13 @@ def get_affinity(irq):
211212

212213
def get_primary_interface_name():
213214
primary_ip = None
214-
215+
215216
# 1. Determine the Local IP used for internet access
216217
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
217218
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
218219
try:
219220
# connect() to a public IP (Google DNS) to force route resolution
220-
s.connect(('8.8.8.8', 80))
221+
s.connect(("8.8.8.8", 80))
221222
primary_ip = s.getsockname()[0]
222223
except Exception:
223224
# Fallback if no internet
@@ -248,7 +249,7 @@ def get_irq_affinity():
248249
for irq in irqs:
249250
affinity_str = get_affinity(irq)
250251
if affinity_str != "N/A":
251-
for part in affinity_str.split(','):
252-
if '-' not in part:
252+
for part in affinity_str.split(","):
253+
if "-" not in part:
253254
cpus.add(int(part))
254255
return cpus

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
@pytest.fixture
1818
def workload_params(request):
1919
params = request.param
20-
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
20+
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
2121
return params, files_names

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):
159159

160160

161161
def _download_files_worker(process_idx, filename, params, bucket_type):
162-
163162
if bucket_type == "zonal":
164163
return worker_loop.run_until_complete(
165164
_download_time_based_async(worker_client, filename, params)

packages/google-cloud-storage/tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer):
175175
writer._is_stream_open = True
176176
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
177177

178-
mock_appendable_writer["mock_stream"].recv.return_value = (
179-
storage_type.BidiWriteObjectResponse(persisted_size=100)
180-
)
178+
mock_appendable_writer[
179+
"mock_stream"
180+
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100)
181181

182182
size = await writer.state_lookup()
183183

@@ -388,9 +388,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer):
388388
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
389389
writer.bytes_appended_since_last_flush = 100
390390

391-
mock_appendable_writer["mock_stream"].recv.return_value = (
392-
storage_type.BidiWriteObjectResponse(persisted_size=200)
393-
)
391+
mock_appendable_writer[
392+
"mock_stream"
393+
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200)
394394

395395
await writer.flush()
396396

@@ -431,9 +431,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer):
431431
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
432432

433433
resource = storage_type.Object(size=999)
434-
mock_appendable_writer["mock_stream"].recv.return_value = (
435-
storage_type.BidiWriteObjectResponse(resource=resource)
436-
)
434+
mock_appendable_writer[
435+
"mock_stream"
436+
].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource)
437437

438438
res = await writer.finalize()
439439

0 commit comments

Comments
 (0)