Skip to content

Commit 70adbf5

Browse files
committed
feat: build at-least once semantics into event hook delivery by (1) persisting events into a partitioned outbox table and (2) sourcing the events from this table thanks the new OutboxDrain class. To keep the implementation simple, the first failed event of an event hook is retried every so often: OutboxDrain will not progress onto the next event until the failed event is delivered (TODO: figure out what to do if the event backlog grows too big).
replace thread-per-request model with non-blocking event loop deprecate all event hook targets except web given the absence of test coverage
1 parent bb82f5c commit 70adbf5

File tree

30 files changed

+800
-846
lines changed

30 files changed

+800
-846
lines changed

dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/ReloadEventHookListeners.java renamed to dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/OnEventHookChange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@
3535
*
3636
* @author Morten Olav Hansen
3737
*/
38-
public class ReloadEventHookListeners {}
38+
public class OnEventHookChange {}

dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/targets/ConsoleTarget.java renamed to dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/OutboxLog.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2004-2022, University of Oslo
2+
* Copyright (c) 2004-2026, University of Oslo
33
* All rights reserved.
44
*
55
* Redistribution and use in source and binary forms, with or without
@@ -27,30 +27,27 @@
2727
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2828
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929
*/
30-
package org.hisp.dhis.eventhook.targets;
30+
package org.hisp.dhis.eventhook;
3131

32-
import com.fasterxml.jackson.annotation.JsonProperty;
33-
import lombok.EqualsAndHashCode;
32+
import jakarta.persistence.Column;
33+
import jakarta.persistence.Entity;
34+
import jakarta.persistence.Id;
35+
import jakarta.persistence.Table;
3436
import lombok.Getter;
37+
import lombok.NoArgsConstructor;
3538
import lombok.Setter;
36-
import lombok.experimental.Accessors;
37-
import org.hisp.dhis.common.CodeGenerator;
38-
import org.hisp.dhis.eventhook.Target;
3939

