Skip to content

Commit 45e76f3

Browse files
authored
Spanner: migrate all unary call methods to gapic and inject headers (#3112)
* inject headers to grpcCallContext before making calls * Suppress retry in gapic
1 parent 75d1d0b commit 45e76f3

File tree

5 files changed

+133
-51
lines changed

5 files changed

+133
-51
lines changed

google-cloud-bom/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@
170170
<testing.version>0.43.1-alpha-SNAPSHOT</testing.version><!-- {x-version-update:google-cloud-testing:current} -->
171171

172172
<api-common.version>1.5.0</api-common.version>
173-
<gax.version>1.23.0</gax.version>
174-
<gax-grpc.version>1.23.0</gax-grpc.version>
175-
<gax-httpjson.version>0.40.0</gax-httpjson.version>
173+
<gax.version>1.24.0</gax.version>
174+
<gax-grpc.version>1.24.0</gax-grpc.version>
175+
<gax-httpjson.version>0.41.0</gax-httpjson.version>
176176
<generated-proto-beta.version>0.8.0</generated-proto-beta.version>
177177
<generated-proto-ga.version>1.7.0</generated-proto-ga.version>
178178
</properties>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ Session createSession(final DatabaseId db) throws SpannerException {
266266
new Callable<com.google.spanner.v1.Session>() {
267267
@Override
268268
public com.google.spanner.v1.Session call() throws Exception {
269-
return rawGrpcRpc.createSession(
269+
return gapicRpc.createSession(
270270
db.getName(), getOptions().getSessionLabels(), options);
271271
}
272272
});
@@ -806,7 +806,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
806806
new Callable<CommitResponse>() {
807807
@Override
808808
public CommitResponse call() throws Exception {
809-
return rawGrpcRpc.commit(request, options);
809+
return gapicRpc.commit(request, options);
810810
}
811811
});
812812
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
@@ -872,7 +872,7 @@ public void close() {
872872
new Callable<Void>() {
873873
@Override
874874
public Void call() throws Exception {
875-
rawGrpcRpc.deleteSession(name, options);
875+
gapicRpc.deleteSession(name, options);
876876
return null;
877877
}
878878
});
@@ -898,7 +898,7 @@ ByteString beginTransaction() {
898898
new Callable<Transaction>() {
899899
@Override
900900
public Transaction call() throws Exception {
901-
return rawGrpcRpc.beginTransaction(request, options);
901+
return gapicRpc.beginTransaction(request, options);
902902
}
903903
});
904904
if (txn.getId().isEmpty()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 110 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2020

21+
import com.google.api.core.ApiFunction;
2122
import com.google.api.gax.core.CredentialsProvider;
2223
import com.google.api.gax.core.GaxProperties;
2324
import com.google.api.gax.grpc.GaxGrpcProperties;
25+
import com.google.api.gax.grpc.GrpcCallContext;
2426
import com.google.api.gax.grpc.GrpcTransportChannel;
2527
import com.google.api.gax.rpc.ApiClientHeaderProvider;
2628
import com.google.api.gax.rpc.FixedTransportChannelProvider;
2729
import com.google.api.gax.rpc.HeaderProvider;
30+
import com.google.api.gax.rpc.StatusCode;
2831
import com.google.api.gax.rpc.TransportChannelProvider;
32+
import com.google.api.gax.rpc.UnaryCallSettings;
2933
import com.google.api.pathtemplate.PathTemplate;
3034
import com.google.cloud.ServiceOptions;
3135
import com.google.cloud.grpc.GrpcTransportOptions;
@@ -43,6 +47,7 @@
4347
import com.google.cloud.spanner.v1.stub.SpannerStub;
4448
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
4549
import com.google.common.base.MoreObjects;
50+
import com.google.common.collect.ImmutableSet;
4651
import com.google.longrunning.GetOperationRequest;
4752
import com.google.longrunning.Operation;
4853
import com.google.protobuf.FieldMask;
@@ -68,6 +73,8 @@
6873
import com.google.spanner.v1.BeginTransactionRequest;
6974
import com.google.spanner.v1.CommitRequest;
7075
import com.google.spanner.v1.CommitResponse;
76+
import com.google.spanner.v1.CreateSessionRequest;
77+
import com.google.spanner.v1.DeleteSessionRequest;
7178
import com.google.spanner.v1.ExecuteSqlRequest;
7279
import com.google.spanner.v1.PartitionQueryRequest;
7380
import com.google.spanner.v1.PartitionReadRequest;
@@ -140,25 +147,59 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
140147
.build());
141148
CredentialsProvider credentialsProvider =
142149
GrpcTransportOptions.setUpCredentialsProvider(options);
143-
144-
this.stub =
150+
151+
// Disabling retry for now because spanner handles retry in SpannerImpl.
152+
// We will finally want to improve gax but for smooth transitioning we
153+
// preserve the retry in SpannerImpl
154+
try {
155+
// TODO: bump the version of gax and remove this try-catch block
156+
// applyToAllUnaryMethods does not throw exception in the latest version
157+
this.stub =
145158
GrpcSpannerStub.create(
146159
SpannerStubSettings.newBuilder()
147160
.setTransportChannelProvider(channelProvider)
148161
.setCredentialsProvider(credentialsProvider)
162+
.applyToAllUnaryMethods(
163+
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
164+
@Override
165+
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
166+
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
167+
return null;
168+
}
169+
})
149170
.build());
171+
150172
this.instanceStub =
151173
GrpcInstanceAdminStub.create(
152174
InstanceAdminStubSettings.newBuilder()
153175
.setTransportChannelProvider(channelProvider)
154176
.setCredentialsProvider(credentialsProvider)
177+
.applyToAllUnaryMethods(
178+
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
179+
@Override
180+
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
181+
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
182+
return null;
183+
}
184+
})
155185
.build());
156186
this.databaseStub =
157187
GrpcDatabaseAdminStub.create(
158188
DatabaseAdminStubSettings.newBuilder()
159189
.setTransportChannelProvider(channelProvider)
160190
.setCredentialsProvider(credentialsProvider)
191+
.applyToAllUnaryMethods(
192+
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
193+
@Override
194+
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
195+
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
196+
return null;
197+
}
198+
})
161199
.build());
200+
} catch (Exception e) {
201+
throw SpannerExceptionFactory.newSpannerException(e);
202+
}
162203
}
163204

