Skip to content

Commit 57890af

Browse files
net: fd_xdp_ring helpers
Prompted by a bug reported by @colinlyguo
1 parent 6bea4d5 commit 57890af

File tree

5 files changed

+307
-67
lines changed

5 files changed

+307
-67
lines changed

src/disco/net/xdp/fd_xdp_tile.c

Lines changed: 31 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -397,27 +397,19 @@ net_check_gre_interface_exists( fd_net_ctx_t * ctx ) {
397397
}
398398

399399

400-
/* net_tx_ready returns 1 if the current XSK is ready to submit a TX send
401-
job. If the XSK is blocked for sends, returns 0. Reasons for block
402-
include:
403-
- No XSK TX buffer is available
404-
- XSK TX ring is full */
400+
/* net_tx_ready returns 1 if we can submit a job to this TX ring, and 0 otherwise.
401+
Reasons for block include:
402+
- No TX buffer is available (free ring empty)
403+
- TX ring is full
404+
405+
tx_ring: pointer to the XDP TX ring
406+
free_ring: pointer to the free TX ring */
405407

406408
static int
407-
net_tx_ready( fd_net_ctx_t * ctx,
408-
uint xsk_idx ) {
409-
fd_xsk_t * xsk = &ctx->xsk[ xsk_idx ];
410-
fd_xdp_ring_t * tx_ring = &xsk->ring_tx;
411-
fd_net_free_ring_t * free = &ctx->free_tx;
412-
if( free->prod == free->cons ) return 0; /* drop */
413-
414-
/* If potentially stale cached_cons says there is space,
415-
there is definitely space */
416-
if( tx_ring->cached_prod - tx_ring->cached_cons >= tx_ring->depth ) return 1;
417-
418-
/* read the fseq, and update our cache */
419-
tx_ring->cached_cons = FD_VOLATILE_CONST( *tx_ring->cons );
420-
if( tx_ring->cached_prod - tx_ring->cached_cons >= tx_ring->depth ) return 0; /* drop */
409+
net_tx_ready( fd_xdp_ring_t * tx_ring,
410+
fd_net_free_ring_t * free_ring ) {
411+
if( FD_UNLIKELY( free_ring->prod == free_ring->cons ) ) return 0; /* drop - no free buffers */
412+
if( FD_UNLIKELY( fd_xdp_ring_full( tx_ring ) ) ) return 0; /* drop - tx ring full */
421413
return 1;
422414
}
423415

@@ -483,17 +475,7 @@ net_tx_periodic_wakeup( fd_net_ctx_t * ctx,
483475
long now,
484476
int * charge_busy ) {
485477
fd_xdp_ring_t * tx_ring = &ctx->xsk[ xsk_idx ].ring_tx;
486-
uint tx_prod = tx_ring->cached_prod;
487-
uint tx_cons = tx_ring->cached_cons;
488-
489-
int tx_ring_empty = tx_prod==tx_cons;
490-
/* If we already think tx_ring_empty, it's definitely empty.
491-
But if not, we should update our view of what kernel has consumed. */
492-
if( FD_LIKELY( !tx_ring_empty ) ) {
493-
tx_cons = tx_ring->cached_cons = FD_VOLATILE_CONST( *tx_ring->cons );
494-
tx_ring_empty = tx_prod==tx_cons;
495-
}
496-
478+
int tx_ring_empty = fd_xdp_ring_empty( tx_ring, FD_XDP_RING_ROLE_PROD );
497479
if( fd_net_flusher_check( ctx->tx_flusher+xsk_idx, now, tx_ring_empty ) ) {
498480
net_tx_wakeup( ctx, &ctx->xsk[ xsk_idx ], charge_busy );
499481
fd_net_flusher_wakeup( ctx->tx_flusher+xsk_idx, now );
@@ -712,14 +694,14 @@ before_frag( fd_net_ctx_t * ctx,
712694

713695
/* Skip if TX is blocked */
714696

715-
if( FD_UNLIKELY( !net_tx_ready( ctx, xsk_idx ) ) ) {
697+
fd_xsk_t * xsk = &ctx->xsk[ xsk_idx ];
698+
fd_net_free_ring_t * free = &ctx->free_tx;
699+
if( FD_UNLIKELY( !net_tx_ready( &xsk->ring_tx, free ) ) ) {
716700
ctx->metrics.tx_full_fail_cnt++;
717701
return 1;
718702
}
719703

720704
/* Allocate buffer for receive */
721-
722-
fd_net_free_ring_t * free = &ctx->free_tx;
723705
ulong alloc_seq = free->cons;
724706
void * frame = (void *)free->queue[ alloc_seq % free->depth ];
725707
free->cons = fd_seq_inc( alloc_seq, 1UL );
@@ -1057,6 +1039,7 @@ net_comp_event( fd_net_ctx_t * ctx,
10571039
uint comp_mask = comp_ring->depth - 1U;
10581040
ulong frame = FD_VOLATILE_CONST( comp_ring->frame_ring[ comp_seq&comp_mask ] );
10591041
ulong const frame_mask = FD_NET_MTU - 1UL;
1042+
FD_STATIC_ASSERT( FD_ULONG_IS_POW2( FD_NET_MTU ), "FD_NET_MTU must be a power of two" );
10601043
if( FD_UNLIKELY( frame+FD_NET_MTU > ctx->umem_sz ) ) {
10611044
FD_LOG_ERR(( "Bounds check failed: frame=0x%lx umem_sz=0x%lx",
10621045
frame, (ulong)ctx->umem_sz ));
@@ -1101,28 +1084,15 @@ net_rx_event( fd_net_ctx_t * ctx,
11011084
/* Check if we have space in the fill ring to free the frame */
11021085

11031086
fd_xdp_ring_t * fill_ring = &xsk->ring_fr;
1104-
uint fill_depth = fill_ring->depth;
1105-
uint fill_mask = fill_depth-1U;
1106-
ulong frame_mask = FD_NET_MTU - 1UL;
1107-
uint fill_prod = fill_ring->cached_prod;
1108-
uint fill_cons = fill_ring->cached_cons;
1109-
1110-
/* If cached_cons suggests there may not be space in the fill ring,
1111-
refresh from fseq and check again. Else, skip the fseq access */
1112-
1113-
if( FD_UNLIKELY( fill_prod-fill_cons >= fill_depth ) ) {
1114-
fill_cons = fill_ring->cached_cons = FD_VOLATILE_CONST( *fill_ring->cons );
1115-
if( FD_UNLIKELY( fill_prod-fill_cons >= fill_depth ) ) {
1116-
ctx->metrics.rx_fill_blocked_cnt++;
1117-
return; /* blocked */
1118-
}
1087+
if( FD_UNLIKELY( fd_xdp_ring_full( fill_ring ) ) ) {
1088+
ctx->metrics.rx_fill_blocked_cnt++;
1089+
return; /* blocked */
11191090
}
11201091

11211092
/* Pass it to the receive handler */
11221093

11231094
uint freed_chunk = (uint)( ctx->umem_chunk0 + (frame.addr>>FD_CHUNK_LG_SZ) );
11241095
net_rx_packet( ctx, frame.addr, frame.len, &freed_chunk );
1125-
11261096
FD_COMPILER_MFENCE();
11271097
rx_ring->cached_cons = rx_seq+1U;
11281098

@@ -1135,7 +1105,12 @@ net_rx_event( fd_net_ctx_t * ctx,
11351105
FD_LOG_CRIT(( "mcache corruption detected: chunk=%u chunk0=%u wmark=%u",
11361106
freed_chunk, ctx->umem_chunk0, ctx->umem_wmark ));
11371107
}
1138-
ulong freed_off = (freed_chunk - ctx->umem_chunk0)<<FD_CHUNK_LG_SZ;
1108+
1109+
FD_STATIC_ASSERT( FD_ULONG_IS_POW2( FD_NET_MTU ), "FD_NET_MTU must be a power of two" );
1110+
uint fill_prod = fill_ring->cached_prod;
1111+
uint fill_mask = (fill_ring->depth)-1U;
1112+
ulong frame_mask = FD_NET_MTU - 1UL;
1113+
ulong freed_off = (freed_chunk - ctx->umem_chunk0)<<FD_CHUNK_LG_SZ;
11391114
fill_ring->frame_ring[ fill_prod&fill_mask ] = freed_off & (~frame_mask);
11401115
fill_ring->cached_prod = fill_prod+1U;
11411116
}
@@ -1168,31 +1143,20 @@ before_credit( fd_net_ctx_t * ctx,
11681143

11691144
net_tx_periodic_wakeup( ctx, rr_idx, fd_tickcount(), charge_busy );
11701145

1171-
uint rx_cons = rr_xsk->ring_rx.cached_cons;
1172-
uint rx_prod = rr_xsk->ring_rx.cached_prod; /* might be stale */
1173-
if( FD_UNLIKELY( rx_cons==rx_prod ) ) {
1174-
rx_prod = rr_xsk->ring_rx.cached_prod = FD_VOLATILE_CONST( *rr_xsk->ring_rx.prod );
1175-
}
1176-
1177-
if( rx_cons!=rx_prod ) {
1146+
/* Fire RX event if we have RX desc avail */
1147+
if( !fd_xdp_ring_empty( &rr_xsk->ring_rx, FD_XDP_RING_ROLE_CONS ) ) {
11781148
*charge_busy = 1;
1179-
net_rx_event( ctx, rr_xsk, rx_cons );
1149+
net_rx_event( ctx, rr_xsk, rr_xsk->ring_rx.cached_cons );
11801150
} else {
11811151
net_rx_wakeup( ctx, rr_xsk, charge_busy );
11821152
ctx->rr_idx++;
11831153
ctx->rr_idx = fd_uint_if( ctx->rr_idx>=ctx->xsk_cnt, 0, ctx->rr_idx );
11841154
}
11851155

1186-
uint comp_cons = rr_xsk->ring_cr.cached_cons;
1187-
uint comp_prod = rr_xsk->ring_cr.cached_prod; /* might be stale */
1188-
if( FD_UNLIKELY( comp_cons==comp_prod ) ) {
1189-
comp_prod = rr_xsk->ring_cr.cached_prod = FD_VOLATILE_CONST( *rr_xsk->ring_cr.prod );
1190-
}
1191-
1192-
if( comp_cons!=comp_prod ) {
1156+
/* Fire comp event if we have comp desc avail */
1157+
if( !fd_xdp_ring_empty( &rr_xsk->ring_cr, FD_XDP_RING_ROLE_CONS ) ) {
11931158
*charge_busy = 1;
1194-
rr_xsk->ring_cr.cached_prod = comp_prod;
1195-
net_comp_event( ctx, rr_xsk, comp_cons );
1159+
net_comp_event( ctx, rr_xsk, rr_xsk->ring_cr.cached_cons );
11961160
}
11971161
}
11981162

src/disco/net/xdp/test_xdp_tile.c

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,89 @@ append_netdev( fd_net_ctx_t * ctx,
147147
ctx->netdev_tbl.hdr->dev_cnt = (ushort)(idx + (ushort)1);
148148
}
149149

150+
/* Test net_tx_ready for all code paths */
151+
static void
152+
test_net_tx_ready( void ) {
153+
fd_xdp_ring_t tx_ring;
154+
uint tx_prod_val = 0;
155+
uint tx_cons_val = 0;
156+
157+
fd_net_free_ring_t free_ring;
158+
ulong free_queue[128];
159+
160+
/* Setup tx_ring */
161+
tx_ring.prod = &tx_prod_val;
162+
tx_ring.cons = &tx_cons_val;
163+
tx_ring.depth = 128;
164+
tx_ring.cached_prod = 0;
165+
tx_ring.cached_cons = 0;
166+
167+
/* Setup free_ring */
168+
free_ring.queue = free_queue;
169+
free_ring.depth = 128;
170+
free_ring.prod = 0;
171+
free_ring.cons = 0;
172+
173+
/* Both ready - should return 1 */
174+
tx_prod_val = tx_ring.cached_prod = 100;
175+
tx_cons_val = tx_ring.cached_cons = 90;
176+
free_ring.prod = 10;
177+
free_ring.cons = 5;
178+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 1 );
179+
180+
/* Free ring empty (prod == cons) - should return 0 */
181+
tx_prod_val = tx_ring.cached_prod = 100;
182+
tx_cons_val = tx_ring.cached_cons = 90;
183+
free_ring.prod = free_ring.cons = 20; /* empty */
184+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 0 );
185+
186+
/* TX ring full - should return 0 */
187+
tx_prod_val = tx_ring.cached_prod = 200;
188+
tx_cons_val = tx_ring.cached_cons = 72; /* 200 - 72 = 128, exactly depth */
189+
free_ring.prod = 30;
190+
free_ring.cons = 25;
191+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 0 );
192+
193+
/* TX ring appears full with stale cache, but actually not full */
194+
tx_ring.cached_prod = 300;
195+
tx_ring.cached_cons = 172; /* 300 - 172 = 128, appears full */
196+
tx_prod_val = 300;
197+
tx_cons_val = 200; /* consumer made progress, 300 - 200 = 100 */
198+
free_ring.prod = 40;
199+
free_ring.cons = 35;
200+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 1 );
201+
FD_TEST( tx_ring.cached_cons == 200 ); /* verify cache was refreshed */
202+
203+
/* Both blocking conditions (free empty AND tx full) */
204+
tx_prod_val = tx_ring.cached_prod = 500;
205+
tx_cons_val = tx_ring.cached_cons = 372; /* 500 - 372 = 128, full */
206+
free_ring.prod = free_ring.cons = 60; /* empty*/
207+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 0 );
208+
209+
/* Edge case - tx ring with one slot available */
210+
tx_prod_val = tx_ring.cached_prod = 600;
211+
tx_cons_val = tx_ring.cached_cons = 473; /* 600 - 473 = 127, one slot */
212+
free_ring.prod = 70;
213+
free_ring.cons = 69;
214+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 1 );
215+
216+
/* Edge case - free ring with one buffer available */
217+
tx_prod_val = tx_ring.cached_prod = 700;
218+
tx_cons_val = tx_ring.cached_cons = 690;
219+
free_ring.prod = 81;
220+
free_ring.cons = 80; /* one buffer */
221+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 1 );
222+
223+
/* TX ring empty (should be ready) */
224+
tx_prod_val = tx_ring.cached_prod = 800;
225+
tx_cons_val = tx_ring.cached_cons = 800;
226+
free_ring.prod = 110;
227+
free_ring.cons = 105;
228+
FD_TEST( net_tx_ready( &tx_ring, &free_ring ) == 1 );
229+
230+
FD_LOG_NOTICE(( "test_net_tx_ready: pass" ));
231+
}
232+
150233
static void
151234
setup_netdev_table( fd_net_ctx_t * ctx ) {
152235
ctx->netdev_tbl.hdr->dev_cnt = 0;
@@ -191,6 +274,8 @@ main( int argc,
191274
char ** argv ) {
192275
fd_boot( &argc, &argv );
193276

277+
test_net_tx_ready();
278+
194279
ulong cpu_idx = fd_tile_cpu_id( fd_tile_idx() );
195280
if( cpu_idx>fd_shmem_cpu_cnt() ) cpu_idx = 0UL;
196281
ulong part_max = fd_wksp_part_max_est( SCRATCH_MAX, 64UL );

src/waltz/xdp/Local.mk

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ ifdef FD_HAS_HOSTED
44
ifdef FD_HAS_LINUX
55
$(call add-objs,fd_xsk fd_xdp1 fd_xdp_redirect_user,fd_waltz)
66

7+
$(call make-unit-test,test_xsk,test_xsk,fd_waltz fd_util)
8+
$(call run-unit-test,test_xsk)
9+
710
$(call make-unit-test,test_xdp_ebpf,test_xdp_ebpf,fd_waltz fd_util)
811
$(call run-unit-test,test_xdp_ebpf)
912
endif # FD_HAS_LINUX

src/waltz/xdp/fd_xsk.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,40 @@ struct __attribute__((aligned(64UL))) fd_xdp_ring {
153153
};
154154
typedef struct fd_xdp_ring fd_xdp_ring_t;
155155

156+
#define FD_XDP_RING_ROLE_PROD 0
157+
#define FD_XDP_RING_ROLE_CONS 1
158+
159+
/* fd_xdp_ring_empty returns 1 if the ring is empty, 0 otherwise.
160+
'role' is FD_XDP_RING_ROLE_PROD if userspace is the producer (fill, tx),
161+
and FD_XDP_RING_ROLE_CONS if userspace is the consumer (rx, completion). */
162+
163+
static inline int
164+
fd_xdp_ring_empty( fd_xdp_ring_t * ring, uint role ) {
165+
if( role == FD_XDP_RING_ROLE_PROD ) {
166+
/* If potentially stale cached_cons says everything consumed,
167+
it's definitely empty. Else, refresh cached seq. */
168+
if( FD_UNLIKELY( ring->cached_prod == ring->cached_cons ) ) return 1;
169+
ring->cached_cons = FD_VOLATILE_CONST( *ring->cons );
170+
} else {
171+
/* If potentially stale cached_prod says we have more to read,
172+
it's definitely non-empty. Else, refresh cached seq. */
173+
if( FD_LIKELY( ring->cached_cons < ring->cached_prod ) ) return 0;
174+
ring->cached_prod = FD_VOLATILE_CONST( *ring->prod );
175+
}
176+
return ring->cached_prod == ring->cached_cons;
177+
}
178+
179+
/* fd_xdp_ring_full returns 1 if the ring is full, 0 otherwise.
180+
Assumes caller is the producer to this ring (fill, tx). */
181+
static inline int
182+
fd_xdp_ring_full( fd_xdp_ring_t * ring ) {
183+
/* If potentially stale cached_cons says we have more space,
184+
it's definitely not full. Else, refresh cached seq. */
185+
if( FD_LIKELY( ring->cached_prod - ring->cached_cons < ring->depth ) ) return 0;
186+
ring->cached_cons = FD_VOLATILE_CONST( *ring->cons );
187+
return ring->cached_prod - ring->cached_cons >= ring->depth;
188+
}
189+
156190
/* fd_xsk_params_t: Memory layout parameters of XSK.
157191
Can be retrieved using fd_xsk_get_params() */
158192

0 commit comments

Comments
 (0)