Skip to content

Commit 05e4618

Browse files
fix: Retry multiplexed session failures
1 parent 3d585cf commit 05e4618

File tree

2 files changed

+198
-23
lines changed

2 files changed

+198
-23
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.concurrent.atomic.AtomicLong;
5555
import java.util.concurrent.atomic.AtomicReference;
56+
import java.util.concurrent.locks.ReentrantLock;
5657

5758
/**
5859
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
@@ -315,6 +316,18 @@ public void close() {
315316
*/
316317
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
317318

319+
/**
320+
* This flag is set to true if create session RPC is in progress. This flag prevents application
321+
* from firing two requests concurrently
322+
*/
323+
private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false);
324+
325+
/**
326+
* This lock is used to prevent two threads from retrying createSession RPC requests in
327+
* concurrently.
328+
*/
329+
private final ReentrantLock sessionCreationLock = new ReentrantLock();
330+
318331
/**
319332
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
320333
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
@@ -358,11 +371,20 @@ public void close() {
358371
SettableApiFuture.create();
359372
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360373
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
374+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
375+
maybeWaitForSessionCreation(
376+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
377+
initialSessionReferenceFuture);
378+
}
379+
380+
private void asyncCreateMultiplexedSession(
381+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361382
this.sessionClient.asyncCreateMultiplexedSession(
362383
new SessionConsumer() {
363384
@Override
364385
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
386+
retryingSessionCreation.set(false);
387+
sessionReferenceFuture.set(session.getSessionReference());
366388
// only start the maintainer if we actually managed to create a session in the first
367389
// place.
368390
maintainer.start();
@@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) {
394416
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
395417
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396418
// UNIMPLEMENTED is returned.
419+
retryingSessionCreation.set(false);
397420
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
421+
sessionReferenceFuture.setException(t);
399422
}
400423
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404424
}
405425

406426
void setPool(SessionPool pool) {
@@ -546,8 +566,30 @@ MultiplexedSessionMaintainer getMaintainer() {
546566
return this.maintainer;
547567
}
548568

569+
ApiFuture<SessionReference> getCurrentSessionReferenceFuture() {
570+
return ApiFutures.immediateFuture(getCurrentSessionReference());
571+
}
572+
549573
@VisibleForTesting
550574
SessionReference getCurrentSessionReference() {
575+
try {
576+
return this.multiplexedSessionReference.get().get();
577+
} catch (ExecutionException | InterruptedException exception) {
578+
return maybeRetrySessionCreation();
579+
}
580+
}
581+
582+
private SessionReference maybeRetrySessionCreation() {
583+
sessionCreationLock.lock();
584+
try {
585+
if (isMultiplexedSessionsSupported() && retryingSessionCreation.compareAndSet(false, true)) {
586+
SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create();
587+
asyncCreateMultiplexedSession(settableApiFuture);
588+
multiplexedSessionReference.set(settableApiFuture);
589+
}
590+
} finally {
591+
sessionCreationLock.unlock();
592+
}
551593
try {
552594
return this.multiplexedSessionReference.get().get();
553595
} catch (ExecutionException executionException) {
@@ -587,28 +629,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
587629

588630
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
589631
boolean singleUse) {
590-
try {
591-
return new MultiplexedSessionTransaction(
592-
this,
593-
tracer.getCurrentSpan(),
594-
// Getting the result of the SettableApiFuture that contains the multiplexed session will
595-
// also automatically propagate any error that happened during the creation of the
596-
// session, such as for example a DatabaseNotFound exception. We therefore do not need
597-
// any special handling of such errors.
598-
multiplexedSessionReference.get().get(),
599-
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
600-
singleUse,
601-
this.pool);
602-
} catch (ExecutionException executionException) {
603-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
604-
} catch (InterruptedException interruptedException) {
605-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
606-
}
632+
return new MultiplexedSessionTransaction(
633+
this,
634+
tracer.getCurrentSpan(),
635+
// Getting the result of the SettableApiFuture that contains the multiplexed session will
636+
// also automatically propagate any error that happened during the creation of the
637+
// session, such as for example a DatabaseNotFound exception. We therefore do not need
638+
// any special handling of such errors.
639+
getCurrentSessionReference(),
640+
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
641+
singleUse,
642+
this.pool);
607643
}
608644

609645
private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
610646
return new DelayedMultiplexedSessionTransaction(
611-
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
647+
this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool);
612648
}
613649

614650
private int getSingleUseChannelHint() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import java.time.Duration;
5555
import java.util.*;
5656
import java.util.concurrent.CountDownLatch;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
5961
import java.util.concurrent.atomic.AtomicReference;
@@ -245,6 +247,143 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() {
245247
assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
246248
}
247249

250+
@Test
251+
public void testDeadlineExceededErrorWithOneRetry() {
252+
// Setting up two exceptions
253+
mockSpanner.setCreateSessionExecutionTime(
254+
SimulatedExecutionTime.ofExceptions(
255+
Arrays.asList(
256+
Status.DEADLINE_EXCEEDED
257+
.withDescription(
258+
"CallOptions deadline exceeded after 22.986872393s. "
259+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
260+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
261+
.asRuntimeException(),
262+
Status.DEADLINE_EXCEEDED
263+
.withDescription(
264+
"CallOptions deadline exceeded after 22.986872393s. "
265+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
266+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
267+
.asRuntimeException())));
268+
DatabaseClientImpl client =
269+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
270+
assertNotNull(client.multiplexedSessionDatabaseClient);
271+
272+
// initial fetch call fails with exception
273+
// this call will try to fetch it again which again throws an exception
274+
assertThrows(
275+
SpannerException.class,
276+
() -> {
277+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
278+
//noinspection StatementWithEmptyBody
279+
while (resultSet.next()) {
280+
// ignore
281+
}
282+
}
283+
});
284+
285+
// When third request comes it should succeed
286+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
287+
//noinspection StatementWithEmptyBody
288+
while (resultSet.next()) {
289+
// ignore
290+
}
291+
}
292+
293+
// Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session.
294+
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
295+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
296+
297+
Session session = mockSpanner.getSession(requests.get(0).getSession());
298+
assertNotNull(session);
299+
assertTrue(session.getMultiplexed());
300+
301+
assertNotNull(client.multiplexedSessionDatabaseClient);
302+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
303+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
304+
}
305+
306+
@Test
307+
public void testDeadlineExceededErrorWithOneRetryWithParallelRequests()
308+
throws InterruptedException {
309+
mockSpanner.setCreateSessionExecutionTime(
310+
SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions(
311+
2000,
312+
0,
313+
Arrays.asList(
314+
Status.DEADLINE_EXCEEDED
315+
.withDescription(
316+
"CallOptions deadline exceeded after 22.986872393s. "
317+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
318+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
319+
.asRuntimeException(),
320+
Status.DEADLINE_EXCEEDED
321+
.withDescription(
322+
"CallOptions deadline exceeded after 22.986872393s. "
323+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
324+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
325+
.asRuntimeException())));
326+
DatabaseClientImpl client =
327+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
328+
assertNotNull(client.multiplexedSessionDatabaseClient);
329+
330+
ExecutorService executor = Executors.newCachedThreadPool();
331+
332+
// First set of request should fail with an error
333+
CountDownLatch failureCountDownLatch = new CountDownLatch(3);
334+
for (int i = 0; i < 3; i++) {
335+
executor.submit(
336+
() -> {
337+
try {
338+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
339+
//noinspection StatementWithEmptyBody
340+
while (resultSet.next()) {
341+
// ignore
342+
}
343+
}
344+
} catch (SpannerException e) {
345+
failureCountDownLatch.countDown();
346+
}
347+
});
348+
}
349+
350+
assertTrue(failureCountDownLatch.await(2, TimeUnit.SECONDS));
351+
assertEquals(0, failureCountDownLatch.getCount());
352+
353+
// Second set of requests should pass
354+
CountDownLatch countDownLatch = new CountDownLatch(3);
355+
for (int i = 0; i < 3; i++) {
356+
executor.submit(
357+
() -> {
358+
try {
359+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
360+
//noinspection StatementWithEmptyBody
361+
while (resultSet.next()) {
362+
// ignore
363+
}
364+
}
365+
} catch (SpannerException e) {
366+
countDownLatch.countDown();
367+
}
368+
});
369+
}
370+
371+
assertFalse(countDownLatch.await(3, TimeUnit.SECONDS));
372+
assertEquals(3, countDownLatch.getCount());
373+
374+
// Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session.
375+
assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
376+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
377+
378+
Session session = mockSpanner.getSession(requests.get(0).getSession());
379+
assertNotNull(session);
380+
assertTrue(session.getMultiplexed());
381+
382+
assertNotNull(client.multiplexedSessionDatabaseClient);
383+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
384+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
385+
}
386+
248387
@Test
249388
public void
250389
testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() {

0 commit comments

Comments
 (0)