164205
@Override
@@ -171,9 +212,9 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str
171212
}
172213
ListInstanceConfigsRequest request = requestBuilder.build();
173214

174-
// TODO: put projectName in metadata
215+
GrpcCallContext context = newCallContext(null, projectName);
175216
ListInstanceConfigsResponse response =
176-
get(instanceStub.listInstanceConfigsCallable().futureCall(request));
217+
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
177218
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
178219
}
179220

@@ -182,8 +223,8 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
182223
GetInstanceConfigRequest request =
183224
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();
184225

185-
// TODO: put projectName in metadata
186-
return get(instanceStub.getInstanceConfigCallable().futureCall(request));
226+
GrpcCallContext context = newCallContext(null, projectName);
227+
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
187228
}
188229

189230
@Override
@@ -199,9 +240,9 @@ public Paginated<Instance> listInstances(
199240
}
200241
ListInstancesRequest request = requestBuilder.build();
201242

202-
// TODO: put projectName in metadata
243+
GrpcCallContext context = newCallContext(null, projectName);
203244
ListInstancesResponse response =
204-
get(instanceStub.listInstancesCallable().futureCall(request));
245+
get(instanceStub.listInstancesCallable().futureCall(request, context));
205246
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
206247
}
207248

@@ -214,34 +255,36 @@ public Operation createInstance(String parent, String instanceId, Instance insta
214255
.setInstanceId(instanceId)
215256
.setInstance(instance)
216257
.build();
217-
// TODO: put parent in metadata
218-
return get(instanceStub.createInstanceCallable().futureCall(request));
258+
259+
GrpcCallContext context = newCallContext(null, parent);
260+
return get(instanceStub.createInstanceCallable().futureCall(request, context));
219261
}
220262

