Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.sproutsocial</groupId>
<artifactId>nsq-j</artifactId>
<version>1.6.0</version>
<version>1.6.1</version>
<packaging>jar</packaging>

<name>nsq-j</name>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/sproutsocial/nsq/AuthFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.sproutsocial.nsq;

/**
* Exception thrown when NSQ server returns E_AUTH_FAILED or E_UNAUTHORIZED errors.
* This typically indicates that the server-side auth session has expired while the
* TCP connection remained open.
*/
public class AuthFailedException extends NSQException {

public AuthFailedException(String message) {
super(message);
}

public AuthFailedException(String message, Throwable cause) {
super(message, cause);
}

}
19 changes: 19 additions & 0 deletions src/main/java/com/sproutsocial/nsq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ abstract class Connection extends BasePubSub implements Closeable {
private static final ThreadFactory readThreadFactory = Util.threadFactory("nsq-read");
private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet<String>(
Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));
private static final Set<String> authErrors = Collections.unmodifiableSet(new HashSet<String>(
Arrays.asList("E_AUTH_FAILED", "E_UNAUTHORIZED")));

private static final Logger logger = LoggerFactory.getLogger(Connection.class);

Expand Down Expand Up @@ -242,6 +244,9 @@ else if (frameType == 1) { //error
if (nonFatalErrors.contains(errorCode)) {
logger.warn("non fatal nsqd error:{} probably due to message timeout", error);
}
else if (authErrors.contains(errorCode)) {
throw new AuthFailedException("auth session expired on nsqd:" + error);
}
else {
throw new NSQException("error from nsqd:" + error);
}
Expand Down Expand Up @@ -279,6 +284,13 @@ else if (response != null) {
close();
}
}
catch (AuthFailedException e) {
if (isReading) {
logger.warn("auth session expired, triggering immediate reconnect. con:{}", toString());
close();
handleAuthFailure();
}
}
catch (Exception e) {
if (isReading) {
respQueue.offer(e.toString());
Expand All @@ -289,6 +301,13 @@ else if (response != null) {
logger.debug("read loop done {}", toString());
}

/**
* Called when auth session expires. Override in subclasses to trigger immediate reconnection.
*/
protected void handleAuthFailure() {
// Base implementation does nothing. Overridden by SubConnection.
}

private synchronized void receivedHeartbeat() {
try {
out.write("NOP\n".getBytes(Util.US_ASCII));
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/sproutsocial/nsq/SubConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public void run() {
}
}

@Override
protected void handleAuthFailure() {
// Trigger immediate reconnection when auth session expires
subscription.getSubscriber().immediateCheckConnections(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make reconnect behavior pluggable. A few different ways a caller might want to handle authentication failures:

  1. Keep existing behavior, let auth failures bubble out of the client.
  2. Exponential / linear retry backoff.

For the case of actual bad credentials, won't an "immediate reconnect" bombard the auth server with infinite retries as fast as it can? At a minimum, we should probably give up after a certain number of authentication failures, since this would happen in the case of legitimate bad credentials.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i considered this early on but is not an issue because checkConnections() :

  1. attempts to create a new connection
  2. via read() will encounter the exception
  3. never adds the connection to Subscription.connectionMap
  4. while (isReading) is not running so there is not infinite loop.
    instead, we will see logger.error("error connecting to:{}", activeHost, e); every minute - this is current behavior.

}

@Override
public void close() {
super.close();
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/sproutsocial/nsq/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ private synchronized void lookup() {
}
}

/**
* Immediately checks connections for a specific topic. This bypasses the normal periodic
* lookup interval and is used for immediate reconnection when auth failures occur.
*/
synchronized void immediateCheckConnections(String topic) {
if (isStopping) {
return;
}
Set<HostAndPort> activeHosts = lookupTopic(topic);
for (Subscription sub : subscriptions) {
if (sub.getTopic().equals(topic)) {
sub.checkConnections(activeHosts);
}
}
}

@GuardedBy("this")
protected Set<HostAndPort> lookupTopic(String topic) {
Set<HostAndPort> nsqds = new HashSet<HostAndPort>();
Expand Down
42 changes: 42 additions & 0 deletions src/test/java/com/sproutsocial/nsq/AuthFailedExceptionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.sproutsocial.nsq;

import org.junit.Assert;
import org.junit.Test;

/**
* Unit tests for AuthFailedException
*/
public class AuthFailedExceptionTest {

@Test
public void testAuthFailedExceptionWithMessage() {
String errorMessage = "E_AUTH_FAILED auth session expired";
AuthFailedException exception = new AuthFailedException(errorMessage);

Assert.assertEquals(errorMessage, exception.getMessage());
Assert.assertNull(exception.getCause());
Assert.assertTrue(exception instanceof NSQException);
}

@Test
public void testAuthFailedExceptionWithMessageAndCause() {
String errorMessage = "E_UNAUTHORIZED unauthorized access";
RuntimeException cause = new RuntimeException("Connection error");
AuthFailedException exception = new AuthFailedException(errorMessage, cause);

Assert.assertEquals(errorMessage, exception.getMessage());
Assert.assertEquals(cause, exception.getCause());
Assert.assertTrue(exception instanceof NSQException);
}

@Test
public void testAuthFailedExceptionInheritance() {
AuthFailedException exception = new AuthFailedException("test");

// Verify it's properly extending NSQException
Assert.assertTrue("AuthFailedException should extend NSQException",
exception instanceof NSQException);
Assert.assertTrue("AuthFailedException should extend RuntimeException",
exception instanceof RuntimeException);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.sproutsocial.nsq;

import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Integration test for immediate reconnection behavior.
*
* These tests verify that the immediateCheckConnections() method (which is called
* when auth failures occur) works correctly to trigger immediate reconnection
* rather than waiting up to 60 seconds for the periodic checkConnections() call.
*
* Note: Actual E_AUTH_FAILED/E_UNAUTHORIZED error handling is tested in
* ConnectionAuthFailureTest, which verifies those errors throw AuthFailedException
* and trigger the handleAuthFailure() -> immediateCheckConnections() code path.
* These integration tests verify the reconnection mechanism itself works correctly.
*/
public class AuthFailureRecoveryDockerTestIT extends BaseDockerTestIT {
private static final Logger logger = LoggerFactory.getLogger(AuthFailureRecoveryDockerTestIT.class);
private Publisher publisher;
private Subscriber subscriber;

@Override
public void setup() {
super.setup();
publisher = this.backupPublisher();
}

/**
* Tests that the subscriber can successfully establish connections and receive messages.
* This baseline test ensures the immediate reconnection mechanism doesn't break
* normal operation.
*/
@Test
public void testNormalSubscriberOperation() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send messages and verify they're received
List<String> messages = messages(20, 40);
send(topic, messages, 0, 0, publisher);

// Verify all messages received
List<NSQMessage> receivedMessages = handler.drainMessagesOrTimeOut(20);
validateReceivedAllMessages(messages, receivedMessages, false);
}

/**
* Tests that subscriber can recover from connection drops.
* This simulates a scenario similar to auth failure where connection is lost
* and needs to be re-established.
*/
@Test
public void testSubscriberReconnectionAfterNetworkDisruption() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send first batch of messages
List<String> batch1 = messages(10, 40);
send(topic, batch1, 0, 0, publisher);
List<NSQMessage> received1 = handler.drainMessagesOrTimeOut(10);
validateReceivedAllMessages(batch1, received1, false);

// Simulate connection issue by disconnecting and reconnecting network
logger.info("Disconnecting network for nsqd node to simulate connection drop");
cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0));
Util.sleepQuietly(2000);

cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0));
Util.sleepQuietly(2000);

// Send second batch and verify recovery
List<String> batch2 = messages(10, 40);
send(topic, batch2, 0, 0, publisher);
List<NSQMessage> received2 = handler.drainMessagesOrTimeOut(10, 20000);
validateReceivedAllMessages(batch2, received2, false);

logger.info("Successfully received messages after connection recovery");
}

/**
* Tests that the immediateCheckConnections method is accessible and functional.
* This verifies the immediate reconnection path exists.
*/
@Test
public void testImmediateCheckConnectionsMethod() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send initial messages to establish connection
List<String> messages = messages(5, 40);
send(topic, messages, 0, 0, publisher);
handler.drainMessagesOrTimeOut(5);

// Call immediateCheckConnections (this is what gets called on auth failure)
subscriber.immediateCheckConnections(topic);

// Verify subscriber still works after immediate check
List<String> moreMessages = messages(5, 40);
send(topic, moreMessages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(5);
validateReceivedAllMessages(moreMessages, received, false);

logger.info("immediateCheckConnections method works correctly");
}

/**
* Tests that multiple rapid connection checks don't cause issues.
* This simulates what might happen if multiple connections experience auth failures.
*/
@Test
public void testMultipleRapidConnectionChecks() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send initial messages
List<String> messages = messages(10, 40);
send(topic, messages, 0, 0, publisher);
handler.drainMessagesOrTimeOut(10);

// Call immediateCheckConnections multiple times rapidly
// (simulating multiple auth failures in quick succession)
for (int i = 0; i < 5; i++) {
subscriber.immediateCheckConnections(topic);
Util.sleepQuietly(100);
}

// Verify subscriber still works correctly
List<String> moreMessages = messages(10, 40);
send(topic, moreMessages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(10);
validateReceivedAllMessages(moreMessages, received, false);

logger.info("Multiple rapid connection checks handled correctly");
}

/**
* Verifies that the subscriber's connection count updates correctly after
* immediate connection checks (which would happen during auth failure recovery).
*/
@Test
public void testConnectionCountAfterImmediateReconnect() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 60, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Wait for initial connections to be established
Util.sleepQuietly(2000);
int initialConnectionCount = subscriber.getConnectionCount();
Assert.assertTrue("Should have at least one connection", initialConnectionCount > 0);
logger.info("Initial connection count: {}", initialConnectionCount);

// Trigger immediate check (simulating auth failure recovery)
subscriber.immediateCheckConnections(topic);
Util.sleepQuietly(2000);

int afterCheckCount = subscriber.getConnectionCount();
Assert.assertTrue("Should still have connections after immediate check", afterCheckCount > 0);
logger.info("Connection count after immediate check: {}", afterCheckCount);

// Verify messages can still be processed
List<String> messages = messages(10, 40);
send(topic, messages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(10);
Assert.assertEquals("Should receive all messages after reconnect", 10, received.size());
}

@Override
public void teardown() throws InterruptedException {
if (publisher != null) {
publisher.stop();
}
if (subscriber != null) {
subscriber.stop();
}
super.teardown();
}
}
Loading