Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor {
private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1);

/**
* Using a large enough value to ensure we don't wait infinitely for the initialization.
* Actual initialization shouldn't take that long.
* How long to wait for a Scheduler initialization to complete in the OnScheduled method.
* If the initialization takes longer than this, the processor will continue initialization checks in the onTrigger method.
*/
private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15);
private static final Duration KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3);

static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -339,6 +340,9 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio

private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;

private volatile Future<InitializationResult> initializationResultFuture;
private volatile AtomicBoolean initialized;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend setting a final value to avoid null checking.

Suggested change
private volatile AtomicBoolean initialized;
private final AtomicBoolean initialized = new AtomicBoolean();


// An instance filed, so that it can be read in getRelationships.
private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from(
PROCESSING_STRATEGY.getDefaultValue());
Expand Down Expand Up @@ -418,6 +422,8 @@ public void setup(final ProcessContext context) {
final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName);

final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger());
initialized = new AtomicBoolean(false);
initializationResultFuture = initializationListener.result();

kinesisScheduler = new Scheduler(
configsBuilder.checkpointConfig(),
Expand All @@ -435,34 +441,20 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the onStopped method.

final InitializationResult result;
try {
result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
final InitializationResult result = initializationResultFuture.get(
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
checkInitializationResult(result);
} catch (final TimeoutException e) {
// During a first run the processor will take more time to initialize. We return from OnSchedule and continue waiting in the onTrigger method.
getLogger().warn("Kinesis Scheduler initialization may take up to 10 minutes on a first run, which is caused by AWS resources initialization");
} catch (final InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cleanUpState();
throw new ProcessException("Initialization failed for stream [%s]".formatted(streamName), e);
}

switch (result) {
case InitializationResult.Success ignored ->
getLogger().info(
"Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, applicationName, workerId);
case InitializationResult.Failure failure -> {
cleanUpState();

final ProcessException ex = failure.error()
.map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err))
// This branch is active only when a scheduler was shutdown, but no initialization error was provided.
// This behavior isn't typical and wasn't observed.
.orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName)));

throw ex;
}
}
}

/**
Expand Down Expand Up @@ -575,6 +567,9 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
@OnStopped
public void onStopped() {
cleanUpState();

initialized = null;
initializationResultFuture = null;
}

private void cleanUpState() {
Expand Down Expand Up @@ -633,6 +628,16 @@ private void shutdownScheduler() {

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (!initialized.get()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short-circuiting by checking a bool variable, so we don't have to inspect the content of the future after initialization completed.

if (!initializationResultFuture.isDone()) {
getLogger().debug("Waiting for Kinesis Scheduler to finish initialization");
context.yield();
return;
}

checkInitializationResult(initializationResultFuture.resultNow());
}

final Optional<Lease> leaseAcquired = recordBuffer.acquireBufferLease();

leaseAcquired.ifPresentOrElse(
Expand All @@ -641,6 +646,30 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
);
}

private void checkInitializationResult(final InitializationResult initializationResult) {
switch (initializationResult) {
case InitializationResult.Success ignored -> {
boolean wasInitialized = initialized.getAndSet(true);
if (!wasInitialized) {
getLogger().info(
"Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, kinesisScheduler.applicationName(), kinesisScheduler.leaseManagementConfig().workerIdentifier());
}
}
case InitializationResult.Failure failure -> {
cleanUpState();

final ProcessException ex = failure.error()
.map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err))
// This branch is active only when a scheduler was shutdown, but no initialization error was provided.
// This behavior isn't typical and wasn't observed.
.orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName)));

throw ex;
}
}
}

private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) {
try {
final List<KinesisClientRecord> records = recordBuffer.consumeRecords(lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
*/
package org.apache.nifi.processors.aws.kinesis;

import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE;
Expand All @@ -36,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD;

class ConsumeKinesisTest {

Expand Down Expand Up @@ -65,14 +74,28 @@ void getRelationshipsForRecordProcessingStrategy() {
}

@Test
// It takes around 30 seconds for a scheduler to fail in this test.
@Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too long for a unit test. One option is moving this to an integration test. Another option is to remove the test method.

void failInitializationWithInvalidValues() {
// KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials.

// Using the processor object to avoid error wrapping by testRunner.
final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor();

final ProcessContext context = testRunner.getProcessContext();
final ProcessSession session = new MockProcessSession(new SharedSessionState(consumeKinesis, new AtomicLong()), consumeKinesis);

final ProcessException ex = assertThrows(
ProcessException.class,
() -> consumeKinesis.setup(testRunner.getProcessContext()));
() -> {
// The error might occur either in @OnScheduled...
consumeKinesis.setup(context);
while (true) {
// ... or in onTrigger, if the initialization takes longer.
consumeKinesis.onTrigger(context, session);
Thread.sleep(Duration.ofSeconds(1));
}
});

assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause");
}
Expand Down
Loading