Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ public void deleteConnectorConfig(String connectorName) {
configManagementService.deleteConnectorConfig(connectorName);
}

/**
* Restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
public void restartConnector(String connectorName) {
configManagementService.restartConnector(connectorName);
}

/**
* Restart the task with the specified task name in the cluster.
*
* @param connectorName
* @param task
*/
public void restartTask(String connectorName, Integer task) {
configManagementService.restartTask(connectorName, task);
}

/**
* Pause the connector. This call will asynchronously suspend processing by the connector and all
* of its tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public RestHandler(AbstractConnectController connectController) {
app.get("/connectors/{connectorName}/stop", this::handleStopConnector);
app.get("/connectors/stop/all", this::handleStopAllConnector);

// restart connector
app.get("/connectors/{connectorName}/restart", this::handleRestartConnector);
app.get("/connectors/{connectorName}/tasks/{task}/restart", this::handleRestartTask);

// pause & resume
app.get("/connectors/{connectorName}/pause", this::handlePauseConnector);
app.get("/connectors/{connectorName}/resume", this::handleResumeConnector);
Expand Down Expand Up @@ -236,6 +240,28 @@ private void handleStopAllConnector(Context context) {
}
}

private void handleRestartConnector(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
connectController.restartConnector(connectorName);
context.json(new HttpResponse<>(context.status(), "Connector [" + connectorName + "] restarted successfully"));
} catch (Exception e) {
log.error("Restart connector failed .", e);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, e.getMessage()));
}
}

public void handleRestartTask(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
Integer task = Integer.valueOf(context.pathParam(TASK_NAME));
connectController.restartTask(connectorName, task);
context.json(new HttpResponse<>(context.status(), "Task [" + connectorName + "/ " + task + "] restarted successfully"));
} catch (Exception ex) {
log.error("Restart task failed .", ex);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, ex.getMessage()));
}
}

private void handlePauseConnector(Context context) {
String connectorName = context.pathParam(CONNECTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ default void configure(WorkerConfig config) {
*/
void deleteConnectorConfig(String connectorName);

/**
* restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
void restartConnector(String connectorName);

/**
* restart the task with the specified task in the cluster.
*
* @param connectorName
* @param task
*/
void restartTask(String connectorName, Integer task);

/**
* pause connector
*
Expand Down
Loading