Skip to content

Commit 3f881b9

Browse files
authored
Implement replay_event_log_async (#1266)
2 parents 4edff40 + 06f3314 commit 3f881b9

File tree

4 files changed

+301
-106
lines changed

4 files changed

+301
-106
lines changed

payjoin/src/core/receive/v2/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ pub(crate) use error::InternalSessionError;
3535
pub use error::SessionError;
3636
use serde::de::Deserializer;
3737
use serde::{Deserialize, Serialize};
38-
pub use session::{replay_event_log, SessionEvent, SessionHistory, SessionOutcome, SessionStatus};
38+
pub use session::{
39+
replay_event_log, replay_event_log_async, SessionEvent, SessionHistory, SessionOutcome,
40+
SessionStatus,
41+
};
3942
use url::Url;
4043
#[cfg(target_arch = "wasm32")]
4144
use web_time::Duration;

payjoin/src/core/receive/v2/session.rs

Lines changed: 165 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,38 @@ use serde::{Deserialize, Serialize};
33
use super::{ReceiveSession, SessionContext};
44
use crate::error::{InternalReplayError, ReplayError};
55
use crate::output_substitution::OutputSubstitution;
6-
use crate::persist::SessionPersister;
6+
use crate::persist::{AsyncSessionPersister, SessionPersister};
77
use crate::receive::{InputPair, JsonReply, OriginalPayload, PsbtContext};
88
use crate::{ImplementationError, PjUri};
99

10+
fn replay_events(
11+
mut logs: impl Iterator<Item = SessionEvent>,
12+
) -> Result<(ReceiveSession, Vec<SessionEvent>), ReplayError<ReceiveSession, SessionEvent>> {
13+
let first_event = logs.next().ok_or(InternalReplayError::NoEvents)?;
14+
let mut session_events = vec![first_event.clone()];
15+
let mut receiver = match first_event {
16+
SessionEvent::Created(context) => ReceiveSession::new(context),
17+
_ => return Err(InternalReplayError::InvalidEvent(Box::new(first_event), None).into()),
18+
};
19+
20+
for event in logs {
21+
session_events.push(event.clone());
22+
receiver = receiver.process_event(event)?;
23+
}
24+
Ok((receiver, session_events))
25+
}
26+
27+
fn construct_history(
28+
session_events: Vec<SessionEvent>,
29+
) -> Result<SessionHistory, ReplayError<ReceiveSession, SessionEvent>> {
30+
let history = SessionHistory::new(session_events);
31+
let ctx = history.session_context();
32+
if ctx.expiration.elapsed() {
33+
return Err(InternalReplayError::Expired(ctx.expiration).into());
34+
}
35+
Ok(history)
36+
}
37+
1038
/// Replay a receiver event log to get the receiver in its current state [ReceiveSession]
1139
/// and a session history [SessionHistory]
1240
pub fn replay_event_log<P>(
@@ -17,35 +45,49 @@ where
1745
P::SessionEvent: Into<SessionEvent> + Clone,
1846
P::SessionEvent: From<SessionEvent>,
1947
{
20-
let mut logs = persister
48+
let logs = persister
2149
.load()
2250
.map_err(|e| InternalReplayError::PersistenceFailure(ImplementationError::new(e)))?;
2351

24-
let first_event = logs.next().ok_or(InternalReplayError::NoEvents)?.into();
25-
let mut session_events = vec![first_event.clone()];
26-
let mut receiver = match first_event {
27-
SessionEvent::Created(context) => ReceiveSession::new(context),
28-
_ => return Err(InternalReplayError::InvalidEvent(Box::new(first_event), None).into()),
52+
let (receiver, session_events) = match replay_events(logs.map(|e| e.into())) {
53+
Ok(r) => r,
54+
Err(e) => {
55+
persister.close().map_err(|ce| {
56+
InternalReplayError::PersistenceFailure(ImplementationError::new(ce))
57+
})?;
58+
return Err(e);
59+
}
2960
};
30-
for event in logs {
31-
session_events.push(event.clone().into());
32-
receiver = receiver.process_event(event.into()).map_err(|e| {
33-
if let Err(storage_err) = persister.close() {
34-
return InternalReplayError::PersistenceFailure(ImplementationError::new(
35-
storage_err,
36-
))
37-
.into();
38-
}
39-
e
40-
})?;
41-
}
4261

43-
let history = SessionHistory::new(session_events.clone());
44-
let ctx = history.session_context();
45-
if ctx.expiration.elapsed() {
46-
return Err(InternalReplayError::Expired(ctx.expiration).into());
47-
}
62+
let history = construct_history(session_events)?;
63+
Ok((receiver, history))
64+
}
65+
66+
/// Async version of [replay_event_log]
67+
pub async fn replay_event_log_async<P>(
68+
persister: &P,
69+
) -> Result<(ReceiveSession, SessionHistory), ReplayError<ReceiveSession, SessionEvent>>
70+
where
71+
P: AsyncSessionPersister,
72+
P::SessionEvent: Into<SessionEvent> + Clone,
73+
P::SessionEvent: From<SessionEvent>,
74+
{
75+
let logs = persister
76+
.load()
77+
.await
78+
.map_err(|e| InternalReplayError::PersistenceFailure(ImplementationError::new(e)))?;
4879

80+
let (receiver, session_events) = match replay_events(logs.map(|e| e.into())) {
81+
Ok(r) => r,
82+
Err(e) => {
83+
persister.close().await.map_err(|ce| {
84+
InternalReplayError::PersistenceFailure(ImplementationError::new(ce))
85+
})?;
86+
return Err(e);
87+
}
88+
};
89+
90+
let history = construct_history(session_events)?;
4991
Ok((receiver, history))
5092
}
5193

@@ -183,7 +225,7 @@ mod tests {
183225
use payjoin_test_utils::{BoxError, EXAMPLE_URL};
184226

185227
use super::*;
186-
use crate::persist::test_utils::InMemoryTestPersister;
228+
use crate::persist::test_utils::{InMemoryAsyncTestPersister, InMemoryTestPersister};
187229
use crate::persist::NoopSessionPersister;
188230
use crate::receive::tests::original_from_test_vector;
189231
use crate::receive::v2::test::{mock_err, SHARED_CONTEXT};
@@ -268,39 +310,55 @@ mod tests {
268310
}
269311
}
270312

313+
#[derive(Clone)]
271314
struct SessionHistoryExpectedOutcome {
272315
fallback_tx: Option<bitcoin::Transaction>,
273316
expected_status: SessionStatus,
274317
}
275318

319+
#[derive(Clone)]
276320
struct SessionHistoryTest {
277321
events: Vec<SessionEvent>,
278322
expected_session_history: SessionHistoryExpectedOutcome,
279323
expected_receiver_state: ReceiveSession,
280324
}
281325

282-
fn run_session_history_test(test: SessionHistoryTest) -> Result<(), BoxError> {
283-
let persister = InMemoryTestPersister::<SessionEvent>::default();
284-
for event in test.events.clone() {
285-
persister.save_event(event)?;
286-
}
287-
288-
let (receiver, session_history) = replay_event_log(&persister)?;
326+
fn verify_session_result(
327+
session_result: Result<
328+
(ReceiveSession, SessionHistory),
329+
crate::error::ReplayError<ReceiveSession, SessionEvent>,
330+
>,
331+
test: &SessionHistoryTest,
332+
) {
333+
let (receiver, session_history) = session_result.expect("replay should succeed");
289334
assert_eq!(receiver, test.expected_receiver_state);
290335
assert_eq!(session_history.fallback_tx(), test.expected_session_history.fallback_tx);
291336
assert_eq!(session_history.status(), test.expected_session_history.expected_status);
292337
let expected_reply_key = test.events.iter().find_map(|event| match event {
293338
SessionEvent::RetrievedOriginalPayload { reply_key, .. } => reply_key.clone(),
294339
_ => None,
295340
});
296-
297341
assert_eq!(session_history.session_context().reply_key, expected_reply_key);
342+
}
298343

299-
Ok(())
344+
fn run_session_history_test(test: &SessionHistoryTest) {
345+
let persister = InMemoryTestPersister::<SessionEvent>::default();
346+
for event in test.events.clone() {
347+
persister.save_event(event).expect("In memory persister shouldn't fail");
348+
}
349+
verify_session_result(replay_event_log(&persister), test);
300350
}
301351

302-
#[test]
303-
fn test_replaying_session_creation() -> Result<(), BoxError> {
352+
async fn run_session_history_test_async(test: &SessionHistoryTest) {
353+
let persister = InMemoryAsyncTestPersister::<SessionEvent>::default();
354+
for event in test.events.clone() {
355+
persister.save_event(event).await.expect("In memory persister shouldn't fail");
356+
}
357+
verify_session_result(replay_event_log_async(&persister).await, test);
358+
}
359+
360+
#[tokio::test]
361+
async fn test_replaying_session_creation() {
304362
let session_context = SHARED_CONTEXT.clone();
305363
let test = SessionHistoryTest {
306364
events: vec![SessionEvent::Created(session_context.clone())],
@@ -313,25 +371,62 @@ mod tests {
313371
session_context,
314372
}),
315373
};
316-
run_session_history_test(test)
374+
run_session_history_test(&test);
375+
run_session_history_test_async(&test).await;
317376
}
318377

319-
#[test]
320-
fn test_replaying_session_creation_with_expired_session() -> Result<(), BoxError> {
378+
#[tokio::test]
379+
async fn test_replaying_session_creation_with_expired_session() {
321380
let expiration = (SystemTime::now() - Duration::from_secs(1)).try_into().unwrap();
322381
let session_context = SessionContext { expiration, ..SHARED_CONTEXT.clone() };
323-
let persister = InMemoryTestPersister::<SessionEvent>::default();
324-
persister.save_event(SessionEvent::Created(session_context))?;
325382

383+
let persister = InMemoryTestPersister::<SessionEvent>::default();
384+
persister.save_event(SessionEvent::Created(session_context.clone()));
326385
let err = replay_event_log(&persister).expect_err("session should be expired");
327386
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
328387
InternalReplayError::Expired(expiration).into();
329388
assert_eq!(err.to_string(), expected_err.to_string());
330-
Ok(())
389+
390+
let persister = InMemoryAsyncTestPersister::<SessionEvent>::default();
391+
persister.save_event(SessionEvent::Created(session_context)).await;
392+
let err = replay_event_log_async(&persister).await.expect_err("session should be expired");
393+
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
394+
InternalReplayError::Expired(expiration).into();
395+
assert_eq!(err.to_string(), expected_err.to_string());
331396
}
332397

333-
#[test]
334-
fn test_replaying_unchecked_proposal() -> Result<(), BoxError> {
398+
#[tokio::test]
399+
async fn test_replaying_session_with_missing_created_event() {
400+
let persister = InMemoryTestPersister::<SessionEvent>::default();
401+
persister.save_event(SessionEvent::CheckedBroadcastSuitability());
402+
assert!(!persister.inner.read().expect("session read should succeed").is_closed);
403+
let err = replay_event_log(&persister).expect_err("session replay should be fail");
404+
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
405+
InternalReplayError::InvalidEvent(
406+
Box::new(SessionEvent::CheckedBroadcastSuitability()),
407+
None,
408+
)
409+
.into();
410+
assert_eq!(err.to_string(), expected_err.to_string());
411+
assert!(persister.inner.read().expect("lock should not be poisoned").is_closed);
412+
413+
let persister = InMemoryAsyncTestPersister::<SessionEvent>::default();
414+
persister.save_event(SessionEvent::CheckedBroadcastSuitability()).await;
415+
assert!(!persister.inner.read().await.is_closed);
416+
let err =
417+
replay_event_log_async(&persister).await.expect_err("session replay should be fail");
418+
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
419+
InternalReplayError::InvalidEvent(
420+
Box::new(SessionEvent::CheckedBroadcastSuitability()),
421+
None,
422+
)
423+
.into();
424+
assert_eq!(err.to_string(), expected_err.to_string());
425+
assert!(persister.inner.read().await.is_closed);
426+
}
427+
428+
#[tokio::test]
429+
async fn test_replaying_unchecked_proposal() {
335430
let session_context = SHARED_CONTEXT.clone();
336431
let original = original_from_test_vector();
337432
let reply_key = Some(crate::HpkeKeyPair::gen_keypair().1);
@@ -353,11 +448,12 @@ mod tests {
353448
session_context: SessionContext { reply_key, ..session_context },
354449
}),
355450
};
356-
run_session_history_test(test)
451+
run_session_history_test(&test);
452+
run_session_history_test_async(&test).await;
357453
}
358454

359-
#[test]
360-
fn test_replaying_unchecked_proposal_with_reply_key() -> Result<(), BoxError> {
455+
#[tokio::test]
456+
async fn test_replaying_unchecked_proposal_with_reply_key() {
361457
let session_context = SHARED_CONTEXT.clone();
362458
let original = original_from_test_vector();
363459
let reply_key = Some(crate::HpkeKeyPair::gen_keypair().1);
@@ -379,11 +475,12 @@ mod tests {
379475
session_context: SessionContext { reply_key, ..session_context },
380476
}),
381477
};
382-
run_session_history_test(test)
478+
run_session_history_test(&test);
479+
run_session_history_test_async(&test).await;
383480
}
384481

385-
#[test]
386-
fn getting_fallback_tx() -> Result<(), BoxError> {
482+
#[tokio::test]
483+
async fn getting_fallback_tx() {
387484
let persister = NoopSessionPersister::<SessionEvent>::default();
388485
let session_context = SHARED_CONTEXT.clone();
389486
let mut events = vec![];
@@ -413,11 +510,12 @@ mod tests {
413510
session_context: SessionContext { reply_key, ..session_context },
414511
}),
415512
};
416-
run_session_history_test(test)
513+
run_session_history_test(&test);
514+
run_session_history_test_async(&test).await;
417515
}
418516

419-
#[test]
420-
fn test_contributed_inputs() -> Result<(), BoxError> {
517+
#[tokio::test]
518+
async fn test_contributed_inputs() {
421519
let persister = InMemoryTestPersister::<SessionEvent>::default();
422520
let session_context = SHARED_CONTEXT.clone();
423521
let mut events = vec![];
@@ -486,11 +584,12 @@ mod tests {
486584
session_context: SessionContext { reply_key, ..session_context },
487585
}),
488586
};
489-
run_session_history_test(test)
587+
run_session_history_test(&test);
588+
run_session_history_test_async(&test).await;
490589
}
491590

492-
#[test]
493-
fn test_payjoin_proposal() -> Result<(), BoxError> {
591+
#[tokio::test]
592+
async fn test_payjoin_proposal() {
494593
let persister = NoopSessionPersister::<SessionEvent>::default();
495594
let session_context = SHARED_CONTEXT.clone();
496595
let mut events = vec![];
@@ -561,11 +660,12 @@ mod tests {
561660
},
562661
expected_receiver_state: ReceiveSession::Closed(SessionOutcome::Success(vec![])),
563662
};
564-
run_session_history_test(test)
663+
run_session_history_test(&test);
664+
run_session_history_test_async(&test).await;
565665
}
566666

567-
#[test]
568-
fn test_session_fatal_error() -> Result<(), BoxError> {
667+
#[tokio::test]
668+
async fn test_session_fatal_error() {
569669
let persister = NoopSessionPersister::<SessionEvent>::default();
570670
let session_context = SHARED_CONTEXT.clone();
571671
let mut events = vec![];
@@ -596,11 +696,12 @@ mod tests {
596696
},
597697
expected_receiver_state: ReceiveSession::Closed(SessionOutcome::Failure),
598698
};
599-
run_session_history_test(test)
699+
run_session_history_test(&test);
700+
run_session_history_test_async(&test).await;
600701
}
601702

602-
#[test]
603-
fn test_session_transient_error() -> Result<(), BoxError> {
703+
#[tokio::test]
704+
async fn test_session_transient_error() {
604705
let persister = NoopSessionPersister::<SessionEvent>::default();
605706
let session_context = SHARED_CONTEXT.clone();
606707
let mut events = vec![];
@@ -632,7 +733,8 @@ mod tests {
632733
session_context: SessionContext { reply_key, ..session_context },
633734
}),
634735
};
635-
run_session_history_test(test)
736+
run_session_history_test(&test);
737+
run_session_history_test_async(&test).await;
636738
}
637739

638740
#[test]

payjoin/src/core/send/v2/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ pub use error::{CreateRequestError, EncapsulationError};
3434
use error::{InternalCreateRequestError, InternalEncapsulationError};
3535
use ohttp::ClientResponse;
3636
use serde::{Deserialize, Serialize};
37-
pub use session::{replay_event_log, SessionEvent, SessionHistory, SessionOutcome, SessionStatus};
37+
pub use session::{
38+
replay_event_log, replay_event_log_async, SessionEvent, SessionHistory, SessionOutcome,
39+
SessionStatus,
40+
};
3841
use url::Url;
3942

4043
use super::error::BuildSenderError;

0 commit comments

Comments
 (0)