Skip to content

Commit 620fce3

Browse files
committed
simplify repeated properties and organize some property loads.
1 parent 3e5e491 commit 620fce3

File tree

15 files changed

+79
-97
lines changed

15 files changed

+79
-97
lines changed

.devcontainer/docker-compose-alt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ services:
218218
- regserver.server.port=8970
219219
- email.based.monitor.address=CHANGEME
220220
- email.based.monitor.password=CHANGEME
221-
- job.monitor.broker.url=kafka:9092
221+
- kafka.broker.url=kafka:9092
222222
command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--", "/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh", "kafka:9092", "--", "/opt/apache-airavata-email-monitor/bin/email-monitor.sh"]
223223

224224
db:

airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ private void createProducer() throws ApplicationSettingsException {
4545

4646
if (producer == null) {
4747
Properties props = new Properties();
48-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url"));
49-
props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
48+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
49+
props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("data.parser.broker.publisher.id"));
5050
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
5151
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
5252
producer = new KafkaProducer<String, ProcessCompletionMessage>(props);
@@ -56,7 +56,7 @@ private void createProducer() throws ApplicationSettingsException {
5656
public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage)
5757
throws ExecutionException, InterruptedException, ApplicationSettingsException {
5858
final ProducerRecord<String, ProcessCompletionMessage> record = new ProducerRecord<>(
59-
ServerSettings.getSetting("kafka.parser.topic"),
59+
ServerSettings.getSetting("data.parser.topic"),
6060
completionMessage.getExperimentId(),
6161
completionMessage);
6262
RecordMetadata recordMetadata = producer.send(record).get();

airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class ParserWorkflowManager extends WorkflowManager {
5656
private static final Logger logger = LoggerFactory.getLogger(ParserWorkflowManager.class);
5757
private static final CountMonitor parserwfCounter = new CountMonitor("parser_wf_counter");
5858

59-
private String parserStorageResourceId = ServerSettings.getSetting("parser.storage.resource.id");
59+
private String parserStorageResourceId = ServerSettings.getSetting("data.parser.storage.resource.id");
6060

6161
public ParserWorkflowManager() throws ApplicationSettingsException {
6262
super(
@@ -440,14 +440,14 @@ private void runConsumer() throws ApplicationSettingsException {
440440

441441
final Properties props = new Properties();
442442

443-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url"));
444-
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
443+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
444+
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("data.parser.broker.consumer.group"));
445445
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
446446
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName());
447447
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
448448
final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
449449

450-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic")));
450+
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("data.parser.topic")));
451451

452452
logger.info("Starting the kafka consumer..");
453453

airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@ private void init() throws Exception {
9292
private Consumer<String, JobStatusResult> createConsumer() throws ApplicationSettingsException {
9393
final Properties props = new Properties();
9494
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
95-
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.broker.consumer.group"));
95+
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("job.monitor.broker.consumer.group"));
9696
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
9797
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName());
9898
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
9999
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
100100
// Create the consumer using props.
101101
final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<>(props);
102102
// Subscribe to the topic.
103-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic")));
103+
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("job.monitor.broker.topic")));
104104
return consumer;
105105
}
106106

airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable {
6262
private Message[] flushUnseenMessages;
6363
private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new HashMap<>();
6464
private long emailExpirationTimeMinutes;
65+
private String publisherId;
6566

6667
public EmailBasedMonitor() throws Exception {
6768
init();
@@ -76,6 +77,7 @@ private void init() throws Exception {
7677
storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
7778
folderName = ServerSettings.getEmailBasedMonitorFolderName();
7879
emailExpirationTimeMinutes = Long.parseLong(ServerSettings.getSetting("email.expiration.minutes"));
80+
publisherId = ServerSettings.getSetting("job.monitor.email.publisher.id");
7981
if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) {
8082
throw new AiravataException(
8183
"Unsupported store protocol , expected " + IMAPS + " or " + POP3 + " but found " + storeProtocol);
@@ -143,7 +145,7 @@ public void monitor(String jobId) {
143145
log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId);
144146
}
145147

146-
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
148+
private JobStatusResult parse(Message message, String publisherId) throws MessagingException, AiravataException {
147149
Address fromAddress = message.getFrom()[0];
148150
String addressStr = fromAddress.toString();
149151
ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
@@ -156,7 +158,11 @@ private JobStatusResult parse(Message message) throws MessagingException, Airava
156158

157159
try {
158160
JobStatusResult jobStatusResult = emailParser.parseEmail(message, regClient);
159-
jobStatusResult.setPublisherName(ServerSettings.getSetting("job.monitor.email.publisher.id"));
161+
jobStatusResult.setPublisherName(publisherId);
162+
var jobId = jobStatusResult.getJobId();
163+
var jobName = jobStatusResult.getJobName();
164+
var jobStatus = jobStatusResult.getState().getValue();
165+
log.info("Parsed Job Status: From=[{}], Id={}, Name={}, State={}", publisherId, jobId, jobName, jobStatus);
160166
return jobStatusResult;
161167
} catch (Exception e) {
162168
getRegistryClientPool().returnBrokenResource(regClient);
@@ -252,11 +258,8 @@ private void processMessages(Message[] searchMessages) throws MessagingException
252258
List<Message> unreadMessages = new ArrayList<>();
253259
for (Message message : searchMessages) {
254260
try {
255-
log.info("Parsing the job status message");
256-
JobStatusResult jobStatusResult = parse(message);
257-
log.info("Job message parsed. Job Id " + jobStatusResult.getJobId() + ", Job Name "
258-
+ jobStatusResult.getJobName() + ", Job State "
259-
+ jobStatusResult.getState().getValue());
261+
log.info("Received Job Status [{}]: {}", publisherId, message);
262+
JobStatusResult jobStatusResult = parse(message, publisherId);
260263
submitJobStatus(jobStatusResult);
261264
log.info("Submitted the job {} status to queue", jobStatusResult.getJobId());
262265
processedMessages.add(message);

airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,25 @@
3030
public class MessageProducer {
3131

3232
final Producer<String, JobStatusResult> producer;
33+
final String jobMonitorQueue;
3334

3435
public MessageProducer() throws ApplicationSettingsException {
3536
producer = createProducer();
37+
jobMonitorQueue = ServerSettings.getSetting("job.monitor.broker.topic");
3638
}
3739

3840
private Producer<String, JobStatusResult> createProducer() throws ApplicationSettingsException {
3941
Properties props = new Properties();
40-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("job.monitor.broker.url"));
42+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
4143
props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("job.monitor.broker.publisher.id"));
4244
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4345
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JobStatusResultSerializer.class.getName());
4446
return new KafkaProducer<String, JobStatusResult>(props);
4547
}
4648

47-
public void submitMessageToQueue(JobStatusResult jobStatusResult)
48-
throws ExecutionException, InterruptedException, ApplicationSettingsException {
49-
final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(
50-
ServerSettings.getSetting("job.monitor.broker.topic"), jobStatusResult.getJobId(), jobStatusResult);
49+
public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
50+
var jobId = jobStatusResult.getJobId();
51+
final var record = new ProducerRecord<>(jobMonitorQueue, jobId, jobStatusResult);
5152
RecordMetadata recordMetadata = producer.send(record).get();
5253
producer.flush();
5354
}

airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,26 @@ public class RealtimeMonitor extends AbstractMonitor {
4040

4141
private static final Logger logger = LoggerFactory.getLogger(RealtimeMonitor.class);
4242

43-
private RealtimeJobStatusParser parser;
43+
private final RealtimeJobStatusParser parser;
44+
private final String publisherId;
45+
private final String brokerTopic;
4446

4547
public RealtimeMonitor() throws ApplicationSettingsException {
4648
parser = new RealtimeJobStatusParser();
49+
publisherId = ServerSettings.getSetting("job.monitor.realtime.publisher.id");
50+
brokerTopic = ServerSettings.getSetting("realtime.monitor.broker.topic");
4751
}
4852

4953
private Consumer<String, String> createConsumer() throws ApplicationSettingsException {
5054
final Properties props = new Properties();
51-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("realtime.monitor.broker.url"));
55+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
5256
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("realtime.monitor.broker.consumer.group"));
5357
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
5458
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
5559
// Create the consumer using props.
5660
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
5761
// Subscribe to the topic.
58-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("realtime.monitor.broker.topic")));
62+
consumer.subscribe(Collections.singletonList(brokerTopic));
5963
return consumer;
6064
}
6165

@@ -81,8 +85,8 @@ private void runConsumer() throws ApplicationSettingsException {
8185
}
8286

8387
private void process(String value, RegistryService.Client registryClient) throws MonitoringException {
84-
logger.info("Received data " + value);
85-
JobStatusResult statusResult = parser.parse(value, registryClient);
88+
logger.info("Received Job Status [{}]: {}", publisherId, value);
89+
JobStatusResult statusResult = parser.parse(value, publisherId, registryClient);
8690
if (statusResult != null) {
8791
logger.info("Submitting message to job monitor queue");
8892
submitJobStatus(statusResult);

airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Optional;
27-
import org.apache.airavata.common.utils.ServerSettings;
2827
import org.apache.airavata.model.job.JobModel;
2928
import org.apache.airavata.model.status.JobState;
3029
import org.apache.airavata.monitor.JobStatusResult;
@@ -41,9 +40,9 @@ private String getJobIdIdByJobNameWithRetry(String jobName, String taskId, Regis
4140
for (int i = 0; i < 3; i++) {
4241

4342
List<JobModel> jobsOfTask = registryClient.getJobs("taskId", taskId);
44-
if (jobsOfTask == null || jobsOfTask.size() == 0) {
43+
if (jobsOfTask == null || jobsOfTask.isEmpty()) {
4544
// Retry after 2s
46-
logger.warn("No jobs for task " + taskId + ". Retrying in 2 seconds");
45+
logger.warn("No jobs for task {}. Retrying in 2 seconds", taskId);
4746
Thread.sleep(2000);
4847
} else {
4948
Optional<JobModel> filtered = jobsOfTask.stream()
@@ -52,15 +51,15 @@ private String getJobIdIdByJobNameWithRetry(String jobName, String taskId, Regis
5251
if (filtered.isPresent()) {
5352
return filtered.get().getJobId();
5453
} else {
55-
logger.warn("No job for job name " + jobName + " and task " + taskId + ". Retrying in 2 seconds");
54+
logger.warn("No job for job name {} and task {}. Retrying in 2 seconds", jobName, taskId);
5655
Thread.sleep(2000);
5756
}
5857
}
5958
}
6059
return null;
6160
}
6261

63-
public JobStatusResult parse(String rawMessage, RegistryService.Client registryClient) {
62+
public JobStatusResult parse(String rawMessage, String publisherId, RegistryService.Client registryClient) {
6463

6564
try {
6665
Map asMap = new Gson().fromJson(rawMessage, Map.class);
@@ -74,68 +73,49 @@ public JobStatusResult parse(String rawMessage, RegistryService.Client registryC
7473
try {
7574
String jobId = getJobIdIdByJobNameWithRetry(jobName, taskId, registryClient);
7675
if (jobId == null) {
77-
logger.error("No job id for job name " + jobName);
76+
logger.error("No job id for job name {}", jobName);
7877
return null;
7978
}
8079

81-
JobState jobState = null;
82-
83-
switch (status) {
84-
case "RUNNING":
85-
jobState = JobState.ACTIVE;
86-
break;
87-
case "COMPLETED":
88-
jobState = JobState.COMPLETE;
89-
break;
90-
case "FAILED":
91-
jobState = JobState.FAILED;
92-
break;
93-
case "SUBMITTED":
94-
jobState = JobState.SUBMITTED;
95-
break;
96-
case "QUEUED":
97-
jobState = JobState.QUEUED;
98-
break;
99-
case "CANCELED":
100-
jobState = JobState.CANCELED;
101-
break;
102-
case "SUSPENDED":
103-
jobState = JobState.SUSPENDED;
104-
break;
105-
case "UNKNOWN":
106-
jobState = JobState.UNKNOWN;
107-
break;
108-
case "NON_CRITICAL_FAIL":
109-
jobState = JobState.NON_CRITICAL_FAIL;
110-
break;
111-
}
80+
JobState jobState =
81+
switch (status) {
82+
case "RUNNING" -> JobState.ACTIVE;
83+
case "COMPLETED" -> JobState.COMPLETE;
84+
case "FAILED" -> JobState.FAILED;
85+
case "SUBMITTED" -> JobState.SUBMITTED;
86+
case "QUEUED" -> JobState.QUEUED;
87+
case "CANCELED" -> JobState.CANCELED;
88+
case "SUSPENDED" -> JobState.SUSPENDED;
89+
case "UNKNOWN" -> JobState.UNKNOWN;
90+
case "NON_CRITICAL_FAIL" -> JobState.NON_CRITICAL_FAIL;
91+
default -> null;
92+
};
11293

11394
if (jobState == null) {
114-
logger.error("Invalid job state " + status);
95+
logger.error("Invalid job state {}", status);
11596
return null;
11697
}
11798

11899
JobStatusResult jobStatusResult = new JobStatusResult();
119100
jobStatusResult.setJobId(jobId);
120101
jobStatusResult.setJobName(jobName);
121102
jobStatusResult.setState(jobState);
122-
jobStatusResult.setPublisherName(
123-
ServerSettings.getSetting("job.monitor.realtime.publisher.id"));
103+
jobStatusResult.setPublisherName(publisherId);
124104
return jobStatusResult;
125105
} catch (Exception e) {
126-
logger.error("Failed to fetch job id for job name " + jobName);
106+
logger.error("Failed to fetch job id for job name {}", jobName);
127107
return null;
128108
}
129109
} else {
130-
logger.error("Job name, taskId or status is null in message " + rawMessage);
110+
logger.error("Job name, taskId or status is null in message {}", rawMessage);
131111
return null;
132112
}
133113
} else {
134-
logger.error("Data structure of message " + rawMessage + " is not correct");
114+
logger.error("Data structure of message {} is not correct", rawMessage);
135115
return null;
136116
}
137117
} catch (JsonSyntaxException e) {
138-
logger.error("Failed to parse raw data " + rawMessage + " to type Map", e);
118+
logger.error("Failed to parse raw data {} to type Map", rawMessage, e);
139119
return null;
140120
}
141121
}

0 commit comments

Comments
 (0)