40-
/**
41-
* @author Morten Olav Hansen
42-
*/
40+
@Entity
41+
@Table(name = "outboxlog")
42+
@NoArgsConstructor
4343
@Getter
4444
@Setter
45-
@EqualsAndHashCode(callSuper = true)
46-
@Accessors(chain = true)
47-
public class ConsoleTarget extends Target {
48-
public static final String TYPE = "console";
45+
public class OutboxLog {
4946

50-
@JsonProperty(required = true)
51-
private String clientId = "dhis2-console-" + CodeGenerator.generateUid();
47+
@Id
48+
@Column(name = "outboxtablename")
49+
private String outboxTableName;
5250

53-
public ConsoleTarget() {
54-
super(TYPE);
55-
}
51+
@Column(name = "lastprocessedid")
52+
private long lastProcessedId;
5653
}

dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/Target.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@
3939
import lombok.Setter;
4040
import lombok.ToString;
4141
import lombok.experimental.Accessors;
42-
import org.hisp.dhis.eventhook.targets.ConsoleTarget;
43-
import org.hisp.dhis.eventhook.targets.JmsTarget;
44-
import org.hisp.dhis.eventhook.targets.KafkaTarget;
4542
import org.hisp.dhis.eventhook.targets.WebhookTarget;
4643

4744
/**
@@ -58,9 +55,6 @@
5855
property = "type")
5956
@JsonSubTypes({
6057
@JsonSubTypes.Type(value = WebhookTarget.class, name = WebhookTarget.TYPE),
61-
@JsonSubTypes.Type(value = ConsoleTarget.class, name = ConsoleTarget.TYPE),
62-
@JsonSubTypes.Type(value = JmsTarget.class, name = JmsTarget.TYPE),
63-
@JsonSubTypes.Type(value = KafkaTarget.class, name = KafkaTarget.TYPE)
6458
})
6559
public abstract class Target implements Serializable {
6660
@JsonCreator

dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/targets/KafkaTarget.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

dhis-2/dhis-api/src/main/java/org/hisp/dhis/eventhook/targets/WebhookTarget.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import lombok.EqualsAndHashCode;
3636
import lombok.Getter;
3737
import lombok.Setter;
38-
import lombok.experimental.Accessors;
3938
import org.hisp.dhis.common.CodeGenerator;
4039
import org.hisp.dhis.common.auth.AuthScheme;
4140
import org.hisp.dhis.eventhook.Target;
@@ -46,7 +45,6 @@
4645
@Getter
4746
@Setter
4847
@EqualsAndHashCode(callSuper = true)
49-
@Accessors(chain = true)
5048
public class WebhookTarget extends Target {
5149
public static final String TYPE = "webhook";
5250

dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ public enum JobType {
126126
REMOVE_USED_OR_EXPIRED_RESERVED_VALUES(
127127
daily2am("uwWCT2BMmlq", "Remove expired or used reserved values")),
128128
SYSTEM_VERSION_UPDATE_CHECK(
129-
dailyRandomBetween3and5("vt21671bgno", "System version update check notification"));
129+
dailyRandomBetween3and5("vt21671bgno", "System version update check notification")),
130+
OUTBOX_ROTATION(every(1800, "zHKNyGk7zHl", "Outbox rotation"));
130131

131132
/**
132133
* Any {@link JobType} which has a default will ensure that the {@link JobConfiguration} for that

dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/config/ServiceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.beans.factory.annotation.Qualifier;
4040
import org.springframework.context.annotation.Bean;
4141
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.context.annotation.Primary;
4243
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4344

4445
/**
@@ -48,6 +49,7 @@
4849
public class ServiceConfig {
4950

5051
@Bean("taskScheduler")
52+
@Primary
5153
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
5254
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
5355
threadPoolTaskScheduler.setPoolSize(25);

dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/metadata/objectbundle/hooks/EventHookObjectBundleHook.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.hisp.dhis.dxf2.metadata.objectbundle.ObjectBundle;
3535
import org.hisp.dhis.eventhook.EventHook;
3636
import org.hisp.dhis.eventhook.EventHookSecretManager;
37-
import org.hisp.dhis.eventhook.ReloadEventHookListeners;
37+
import org.hisp.dhis.eventhook.OnEventHookChange;
3838
import org.springframework.context.ApplicationEventPublisher;
3939
import org.springframework.stereotype.Component;
4040

@@ -61,6 +61,6 @@ public void preUpdate(EventHook eventHook, EventHook persistedObject, ObjectBund
6161
@Override
6262
public <E extends EventHook> void postTypeImport(
6363
Class<E> klass, List<E> objects, ObjectBundle bundle) {
64-
publisher.publishEvent(new ReloadEventHookListeners());
64+
publisher.publishEvent(new OnEventHookChange());
6565
}
6666
}

dhis-2/dhis-services/dhis-service-event-hook/pom.xml

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@
2727
<groupId>org.hisp.dhis</groupId>
2828
<artifactId>dhis-service-field-filtering</artifactId>
2929
</dependency>
30-
<dependency>
31-
<groupId>org.hisp.dhis</groupId>
32-
<artifactId>dhis-support-system</artifactId>
33-
</dependency>
3430
<!-- Application -->
3531
<dependency>
3632
<groupId>org.springframework</groupId>
@@ -50,11 +46,30 @@
5046
</dependency>
5147
<dependency>
5248
<groupId>org.springframework</groupId>
53-
<artifactId>spring-jms</artifactId>
49+
<artifactId>spring-web</artifactId>
5450
</dependency>
5551
<dependency>
5652
<groupId>org.springframework</groupId>
57-
<artifactId>spring-web</artifactId>
53+
<artifactId>spring-jdbc</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.springframework</groupId>
57+
<artifactId>spring-webflux</artifactId>
58+
<version>${spring.version}</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.reactivestreams</groupId>
62+
<artifactId>reactive-streams</artifactId>
63+
<version>1.0.4</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>io.projectreactor</groupId>
67+
<artifactId>reactor-core</artifactId>
68+
<version>3.7.7</version>
69+
</dependency>
70+
<dependency>
71+
<groupId>jakarta.persistence</groupId>
72+
<artifactId>jakarta.persistence-api</artifactId>
5873
</dependency>
5974
<dependency>
6075
<groupId>com.fasterxml.jackson.core</groupId>
@@ -81,26 +96,10 @@
8196
<groupId>com.google.code.findbugs</groupId>
8297
<artifactId>jsr305</artifactId>
8398
</dependency>
84-
<dependency>
85-
<groupId>org.apache.activemq</groupId>
86-
<artifactId>artemis-jakarta-client</artifactId>
87-
</dependency>
88-
<dependency>
89-
<groupId>org.apache.kafka</groupId>
90-
<artifactId>kafka-clients</artifactId>
91-
</dependency>
9299
<dependency>
93100
<groupId>org.jasypt</groupId>
94101
<artifactId>jasypt</artifactId>
95102
</dependency>
96-
<dependency>
97-
<groupId>org.apache.httpcomponents.client5</groupId>
98-
<artifactId>httpclient5</artifactId>
99-
</dependency>
100-
<dependency>
101-
<groupId>org.apache.httpcomponents.core5</groupId>
102-
<artifactId>httpcore5</artifactId>
103-
</dependency>
104103

105104
<!-- Test -->
106105
<dependency>

dhis-2/dhis-services/dhis-service-event-hook/src/main/java/org/hisp/dhis/eventhook/EventHookContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@
4242
@Data
4343
@Builder
4444
public class EventHookContext {
45-
@Builder.Default Map<String, List<Handler>> targets = new HashMap<>();
45+
@Builder.Default Map<String, List<ReactiveHandler>> targets = new HashMap<>();
4646

4747
@Builder.Default List<EventHook> eventHooks = new ArrayList<>();
4848

4949
public boolean hasTarget(String uid) {
5050
return targets.containsKey(uid) || targets.get(uid).isEmpty();
5151
}
5252

53-
public List<Handler> getTarget(String uid) {
53+
public List<ReactiveHandler> getTarget(String uid) {
5454
return targets.get(uid);
5555
}
5656

5757
public void closeTargets() {
58-
targets.values().forEach(handlers -> handlers.forEach(Handler::close));
58+
targets.values().forEach(handlers -> handlers.forEach(ReactiveHandler::close));
5959
}
6060
}

0 commit comments

Comments
 (0)