Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,20 @@ public class Config extends HashMap<String, Object> {
@IsInteger
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
/**
* The maximum size in MB for the event logger file before it rotates.
* If not specified, a default of 100 MB is used.
*/
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB = "topology.eventlogger.rotation.size.mb";
/**
* The maximum number of retained files for the event logger.
* If not specified, a default of 5 is used.
*/
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES = "topology.eventlogger.max.retained.files";
/**
* The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within
* this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time.
Expand Down
116 changes: 92 additions & 24 deletions storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,88 +19,110 @@
package org.apache.storm.metric;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.storm.Config;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileBasedEventLogger implements IEventLogger {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

private static final int FLUSH_INTERVAL_MILLIS = 1000;
private static final long BYTES_PER_MB = 1024L * 1024L;
private static final int DEFAULT_ROTATION_SIZE_MB = 100;
private static final int DEFAULT_MAX_RETAINED_FILES = 5;

private Path eventLogPath;
private BufferedWriter eventLogWriter;
private ScheduledExecutorService flushScheduler;
private volatile boolean dirty = false;
private final Object writeLock = new Object();

// File rotation configs
private long maxFileSize;
private int maxRetainedFiles;
private long currentFileSize = 0;

private void initLogWriter(Path logFilePath) {
try {
LOG.info("logFilePath {}", logFilePath);
eventLogPath = logFilePath;

currentFileSize = Files.exists(eventLogPath) ? Files.size(eventLogPath) : 0L;

eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.APPEND);
StandardOpenOption.WRITE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOG.error("Error setting up FileBasedEventLogger.", e);
throw new RuntimeException(e);
}
}


private void setUpFlushTask() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("event-logger-flush-%d")
.setDaemon(true)
.build();
.setNameFormat("event-logger-flush-%d")
.setDaemon(true)
.build();

flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
if (dirty) {
eventLogWriter.flush();
dirty = false;
synchronized (writeLock) {
if (dirty && eventLogWriter != null) {
eventLogWriter.flush();
dirty = false;
}
}
} catch (IOException ex) {
} catch (Exception ex) {
LOG.error("Error flushing " + eventLogPath, ex);
throw new RuntimeException(ex);
}
}
};

flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
}


@Override
public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
String stormId = context.getStormId();
int port = context.getThisWorkerPort();

int rotationSizeMb = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB),
DEFAULT_ROTATION_SIZE_MB);
this.maxFileSize = rotationSizeMb * BYTES_PER_MB;
this.maxRetainedFiles = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES),
DEFAULT_MAX_RETAINED_FILES);

/*
* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
*/
String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

Path path = Paths.get(workersArtifactRoot, "events.log");
File dir = path.toFile().getParentFile();
if (!dir.exists()) {
dir.mkdirs();
try {
Files.createDirectories(path.getParent());
} catch (IOException e) {
LOG.error("Failed to create directories for event logger", e);
throw new RuntimeException(e);
}
initLogWriter(path);
setUpFlushTask();
Expand All @@ -109,30 +131,76 @@ public void prepare(Map<String, Object> conf, Map<String, Object> arguments, Top
@Override
public void log(EventInfo event) {
try {
//TODO: file rotation
eventLogWriter.write(buildLogMessage(event));
eventLogWriter.newLine();
dirty = true;
String logMessage = buildLogMessage(event);
int writeLength = logMessage.length() + System.lineSeparator().length();

synchronized (writeLock) {
if (currentFileSize + writeLength > maxFileSize) {
rotateFiles();
}

if (eventLogWriter != null) {
eventLogWriter.write(logMessage);
eventLogWriter.newLine();
currentFileSize += writeLength;
dirty = true;
}
}
} catch (IOException ex) {
LOG.error("Error logging event {}", event, ex);
throw new RuntimeException(ex);
}
}

private void rotateFiles() throws IOException {
eventLogWriter.close();

// Delete any files that exceed maxRetainedFiles (e.g. if the config was
// lowered)
int i = maxRetainedFiles;
while (Files.exists(Paths.get(eventLogPath.toString() + "." + i))) {
Files.delete(Paths.get(eventLogPath.toString() + "." + i));
i++;
}

// Shift existing rotated files
for (i = maxRetainedFiles - 1; i >= 1; i--) {
Path src = Paths.get(eventLogPath.toString() + "." + i);
Path dst = Paths.get(eventLogPath.toString() + "." + (i + 1));
if (Files.exists(src)) {
Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
}
}

// Rename current events.log
if (Files.exists(eventLogPath)) {
Path dst = Paths.get(eventLogPath.toString() + ".1");
Files.move(eventLogPath, dst, StandardCopyOption.REPLACE_EXISTING);
}

// Re-open writers to empty file
initLogWriter(eventLogPath);
currentFileSize = 0;
}

protected String buildLogMessage(EventInfo event) {
return event.toString();
}

@Override
public void close() {
try {
eventLogWriter.close();
closeFlushScheduler();

try {
synchronized (writeLock) {
if (eventLogWriter != null) {
eventLogWriter.close();
eventLogWriter = null;
}
}
} catch (IOException ex) {
LOG.error("Error closing event log.", ex);
}

closeFlushScheduler();
}

private void closeFlushScheduler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.metric;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FileBasedEventLoggerTest {

private Path tempDir;
private FileBasedEventLogger eventLogger;

@BeforeEach
public void setUp() throws IOException {
tempDir = Files.createTempDirectory("storm-eventlogger-test");
eventLogger = new FileBasedEventLogger();
}

@AfterEach
public void tearDown() throws IOException {
eventLogger.close();
if (tempDir != null) {
Files.walk(tempDir)
.map(Path::toFile)
.forEach(File::delete);
tempDir.toFile().delete();
}
}

private TopologyContext mockTopologyContext() {
TopologyContext context = mock(TopologyContext.class);
when(context.getStormId()).thenReturn("test-topology-1");
when(context.getThisWorkerPort()).thenReturn(6700);
return context;
}

@Test
public void testFileRotation() throws IOException, InterruptedException {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, tempDir.toAbsolutePath().toString());
// We set rotation to be 1MB to trigger it easily, but we'll need to write
// a lot. Alternatively, we can use a very small value, but we need an int >= 1.
// Wait, Config is by MB. If we set it to 1, we still need to write 1MB.
// Let's reflection inject a smaller value for tests? No, Storm uses config.
// We will just use `1` MB and write a large string a few times.
conf.put(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB, 1);
conf.put(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES, 2);

eventLogger.prepare(conf, new HashMap<>(), mockTopologyContext());

// 1 MB = 1048576 bytes
// We create an event message that is about 100KB, write it 11 times to exceed 1MB.
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100_000; i++) {
sb.append("A"); // 1 byte
}
String largeValue = sb.toString();

List<Object> values = new ArrayList<>();
values.add(largeValue);

// This toString() will add some bytes overhead, so each event is ~ 100KB.
IEventLogger.EventInfo eventInfo = new IEventLogger.EventInfo(
System.currentTimeMillis(), "test-component", 1, "msgId", values);

// Write 10 times -> ~1 MB
for (int i = 0; i < 10; i++) {
eventLogger.log(eventInfo);
}

// Wait a bit for flush if any (though rotation is synchronous in write)
Thread.sleep(100);

Path expectedLogDir = tempDir.resolve("test-topology-1").resolve("6700");
Path logFile = expectedLogDir.resolve("events.log");
Path logFile1 = expectedLogDir.resolve("events.log.1");
Path logFile2 = expectedLogDir.resolve("events.log.2");

// The first 10 writes should be in one file, almost 1 MB.
assertTrue(Files.exists(logFile));

// Write 2 more times to push it over 1MB
eventLogger.log(eventInfo);
eventLogger.log(eventInfo);

Thread.sleep(100);

// Now we expect events.log.1 to exist and events.log to be new
assertTrue(Files.exists(logFile1), "Rotated file events.log.1 should exist");

// Write 12 more times to push over 1MB again
for (int i = 0; i < 12; i++) {
eventLogger.log(eventInfo);
}

Thread.sleep(100);

// Now events.log.2 and events.log.1 and events.log should exist
assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should exist");

// Write 12 MORE times to push over 1MB again
for (int i = 0; i < 12; i++) {
eventLogger.log(eventInfo);
}

Thread.sleep(100);

// max config was 2, so events.log.3 should NOT exist, and events.log.2
// should exist.
Path logFile3 = expectedLogDir.resolve("events.log.3");
assertTrue(!Files.exists(logFile3), "Rotated file events.log.3 should not exist");
assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should exist");
}
}