Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ jobs:
distribution: 'temurin'
java-version: '11'
cache: 'maven'
- name: Run Spotless Apply
run: mvn spotless:apply
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: mvn --file pom.xml clean install
env:
Expand Down
18 changes: 18 additions & 0 deletions eclipse-java-formatter.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<profiles version="13">
<profile kind="CodeFormatterProfile" name="Custom-4spaces-NoCommentSplit" version="13">

<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>

<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>

<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="false"/>

</profile>
</profiles>
34 changes: 31 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<common.version>2.0.2.0</common.version>
<common.version>2.0.5</common.version>
<jackson.version>2.12.7</jackson.version>
<spotlessMavenPlugin.version>2.43.0</spotlessMavenPlugin.version>
<googleJavaFormat.version>1.17.0</googleJavaFormat.version>
</properties>

<repositories>
Expand Down Expand Up @@ -67,7 +69,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
<version>42.6.1</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -175,7 +177,7 @@
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
<version>3.8.0</version>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down Expand Up @@ -226,6 +228,32 @@
<useUnlimitedThreads>true</useUnlimitedThreads>
</configuration>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotlessMavenPlugin.version}</version>
<configuration>
<java>
<eclipse>
<file>${project.basedir}/eclipse-java-formatter.xml</file>
</eclipse>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>spotless-apply</id>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

</build>
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/fi/hsl/transitlog/hfp/HfpDataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import org.springframework.stereotype.Component;

