diff --git a/README.md b/README.md index c56bdc7..7e0a576 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ and display-and-wait. ## Features - **Echo Bridge Functionality**: Acts as a simple echo bridge for testing. +- **Per-Message Echo Controls**: Specific outbound messages can suppress or delay the remote echo. - **Login Flows**: Supports multiple login flows such as password, cookies, local storage, and display-and-wait. - **Automation**: Configurable automation options for management rooms, login, portals, @@ -48,6 +49,14 @@ automation: timelimit: 0s # Duration for the initial startup infinite backfill, e.g. 10s, 1m, 1h ``` +To test slow or missing remote echoes, send trigger phrases in the message body: + +- `remote-echo none` keeps that send pending forever. +- `remote-echo fail` makes that send fail immediately with an error. +- `remote-echo delay 5s` delays the remote echo for the parsed Go duration. + +Other messages keep the existing immediate-success behavior. + ## Running To run DummyBridge using Docker, execute the following command: diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 21ada6b..38f3d05 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "regexp" + "strings" "sync" "time" @@ -14,12 +16,15 @@ import ( "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/bridgev2/simplevent" "maunium.net/go/mautrix/bridgev2/status" "maunium.net/go/mautrix/event" ) type DummyClient struct { - wg sync.WaitGroup + wg sync.WaitGroup + ctx context.Context + stop context.CancelFunc UserLogin *bridgev2.UserLogin Connector *DummyConnector @@ -31,6 +36,8 @@ var _ bridgev2.BackfillingNetworkAPI = (*DummyClient)(nil) var _ bridgev2.DeleteChatHandlingNetworkAPI = (*DummyClient)(nil) var _ bridgev2.MessageRequestAcceptingNetworkAPI = (*DummyClient)(nil) +var delayedRemoteEchoPattern = regexp.MustCompile(`(?i)^remote-echo\s+delay\s+([0-9]+(?:ms|s|m|h))$`) + var dummyRoomCaps = &event.RoomFeatures{ ID: "com.beeper.dummy.capabilities", @@ -67,6 +74,8 @@ var dummyRoomCaps = &event.RoomFeatures{ } func (dc *DummyClient) Connect(ctx context.Context) { + dc.ctx, dc.stop = context.WithCancel(ctx) + state := status.BridgeState{ UserID: dc.UserLogin.UserMXID, RemoteName: dc.UserLogin.RemoteName, @@ -81,7 +90,7 @@ func (dc *DummyClient) Connect(ctx context.Context) { log.Info().Int("portals", dc.Connector.Config.Automation.Portals.Count).Msg("Generating portals after login") for range dc.Connector.Config.Automation.Portals.Count { if _, err := generatePortal( - ctx, + dc.ctx, dc.Connector.br, dc.UserLogin, dc.Connector.Config.Automation.Portals.Members, @@ -95,6 +104,9 @@ func (dc *DummyClient) Connect(ctx context.Context) { } func (dc *DummyClient) Disconnect() { + if dc.stop != nil { + dc.stop() + } dc.wg.Wait() } @@ -157,16 +169,35 @@ func (dc *DummyClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.Ma _ = msg.Portal.Save(ctx) } - messageID := randomMessageID() - if msg.Event != nil && msg.Event.Unsigned.TransactionID != "" { - messageID = networkid.MessageID(msg.Event.Unsigned.TransactionID) - } - timestamp := time.Now() if msg.Event != nil && msg.Event.Timestamp != 0 { timestamp = time.UnixMilli(msg.Event.Timestamp) } + behavior := getRemoteEchoBehavior(msg.Content) + if behavior.fail { + return nil, errors.New("dummy remote echo failure") + } + if behavior.pending { + transactionID := getTransactionID(msg) + dbMessage := &database.Message{ + ID: randomMessageID(), + SenderID: networkid.UserID(dc.UserLogin.ID), + Timestamp: timestamp, + } + msg.AddPendingToSave(dbMessage, transactionID, nil) + dc.queueRemoteEcho(msg, transactionID, timestamp, behavior.delay) + return &bridgev2.MatrixMessageResponse{ + DB: dbMessage, + Pending: true, + }, nil + } + + messageID := randomMessageID() + if msg.Event != nil && msg.Event.Unsigned.TransactionID != "" { + messageID = networkid.MessageID(msg.Event.Unsigned.TransactionID) + } + return &bridgev2.MatrixMessageResponse{ DB: &database.Message{ ID: messageID, @@ -177,6 +208,88 @@ func (dc *DummyClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.Ma }, nil } +func getTransactionID(msg *bridgev2.MatrixMessage) networkid.TransactionID { + if msg.Event != nil && msg.Event.Unsigned.TransactionID != "" { + return networkid.TransactionID(msg.Event.Unsigned.TransactionID) + } + return networkid.TransactionID(randomMessageID()) +} + +type remoteEchoBehavior struct { + pending bool + delay time.Duration + fail bool +} + +func getRemoteEchoBehavior(content *event.MessageEventContent) remoteEchoBehavior { + if content == nil { + return remoteEchoBehavior{} + } + body := strings.TrimSpace(content.Body) + if strings.EqualFold(body, "remote-echo none") { + return remoteEchoBehavior{pending: true} + } else if strings.EqualFold(body, "remote-echo fail") { + return remoteEchoBehavior{fail: true} + } + matches := delayedRemoteEchoPattern.FindStringSubmatch(body) + if len(matches) != 2 { + return remoteEchoBehavior{} + } + delay, err := time.ParseDuration(matches[1]) + if err != nil { + return remoteEchoBehavior{} + } + return remoteEchoBehavior{pending: true, delay: delay} +} + +func (dc *DummyClient) queueRemoteEcho(msg *bridgev2.MatrixMessage, transactionID networkid.TransactionID, timestamp time.Time, delay time.Duration) { + if delay <= 0 || msg.Portal == nil { + return + } + + dc.wg.Add(1) + go func() { + defer dc.wg.Done() + + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-dc.ctx.Done(): + return + case <-timer.C: + } + + dc.UserLogin.QueueRemoteEvent(&simplevent.PreConvertedMessage{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventMessage, + PortalKey: msg.Portal.PortalKey, + Sender: bridgev2.EventSender{ + IsFromMe: true, + SenderLogin: dc.UserLogin.ID, + Sender: networkid.UserID(dc.UserLogin.ID), + }, + Timestamp: timestamp, + StreamOrder: time.Now().UnixNano(), + }, + Data: &bridgev2.ConvertedMessage{Parts: []*bridgev2.ConvertedMessagePart{{ + Type: event.EventMessage, + Content: cloneMessageContent(msg.Content), + }}}, + ID: randomMessageID(), + TransactionID: transactionID, + }) + }() +} + +func cloneMessageContent(content *event.MessageEventContent) *event.MessageEventContent { + if content == nil { + return nil + } + cloned := *content + return &cloned +} + func (dc *DummyClient) HandleMatrixDeleteChat(ctx context.Context, msg *bridgev2.MatrixDeleteChat) error { // bridgev2 will delete the portal + Matrix room after this returns nil. // For dummybridge, there's no separate remote-side deletion to do. diff --git a/pkg/connector/client_test.go b/pkg/connector/client_test.go new file mode 100644 index 0000000..ed2a445 --- /dev/null +++ b/pkg/connector/client_test.go @@ -0,0 +1,39 @@ +package connector + +import ( + "testing" + "time" + + "maunium.net/go/mautrix/event" +) + +func TestGetRemoteEchoBehavior(t *testing.T) { + tests := []struct { + name string + body string + pending bool + delay time.Duration + fail bool + }{ + {name: "normal message", body: "hello", pending: false}, + {name: "no echo trigger", body: "remote-echo none", pending: true}, + {name: "fail trigger", body: "remote-echo fail", fail: true}, + {name: "delay trigger", body: "remote-echo delay 5s", pending: true, delay: 5 * time.Second}, + {name: "case insensitive", body: "REMOTE-ECHO DELAY 2m", pending: true, delay: 2 * time.Minute}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := getRemoteEchoBehavior(&event.MessageEventContent{Body: tc.body}) + if got.pending != tc.pending { + t.Fatalf("pending = %v, want %v", got.pending, tc.pending) + } + if got.delay != tc.delay { + t.Fatalf("delay = %s, want %s", got.delay, tc.delay) + } + if got.fail != tc.fail { + t.Fatalf("fail = %v, want %v", got.fail, tc.fail) + } + }) + } +}