-
Notifications
You must be signed in to change notification settings - Fork 29
[INFRA-9517] handle auth failures with immediate reconnection #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.sproutsocial.nsq; | ||
|
|
||
| /** | ||
| * Exception thrown when NSQ server returns E_AUTH_FAILED or E_UNAUTHORIZED errors. | ||
| * This typically indicates that the server-side auth session has expired while the | ||
| * TCP connection remained open. | ||
| */ | ||
| public class AuthFailedException extends NSQException { | ||
|
|
||
| public AuthFailedException(String message) { | ||
| super(message); | ||
| } | ||
|
|
||
| public AuthFailedException(String message, Throwable cause) { | ||
| super(message, cause); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,6 +194,12 @@ public void run() { | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void handleAuthFailure() { | ||
| // Trigger immediate reconnection when auth session expires | ||
| subscription.getSubscriber().immediateCheckConnections(topic); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make reconnect behavior pluggable. A few different ways a caller might want to handle authentication failures:
For the case of actual bad credentials, won't an "immediate reconnect" bombard the auth server with infinite retries as fast as it can? At a minimum, we should probably give up after a certain number of authentication failures, since this would happen in the case of legitimate bad credentials.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i considered this early on but is not an issue because
|
||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| super.close(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| package com.sproutsocial.nsq; | ||
|
|
||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| /** | ||
| * Unit tests for AuthFailedException | ||
| */ | ||
| public class AuthFailedExceptionTest { | ||
|
|
||
| @Test | ||
| public void testAuthFailedExceptionWithMessage() { | ||
| String errorMessage = "E_AUTH_FAILED auth session expired"; | ||
| AuthFailedException exception = new AuthFailedException(errorMessage); | ||
|
|
||
| Assert.assertEquals(errorMessage, exception.getMessage()); | ||
| Assert.assertNull(exception.getCause()); | ||
| Assert.assertTrue(exception instanceof NSQException); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAuthFailedExceptionWithMessageAndCause() { | ||
| String errorMessage = "E_UNAUTHORIZED unauthorized access"; | ||
| RuntimeException cause = new RuntimeException("Connection error"); | ||
| AuthFailedException exception = new AuthFailedException(errorMessage, cause); | ||
|
|
||
| Assert.assertEquals(errorMessage, exception.getMessage()); | ||
| Assert.assertEquals(cause, exception.getCause()); | ||
| Assert.assertTrue(exception instanceof NSQException); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAuthFailedExceptionInheritance() { | ||
| AuthFailedException exception = new AuthFailedException("test"); | ||
|
|
||
| // Verify it's properly extending NSQException | ||
| Assert.assertTrue("AuthFailedException should extend NSQException", | ||
| exception instanceof NSQException); | ||
| Assert.assertTrue("AuthFailedException should extend RuntimeException", | ||
| exception instanceof RuntimeException); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| package com.sproutsocial.nsq; | ||
|
|
||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.List; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| /** | ||
| * Integration test for immediate reconnection behavior. | ||
| * | ||
| * These tests verify that the immediateCheckConnections() method (which is called | ||
| * when auth failures occur) works correctly to trigger immediate reconnection | ||
| * rather than waiting up to 60 seconds for the periodic checkConnections() call. | ||
| * | ||
| * Note: Actual E_AUTH_FAILED/E_UNAUTHORIZED error handling is tested in | ||
| * ConnectionAuthFailureTest, which verifies those errors throw AuthFailedException | ||
| * and trigger the handleAuthFailure() -> immediateCheckConnections() code path. | ||
| * These integration tests verify the reconnection mechanism itself works correctly. | ||
| */ | ||
| public class AuthFailureRecoveryDockerTestIT extends BaseDockerTestIT { | ||
| private static final Logger logger = LoggerFactory.getLogger(AuthFailureRecoveryDockerTestIT.class); | ||
| private Publisher publisher; | ||
| private Subscriber subscriber; | ||
|
|
||
| @Override | ||
| public void setup() { | ||
| super.setup(); | ||
| publisher = this.backupPublisher(); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that the subscriber can successfully establish connections and receive messages. | ||
| * This baseline test ensures the immediate reconnection mechanism doesn't break | ||
| * normal operation. | ||
| */ | ||
| @Test | ||
| public void testNormalSubscriberOperation() { | ||
| TestMessageHandler handler = new TestMessageHandler(); | ||
| subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString()); | ||
| subscriber.setDefaultMaxInFlight(10); | ||
| subscriber.subscribe(topic, "channel", handler); | ||
|
|
||
| // Send messages and verify they're received | ||
| List<String> messages = messages(20, 40); | ||
| send(topic, messages, 0, 0, publisher); | ||
|
|
||
| // Verify all messages received | ||
| List<NSQMessage> receivedMessages = handler.drainMessagesOrTimeOut(20); | ||
| validateReceivedAllMessages(messages, receivedMessages, false); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that subscriber can recover from connection drops. | ||
| * This simulates a scenario similar to auth failure where connection is lost | ||
| * and needs to be re-established. | ||
| */ | ||
| @Test | ||
| public void testSubscriberReconnectionAfterNetworkDisruption() { | ||
| TestMessageHandler handler = new TestMessageHandler(); | ||
| subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString()); | ||
| subscriber.setDefaultMaxInFlight(10); | ||
| subscriber.subscribe(topic, "channel", handler); | ||
|
|
||
| // Send first batch of messages | ||
| List<String> batch1 = messages(10, 40); | ||
| send(topic, batch1, 0, 0, publisher); | ||
| List<NSQMessage> received1 = handler.drainMessagesOrTimeOut(10); | ||
| validateReceivedAllMessages(batch1, received1, false); | ||
|
|
||
| // Simulate connection issue by disconnecting and reconnecting network | ||
| logger.info("Disconnecting network for nsqd node to simulate connection drop"); | ||
| cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0)); | ||
| Util.sleepQuietly(2000); | ||
|
|
||
| cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0)); | ||
| Util.sleepQuietly(2000); | ||
|
|
||
| // Send second batch and verify recovery | ||
| List<String> batch2 = messages(10, 40); | ||
| send(topic, batch2, 0, 0, publisher); | ||
| List<NSQMessage> received2 = handler.drainMessagesOrTimeOut(10, 20000); | ||
| validateReceivedAllMessages(batch2, received2, false); | ||
|
|
||
| logger.info("Successfully received messages after connection recovery"); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that the immediateCheckConnections method is accessible and functional. | ||
| * This verifies the immediate reconnection path exists. | ||
| */ | ||
| @Test | ||
| public void testImmediateCheckConnectionsMethod() { | ||
| TestMessageHandler handler = new TestMessageHandler(); | ||
| subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString()); | ||
| subscriber.setDefaultMaxInFlight(10); | ||
| subscriber.subscribe(topic, "channel", handler); | ||
|
|
||
| // Send initial messages to establish connection | ||
| List<String> messages = messages(5, 40); | ||
| send(topic, messages, 0, 0, publisher); | ||
| handler.drainMessagesOrTimeOut(5); | ||
|
|
||
| // Call immediateCheckConnections (this is what gets called on auth failure) | ||
| subscriber.immediateCheckConnections(topic); | ||
|
|
||
| // Verify subscriber still works after immediate check | ||
| List<String> moreMessages = messages(5, 40); | ||
| send(topic, moreMessages, 0, 0, publisher); | ||
| List<NSQMessage> received = handler.drainMessagesOrTimeOut(5); | ||
| validateReceivedAllMessages(moreMessages, received, false); | ||
|
|
||
| logger.info("immediateCheckConnections method works correctly"); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that multiple rapid connection checks don't cause issues. | ||
| * This simulates what might happen if multiple connections experience auth failures. | ||
| */ | ||
| @Test | ||
| public void testMultipleRapidConnectionChecks() { | ||
| TestMessageHandler handler = new TestMessageHandler(); | ||
| subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString()); | ||
| subscriber.setDefaultMaxInFlight(10); | ||
| subscriber.subscribe(topic, "channel", handler); | ||
|
|
||
| // Send initial messages | ||
| List<String> messages = messages(10, 40); | ||
| send(topic, messages, 0, 0, publisher); | ||
| handler.drainMessagesOrTimeOut(10); | ||
|
|
||
| // Call immediateCheckConnections multiple times rapidly | ||
| // (simulating multiple auth failures in quick succession) | ||
| for (int i = 0; i < 5; i++) { | ||
| subscriber.immediateCheckConnections(topic); | ||
| Util.sleepQuietly(100); | ||
| } | ||
|
|
||
| // Verify subscriber still works correctly | ||
| List<String> moreMessages = messages(10, 40); | ||
| send(topic, moreMessages, 0, 0, publisher); | ||
| List<NSQMessage> received = handler.drainMessagesOrTimeOut(10); | ||
| validateReceivedAllMessages(moreMessages, received, false); | ||
|
|
||
| logger.info("Multiple rapid connection checks handled correctly"); | ||
| } | ||
|
|
||
| /** | ||
| * Verifies that the subscriber's connection count updates correctly after | ||
| * immediate connection checks (which would happen during auth failure recovery). | ||
| */ | ||
| @Test | ||
| public void testConnectionCountAfterImmediateReconnect() { | ||
| TestMessageHandler handler = new TestMessageHandler(); | ||
| subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString()); | ||
| subscriber.setDefaultMaxInFlight(10); | ||
| subscriber.subscribe(topic, "channel", handler); | ||
|
|
||
| // Wait for initial connections to be established | ||
| Util.sleepQuietly(2000); | ||
| int initialConnectionCount = subscriber.getConnectionCount(); | ||
| Assert.assertTrue("Should have at least one connection", initialConnectionCount > 0); | ||
| logger.info("Initial connection count: {}", initialConnectionCount); | ||
|
|
||
| // Trigger immediate check (simulating auth failure recovery) | ||
| subscriber.immediateCheckConnections(topic); | ||
| Util.sleepQuietly(2000); | ||
|
|
||
| int afterCheckCount = subscriber.getConnectionCount(); | ||
| Assert.assertTrue("Should still have connections after immediate check", afterCheckCount > 0); | ||
| logger.info("Connection count after immediate check: {}", afterCheckCount); | ||
|
|
||
| // Verify messages can still be processed | ||
| List<String> messages = messages(10, 40); | ||
| send(topic, messages, 0, 0, publisher); | ||
| List<NSQMessage> received = handler.drainMessagesOrTimeOut(10); | ||
| Assert.assertEquals("Should receive all messages after reconnect", 10, received.size()); | ||
| } | ||
|
|
||
| @Override | ||
| public void teardown() throws InterruptedException { | ||
| if (publisher != null) { | ||
| publisher.stop(); | ||
| } | ||
| if (subscriber != null) { | ||
| subscriber.stop(); | ||
| } | ||
| super.teardown(); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.