-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15307 ConsumeKinesis. Wait for long initialization in onTrigger #10664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,6 +100,7 @@ | |
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
@@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor { | |
| private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1); | ||
|
|
||
| /** | ||
| * Using a large enough value to ensure we don't wait infinitely for the initialization. | ||
| * Actual initialization shouldn't take that long. | ||
| * How long to wait for a Scheduler initialization to complete in the OnScheduled method. | ||
| * If the initialization takes longer than this, the processor will continue initialization checks in the onTrigger method. | ||
| */ | ||
| private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15); | ||
| private static final Duration KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30); | ||
| private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3); | ||
|
|
||
| static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder() | ||
|
|
@@ -339,6 +340,9 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio | |
|
|
||
| private volatile @Nullable ReaderRecordProcessor readerRecordProcessor; | ||
|
|
||
| private volatile Future<InitializationResult> initializationResultFuture; | ||
| private volatile AtomicBoolean initialized; | ||
|
|
||
| // An instance filed, so that it can be read in getRelationships. | ||
| private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from( | ||
| PROCESSING_STRATEGY.getDefaultValue()); | ||
|
|
@@ -418,6 +422,8 @@ public void setup(final ProcessContext context) { | |
| final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName); | ||
|
|
||
| final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger()); | ||
| initialized = new AtomicBoolean(false); | ||
| initializationResultFuture = initializationListener.result(); | ||
|
|
||
| kinesisScheduler = new Scheduler( | ||
| configsBuilder.checkpointConfig(), | ||
|
|
@@ -435,34 +441,20 @@ public void setup(final ProcessContext context) { | |
| schedulerThread.start(); | ||
| // The thread is stopped when kinesisScheduler is shutdown in the onStopped method. | ||
|
|
||
| final InitializationResult result; | ||
| try { | ||
| result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); | ||
| } catch (final InterruptedException | ExecutionException | TimeoutException e) { | ||
| final InitializationResult result = initializationResultFuture.get( | ||
| KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); | ||
| checkInitializationResult(result); | ||
| } catch (final TimeoutException e) { | ||
| // During a first run the processor will take more time to initialize. We return from OnSchedule and continue waiting in the onTrigger method. | ||
| getLogger().warn("Kinesis Scheduler initialization may take up to 10 minutes on a first run, which is caused by AWS resources initialization"); | ||
| } catch (final InterruptedException | ExecutionException e) { | ||
| if (e instanceof InterruptedException) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| cleanUpState(); | ||
| throw new ProcessException("Initialization failed for stream [%s]".formatted(streamName), e); | ||
| } | ||
|
|
||
| switch (result) { | ||
| case InitializationResult.Success ignored -> | ||
| getLogger().info( | ||
| "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", | ||
| streamName, applicationName, workerId); | ||
| case InitializationResult.Failure failure -> { | ||
| cleanUpState(); | ||
|
|
||
| final ProcessException ex = failure.error() | ||
| .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) | ||
| // This branch is active only when a scheduler was shutdown, but no initialization error was provided. | ||
| // This behavior isn't typical and wasn't observed. | ||
| .orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName))); | ||
|
|
||
| throw ex; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -575,6 +567,9 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig( | |
| @OnStopped | ||
| public void onStopped() { | ||
| cleanUpState(); | ||
|
|
||
| initialized = null; | ||
| initializationResultFuture = null; | ||
| } | ||
|
|
||
| private void cleanUpState() { | ||
|
|
@@ -633,6 +628,16 @@ private void shutdownScheduler() { | |
|
|
||
| @Override | ||
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { | ||
| if (!initialized.get()) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Short-circuiting by checking a bool variable, so we don't have to inspect the content of the future after initialization completed. |
||
| if (!initializationResultFuture.isDone()) { | ||
| getLogger().debug("Waiting for Kinesis Scheduler to finish initialization"); | ||
| context.yield(); | ||
| return; | ||
| } | ||
|
|
||
| checkInitializationResult(initializationResultFuture.resultNow()); | ||
| } | ||
|
|
||
| final Optional<Lease> leaseAcquired = recordBuffer.acquireBufferLease(); | ||
|
|
||
| leaseAcquired.ifPresentOrElse( | ||
|
|
@@ -641,6 +646,30 @@ public void onTrigger(final ProcessContext context, final ProcessSession session | |
| ); | ||
| } | ||
|
|
||
| private void checkInitializationResult(final InitializationResult initializationResult) { | ||
| switch (initializationResult) { | ||
| case InitializationResult.Success ignored -> { | ||
| boolean wasInitialized = initialized.getAndSet(true); | ||
| if (!wasInitialized) { | ||
| getLogger().info( | ||
| "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", | ||
| streamName, kinesisScheduler.applicationName(), kinesisScheduler.leaseManagementConfig().workerIdentifier()); | ||
| } | ||
| } | ||
| case InitializationResult.Failure failure -> { | ||
| cleanUpState(); | ||
|
|
||
| final ProcessException ex = failure.error() | ||
| .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) | ||
| // This branch is active only when a scheduler was shutdown, but no initialization error was provided. | ||
| // This behavior isn't typical and wasn't observed. | ||
| .orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName))); | ||
|
|
||
| throw ex; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) { | ||
| try { | ||
| final List<KinesisClientRecord> records = recordBuffer.consumeRecords(lease); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,17 +16,25 @@ | |
| */ | ||
| package org.apache.nifi.processors.aws.kinesis; | ||
|
|
||
| import org.apache.nifi.processor.ProcessContext; | ||
| import org.apache.nifi.processor.ProcessSession; | ||
| import org.apache.nifi.processor.Relationship; | ||
| import org.apache.nifi.processor.exception.ProcessException; | ||
| import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; | ||
| import org.apache.nifi.processors.aws.region.RegionUtil; | ||
| import org.apache.nifi.reporting.InitializationException; | ||
| import org.apache.nifi.util.MockProcessSession; | ||
| import org.apache.nifi.util.SharedSessionState; | ||
| import org.apache.nifi.util.TestRunner; | ||
| import org.apache.nifi.util.TestRunners; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.Timeout; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY; | ||
| import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE; | ||
|
|
@@ -36,6 +44,7 @@ | |
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD; | ||
|
|
||
| class ConsumeKinesisTest { | ||
|
|
||
|
|
@@ -65,14 +74,28 @@ void getRelationshipsForRecordProcessingStrategy() { | |
| } | ||
|
|
||
| @Test | ||
| // It takes around 30 seconds for a scheduler to fail in this test. | ||
| @Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is too long for a unit test. One option is moving this to an integration test. Another option is to remove the test method. |
||
| void failInitializationWithInvalidValues() { | ||
| // KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials. | ||
|
|
||
| // Using the processor object to avoid error wrapping by testRunner. | ||
| final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor(); | ||
|
|
||
| final ProcessContext context = testRunner.getProcessContext(); | ||
| final ProcessSession session = new MockProcessSession(new SharedSessionState(consumeKinesis, new AtomicLong()), consumeKinesis); | ||
|
|
||
| final ProcessException ex = assertThrows( | ||
| ProcessException.class, | ||
| () -> consumeKinesis.setup(testRunner.getProcessContext())); | ||
| () -> { | ||
| // The error might occur either in @OnScheduled... | ||
| consumeKinesis.setup(context); | ||
| while (true) { | ||
| // ... or in onTrigger, if the initialization takes longer. | ||
| consumeKinesis.onTrigger(context, session); | ||
| Thread.sleep(Duration.ofSeconds(1)); | ||
| } | ||
| }); | ||
|
|
||
| assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause"); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend setting a final value to avoid null checking.