@Component
public
class HfpDataParser {
public class HfpDataParser {
public Hfp.Data parseFrom(byte[] data) throws InvalidProtocolBufferException {
return Hfp.Data.parseFrom(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import org.apache.pulsar.client.api.Message;
import org.springframework.stereotype.Component;


@Component
public
class TransitDataSchemaWrapper {
public class TransitDataSchemaWrapper {
public boolean hasProtobufSchema(Message message) {
return TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

@Configuration
@PropertySource({"classpath:application.properties"})
@EnableJpaRepositories(
basePackages = "fi.hsl.transitlog.hfp.domain.repositories",
entityManagerFactoryRef = "devWriteEntityManager",
transactionManagerRef = "devWriteTransactionManager"
)
@EnableJpaRepositories(basePackages = "fi.hsl.transitlog.hfp.domain.repositories", entityManagerFactoryRef = "devWriteEntityManager", transactionManagerRef = "devWriteTransactionManager")
@Profile(value = {"default", "dev"})
@EnableTransactionManagement
@EnableAsync
Expand All @@ -32,28 +28,22 @@ public class DatabaseConfiguration {
@Bean
@Primary
public PlatformTransactionManager devWriteTransactionManager() {
JpaTransactionManager transactionManager
= new JpaTransactionManager();
transactionManager.setEntityManagerFactory(
devWriteEntityManager().getObject());
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(devWriteEntityManager().getObject());
return transactionManager;
}

@Bean
@Primary
public LocalContainerEntityManagerFactoryBean devWriteEntityManager() {
LocalContainerEntityManagerFactoryBean em
= new LocalContainerEntityManagerFactoryBean();
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(devWriteDataSource());
em.setPackagesToScan(
"fi.hsl.transitlog.hfp.domain");
em.setPackagesToScan("fi.hsl.transitlog.hfp.domain");

HibernateJpaVendorAdapter vendorAdapter
= new HibernateJpaVendorAdapter();
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
em.setJpaVendorAdapter(vendorAdapter);
HashMap<String, Object> properties = new HashMap<>();
properties.put("hibernate.dialect",
env.getProperty("hibernate.dialect"));
properties.put("hibernate.dialect", env.getProperty("hibernate.dialect"));
properties.put("hibernate.order_inserts", true);
properties.put("hibernate.jdbc.batch_size", 5000);
properties.put("hibernate.order_updates", true);
Expand All @@ -70,10 +60,8 @@ public LocalContainerEntityManagerFactoryBean devWriteEntityManager() {
@Bean
@Primary
public DataSource devWriteDataSource() {
HikariDataSource dataSource
= new HikariDataSource();
dataSource.setDriverClassName(
env.getProperty("jdbc.driverClassName"));
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(env.getProperty("jdbc.driverClassName"));
dataSource.setJdbcUrl(env.getProperty("jdbc.url"));
dataSource.setUsername(env.getProperty("jdbc.user"));
dataSource.setPassword(env.getProperty("jdbc.pass"));
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/fi/hsl/transitlog/hfp/domain/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

@MappedSuperclass
@EqualsAndHashCode
@JsonSubTypes({
@JsonSubTypes.Type(value = VehiclePosition.class, name = "vehicleposition"),
@JsonSubTypes({@JsonSubTypes.Type(value = VehiclePosition.class, name = "vehicleposition"),
@JsonSubTypes.Type(value = LightPriorityEvent.class, name = "lightpriorityevent"),
@JsonSubTypes.Type(value = OtherEvent.class, name = "otherevent"),
@JsonSubTypes.Type(value = StopEvent.class, name = "stopevent"),
@JsonSubTypes.Type(value = UnsignedEvent.class, name = "unsignedevent")
})
@JsonSubTypes.Type(value = UnsignedEvent.class, name = "unsignedevent")})
@Data
@Slf4j
public abstract class Event {
Expand Down Expand Up @@ -91,7 +89,8 @@ public Event(Hfp.Topic topic, Hfp.Payload payload) {
this.route_id = topic.hasRouteId() ? topic.getRouteId() : null;
this.direction_id = topic.hasDirectionId() ? topic.getDirectionId() : null;
this.headsign = topic.hasHeadsign() ? topic.getHeadsign() : null;
Optional<Time> maybeStartTime = wrapToOptional(topic::hasStartTime, topic::getStartTime).flatMap(HfpParser::safeParseTime);
Optional<Time> maybeStartTime = wrapToOptional(topic::hasStartTime, topic::getStartTime)
.flatMap(HfpParser::safeParseTime);
this.journey_start_time = maybeStartTime.orElse(null);
this.next_stop_id = topic.hasNextStop() ? topic.getNextStop() : null;
this.geohash_level = topic.hasGeohashLevel() ? topic.getGeohashLevel() : null;
Expand All @@ -100,7 +99,8 @@ public Event(Hfp.Topic topic, Hfp.Payload payload) {
this.lat = payload.hasLat() ? payload.getLat() : null;
this.longitude = payload.hasLong() ? payload.getLong() : null;
this.desi = payload.hasDesi() ? payload.getDesi() : null;
Optional<Integer> maybeDirection = wrapToOptional(payload::hasDir, payload::getDir).flatMap(HfpParser::safeParseInt);
Optional<Integer> maybeDirection = wrapToOptional(payload::hasDir, payload::getDir)
.flatMap(HfpParser::safeParseInt);
this.dir = maybeDirection.orElse(null);
this.oper = payload.hasOper() ? payload.getOper() : null;
this.veh = payload.hasVeh() ? payload.getVeh() : null;
Expand All @@ -110,13 +110,16 @@ public Event(Hfp.Topic topic, Hfp.Payload payload) {
this.acc = payload.hasAcc() ? payload.getAcc() : null;
this.dl = payload.hasDl() ? payload.getDl() : null;
this.odo = payload.hasOdo() ? payload.getOdo() : null;
Optional<Boolean> maybeDoors = wrapToOptional(payload::hasDrst, payload::getDrst).flatMap(HfpParser::safeParseBoolean);
Optional<Boolean> maybeDoors = wrapToOptional(payload::hasDrst, payload::getDrst)
.flatMap(HfpParser::safeParseBoolean);
this.drst = maybeDoors.orElse(null);
Optional<java.sql.Date> maybeOperatingDay = wrapToOptional(payload::hasOday, payload::getOday).flatMap(HfpParser::safeParseDate);
Optional<java.sql.Date> maybeOperatingDay = wrapToOptional(payload::hasOday, payload::getOday)
.flatMap(HfpParser::safeParseDate);
this.oday = maybeOperatingDay.orElse(null);
this.jrn = payload.hasJrn() ? payload.getJrn() : null;
this.line = payload.hasLine() ? payload.getLine() : null;
Optional<Time> maybeStart = wrapToOptional(payload::hasStart, payload::getStart).flatMap(HfpParser::safeParseTime);
Optional<Time> maybeStart = wrapToOptional(payload::hasStart, payload::getStart)
.flatMap(HfpParser::safeParseTime);
this.start = maybeStart.orElse(null);
this.location_quality_method = payload.hasLoc() ? payload.getLoc().toString() : null;
this.stop = payload.hasStop() ? payload.getStop() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public LightPriorityEvent(Hfp.Topic topic, Hfp.Payload payload) {
this.tlp_line_configid = payload.hasTlpLineConfigid() ? payload.getTlpLineConfigid() : null;
this.tlp_point_configid = payload.hasTlpPointConfigid() ? payload.getTlpPointConfigid() : null;
this.tlp_frequency = payload.hasTlpFrequency() ? payload.getTlpFrequency() : null;
this.tlp_protocol = payload.hasTlpProtocol () ? payload.getTlpProtocol() : null;
this.tlp_protocol = payload.hasTlpProtocol() ? payload.getTlpProtocol() : null;
}

public LightPriorityEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,9 @@ public class DomainMappingWriter {
};

@Autowired
DomainMappingWriter(DWUpload dwUpload,
PulsarApplication pulsarApplication,
EntityManager entityManager,
DumpService dumpTask,
EventFactory eventFactory,
@Value(value = "${hfp.tstMaxPast}") Integer hfpTstMaxPast,
@Value(value = "${hfp.tstMaxFuture}") Integer hfpTstMaxFuture) {
DomainMappingWriter(DWUpload dwUpload, PulsarApplication pulsarApplication, EntityManager entityManager,
DumpService dumpTask, EventFactory eventFactory, @Value(value = "${hfp.tstMaxPast}") Integer hfpTstMaxPast,
@Value(value = "${hfp.tstMaxFuture}") Integer hfpTstMaxFuture) {
this.DWUpload = dwUpload;
eventQueue = new ConcurrentHashMap<>();
this.dumpTask = dumpTask;
Expand All @@ -76,55 +72,58 @@ public class DomainMappingWriter {
void process(MessageId msgId, Hfp.Data data) throws IOException, ParseException {
Event event = null;
switch (data.getTopic().getEventType()) {
case VP:
case VP :
switch (data.getTopic().getJourneyType()) {
case journey:
case journey :
event = eventFactory.createVehiclePositionEvent(data.getTopic(), data.getPayload());
break;
case deadrun:
case deadrun :
event = eventFactory.createUnsignedEvent(data.getTopic(), data.getPayload());
break;
default:
default :
if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.signoff) {
log.warn("Received unknown journey type {}", data.getTopic().getJourneyType());
}
}
break;
case DUE:
case ARR:
case ARS:
case PDE:
case DEP:
case PAS:
case WAIT:
case DUE :
case ARR :
case ARS :
case PDE :
case DEP :
case PAS :
case WAIT :
event = eventFactory.createStopEvent(data.getTopic(), data.getPayload());
break;
case TLR:
case TLA:
case TLR :
case TLA :
event = eventFactory.createLightPriorityEvent(data.getTopic(), data.getPayload());
break;
case DOO:
case DOC:
case DA:
case DOUT:
case BA:
case BOUT:
case VJA:
case VJOUT:
case DOO :
case DOC :
case DA :
case DOUT :
case BA :
case BOUT :
case VJA :
case VJOUT :
event = eventFactory.createOtherEvent(data.getTopic(), data.getPayload());
break;
default:
default :
log.warn("Received HFP message with unknown event type: {}", data.getTopic().getEventType());
}

if (event != null) {
//Do not insert data where the timestamp (tst) is too far away from the time when the message was received (received_at)
if (event.getTst() != null
&& Duration.between(event.getTst().toInstant(), event.getReceived_at().toInstant()).compareTo(hfpTstMaxPast) < 0
&& Duration.between(event.getReceived_at().toInstant(), event.getTst().toInstant()).compareTo(hfpTstMaxFuture) < 0) {
&& Duration.between(event.getTst().toInstant(), event.getReceived_at().toInstant())
.compareTo(hfpTstMaxPast) < 0
&& Duration.between(event.getReceived_at().toInstant(), event.getTst().toInstant())
.compareTo(hfpTstMaxFuture) < 0) {
eventQueue.put(msgId, event);
} else {
log.warn("tst for {} was outside accepted range of -{} to +{} days", event, hfpTstMaxPast, hfpTstMaxFuture);
log.warn("tst for {} was outside accepted range of -{} to +{} days", event, hfpTstMaxPast,
hfpTstMaxFuture);
ack(msgId);
}
DWUpload.uploadBlob(event);
Expand Down Expand Up @@ -163,12 +162,10 @@ void close(boolean closePulsar) {
}

public void ack(MessageId received) {
consumer.acknowledgeAsync(received)
.exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
})
.thenRun(() -> {
});
consumer.acknowledgeAsync(received).exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
}).thenRun(() -> {
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class MessageProcessor implements IMessageHandler {
private final HfpDataParser hfpDataParser;
private DomainMappingWriter domainMappingWriter;

public MessageProcessor(DomainMappingWriter writer, TransitDataSchemaWrapper transitdataSchemaWrapper, HfpDataParser hfpDataParser) {
public MessageProcessor(DomainMappingWriter writer, TransitDataSchemaWrapper transitdataSchemaWrapper,
HfpDataParser hfpDataParser) {
this.domainMappingWriter = writer;
this.transitdataSchemaWrapper = transitdataSchemaWrapper;
this.hfpDataParser = hfpDataParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public class PulsarListenerService {
private MessageProcessor messageProcessor;

@Autowired
public PulsarListenerService(DomainMappingWriter domainMappingWriter, PulsarApplication pulsarApplication, MessageProcessor messageProcessor) {
public PulsarListenerService(DomainMappingWriter domainMappingWriter, PulsarApplication pulsarApplication,
MessageProcessor messageProcessor) {
this.domainMappingWriter = domainMappingWriter;
this.pulsarApplication = pulsarApplication;
this.messageProcessor = messageProcessor;
Expand Down
Loading
Loading