1616
1717package com .google .cloud .pubsub ;
1818
19+ import static com .google .api .client .util .Preconditions .checkArgument ;
1920import static com .google .cloud .pubsub .PubSub .ListOption .OptionType .PAGE_SIZE ;
2021import static com .google .cloud .pubsub .PubSub .ListOption .OptionType .PAGE_TOKEN ;
21- import static com .google .common .base .Preconditions .checkArgument ;
2222import static com .google .common .util .concurrent .Futures .lazyTransform ;
2323
2424import com .google .cloud .AsyncPage ;
2929import com .google .cloud .pubsub .spi .PubSubRpc ;
3030import com .google .cloud .pubsub .spi .v1 .PublisherApi ;
3131import com .google .cloud .pubsub .spi .v1 .SubscriberApi ;
32+ import com .google .common .annotations .VisibleForTesting ;
3233import com .google .common .base .Function ;
3334import com .google .common .base .Throwables ;
3435import com .google .common .collect .ImmutableList ;
3536import com .google .common .collect .Iterables ;
37+ import com .google .common .collect .Iterators ;
3638import com .google .common .collect .Lists ;
3739import com .google .common .collect .Maps ;
3840import com .google .common .util .concurrent .Uninterruptibles ;
5254import com .google .pubsub .v1 .ModifyPushConfigRequest ;
5355import com .google .pubsub .v1 .PublishRequest ;
5456import com .google .pubsub .v1 .PublishResponse ;
57+ import com .google .pubsub .v1 .PullRequest ;
58+ import com .google .pubsub .v1 .PullResponse ;
5559
5660import java .util .Collections ;
5761import java .util .Iterator ;
6468class PubSubImpl extends BaseService <PubSubOptions > implements PubSub {
6569
6670 private final PubSubRpc rpc ;
71+ private final AckDeadlineRenewer ackDeadlineRenewer ;
72+ private boolean closed ;
6773
6874 private static final Function <Empty , Void > EMPTY_TO_VOID_FUNCTION = new Function <Empty , Void >() {
6975 @ Override
@@ -78,10 +84,25 @@ public Boolean apply(Empty input) {
7884 return input != null ;
7985 }
8086 };
87+ private static final Function <com .google .pubsub .v1 .ReceivedMessage , String >
88+ MESSAGE_TO_ACK_ID_FUNCTION = new Function <com .google .pubsub .v1 .ReceivedMessage , String >() {
89+ @ Override
90+ public String apply (com .google .pubsub .v1 .ReceivedMessage message ) {
91+ return message .getAckId ();
92+ }
93+ };
8194
8295 PubSubImpl (PubSubOptions options ) {
8396 super (options );
8497 rpc = options .rpc ();
98+ ackDeadlineRenewer = new AckDeadlineRenewer (this );
99+ }
100+
101+ @ VisibleForTesting
102+ PubSubImpl (PubSubOptions options , AckDeadlineRenewer ackDeadlineRenewer ) {
103+ super (options );
104+ rpc = options .rpc ();
105+ this .ackDeadlineRenewer = ackDeadlineRenewer ;
85106 }
86107
87108 private abstract static class BasePageFetcher <T > implements AsyncPageImpl .NextPageFetcher <T > {
@@ -445,17 +466,35 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
445466
446467 @ Override
447468 public Iterator <ReceivedMessage > pull (String subscription , int maxMessages ) {
448- // this should set return_immediately to true
449- return null ;
469+ return get (pullAsync (subscription , maxMessages ));
450470 }
451471
452472 @ Override
453- public Future <Iterator <ReceivedMessage >> pullAsync (String subscription , int maxMessages ) {
454- // though this method can set return_immediately to false (as future can be canceled) I
455- // suggest to keep it false so sync could delegate to asyc and use the same options
456- // this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
457- // messages
458- return null ;
473+ public Future <Iterator <ReceivedMessage >> pullAsync (final String subscription , int maxMessages ) {
474+ PullRequest request = PullRequest .newBuilder ().setReturnImmediately (true )
475+ .setSubscription (SubscriberApi .formatSubscriptionName (options ().projectId (), subscription ))
476+ .setMaxMessages (maxMessages )
477+ .setReturnImmediately (true )
478+ .build ();
479+ Future <PullResponse > response = rpc .pull (request );
480+ return lazyTransform (response , new Function <PullResponse , Iterator <ReceivedMessage >>() {
481+ @ Override
482+ public Iterator <ReceivedMessage > apply (PullResponse pullResponse ) {
483+ // Add all received messages to the automatic ack deadline renewer
484+ List <String > ackIds = Lists .transform (pullResponse .getReceivedMessagesList (),
485+ MESSAGE_TO_ACK_ID_FUNCTION );
486+ ackDeadlineRenewer .add (subscription , ackIds );
487+ return Iterators .transform (pullResponse .getReceivedMessagesList ().iterator (),
488+ new Function <com .google .pubsub .v1 .ReceivedMessage , ReceivedMessage >() {
489+ @ Override
490+ public ReceivedMessage apply (com .google .pubsub .v1 .ReceivedMessage receivedMessage ) {
491+ // Remove consumed message from automatic ack deadline renewer
492+ ackDeadlineRenewer .remove (subscription , receivedMessage .getAckId ());
493+ return ReceivedMessage .fromPb (PubSubImpl .this , subscription , receivedMessage );
494+ }
495+ });
496+ }
497+ });
459498 }
460499
461500 @ Override
@@ -549,6 +588,13 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
549588
550589 @ Override
551590 public void close () throws Exception {
591+ if (closed ) {
592+ return ;
593+ }
594+ closed = true ;
552595 rpc .close ();
596+ if (ackDeadlineRenewer != null ) {
597+ ackDeadlineRenewer .close ();
598+ }
553599 }
554600}
0 commit comments