Skip to content

Commit 4a26abe

Browse files
authored
Adding comments to CallbackExecutor.submit (#4981)
* Adding comments to CallbackExecutor.submit * Fixing the merge
1 parent d556ce4 commit 4a26abe

File tree

1 file changed

+39
-0
lines changed

1 file changed

+39
-0
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ private static class AutoExecutor extends SequentialExecutor {
115115
super(executor);
116116
}
117117

118+
@Override
118119
protected void execute(final String key, final Deque<Runnable> tasks) {
119120
executor.execute(
120121
new Runnable() {
@@ -146,29 +147,66 @@ private static class CallbackExecutor extends SequentialExecutor {
146147
super(executor);
147148
}
148149

150+
/**
151+
* This method does the following in a chain:
152+
*
153+
* <ol>
154+
* <li>Creates an `ApiFuture` that can be used for tracking progress.
155+
* <li>Creates a `CancellableRunnable` out of the `Callable`
156+
* <li>Adds the `CancellableRunnable` to the task queue
157+
* <li>Once the task is ready to be run, it will execute the `Callable`
158+
* <li>When the `Callable` completes one of two things happens:
159+
* <ol>
160+
* <li>On success:
161+
* <ol>
162+
* <li>Complete the `ApiFuture` by setting the return value.
163+
* <li>Call the next task.
164+
* </ol>
165+
* <li>On Failure:
166+
* <ol>
167+
* <li>Fail the `ApiFuture` by setting the exception.
168+
* <li>Cancel all tasks in the queue.
169+
* </ol>
170+
* </ol>
171+
* </ol>
172+
*
173+
* @param key The key for the task queue
174+
* @param callable The thing to run
175+
* @param <T> The return type for the `Callable`'s `ApiFuture`.
176+
* @return an `ApiFuture` for tracking.
177+
*/
149178
<T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable) {
179+
// Step 1: create a future for the user
150180
final SettableApiFuture<T> future = SettableApiFuture.create();
181+
182+
// Step 2: create the CancellableRunnable
183+
// Step 3: add the task to queue via `execute`
151184
execute(
152185
key,
153186
new CancellableRunnable() {
154187
private boolean cancelled = false;
155188

156189
@Override
157190
public void run() {
191+
// the task was cancelled
158192
if (cancelled) {
159193
return;
160194
}
195+
161196
try {
197+
// Step 4: call the `Callable`
162198
ApiFuture<T> callResult = callable.call();
163199
ApiFutures.addCallback(
164200
callResult,
165201
new ApiFutureCallback<T>() {
202+
// Step 5.1: on success
166203
@Override
167204
public void onSuccess(T msg) {
168205
future.set(msg);
169206
resume(key);
170207
}
171208

209+
// Step 5.2: on failure
172210
@Override
173211
public void onFailure(Throwable e) {
174212
future.setException(e);
@@ -193,6 +231,7 @@ public void cancel(Throwable e) {
193231
return future;
194232
}
195233

234+
@Override
196235
protected void execute(final String key, final Deque<Runnable> tasks) {
197236
executor.execute(
198237
new Runnable() {

0 commit comments

Comments
 (0)