@@ -627,11 +627,15 @@ struct ExecutorKit {
627627
628628struct PaymentEvent {
629629 absolute_time : SystemTime ,
630+ executor : ExecutorKit ,
631+ }
632+
633+ #[ derive( Clone ) ]
634+ struct PaymentEventPayload {
630635 wait_time : Duration ,
631636 destination : NodeInfo ,
632- current_count : u64 ,
633637 capacity : Option < u64 > ,
634- executor : ExecutorKit ,
638+ current_count : u64 ,
635639}
636640
637641impl Ord for PaymentEvent {
@@ -1106,14 +1110,26 @@ impl<C: Clock + 'static> Simulation<C> {
11061110 tasks : & TaskTracker ,
11071111 ) -> Result < ( ) , SimulationError > {
11081112 let mut heap: BinaryHeap < Reverse < PaymentEvent > > = BinaryHeap :: new ( ) ;
1113+ let payment_event_payloads: Arc < Mutex < HashMap < PublicKey , PaymentEventPayload > > > =
1114+ Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
11091115 for executor in executors {
1110- generate_payment ( & mut heap, executor, 0 ) ?;
1116+ let payload = PaymentEventPayload {
1117+ wait_time : Duration :: from_secs ( 0 ) ,
1118+ destination : executor. source_info . clone ( ) ,
1119+ capacity : None ,
1120+ current_count : 0 ,
1121+ } ;
1122+ payment_event_payloads
1123+ . lock ( )
1124+ . await
1125+ . insert ( executor. source_info . pubkey , payload) ;
1126+ generate_payment ( & mut heap, executor, 0 , payment_event_payloads. clone ( ) ) . await ?;
11111127 }
11121128
11131129 let listener = self . shutdown_listener . clone ( ) ;
11141130 let shutdown = self . shutdown_trigger . clone ( ) ;
11151131 let clock = self . clock . clone ( ) ;
1116- let t = tasks. clone ( ) ;
1132+ let task_clone = tasks. clone ( ) ;
11171133
11181134 tasks. spawn ( async move {
11191135 loop {
@@ -1126,14 +1142,16 @@ impl<C: Clock + 'static> Simulation<C> {
11261142 match heap. pop( ) {
11271143 Some ( Reverse ( PaymentEvent {
11281144 absolute_time: _,
1129- wait_time,
1130- destination,
1131- current_count,
1132- capacity,
11331145 executor,
11341146 } ) ) => {
1147+ let payload = payment_event_payloads
1148+ . lock( )
1149+ . await
1150+ . get( & executor. source_info. pubkey)
1151+ . ok_or( SimulationError :: PaymentGenerationError ( PaymentGenerationError ( format!( "executor {} not found" , executor. source_info) ) ) ) ?
1152+ . clone( ) ;
11351153 if let Some ( c) = executor. payment_generator. payment_count( ) {
1136- if c == current_count {
1154+ if c == payload . current_count {
11371155 log:: info!(
11381156 "Payment count has been met for {}: {c} payments. Stopping the activity."
11391157 , executor. source_info) ;
@@ -1157,14 +1175,14 @@ impl<C: Clock + 'static> Simulation<C> {
11571175 // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
11581176 // a payment amount something has gone wrong (because we should have validated that we can always
11591177 // generate amounts), so we exit.
1160- let payment_amount = executor. payment_generator. payment_amount( capacity) ;
1178+ let payment_amount = executor. payment_generator. payment_amount( payload . capacity) ;
11611179 let amount = match payment_amount {
11621180 Ok ( amt) => {
11631181 if amt == 0 {
11641182 log:: debug!(
1165- "Skipping zero amount payment for {source} -> {destination }."
1183+ "Skipping zero amount payment for {source} -> {}." , payload . destination
11661184 ) ;
1167- generate_payment( & mut heap, executor, current_count) ?;
1185+ generate_payment( & mut heap, executor, payload . current_count, payment_event_payloads . clone ( ) ) . await ?;
11681186 continue ;
11691187 }
11701188 amt
@@ -1173,19 +1191,19 @@ impl<C: Clock + 'static> Simulation<C> {
11731191 return Err ( SimulationError :: PaymentGenerationError ( e) ) ;
11741192 } ,
11751193 } ;
1176- generate_payment( & mut heap, executor, current_count + 1 ) ?;
1194+ generate_payment( & mut heap, executor, payload . current_count + 1 , payment_event_payloads . clone ( ) ) . await ?;
11771195
11781196 // Wait until our time to next payment has elapsed then execute a random amount payment to a random
11791197 // destination.
1180- pe_clock. sleep( wait_time) . await ;
1181- t . spawn( async move {
1182- log:: debug!( "Generated payment: {source} -> {}: {amount} msat." , destination) ;
1198+ pe_clock. sleep( payload . wait_time) . await ;
1199+ task_clone . spawn( async move {
1200+ log:: debug!( "Generated payment: {source} -> {}: {amount} msat." , payload . destination) ;
11831201
11841202 // Send the payment, exiting if we can no longer send to the consumer.
1185- let event = SimulationEvent :: SendPayment ( destination. clone( ) , amount) ;
1203+ let event = SimulationEvent :: SendPayment ( payload . destination. clone( ) , amount) ;
11861204 if let Err ( e) = pe_sender. send( event. clone( ) ) . await {
11871205 pe_shutdown. trigger( ) ;
1188- log:: debug!( "Not able to send event payment for {amount}: {source} -> {destination }. Exited with error {e}." ) ;
1206+ log:: debug!( "Not able to send event payment for {amount}: {source} -> {}. Exited with error {e}." , payload . destination ) ;
11891207 } else {
11901208
11911209 log:: debug!( "Send event payment for {source} completed successfully." ) ;
@@ -1540,10 +1558,11 @@ async fn track_payment_result(
15401558 Ok ( ( ) )
15411559}
15421560
1543- fn generate_payment (
1561+ async fn generate_payment (
15441562 heap : & mut BinaryHeap < Reverse < PaymentEvent > > ,
15451563 executor : ExecutorKit ,
15461564 current_count : u64 ,
1565+ counters : Arc < Mutex < HashMap < PublicKey , PaymentEventPayload > > > ,
15471566) -> Result < ( ) , SimulationError > {
15481567 let now = SystemTime :: now ( ) ;
15491568 let wait_time = get_payment_delay (
@@ -1560,12 +1579,24 @@ fn generate_payment(
15601579 . network_generator
15611580 . choose_destination ( executor. source_info . pubkey )
15621581 . map_err ( SimulationError :: DestinationGenerationError ) ?;
1582+ let mut payload = counters
1583+ . lock ( )
1584+ . await
1585+ . get ( & executor. source_info . pubkey )
1586+ . ok_or ( SimulationError :: PaymentGenerationError (
1587+ PaymentGenerationError ( format ! ( "executor {} not found" , executor. source_info) ) ,
1588+ ) ) ?
1589+ . clone ( ) ;
1590+ payload. wait_time = wait_time;
1591+ payload. destination = destination;
1592+ payload. capacity = capacity;
1593+ payload. current_count = current_count;
1594+ counters
1595+ . lock ( )
1596+ . await
1597+ . insert ( executor. source_info . pubkey , payload) ;
15631598 let payment_event = PaymentEvent {
15641599 absolute_time,
1565- wait_time,
1566- destination,
1567- current_count,
1568- capacity,
15691600 executor,
15701601 } ;
15711602 heap. push ( Reverse ( payment_event) ) ;
@@ -2038,18 +2069,18 @@ mod tests {
20382069 source : node_1. clone ( ) ,
20392070 destination : node_2. clone ( ) ,
20402071 start_secs : None ,
2041- count : Some ( 10 ) , // No limit
2042- interval_secs : crate :: ValueOrRange :: Value ( 5 ) , // 1 second interval
2072+ count : Some ( 10 ) , // 10 payments
2073+ interval_secs : crate :: ValueOrRange :: Value ( 2 ) , // 2 second interval
20432074 amount_msat : crate :: ValueOrRange :: Value ( 2000 ) , // 2000 msats
20442075 } ;
20452076
2046- // Activity 2: From node_2 to node_1 - This would normally succeed
2077+ // Activity 2: From node_3 to node_4
20472078 let activity_2 = crate :: ActivityDefinition {
20482079 source : node_3. clone ( ) ,
20492080 destination : node_4. clone ( ) ,
2050- start_secs : None , // Start 2 seconds after the first activity
2051- count : Some ( 10 ) , // 10 payments if no error
2052- interval_secs : crate :: ValueOrRange :: Value ( 10 ) , // 1 second interval
2081+ start_secs : None ,
2082+ count : Some ( 10 ) , // 10 payments
2083+ interval_secs : crate :: ValueOrRange :: Value ( 4 ) , // 4 second interval
20532084 amount_msat : crate :: ValueOrRange :: Value ( 3000 ) , // 3000 msats
20542085 } ;
20552086
@@ -2058,7 +2089,7 @@ mod tests {
20582089 // Create simulation with a long timeout that we don't expect to be reached
20592090 let simulation = Simulation :: new (
20602091 SimulationCfg :: new (
2061- None , // 30 second timeout (shouldn't matter)
2092+ None , // without timeout
20622093 100 , // Expected payment size
20632094 2.0 , // Activity multiplier
20642095 None , // No result writing
@@ -2076,12 +2107,12 @@ mod tests {
20762107 let _ = simulation. run ( & vec ! [ activity_1, activity_2] ) . await ;
20772108 let elapsed = start. elapsed ( ) ;
20782109
2079- // Check that simulation ran 150 because
2080- // from activity_1 there are 10 payments with a wait_time of 5s
2081- // from activity_2 there are 10 payments with a wait_time of 10s
2110+ // Check that simulation ran 60 because
2111+ // from activity_1 there are 10 payments with a wait_time of 2s -> 20s
2112+ // from activity_2 there are 10 payments with a wait_time of 4s -> 40s
20822113 assert ! (
2083- elapsed >= Duration :: from_secs( 150 ) ,
2084- "Simulation should have run at least for 150s , took {:?}" ,
2114+ elapsed >= Duration :: from_secs( 60 ) ,
2115+ "Simulation should have run at least for 54s , took {:?}" ,
20852116 elapsed
20862117 ) ;
20872118
0 commit comments