Skip to content

Commit 39080d7

Browse files
author
Chris Rossi
authored
fix: use multiple batches of limited size for large operations (#321)
Fixes #318.
1 parent 799dd77 commit 39080d7

File tree

8 files changed

+121
-13
lines changed

8 files changed

+121
-13
lines changed

packages/google-cloud-ndb/google/cloud/ndb/_batch.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,17 @@ def get_batch(batch_cls, options=None):
5656
options_key = ()
5757

5858
batch = batches.get(options_key)
59-
if batch is not None:
59+
if batch is not None and not batch.full():
6060
return batch
6161

62-
def idle():
63-
batch = batches.pop(options_key)
64-
batch.idle_callback()
62+
def idler(batch):
63+
def idle():
64+
if batches.get(options_key) is batch:
65+
del batches[options_key]
66+
batch.idle_callback()
67+
68+
return idle
6569

6670
batches[options_key] = batch = batch_cls(options)
67-
_eventloop.add_idle(idle)
71+
_eventloop.add_idle(idler(batch))
6872
return batch

packages/google-cloud-ndb/google/cloud/ndb/_cache.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ class _GlobalCacheBatch(object):
6565
"""Abstract base for classes used to batch operations for the global cache.
6666
"""
6767

68+
def full(self):
69+
"""Indicates whether more work can be added to this batch.
70+
71+
Returns:
72+
boolean: `False`, always.
73+
"""
74+
return False
75+
6876
def idle_callback(self):
6977
"""Call the cache operation.
7078

packages/google-cloud-ndb/google/cloud/ndb/_datastore_api.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,16 @@ def __init__(self, options):
196196
self.options = options
197197
self.todo = {}
198198

199+
def full(self):
200+
201+
"""Indicates whether more work can be added to this batch.
202+
203+
Returns:
204+
boolean: `True` if number of keys to be looked up has reached 1000,
205+
else `False`.
206+
"""
207+
return len(self.todo) >= 1000
208+
199209
def add(self, key):
200210
"""Add a key to the batch to look up.
201211
@@ -477,6 +487,15 @@ def __init__(self, options):
477487
self.mutations = []
478488
self.futures = []
479489

490+
def full(self):
491+
"""Indicates whether more work can be added to this batch.
492+
493+
Returns:
494+
boolean: `True` if number of mutations has reached 500, else
495+
`False`.
496+
"""
497+
return len(self.mutations) >= 500
498+
480499
def put(self, entity_pb):
481500
"""Add an entity to batch to be stored.
482501
@@ -854,8 +873,15 @@ def allocate(keys, options):
854873
Returns:
855874
tasklets.Future: A future for the key completed with the allocated id.
856875
"""
857-
batch = _batch.get_batch(_AllocateIdsBatch, options)
858-
return batch.add(keys)
876+
futures = []
877+
while keys:
878+
batch = _batch.get_batch(_AllocateIdsBatch, options)
879+
room_left = batch.room_left()
880+
batch_keys = keys[:room_left]
881+
futures.extend(batch.add(batch_keys))
882+
keys = keys[room_left:]
883+
884+
return tasklets._MultiFuture(futures)
859885

860886

861887
class _AllocateIdsBatch(object):
@@ -875,6 +901,22 @@ def __init__(self, options):
875901
self.keys = []
876902
self.futures = []
877903

904+
def full(self):
905+
"""Indicates whether more work can be added to this batch.
906+
907+
Returns:
908+
boolean: `True` if number of keys has reached 500, else `False`.
909+
"""
910+
return len(self.keys) >= 500
911+
912+
def room_left(self):
913+
"""Get how many more keys can be added to this batch.
914+
915+
Returns:
916+
int: 500 - number of keys already in batch
917+
"""
918+
return 500 - len(self.keys)
919+
878920
def add(self, keys):
879921
"""Add incomplete keys to batch to allocate.
880922
@@ -892,7 +934,7 @@ def add(self, keys):
892934
self.keys.append(key)
893935

894936
self.futures.extend(futures)
895-
return tasklets._MultiFuture(futures)
937+
return futures
896938

897939
def idle_callback(self):
898940
"""Perform a Datastore AllocateIds request on all batched keys."""

packages/google-cloud-ndb/tests/system/conftest.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ def with_ds_client(ds_client, to_delete, deleted_keys):
6666

6767
yield ds_client
6868

69-
if to_delete:
70-
ds_client.delete_multi(to_delete)
71-
deleted_keys.update(to_delete)
69+
while to_delete:
70+
batch = to_delete[:500]
71+
ds_client.delete_multi(batch)
72+
deleted_keys.update(batch)
73+
to_delete = to_delete[500:]
7274

7375
not_deleted = [
7476
entity

packages/google-cloud-ndb/tests/system/test_crud.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,3 +1131,39 @@ class SomeKind(ndb.Model):
11311131
assert len(keys_upd) == len(keys)
11321132
assert len(set(keys_upd)) == len(set(keys))
11331133
assert set(keys_upd) == set(keys)
1134+
1135+
1136+
@pytest.mark.usefixtures("client_context")
1137+
def test_multi_with_lots_of_keys(dispose_of):
1138+
"""Regression test for issue #318.
1139+
1140+
https://github.com/googleapis/python-ndb/issues/318
1141+
"""
1142+
N = 1001
1143+
1144+
class SomeKind(ndb.Model):
1145+
foo = ndb.IntegerProperty()
1146+
1147+
foos = list(range(N))
1148+
entities = [SomeKind(foo=foo) for foo in foos]
1149+
keys = ndb.put_multi(entities)
1150+
dispose_of(*(key._key for key in keys))
1151+
assert len(keys) == N
1152+
1153+
entities = ndb.get_multi(keys)
1154+
assert [entity.foo for entity in entities] == foos
1155+
1156+
ndb.delete_multi(keys)
1157+
entities = ndb.get_multi(keys)
1158+
assert entities == [None] * N
1159+
1160+
1161+
@pytest.mark.usefixtures("client_context")
1162+
def test_allocate_a_lot_of_keys():
1163+
N = 1001
1164+
1165+
class SomeKind(ndb.Model):
1166+
pass
1167+
1168+
keys = SomeKind.allocate_ids(N)
1169+
assert len(keys) == N

packages/google-cloud-ndb/tests/unit/test__batch.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,25 @@ def test_it(self):
3333

3434
assert _batch.get_batch(MockBatch, options) is batch
3535

36+
batch._full = True
37+
batch2 = _batch.get_batch(MockBatch, options)
38+
assert batch2 is not batch
39+
assert not batch2.idle_called
40+
3641
_eventloop.run()
3742
assert batch.idle_called
43+
assert batch2.idle_called
3844

3945

4046
class MockBatch:
47+
_full = False
48+
4149
def __init__(self, options):
4250
self.options = options
4351
self.idle_called = False
4452

4553
def idle_callback(self):
4654
self.idle_called = True
55+
56+
def full(self):
57+
return self._full

packages/google-cloud-ndb/tests/unit/test__cache.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ def test_add_and_idle_and_done_callbacks_w_error(in_context):
143143
assert future1.exception() is error
144144
assert future2.exception() is error
145145

146+
@staticmethod
147+
def test_full():
148+
batch = _cache._GlobalCacheGetBatch(None)
149+
assert batch.full() is False
150+
146151

147152
class Test_global_set:
148153
@staticmethod

packages/google-cloud-ndb/tests/unit/test__datastore_api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,9 +1207,9 @@ def test_constructor():
12071207
def test_add():
12081208
options = _options.Options()
12091209
batch = _api._AllocateIdsBatch(options)
1210-
future = batch.add(["key1", "key2"])
1210+
futures = batch.add(["key1", "key2"])
12111211
assert batch.keys == ["key1", "key2"]
1212-
assert batch.futures == future._dependencies
1212+
assert batch.futures == futures
12131213

12141214
@staticmethod
12151215
@mock.patch("google.cloud.ndb._datastore_api._datastore_allocate_ids")

0 commit comments

Comments
 (0)