From 9f24f955be3207f2a4287ccc7ad5d9c158cccf5d Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Tue, 12 Feb 2019 10:02:29 -0800 Subject: [PATCH 1/3] Create AbusiveUserMapper to replicate an abusive user's workload --- .../workloadgenerator/AbusiveUserMapper.java | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java new file mode 100644 index 0000000000..ee9a917e8c --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java @@ -0,0 +1,159 @@ +/** + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.URI; +import java.security.PrivilegedAction; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * AbusiveUserMapper consists of two types of users: one type that performs a large listing operation on a directory + * with many files, and another type that performs a small operation (either a single file listing or a single directory + * creation). This uses {@link TimedInputFormat}; see its Javadoc for configuration information. Only requires the + * NameNode, does not make file data changes. + * + *

Configuration options available: + *

+ */ +public class AbusiveUserMapper extends WorkloadMapper { + + public static final String FILE_PARENT_PATH_KEY = "bigfile.file-parent-path"; + public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper"; + + public static final String NUM_LARGE_JOBS_KEY = "bigfile.num-large-jobs"; + public static final int NUM_LARGE_JOBS_DEFAULT = 10; + + public static final String NUM_FILES_KEY = "bigfile.num-files"; + public static final int NUM_FILES_DEFAULT = 1000; + + public static final String MAPPERS_PER_USER_KEY = "bigfile.mappers-per-user"; + public static final int MAPPERS_PER_USER_DEFAULT = 10; + + public static final String ENABLE_WRITE_OPS_KEY = "bigfile.enable-write-ops"; + public static final boolean ENABLE_WRITE_OPS_DEFAULT = false; + + private ConcurrentHashMap fsMap = new ConcurrentHashMap(); + private Path parentFolder; + private int numLargeJobs; + private int mappersPerUser; + private boolean enableWriteOps; + + @Override + public String getDescription() { + return "This mapper creates 1-byte files for the specified duration."; + } + + @Override + public List getConfigDescriptions() { + List baseList = TimedInputFormat.getConfigDescriptions(); + baseList.add(FILE_PARENT_PATH_KEY + " (default: " + FILE_PARENT_PATH_DEFAULT + + "): The root directory for the job to create files in."); + baseList.add(NUM_LARGE_JOBS_KEY + " (default: " + NUM_LARGE_JOBS_DEFAULT + + "): Number of large listing jobs."); + baseList.add(NUM_FILES_KEY + " (default: " + NUM_FILES_DEFAULT + + "): Number of files in the directory for the large listing operations."); + baseList.add(MAPPERS_PER_USER_KEY + " (default: " + MAPPERS_PER_USER_DEFAULT + + "): Number of mappers per user."); + baseList.add(ENABLE_WRITE_OPS_KEY + " (default: " + ENABLE_WRITE_OPS_DEFAULT + + "): For the small operation, creates a single directory instead of listing a single file directory."); + return baseList; + } + + @Override + public boolean verifyConfigurations(Configuration conf) { + return TimedInputFormat.verifyConfigurations(conf); + } + + @Override + public void setup(Context context) throws IOException { + // Load config + Configuration conf = context.getConfiguration(); + int taskID = context.getTaskAttemptID().getTaskID().getId(); + numLargeJobs = conf.getInt(NUM_LARGE_JOBS_KEY, NUM_LARGE_JOBS_DEFAULT); + int numFiles = conf.getInt(NUM_FILES_KEY, NUM_FILES_DEFAULT); + mappersPerUser = conf.getInt(MAPPERS_PER_USER_KEY, MAPPERS_PER_USER_DEFAULT); + enableWriteOps = conf.getBoolean(ENABLE_WRITE_OPS_KEY, ENABLE_WRITE_OPS_DEFAULT); + + // Load filesystem + String namenodeURI = conf.get(WorkloadDriver.NN_URI); + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + String proxyUser = "fakeuser" + taskID / mappersPerUser; + UserGroupInformation ugi = UserGroupInformation.createProxyUser(proxyUser, loginUser); + FileSystem fs = ugi.doAs(new PrivilegedAction() { + @Override + public FileSystem run() { + try { + FileSystem fs = new DistributedFileSystem(); + fs.initialize(URI.create(namenodeURI), conf); + return fs; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + }); + fsMap.put(proxyUser, fs); + + // Load default path + String fileParentPath = conf.get(FILE_PARENT_PATH_KEY, FILE_PARENT_PATH_DEFAULT); + parentFolder = new Path(new Path(fileParentPath), "mapper" + taskID); + + // Make job folder + fs.mkdirs(parentFolder); + + if (taskID < numLargeJobs) { + // Large job: set up files for large listing op + for (int i = 0; i < numFiles; i++) { + fs.mkdirs(new Path(parentFolder, new Path("big" + i))); + } + } else if (enableWriteOps) { + // Small write job: no setup required + } else { + // Small read job: set up file for single-file listing op + fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); + } + } + + @Override + public void map(LongWritable key, NullWritable value, Context mapperContext) + throws IOException { + int taskID = mapperContext.getTaskAttemptID().getTaskID().getId(); + String proxyUser = "fakeuser" + taskID / mappersPerUser; + FileSystem fs = fsMap.get(proxyUser); + + if (taskID < numLargeJobs) { + // Large job: lists + fs.listStatus(parentFolder); + } else if (enableWriteOps) { + // Small mkdir op + fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); + fs.delete(new Path(parentFolder, new Path("small" + taskID)), true); + } else { + // Small listing op + fs.listStatus(new Path(parentFolder, new Path("small" + taskID))); + } + } +} From 3718128acb71d6112a10ac22ae689aaef9f4809b Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Mon, 1 Apr 2019 14:45:06 -0700 Subject: [PATCH 2/3] Fix description --- .../dynamometer/workloadgenerator/AbusiveUserMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java index ee9a917e8c..a3582dd522 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java @@ -64,7 +64,7 @@ public class AbusiveUserMapper extends WorkloadMapper Date: Tue, 2 Apr 2019 10:58:52 -0700 Subject: [PATCH 3/3] Extracted setup and run steps and fixed big listing --- .../workloadgenerator/AbusiveUserMapper.java | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java index a3582dd522..3ef3f93e6e 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/AbusiveUserMapper.java @@ -59,6 +59,7 @@ public class AbusiveUserMapper extends WorkloadMapper fsMap = new ConcurrentHashMap(); private Path parentFolder; private int numLargeJobs; + private int numFiles; private int mappersPerUser; private boolean enableWriteOps; @@ -94,7 +95,7 @@ public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); int taskID = context.getTaskAttemptID().getTaskID().getId(); numLargeJobs = conf.getInt(NUM_LARGE_JOBS_KEY, NUM_LARGE_JOBS_DEFAULT); - int numFiles = conf.getInt(NUM_FILES_KEY, NUM_FILES_DEFAULT); + numFiles = conf.getInt(NUM_FILES_KEY, NUM_FILES_DEFAULT); mappersPerUser = conf.getInt(MAPPERS_PER_USER_KEY, MAPPERS_PER_USER_DEFAULT); enableWriteOps = conf.getBoolean(ENABLE_WRITE_OPS_KEY, ENABLE_WRITE_OPS_DEFAULT); @@ -124,19 +125,22 @@ public FileSystem run() { // Make job folder fs.mkdirs(parentFolder); - if (taskID < numLargeJobs) { - // Large job: set up files for large listing op - for (int i = 0; i < numFiles; i++) { - fs.mkdirs(new Path(parentFolder, new Path("big" + i))); - } - } else if (enableWriteOps) { - // Small write job: no setup required - } else { - // Small read job: set up file for single-file listing op - fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); + setupLargeListingJob(fs); + if (taskID > numLargeJobs && !enableWriteOps) { + setupSmallListingJob(fs, taskID); } } + private void setupLargeListingJob(FileSystem fs) throws IOException { + for (int i = 0; i < numFiles; i++) { + fs.mkdirs(new Path(parentFolder, new Path("big", new Path("sub" + i)))); + } + } + + private void setupSmallListingJob(FileSystem fs, int taskID) throws IOException { + fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); + } + @Override public void map(LongWritable key, NullWritable value, Context mapperContext) throws IOException { @@ -145,15 +149,24 @@ public void map(LongWritable key, NullWritable value, Context mapperContext) FileSystem fs = fsMap.get(proxyUser); if (taskID < numLargeJobs) { - // Large job: lists - fs.listStatus(parentFolder); + runLargeListingJob(fs); } else if (enableWriteOps) { - // Small mkdir op - fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); - fs.delete(new Path(parentFolder, new Path("small" + taskID)), true); + runSmallMkdirJob(fs, taskID); } else { - // Small listing op - fs.listStatus(new Path(parentFolder, new Path("small" + taskID))); + runSmallListingJob(fs, taskID); } } + + private void runLargeListingJob(FileSystem fs) throws IOException { + fs.listStatus(new Path(parentFolder, new Path("big"))); + } + + private void runSmallMkdirJob(FileSystem fs, int taskID) throws IOException { + fs.mkdirs(new Path(parentFolder, new Path("small" + taskID))); + fs.delete(new Path(parentFolder, new Path("small" + taskID)), true); + } + + private void runSmallListingJob(FileSystem fs, int taskID) throws IOException { + fs.listStatus(new Path(parentFolder, new Path("small" + taskID))); + } }