Skip to content

Commit 7869364

Browse files
anuraagapongad
authored andcommitted
Migrate Pub/Sub client to ApiFutures. (#3700)
1 parent e310f6d commit 7869364

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.core.ApiClock;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
2022
import com.google.api.core.InternalApi;
23+
import com.google.api.core.SettableApiFuture;
2124
import com.google.api.gax.batching.FlowController;
2225
import com.google.api.gax.batching.FlowController.FlowControlException;
2326
import com.google.api.gax.core.Distribution;
2427
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
25-
import com.google.common.collect.ArrayListMultimap;
2628
import com.google.common.primitives.Ints;
27-
import com.google.common.util.concurrent.FutureCallback;
28-
import com.google.common.util.concurrent.Futures;
29-
import com.google.common.util.concurrent.SettableFuture;
29+
import com.google.common.util.concurrent.MoreExecutors;
3030
import com.google.pubsub.v1.PubsubMessage;
3131
import com.google.pubsub.v1.ReceivedMessage;
3232
import java.util.ArrayList;
@@ -129,7 +129,7 @@ public enum AckReply {
129129
}
130130

131131
/** Handles callbacks for acking/nacking messages from the {@link MessageReceiver}. */
132-
private class AckHandler implements FutureCallback<AckReply> {
132+
private class AckHandler implements ApiFutureCallback<AckReply> {
133133
private final String ackId;
134134
private final int outstandingBytes;
135135
private final long receivedTimeMillis;
@@ -379,7 +379,7 @@ public void processOutstandingBatches() {
379379

380380
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
381381
final AckHandler ackHandler = outstandingMessage.ackHandler();
382-
final SettableFuture<AckReply> response = SettableFuture.create();
382+
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
383383
final AckReplyConsumer consumer =
384384
new AckReplyConsumer() {
385385
@Override
@@ -392,7 +392,7 @@ public void nack() {
392392
response.set(AckReply.NACK);
393393
}
394394
};
395-
Futures.addCallback(response, ackHandler);
395+
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
396396
executor.execute(
397397
new Runnable() {
398398
@Override

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
import com.google.api.core.AbstractApiService;
2020
import com.google.api.core.ApiClock;
21+
import com.google.api.core.ApiFutureCallback;
22+
import com.google.api.core.ApiFutures;
2123
import com.google.api.core.InternalApi;
24+
import com.google.api.core.SettableApiFuture;
2225
import com.google.api.gax.batching.FlowController;
2326
import com.google.api.gax.core.Distribution;
2427
import com.google.api.gax.grpc.GrpcStatusCode;
@@ -27,9 +30,7 @@
2730
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
2831
import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline;
2932
import com.google.common.collect.Lists;
30-
import com.google.common.util.concurrent.FutureCallback;
31-
import com.google.common.util.concurrent.Futures;
32-
import com.google.common.util.concurrent.SettableFuture;
33+
import com.google.common.util.concurrent.MoreExecutors;
3334
import com.google.protobuf.Empty;
3435
import com.google.pubsub.v1.AcknowledgeRequest;
3536
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
@@ -127,7 +128,7 @@ protected void doStop() {
127128
private class StreamingPullResponseObserver
128129
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
129130

130-
final SettableFuture<Void> errorFuture;
131+
final SettableApiFuture<Void> errorFuture;
131132

132133
/**
133134
* When a batch finsihes processing, we want to request one more batch from the server. But by
@@ -138,7 +139,7 @@ private class StreamingPullResponseObserver
138139
*/
139140
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;
140141

141-
StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
142+
StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
142143
this.errorFuture = errorFuture;
143144
}
144145

@@ -186,7 +187,7 @@ public void onCompleted() {
186187
}
187188

188189
private void initialize() {
189-
final SettableFuture<Void> errorFuture = SettableFuture.create();
190+
final SettableApiFuture<Void> errorFuture = SettableApiFuture.create();
190191
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver =
191192
new StreamingPullResponseObserver(errorFuture);
192193
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
@@ -215,9 +216,9 @@ private void initialize() {
215216
lock.unlock();
216217
}
217218

218-
Futures.addCallback(
219+
ApiFutures.addCallback(
219220
errorFuture,
220-
new FutureCallback<Void>() {
221+
new ApiFutureCallback<Void>() {
221222
@Override
222223
public void onSuccess(@Nullable Void result) {
223224
if (!isAlive()) {
@@ -260,7 +261,8 @@ public void run() {
260261
backoffMillis,
261262
TimeUnit.MILLISECONDS);
262263
}
263-
});
264+
},
265+
MoreExecutors.directExecutor());
264266
}
265267

266268
private boolean isAlive() {

0 commit comments

Comments
 (0)