Skip to content

Commit 1d482fe

Browse files
committed
refine monitor codes
1 parent 8303f03 commit 1d482fe

File tree

3 files changed

+110
-164
lines changed

3 files changed

+110
-164
lines changed

airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java

Lines changed: 73 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,8 @@
1919
*/
2020
package org.apache.airavata.helix.impl.workflow;
2121

22-
import java.util.ArrayList;
23-
import java.util.Collections;
24-
import java.util.Comparator;
25-
import java.util.List;
26-
import java.util.Optional;
27-
import java.util.Properties;
28-
import java.util.UUID;
29-
import java.util.concurrent.CompletionService;
30-
import java.util.concurrent.ExecutorCompletionService;
31-
import java.util.concurrent.ExecutorService;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.Future;
34-
import java.util.stream.Collectors;
22+
import java.util.*;
23+
import java.util.concurrent.*;
3524
import org.apache.airavata.common.exception.ApplicationSettingsException;
3625
import org.apache.airavata.common.utils.AiravataUtils;
3726
import org.apache.airavata.common.utils.ServerSettings;
@@ -61,12 +50,7 @@
6150
import org.apache.airavata.patform.monitoring.CountMonitor;
6251
import org.apache.airavata.patform.monitoring.MonitoringServer;
6352
import org.apache.airavata.registry.api.RegistryService;
64-
import org.apache.kafka.clients.consumer.Consumer;
65-
import org.apache.kafka.clients.consumer.ConsumerConfig;
66-
import org.apache.kafka.clients.consumer.ConsumerRecord;
67-
import org.apache.kafka.clients.consumer.ConsumerRecords;
68-
import org.apache.kafka.clients.consumer.KafkaConsumer;
69-
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
53+
import org.apache.kafka.clients.consumer.*;
7054
import org.apache.kafka.common.TopicPartition;
7155
import org.apache.kafka.common.serialization.StringDeserializer;
7256
import org.slf4j.Logger;
@@ -77,14 +61,29 @@ public class PostWorkflowManager extends WorkflowManager {
7761
private static final Logger logger = LoggerFactory.getLogger(PostWorkflowManager.class);
7862
private static final CountMonitor postwfCounter = new CountMonitor("post_wf_counter");
7963

80-
private ExecutorService processingPool = Executors.newFixedThreadPool(10);
64+
private final ExecutorService processingPool = Executors.newFixedThreadPool(10);
8165

8266
public PostWorkflowManager() throws ApplicationSettingsException {
8367
super(
8468
ServerSettings.getSetting("post.workflow.manager.name"),
8569
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
8670
}
8771

72+
public static void main(String[] args) throws Exception {
73+
74+
if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
75+
MonitoringServer monitoringServer = new MonitoringServer(
76+
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
77+
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
78+
monitoringServer.start();
79+
80+
Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop));
81+
}
82+
83+
PostWorkflowManager postManager = new PostWorkflowManager();
84+
postManager.startServer();
85+
}
86+
8887
private void init() throws Exception {
8988
super.initComponents();
9089
}
@@ -113,115 +112,89 @@ private boolean process(JobStatusResult jobStatusResult) {
113112

114113
RegistryService.Client registryClient = getRegistryClientPool().getResource();
115114

116-
try {
117-
logger.info("Processing job result of job id " + jobStatusResult.getJobId() + " sent by "
118-
+ jobStatusResult.getPublisherName());
119-
120-
List<JobModel> jobs = registryClient.getJobs("jobId", jobStatusResult.getJobId());
115+
var jobId = jobStatusResult.getJobId();
116+
var jobName = jobStatusResult.getJobName();
117+
var jobState = jobStatusResult.getState();
118+
var publisherId = jobStatusResult.getPublisherName();
119+
logger.info("processing JobStatusUpdate<{}> from {}: {}", jobId, publisherId, jobStatusResult);
121120

122-
if (jobs.size() > 0) {
123-
logger.info("Filtering total " + jobs.size() + " with target job name " + jobStatusResult.getJobName());
121+
try {
122+
List<JobModel> jobs = registryClient.getJobs("jobId", jobId);
123+
logger.info("Found {} jobs in registry with id={}", jobs.size(), jobId);
124+
if (!jobs.isEmpty()) {
124125
jobs = jobs.stream()
125-
.filter(jm -> jm.getJobName().equals(jobStatusResult.getJobName()))
126-
.collect(Collectors.toList());
126+
.filter(jm -> jm.getJobName().equals(jobName))
127+
.toList();
128+
logger.info("Found {} jobs in registry with id={} and name={}", jobs.size(), jobId, jobName);
127129
}
128-
129130
if (jobs.size() != 1) {
130-
logger.error("Couldn't find exactly one job with id " + jobStatusResult.getJobId() + " and name "
131-
+ jobStatusResult.getJobName() + " in the registry. Count " + jobs.size());
131+
logger.error("Found {} job(s) in registry with id={} and name={}", jobs.size(), jobId, jobName);
132132
getRegistryClientPool().returnResource(registryClient);
133133
return false;
134134
}
135-
136135
JobModel jobModel = jobs.get(0);
137136
ProcessModel processModel = registryClient.getProcess(jobModel.getProcessId());
138137
ExperimentModel experimentModel = registryClient.getExperiment(processModel.getExperimentId());
139138
ProcessStatus processStatus = registryClient.getProcessStatus(processModel.getProcessId());
140139

140+
var processState = processStatus.getState();
141141
getRegistryClientPool().returnResource(registryClient);
142142

143-
if (processModel != null && experimentModel != null) {
144-
145-
jobModel.getJobStatuses()
146-
.sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange)
147-
.reversed());
143+
if (experimentModel != null) {
144+
jobModel.getJobStatuses().sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange).reversed());
148145
JobState currentJobStatus = jobModel.getJobStatuses().get(0).getJobState();
146+
logger.info("Last known state of job {} is {}", jobId, jobName);
149147

150-
logger.info("Last known state of job " + jobModel.getJobId() + " is " + currentJobStatus.name());
151-
152-
if (!JobStateValidator.isValid(currentJobStatus, jobStatusResult.getState())) {
153-
logger.warn("Job state of " + jobStatusResult.getJobId() + " is not valid. Previous state "
154-
+ currentJobStatus + ", new state " + jobStatusResult.getState());
148+
if (!JobStateValidator.isValid(currentJobStatus, jobState)) {
149+
logger.warn("JobStatusUpdate<{}> invalid. prev={} -> new={}", jobId, currentJobStatus, jobState);
155150
return true;
156151
}
157152

158-
String gateway = experimentModel.getGatewayId();
153+
String task = jobModel.getTaskId();
159154
String processId = processModel.getProcessId();
155+
String gateway = experimentModel.getGatewayId();
160156
String experimentId = experimentModel.getExperimentId();
161-
String task = jobModel.getTaskId();
162-
163-
logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
164-
+ processId + ", exp id " + experimentId + ", gateway " + gateway + " and status "
165-
+ jobStatusResult.getState().name());
166157

167-
saveAndPublishJobStatus(
168-
jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
158+
logger.info("saving JobStatusUpdate<{}>: pid={}, eid={}, gw={}, state={}",
159+
jobId, processId, experimentId, gateway, jobState);
160+
saveAndPublishJobStatus(jobId, task, processId, experimentId, gateway, jobState);
169161

170162
// TODO get cluster lock before that
171-
if (ProcessState.CANCELLING.equals(processStatus.getState())
172-
|| ProcessState.CANCELED.equals(processStatus.getState())) {
173-
logger.info("Cancelled post workflow for process " + processId + " in experiment " + experimentId);
174-
// This will mark an cancelling Experiment into a cancelled status for a set of valid job statuses
163+
if (ProcessState.CANCELLING.equals(processState) || ProcessState.CANCELED.equals(processState)) {
164+
logger.info("Cancelled post workflow for process {} in experiment {}", processId, experimentId);
165+
// This will mark a canceling Experiment with CANCELED status for a set of valid job statuses
175166
// This is a safety check. Cancellation is originally handled in Job Cancellation Workflow
176-
switch (jobStatusResult.getState()) {
167+
switch (jobState) {
177168
case FAILED:
178169
case SUSPENDED:
179170
case CANCELED:
180171
case COMPLETE:
181-
logger.info("Job " + jobStatusResult.getJobId() + " status is " + jobStatusResult.getState()
182-
+ " so marking experiment " + experimentId + " as cancelled");
172+
logger.info("canceled job={}: eid={}, state={}", jobId, experimentId, jobState);
183173
publishProcessStatus(processId, experimentId, gateway, ProcessState.CANCELED);
184174
break;
185175
default:
186-
logger.warn("Job " + jobStatusResult.getJobId() + " status " + jobStatusResult.getState()
187-
+ " is invalid to mark experiment " + experimentId + " as cancelled");
176+
logger.warn("skipping job={}: eid={}, state={}", jobId, experimentId, jobState);
188177
}
189178
} else {
190-
191-
if (jobStatusResult.getState() == JobState.COMPLETE
192-
|| jobStatusResult.getState() == JobState.FAILED) {
193-
// if the job is FAILED, still run output staging tasks to debug the reason for failure. And
194-
// update
195-
// the experiment status as COMPLETED as this job failure is not related to Airavata scope.
196-
logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId()
197-
+ " with process id " + processId + ", gateway " + gateway + " and status "
198-
+ jobStatusResult.getState().name());
199-
200-
logger.info("Job " + jobStatusResult.getJobId() + " was completed");
201-
179+
logger.info("Job {} is in state={}", jobId, jobState);
180+
if (jobState == JobState.COMPLETE || jobState == JobState.FAILED) {
181+
// If Job has FAILED, still run output staging tasks to debug the reason for failure. And
182+
// update the experiment status as COMPLETED as job failures are unrelated to Airavata scope.
183+
logger.info("running PostWorkflow for process {} of experiment {}", processId, experimentId);
202184
executePostWorkflow(processId, gateway, false);
203185

204186
} else if (jobStatusResult.getState() == JobState.CANCELED) {
205-
logger.info("Job " + jobStatusResult.getJobId()
206-
+ " was externally cancelled but process is not marked as cancelled yet");
187+
logger.info("Setting process {} of experiment {} to state=CANCELED", processId, experimentId);
207188
publishProcessStatus(processId, experimentId, gateway, ProcessState.CANCELED);
208-
logger.info("Marked process " + processId + " of experiment " + experimentId
209-
+ " as cancelled as job is already being cancelled");
210-
211-
} else if (jobStatusResult.getState() == JobState.SUBMITTED) {
212-
logger.info("Job " + jobStatusResult.getJobId() + " was submitted");
213189
}
214190
}
215191
return true;
216192
} else {
217-
logger.warn("Could not find a monitoring register for job id " + jobStatusResult.getJobId());
193+
logger.warn("Could not find a monitoring register for job id {}", jobId);
218194
return false;
219195
}
220196
} catch (Exception e) {
221-
logger.error(
222-
"Failed to process job : " + jobStatusResult.getJobId() + ", with status : "
223-
+ jobStatusResult.getState().name(),
224-
e);
197+
logger.error("Failed to process job: {}, with status : {}", jobStatusResult.getJobId(), jobStatusResult.getState().name(), e);
225198
getRegistryClientPool().returnBrokenResource(registryClient);
226199
return false;
227200
}
@@ -237,21 +210,21 @@ private void executePostWorkflow(String processId, String gateway, boolean force
237210
HelixTaskFactory taskFactory;
238211
try {
239212
processModel = registryClient.getProcess(processId);
240-
experimentModel = registryClient.getExperiment(processModel.getExperimentId());
241-
getRegistryClientPool().returnResource(registryClient);
242-
ResourceType resourceType = registryClient
243-
.getGroupComputeResourcePreference(
244-
processModel.getComputeResourceId(), processModel.getGroupResourceProfileId())
245-
.getResourceType();
213+
var experimentId = processModel.getExperimentId();
214+
var crId = processModel.getComputeResourceId();
215+
var grpId = processModel.getGroupResourceProfileId();
216+
217+
experimentModel = registryClient.getExperiment(experimentId);
218+
ResourceType resourceType = registryClient.getGroupComputeResourcePreference(crId, grpId).getResourceType();
219+
246220
taskFactory = TaskFactory.getFactory(resourceType);
247221
logger.info("Initialized task factory for resource type {} for process {}", resourceType, processId);
248222

249223
} catch (Exception e) {
250-
logger.error(
251-
"Failed to fetch experiment or process from registry associated with process id " + processId, e);
224+
logger.error("Failed to fetch experiment/process from registry for pid={}", processId, e);
225+
throw new Exception("Failed to fetch experiment/process from registry for pid=" + processId, e);
226+
} finally {
252227
getRegistryClientPool().returnResource(registryClient);
253-
throw new Exception(
254-
"Failed to fetch experiment or process from registry associated with process id " + processId, e);
255228
}
256229

257230
String taskDag = processModel.getTaskDag();
@@ -264,8 +237,7 @@ private void executePostWorkflow(String processId, String gateway, boolean force
264237
jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
265238
jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
266239
jobVerificationTask.setProcessId(processModel.getProcessId());
267-
jobVerificationTask.setTaskId(
268-
"Job-Verification-Task-" + UUID.randomUUID().toString() + "-");
240+
jobVerificationTask.setTaskId("Job-Verification-Task-" + UUID.randomUUID() + "-");
269241
jobVerificationTask.setForceRunTask(forceRun);
270242
jobVerificationTask.setSkipAllStatusPublish(true);
271243

@@ -320,7 +292,7 @@ private void executePostWorkflow(String processId, String gateway, boolean force
320292
completingTask.setGatewayId(experimentModel.getGatewayId());
321293
completingTask.setExperimentId(experimentModel.getExperimentId());
322294
completingTask.setProcessId(processModel.getProcessId());
323-
completingTask.setTaskId("Completing-Task-" + UUID.randomUUID().toString() + "-");
295+
completingTask.setTaskId("Completing-Task-" + UUID.randomUUID() + "-");
324296
completingTask.setForceRunTask(forceRun);
325297
completingTask.setSkipAllStatusPublish(true);
326298
if (allTasks.size() > 0) {
@@ -341,8 +313,7 @@ private void executePostWorkflow(String processId, String gateway, boolean force
341313
allTasks.add(parsingTriggeringTask);
342314

343315
String workflowName = getWorkflowOperator()
344-
.launchWorkflow(
345-
processId + "-POST-" + UUID.randomUUID().toString(), new ArrayList<>(allTasks), true, false);
316+
.launchWorkflow(processId + "-POST-" + UUID.randomUUID(), new ArrayList<>(allTasks), true, false);
346317

347318
registerWorkflowForProcess(processId, workflowName, "POST");
348319
}
@@ -353,7 +324,6 @@ public void startServer() throws Exception {
353324
final Consumer<String, JobStatusResult> consumer = createConsumer();
354325
new Thread(() -> {
355326
while (true) {
356-
357327
final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Long.MAX_VALUE);
358328
CompletionService<Boolean> executorCompletionService =
359329
new ExecutorCompletionService<>(processingPool);
@@ -370,13 +340,11 @@ public void startServer() throws Exception {
370340
record.value().getJobId());
371341

372342
// This avoids kafka read thread to wait until processing is completed before committing
373-
// There is a risk of missing 20 messages in case of a restart but this improves the
374-
// robustness
375-
// of the kafka read thread by avoiding wait timeouts
343+
// There is a risk of missing 20 messages in case of a restart, but this improves the
344+
// robustness of the kafka read thread by avoiding wait timeouts
376345
processingFutures.add(executorCompletionService.submit(() -> {
377346
boolean success = process(record.value());
378-
logger.info("Status of processing "
379-
+ record.value().getJobId() + " : " + success);
347+
logger.info("Status of processing {} : {}", record.value().getJobId(), success);
380348
return success;
381349
}));
382350

@@ -440,19 +408,4 @@ private void saveAndPublishJobStatus(
440408
}
441409

442410
public void stopServer() {}
443-
444-
public static void main(String[] args) throws Exception {
445-
446-
if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
447-
MonitoringServer monitoringServer = new MonitoringServer(
448-
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
449-
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
450-
monitoringServer.start();
451-
452-
Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop));
453-
}
454-
455-
PostWorkflowManager postManager = new PostWorkflowManager();
456-
postManager.startServer();
457-
}
458411
}

0 commit comments

Comments
 (0)