221263
@Override
222264
public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException {
223265
UpdateInstanceRequest request =
224266
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
225-
// TODO: put instance.getName() in metadata
226-
return get(instanceStub.updateInstanceCallable().futureCall(request));
267+
268+
GrpcCallContext context = newCallContext(null, instance.getName());
269+
return get(instanceStub.updateInstanceCallable().futureCall(request, context));
227270
}
228271

229272
@Override
230273
public Instance getInstance(String instanceName) throws SpannerException {
231274
GetInstanceRequest request =
232275
GetInstanceRequest.newBuilder().setName(instanceName).build();
233276

234-
// TODO: put instanceName in metadata
235-
return get(instanceStub.getInstanceCallable().futureCall(request));
277+
GrpcCallContext context = newCallContext(null, instanceName);
278+
return get(instanceStub.getInstanceCallable().futureCall(request, context));
236279
}
237280

238281
@Override
239282
public void deleteInstance(String instanceName) throws SpannerException {
240283
DeleteInstanceRequest request =
241284
DeleteInstanceRequest.newBuilder().setName(instanceName).build();
242285

243-
// TODO: put instanceName in metadata
244-
get(instanceStub.deleteInstanceCallable().futureCall(request));
286+
GrpcCallContext context = newCallContext(null, instanceName);
287+
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
245288
}
246289

247290
@Override
@@ -254,8 +297,9 @@ public Paginated<Database> listDatabases(
254297
}
255298
ListDatabasesRequest request = requestBuilder.build();
256299

257-
// TODO: put instanceName in metadata
258-
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable().futureCall(request));
300+
GrpcCallContext context = newCallContext(null, instanceName);
301+
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
302+
.futureCall(request, context));
259303
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
260304
}
261305

@@ -268,8 +312,8 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem
268312
.setCreateStatement(createDatabaseStatement)
269313
.addAllExtraStatements(additionalStatements)
270314
.build();
271-
// TODO: put instanceName in metadata
272-
return get(databaseStub.createDatabaseCallable().futureCall(request));
315+
GrpcCallContext context = newCallContext(null, instanceName);
316+
return get(databaseStub.createDatabaseCallable().futureCall(request, context));
273317
}
274318

275319
@Override
@@ -281,17 +325,17 @@ public Operation updateDatabaseDdl(String databaseName, Iterable<String> updateD
281325
.addAllStatements(updateDatabaseStatements)
282326
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
283327
.build();
284-
// TODO: put databaseName in metadata
285-
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request));
328+
GrpcCallContext context = newCallContext(null, databaseName);
329+
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context));
286330
}
287331

288332
@Override
289333
public void dropDatabase(String databaseName) throws SpannerException {
290334
DropDatabaseRequest request =
291335
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();
292336

293-
// TODO: put databaseName in metadata
294-
get(databaseStub.dropDatabaseCallable().futureCall(request));
337+
GrpcCallContext context = newCallContext(null, databaseName);
338+
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
295339
}
296340

297341
@Override
@@ -301,43 +345,56 @@ public Database getDatabase(String databaseName) throws SpannerException {
301345
.setName(databaseName)
302346
.build();
303347

304-
// TODO: put databaseName in metadata
305-
return get(databaseStub.getDatabaseCallable().futureCall(request));
348+
GrpcCallContext context = newCallContext(null, databaseName);
349+
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
306350
}
307351

308352
@Override
309353
public List<String> getDatabaseDdl(String databaseName) throws SpannerException {
310354
GetDatabaseDdlRequest request =
311355
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();
312356

313-
// TODO: put databaseName in metadata
314-
return get(databaseStub.getDatabaseDdlCallable().futureCall(request))
357+
GrpcCallContext context = newCallContext(null, databaseName);
358+
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
315359
.getStatementsList();
316360
}
317361

