Skip to content

Commit d0e2d03

Browse files
Change MessageDispatcher to be synchronous instead of asynchronous.
This removes the failure mode described in googleapis#2452 that can occur when MaxOutstandingElementCount is low and there is more than one connection. In this case, it is possible for an individual MessageDispatcher to have no outstanding in-flight messages, but also be blocked by flow control with a whole new batch outstanding. In this case, it will never make progress on that batch since it will never receive another batch and the queue was made to not be shared in googleapis#4590, so the batch will never be pulled off by another MessageDispatcher. By changing this to use a blocking flow controller, this will never happen, as each batch will synchronously wait until it is allowed by flow control before being processed.
1 parent 80924d0 commit d0e2d03

File tree

4 files changed

+45
-104
lines changed

4 files changed

+45
-104
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 21 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.api.gax.batching.FlowController;
2525
import com.google.api.gax.batching.FlowController.FlowControlException;
2626
import com.google.api.gax.core.Distribution;
27-
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
2827
import com.google.common.primitives.Ints;
2928
import com.google.common.util.concurrent.MoreExecutors;
3029
import com.google.pubsub.v1.PubsubMessage;
@@ -91,9 +90,6 @@ class MessageDispatcher {
9190
private final Lock jobLock;
9291
private ScheduledFuture<?> backgroundJob;
9392

94-
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95-
new LinkedBlockingDeque<>();
96-
9793
// To keep track of number of seconds the receiver takes to process messages.
9894
private final Distribution ackLatencyDistribution;
9995

@@ -155,7 +151,6 @@ private void forget() {
155151
}
156152
flowController.release(1, outstandingBytes);
157153
messagesWaiter.incrementPendingMessages(-1);
158-
processOutstandingBatches();
159154
}
160155

161156
@Override
@@ -296,50 +291,19 @@ int getMessageDeadlineSeconds() {
296291
return messageDeadlineSeconds.get();
297292
}
298293

299-
static class OutstandingMessageBatch {
300-
private final Deque<OutstandingMessage> messages;
301-
private final Runnable doneCallback;
302-
303-
static class OutstandingMessage {
304-
private final ReceivedMessage receivedMessage;
305-
private final AckHandler ackHandler;
306-
307-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
308-
this.receivedMessage = receivedMessage;
309-
this.ackHandler = ackHandler;
310-
}
311-
312-
public ReceivedMessage receivedMessage() {
313-
return receivedMessage;
314-
}
315-
316-
public AckHandler ackHandler() {
317-
return ackHandler;
318-
}
319-
}
294+
static class OutstandingMessage {
295+
private final ReceivedMessage receivedMessage;
296+
private final AckHandler ackHandler;
320297

321-
public OutstandingMessageBatch(Runnable doneCallback) {
322-
this.messages = new LinkedList<>();
323-
this.doneCallback = doneCallback;
324-
}
325-
326-
public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
327-
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
328-
}
329-
330-
public Deque<OutstandingMessage> messages() {
331-
return messages;
298+
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
299+
this.receivedMessage = receivedMessage;
300+
this.ackHandler = ackHandler;
332301
}
333302
}
334303

335-
public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
336-
if (messages.isEmpty()) {
337-
doneCallback.run();
338-
return;
339-
}
340-
304+
public void processReceivedMessages(List<ReceivedMessage> messages) {
341305
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
342-
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
306+
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
343307
for (ReceivedMessage message : messages) {
344308
AckHandler ackHandler =
345309
new AckHandler(
@@ -355,42 +319,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
355319
// totally expire so that pubsub service sends us the message again.
356320
continue;
357321
}
358-
outstandingBatch.addMessage(message, ackHandler);
322+
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
359323
pendingReceipts.add(message.getAckId());
360324
}
361325

362-
if (outstandingBatch.messages.isEmpty()) {
363-
doneCallback.run();
364-
return;
365-
}
366-
367-
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
outstandingMessageBatches.add(outstandingBatch);
369-
processOutstandingBatches();
326+
processBatch(outstandingBatch);
370327
}
371328

