-
Notifications
You must be signed in to change notification settings - Fork 391
snapshots: fix pipeline state machine edge cases #7570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,7 +82,7 @@ metrics_write( fd_snapdc_tile_t * ctx ) { | |
| FD_MGAUGE_SET( SNAPDC, INCREMENTAL_COMPRESSED_BYTES_READ, ctx->metrics.incremental.compressed_bytes_read ); | ||
| FD_MGAUGE_SET( SNAPDC, INCREMENTAL_DECOMPRESSED_BYTES_WRITTEN, ctx->metrics.incremental.decompressed_bytes_written ); | ||
|
|
||
| FD_MGAUGE_SET( SNAPDC, STATE, (ulong)(ctx->state) ); | ||
| FD_MGAUGE_SET( SNAPDC, STATE, (ulong)(ctx->state) ); | ||
| } | ||
|
|
||
| static inline void | ||
|
|
@@ -96,7 +96,6 @@ handle_control_frag( fd_snapdc_tile_t * ctx, | |
| /* All control messages cause us to want to reset the decompression stream */ | ||
| ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only ); | ||
| if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) )); | ||
| ctx->dirty = 0; | ||
|
|
||
| switch( sig ) { | ||
| case FD_SNAPSHOT_MSG_CTRL_INIT_FULL: { | ||
|
|
@@ -106,6 +105,7 @@ handle_control_frag( fd_snapdc_tile_t * ctx, | |
| ctx->state = FD_SNAPSHOT_STATE_PROCESSING; | ||
| ctx->full = 1; | ||
| ctx->is_zstd = !!msg->zstd; | ||
| ctx->dirty = 0; | ||
| ctx->in.frag_pos = 0UL; | ||
| ctx->metrics.full.compressed_bytes_read = 0UL; | ||
| ctx->metrics.full.decompressed_bytes_written = 0UL; | ||
|
|
@@ -118,27 +118,26 @@ handle_control_frag( fd_snapdc_tile_t * ctx, | |
| ctx->state = FD_SNAPSHOT_STATE_PROCESSING; | ||
| ctx->full = 0; | ||
| ctx->is_zstd = !!msg->zstd; | ||
| ctx->dirty = 0; | ||
| ctx->in.frag_pos = 0UL; | ||
| ctx->metrics.incremental.compressed_bytes_read = 0UL; | ||
| ctx->metrics.incremental.decompressed_bytes_written = 0UL; | ||
| break; | ||
| } | ||
| case FD_SNAPSHOT_MSG_CTRL_FAIL: | ||
| FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || | ||
| ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
| break; | ||
| case FD_SNAPSHOT_MSG_CTRL_NEXT: | ||
| case FD_SNAPSHOT_MSG_CTRL_DONE: | ||
| FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || | ||
| ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| if( FD_UNLIKELY( ctx->is_zstd && ctx->dirty ) ) { | ||
| FD_LOG_WARNING(( "encountered end-of-file in the middle of a compressed frame" )); | ||
| ctx->state = FD_SNAPSHOT_STATE_ERROR; | ||
| fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL ); | ||
| return; | ||
| } | ||
|
Comment on lines
-135
to
-140
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check could never trigger before (we always reset dirty to 0 above), but also returning here deadlocks the pipeline |
||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
| if( FD_LIKELY( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ) ) { | ||
| if( FD_UNLIKELY( ctx->is_zstd && ctx->dirty ) ) { | ||
| FD_LOG_WARNING(( "encountered end-of-file in the middle of a compressed frame" )); | ||
| ctx->state = FD_SNAPSHOT_STATE_ERROR; | ||
| fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL ); | ||
| } else { | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
| } | ||
| } else FD_TEST( ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| break; | ||
| case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: | ||
| FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE ); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,7 +87,7 @@ metrics_write( fd_snapin_tile_t * ctx ) { | |
| FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read ); | ||
| FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read ); | ||
| FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted ); | ||
| FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state ); | ||
| FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state ); | ||
| } | ||
|
|
||
| /* verify_slot_deltas_with_slot_history verifies the 'SlotHistory' | ||
|
|
@@ -161,6 +161,7 @@ verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx, | |
| static void | ||
| transition_malformed( fd_snapin_tile_t * ctx, | ||
| fd_stem_context_t * stem ) { | ||
| if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return; | ||
| ctx->state = FD_SNAPSHOT_STATE_ERROR; | ||
| fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL ); | ||
| } | ||
|
Comment on lines
162
to
167
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a bug, but no need to generate extra ERROR messages when we are already in that state |
||
|
|
@@ -593,22 +594,21 @@ handle_control_frag( fd_snapin_tile_t * ctx, | |
| break; | ||
|
|
||
| case FD_SNAPSHOT_MSG_CTRL_FAIL: | ||
| FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || | ||
| ctx->state==FD_SNAPSHOT_STATE_FINISHING || | ||
| ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
| if( ctx->state!=FD_SNAPSHOT_STATE_IDLE ) { | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
|
|
||
| if( ctx->use_vinyl ) { | ||
| fd_snapin_vinyl_wd_fini( ctx ); | ||
| if( ctx->vinyl.txn_active ) { | ||
| fd_snapin_vinyl_txn_cancel( ctx ); | ||
| } | ||
| } else { | ||
| if( ctx->full ) { | ||
| fd_accdb_clear( ctx->accdb_admin ); | ||
| if( ctx->use_vinyl ) { | ||
| fd_snapin_vinyl_wd_fini( ctx ); | ||
| if( ctx->vinyl.txn_active ) { | ||
| fd_snapin_vinyl_txn_cancel( ctx ); | ||
| } | ||
| } else { | ||
| fd_accdb_cancel( ctx->accdb_admin, ctx->xid ); | ||
| fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( ctx->accdb_admin->funk ) ); | ||
| if( ctx->full ) { | ||
| fd_accdb_clear( ctx->accdb_admin ); | ||
| } else { | ||
| fd_accdb_cancel( ctx->accdb_admin, ctx->xid ); | ||
| fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( ctx->accdb_admin->funk ) ); | ||
| } | ||
| } | ||
| } | ||
| break; | ||
|
|
@@ -619,7 +619,7 @@ handle_control_frag( fd_snapin_tile_t * ctx, | |
| ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) { | ||
| transition_malformed( ctx, stem ); | ||
| return; | ||
| break; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the break here instead of return? Do we still need to forward the control message if we are generating an error?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, every control message needs to be forwarded down the pipeline immediately, or deferred for later (such as with snapls). But we can't outright drop a control message or the pipeline will be locked on waiting for that message to be flushed forever (each control message is only generated a single time and we do not generate new control messages until it's been flushed). |
||
| } | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
|
|
||
|
|
@@ -642,7 +642,7 @@ handle_control_frag( fd_snapin_tile_t * ctx, | |
| ctx->state==FD_SNAPSHOT_STATE_ERROR ); | ||
| if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) { | ||
| transition_malformed( ctx, stem ); | ||
| return; | ||
| break; | ||
| } | ||
| ctx->state = FD_SNAPSHOT_STATE_IDLE; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also possible for tiles to receive the FAIL control message in the IDLE state. This is the cause of several of the FD_TEST assertions we've seen.
This happens when snapct sends out a DONE, and an early tile immediately handles it and goes to IDLE. But a later tile may fail and generate ERROR, which causes snapct to send out a FAIL control message which is looped back through the pipeline