318362
@Override
319363
public Operation getOperation(String name) throws SpannerException {
320364
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
321-
// TODO: put name in metadata
322-
return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request));
365+
GrpcCallContext context = newCallContext(null, name);
366+
return get(databaseStub.getOperationsStub().getOperationCallable()
367+
.futureCall(request, context));
323368
}
324369

325370
@Override
326371
public Session createSession(String databaseName, @Nullable Map<String, String> labels,
327372
@Nullable Map<Option, ?> options) throws SpannerException {
328-
throw new UnsupportedOperationException("Not implemented yet.");
373+
CreateSessionRequest.Builder requestBuilder =
374+
CreateSessionRequest.newBuilder().setDatabase(databaseName);
375+
if (labels != null && !labels.isEmpty()) {
376+
Session.Builder session = Session.newBuilder().putAllLabels(labels);
377+
requestBuilder.setSession(session);
378+
}
379+
CreateSessionRequest request = requestBuilder.build();
380+
GrpcCallContext context = newCallContext(options, databaseName);
381+
return get(stub.createSessionCallable().futureCall(request, context));
329382
}
330383

331384
@Override
332385
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
333386
throws SpannerException {
334-
throw new UnsupportedOperationException("Not implemented yet.");
387+
DeleteSessionRequest request =
388+
DeleteSessionRequest.newBuilder().setName(sessionName).build();
389+
GrpcCallContext context = newCallContext(options, sessionName);
390+
get(stub.deleteSessionCallable().futureCall(request, context));
335391
}
336392

337393
@Override
338394
public StreamingCall read(
339395
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
340-
throw new UnsupportedOperationException("Not implemented yet.");
396+
GrpcCallContext context = newCallContext(options, request.getSession());
397+
throw new UnsupportedOperationException("not implemented yet");
341398
}
342399

343400
@Override
@@ -349,33 +406,36 @@ public StreamingCall executeQuery(
349406
@Override
350407
public Transaction beginTransaction(
351408
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
352-
throw new UnsupportedOperationException("Not implemented yet.");
409+
GrpcCallContext context = newCallContext(options, request.getSession());
410+
return get(stub.beginTransactionCallable().futureCall(request, context));
353411
}
354412

355413
@Override
356414
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
357415
throws SpannerException {
358-
throw new UnsupportedOperationException("Not implemented yet.");
416+
GrpcCallContext context = newCallContext(options, commitRequest.getSession());
417+
return get(stub.commitCallable().futureCall(commitRequest, context));
359418
}
360419

361420
@Override
362421
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
363422
throws SpannerException {
364-
throw new UnsupportedOperationException("Not implemented yet.");
423+
GrpcCallContext context = newCallContext(options, request.getSession());
424+
get(stub.rollbackCallable().futureCall(request, context));
365425
}
366426

367427
@Override
368428
public PartitionResponse partitionQuery(
369429
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
370-
throw new UnsupportedOperationException("Not implemented yet.");
430+
GrpcCallContext context = newCallContext(options, request.getSession());
431+
return get(stub.partitionQueryCallable().futureCall(request, context));
371432
}
372433

373434
@Override
374435
public PartitionResponse partitionRead(
375436
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
376-
// TODO(pongad): Figure out metadata
377-
// TODO(pongad): Figure out channel affinity
378-
return get(stub.partitionReadCallable().futureCall(request));
437+
GrpcCallContext context = newCallContext(options, request.getSession());
438+
return get(stub.partitionReadCallable().futureCall(request, context));
379439
}
380440

381441
/** Gets the result of an async RPC call, handling any exceptions encountered. */
@@ -391,4 +451,14 @@ private static <T> T get(final Future<T> future) throws SpannerException {
391451
throw newSpannerException(context, e);
392452
}
393453
}
454+
455+
private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
456+
GrpcCallContext context = GrpcCallContext.createDefault();
457+
if (options != null) {
458+
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
459+
}
460+
context = context.withExtraHeaders(
461+
metadataProvider.newExtraHeaders(resource, projectName));
462+
return context;
463+
}
394464
}

0 commit comments

Comments
 (0)