372-
private void processOutstandingBatches() {
373-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374-
nextBatch != null;
375-
nextBatch = outstandingMessageBatches.poll()) {
376-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377-
nextMessage != null;
378-
nextMessage = nextBatch.messages.poll()) {
379-
try {
380-
// This is a non-blocking flow controller.
381-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382-
} catch (FlowController.MaxOutstandingElementCountReachedException
383-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384-
// Unwind previous changes in the batches outstanding.
385-
nextBatch.messages.addFirst(nextMessage);
386-
outstandingMessageBatches.addFirst(nextBatch);
387-
return;
388-
} catch (FlowControlException unexpectedException) {
389-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
390-
}
391-
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
329+
private void processBatch(List<OutstandingMessage> batch) {
330+
messagesWaiter.incrementPendingMessages(batch.size());
331+
for (OutstandingMessage message : batch) {
332+
// This is a blocking flow controller. We have already incremented MessageWaiter, so
333+
// shutdown will block on processing of all these messages anyway.
334+
try {
335+
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
336+
} catch (FlowControlException unexpectedException) {
337+
// This should be a blocking flow controller and never throw an exception.
338+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
392339
}
393-
nextBatch.doneCallback.run();
340+
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
394341
}
395342
}
396343

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,20 @@ public void onStart(StreamController controller) {
151151
@Override
152152
public void onResponse(StreamingPullResponse response) {
153153
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
154-
messageDispatcher.processReceivedMessages(
155-
response.getReceivedMessagesList(),
156-
new Runnable() {
157-
@Override
158-
public void run() {
159-
// Only request more if we're not shutdown.
160-
// If errorFuture is done, the stream has either failed or hung up,
161-
// and we don't need to request.
162-
if (isAlive() && !errorFuture.isDone()) {
163-
lock.lock();
164-
try {
165-
thisController.request(1);
166-
} catch (Exception e) {
167-
logger.log(Level.WARNING, "cannot request more messages", e);
168-
} finally {
169-
lock.unlock();
170-
}
171-
}
172-
}
173-
});
154+
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
155+
// Only request more if we're not shutdown.
156+
// If errorFuture is done, the stream has either failed or hung up,
157+
// and we don't need to request.
158+
if (isAlive() && !errorFuture.isDone()) {
159+
lock.lock();
160+
try {
161+
thisController.request(1);
162+
} catch (Exception e) {
163+
logger.log(Level.WARNING, "cannot request more messages", e);
164+
} finally {
165+
lock.unlock();
166+
}
167+
}
174168
}
175169

176170
@Override

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private Subscriber(Builder builder) {
130130
builder
131131
.flowControlSettings
132132
.toBuilder()
133-
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133+
.setLimitExceededBehavior(LimitExceededBehavior.Block)
134134
.build());
135135

136136
this.numPullers = builder.parallelPullCount;

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void sendAckOperations(
105105
new FlowController(
106106
FlowControlSettings.newBuilder()
107107
.setMaxOutstandingElementCount(1L)
108-
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
108+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
109109
.build());
110110

111111
dispatcher =
@@ -124,31 +124,31 @@ public void sendAckOperations(
124124

125125
@Test
126126
public void testReceipt() throws Exception {
127-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
127+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
128128
dispatcher.processOutstandingAckOperations();
129129
assertThat(sentModAcks)
130130
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
131131
}
132132

133133
@Test
134134
public void testAck() throws Exception {
135-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
135+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
136136
consumers.take().ack();
137137
dispatcher.processOutstandingAckOperations();
138138
assertThat(sentAcks).contains(TEST_MESSAGE.getAckId());
139139
}
140140

141141
@Test
142142
public void testNack() throws Exception {
143-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
143+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
144144
consumers.take().nack();
145145
dispatcher.processOutstandingAckOperations();
146146
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
147147
}
148148

149149
@Test
150150
public void testExtension() throws Exception {
151-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
151+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
152152
dispatcher.extendDeadlines();
153153
assertThat(sentModAcks)
154154
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -161,7 +161,7 @@ public void testExtension() throws Exception {
161161

162162
@Test
163163
public void testExtension_Close() throws Exception {
164-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
164+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
165165
dispatcher.extendDeadlines();
166166
assertThat(sentModAcks)
167167
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception {
176176

177177
@Test
178178
public void testExtension_GiveUp() throws Exception {
179-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
179+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
180180
dispatcher.extendDeadlines();
181181
assertThat(sentModAcks)
182182
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception {
188188
dispatcher.extendDeadlines();
189189
assertThat(sentModAcks).isEmpty();
190190

191-
// We should be able to reserve another item in the flow controller and not block shutdown
191+
// We should be able to reserve another item in the flow controller and not block.
192192
flowController.reserve(1, 0);
193193
dispatcher.stop();
194194
}
@@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception {
197197
public void testDeadlineAdjustment() throws Exception {
198198
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);
199199

200-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
200+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
201201
clock.advance(42, TimeUnit.SECONDS);
202202
consumers.take().ack();
203203

0 commit comments

Comments
 (0)