Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2110,4 +2110,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;

public static final String DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_KEY =
"dfs.namenode.datanode.list.cache.expiration.ms";
public static final long DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_DEFAULT = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,9 @@ public void refreshBlockPlacementPolicy(Configuration conf) {
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
assert namesystem.hasReadLock(RwLockMode.BM);
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(live, dead, false);
final List<DatanodeDescriptor> live = new ArrayList<>();
final List<DatanodeDescriptor> dead = new ArrayList<>();
datanodeManager.fetchDatanodes(live, dead, false, false);
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());

Expand Down Expand Up @@ -1722,6 +1722,9 @@ public void verifyReplication(String src,
public boolean isSufficientlyReplicated(BlockInfo b) {
// Compare against the lesser of the minReplication and number of live DNs.
final int liveReplicas = countNodes(b).liveReplicas();
if (liveReplicas == 0) {
return false;
}
if (hasMinStorage(b, liveReplicas)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;

Expand Down Expand Up @@ -70,6 +72,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -230,6 +233,9 @@ public class DatanodeManager {

private final boolean randomNodeOrderEnabled;

/** Cached map of DatanodeReportType -> list of DatanodeDescriptor for metrics purposes. */
private volatile Cache<DatanodeReportType, List<DatanodeDescriptor>> datanodeListSnapshots = null;

DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -364,6 +370,18 @@ public class DatanodeManager {
this.randomNodeOrderEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_RANDOM_NODE_ORDER_ENABLED,
DFSConfigKeys.DFS_NAMENODE_RANDOM_NODE_ORDER_ENABLED_DEFAULT);

long datanodeListCacheExpirationMs =
conf.getLong(DFSConfigKeys.DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_DEFAULT);
if (datanodeListCacheExpirationMs > 0) {
LOG.info("Using cached DN list for metrics, expiration time = {} ms.",
datanodeListCacheExpirationMs);
datanodeListSnapshots = CacheBuilder.newBuilder()
.expireAfterWrite(datanodeListCacheExpirationMs, TimeUnit.MILLISECONDS)
.build();
}

}

/**
Expand Down Expand Up @@ -945,6 +963,11 @@ void addDatanode(final DatanodeDescriptor node) {
synchronized(this) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
Cache<DatanodeReportType, List<DatanodeDescriptor>> tmpDatanodeListSnapshots =
datanodeListSnapshots;
if (tmpDatanodeListSnapshots != null) {
tmpDatanodeListSnapshots.invalidateAll();
}

networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
Expand All @@ -963,6 +986,11 @@ private void wipeDatanode(final DatanodeID node) {
synchronized (this) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
Cache<DatanodeReportType, List<DatanodeDescriptor>> tmpDatanodeListSnapshots =
datanodeListSnapshots;
if (tmpDatanodeListSnapshots != null) {
tmpDatanodeListSnapshots.invalidateAll();
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}.wipeDatanode({}): storage {} is removed from datanodeMap.",
getClass().getSimpleName(), node, key);
Expand Down Expand Up @@ -1438,7 +1466,7 @@ public int getNumLiveDataNodes() {

/** @return the number of dead datanodes. */
public int getNumDeadDataNodes() {
return getDatanodeListForReport(DatanodeReportType.DEAD).size();
return getDatanodeListSnapshotForReport(DatanodeReportType.DEAD).size();
}

/** @return the number of datanodes. */
Expand All @@ -1453,12 +1481,12 @@ public List<DatanodeDescriptor> getDecommissioningNodes() {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
// A decommissioning DN may be "alive" or "dead".
return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
return getDatanodeListSnapshotForReport(DatanodeReportType.DECOMMISSIONING);
}

/** @return list of datanodes that are entering maintenance. */
public List<DatanodeDescriptor> getEnteringMaintenanceNodes() {
return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE);
return getDatanodeListSnapshotForReport(DatanodeReportType.ENTERING_MAINTENANCE);
}

/* Getter and Setter for stale DataNodes related attributes */
Expand Down Expand Up @@ -1534,15 +1562,19 @@ void setNumStaleStorages(int numStaleStorages) {

/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode, boolean useCache) {
if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}

// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
List<DatanodeDescriptor> results;
if (useCache) {
results = getDatanodeListSnapshotForReport(DatanodeReportType.ALL);
} else {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
results = getDatanodeListForReport(DatanodeReportType.ALL);
}
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
Expand Down Expand Up @@ -1635,6 +1667,37 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) {
return dnId;
}

public void refreshDatanodeListSnapshot(long newExpirationMs) {
if (newExpirationMs <= 0) {
LOG.info("New config is non-positive ({}), disabling DN list cache", newExpirationMs);
datanodeListSnapshots = null;
} else {
LOG.info("Resetting DN list cache with new expiration time {}ms", newExpirationMs);
datanodeListSnapshots = CacheBuilder.newBuilder()
.expireAfterWrite(newExpirationMs, TimeUnit.MILLISECONDS)
.build();
}
}

/**
* Low impact version of {@link #getDatanodeListForReport} with possible stale
* data for low impact usage (metrics).
*/
public List<DatanodeDescriptor> getDatanodeListSnapshotForReport(
final DatanodeReportType type) {
Cache<DatanodeReportType, List<DatanodeDescriptor>> tmpDatanodeListSnapshots =
datanodeListSnapshots;
if (tmpDatanodeListSnapshots == null) {
return getDatanodeListForReport(type);
}
try {
return tmpDatanodeListSnapshots.get(type, () -> getDatanodeListForReport(type));
} catch (ExecutionException e) {
// Fallback if cache fails
return getDatanodeListForReport(type);
}
}

/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5788,8 +5788,8 @@ public int getNumDeadDataNodes() {
@Metric({"NumDecomLiveDataNodes",
"Number of datanodes which have been decommissioned and are now live"})
public int getNumDecomLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false, true);
int liveDecommissioned = 0;
for (DatanodeDescriptor node : live) {
liveDecommissioned += node.isDecommissioned() ? 1 : 0;
Expand All @@ -5801,8 +5801,8 @@ public int getNumDecomLiveDataNodes() {
@Metric({"NumDecomDeadDataNodes",
"Number of datanodes which have been decommissioned and are now dead"})
public int getNumDecomDeadDataNodes() {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, false);
final List<DatanodeDescriptor> dead = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, false, true);
int deadDecommissioned = 0;
for (DatanodeDescriptor node : dead) {
deadDecommissioned += node.isDecommissioned() ? 1 : 0;
Expand All @@ -5814,8 +5814,8 @@ public int getNumDecomDeadDataNodes() {
@Metric({"NumInServiceLiveDataNodes",
"Number of live datanodes which are currently in service"})
public int getNumInServiceLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true, true);
int liveInService = live.size();
for (DatanodeDescriptor node : live) {
liveInService -= node.isInMaintenance() ? 1 : 0;
Expand All @@ -5827,8 +5827,8 @@ public int getNumInServiceLiveDataNodes() {
@Metric({"VolumeFailuresTotal",
"Total number of volume failures across all Datanodes"})
public int getVolumeFailuresTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false, true);
int volumeFailuresTotal = 0;
for (DatanodeDescriptor node: live) {
volumeFailuresTotal += node.getVolumeFailures();
Expand All @@ -5840,8 +5840,8 @@ public int getVolumeFailuresTotal() {
@Metric({"EstimatedCapacityLostTotal",
"An estimate of the total capacity lost due to volume failures"})
public long getEstimatedCapacityLostTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false, true);
long estimatedCapacityLostTotal = 0;
for (DatanodeDescriptor node: live) {
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
Expand Down Expand Up @@ -6730,10 +6730,9 @@ public int getThreads() {
*/
@Override // NameNodeMXBean
public String getLiveNodes() {
final Map<String, Map<String,Object>> info =
new HashMap<String, Map<String,Object>>();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, false);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> live = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, false, true);
for (DatanodeDescriptor node : live) {
ImmutableMap.Builder<String, Object> innerinfo =
ImmutableMap.<String,Object>builder();
Expand Down Expand Up @@ -6785,10 +6784,9 @@ public String getLiveNodes() {
*/
@Override // NameNodeMXBean
public String getDeadNodes() {
final Map<String, Map<String, Object>> info =
new HashMap<String, Map<String, Object>>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(null, dead, false);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> dead = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodes(null, dead, false, true);
for (DatanodeDescriptor node : dead) {
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
.put("lastContact", getLastContact(node))
Expand Down Expand Up @@ -6916,10 +6914,9 @@ public String getNodeUsage() {
float min = 0;
float dev = 0;

final Map<String, Map<String,Object>> info =
new HashMap<String, Map<String,Object>>();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> live = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true, true);
for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (!node.isInService()) {
Expand Down Expand Up @@ -9094,8 +9091,8 @@ public long getBytesInFuture() {
@Metric({"NumInMaintenanceLiveDataNodes",
"Number of live Datanodes which are in maintenance state"})
public int getNumInMaintenanceLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true, true);
int liveInMaintenance = 0;
for (DatanodeDescriptor node : live) {
liveInMaintenance += node.isInMaintenance() ? 1 : 0;
Expand All @@ -9107,8 +9104,8 @@ public int getNumInMaintenanceLiveDataNodes() {
@Metric({"NumInMaintenanceDeadDataNodes",
"Number of dead Datanodes which are in maintenance state"})
public int getNumInMaintenanceDeadDataNodes() {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
final List<DatanodeDescriptor> dead = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true, true);
int deadInMaintenance = 0;
for (DatanodeDescriptor node : dead) {
deadInMaintenance += node.isInMaintenance() ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6712,4 +6712,14 @@
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
<property>
<name>dfs.namenode.datanode.list.cache.expiration.ms</name>
<value>0</value>
<description>
Set to a positive number to cache values for DatanodeManager.getDatanodeListForReport for
performance purpose. Milliseconds for cache expiration from insertion. 0 or negative value
to disable this cache.
Non metrics usage will bypass this cache (fsck, datanodeReport, etc.)
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public static int firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)
*/
public static long getLiveDatanodeCapacity(DatanodeManager dm) {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, null, false);
dm.fetchDatanodes(live, null, false, false);
long capacity = 0;
for (final DatanodeDescriptor dn : live) {
capacity += dn.getCapacity();
Expand All @@ -715,8 +715,8 @@ public static long getLiveDatanodeCapacity(DatanodeManager dm) {
* Return the capacity of the given live DN.
*/
public static long getDatanodeCapacity(DatanodeManager dm, int index) {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, null, false);
final List<DatanodeDescriptor> live = new ArrayList<>();
dm.fetchDatanodes(live, null, false, false);
return live.get(index).getCapacity();
}

Expand All @@ -737,7 +737,7 @@ public static void waitForDatanodeStatus(DatanodeManager dm, int expectedLive,
Thread.sleep(timeout);
live.clear();
dead.clear();
dm.fetchDatanodes(live, dead, false);
dm.fetchDatanodes(live, dead, false, false);
currTotalCapacity = 0;
volFails = 0;
for (final DatanodeDescriptor dd : live) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testBlocksScheduledCounter() throws IOException {
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
dm.fetchDatanodes(dnList, dnList, false);
dm.fetchDatanodes(dnList, dnList, false, false);
DatanodeDescriptor dn = dnList.get(0);

assertEquals(1, dn.getBlocksScheduled());
Expand All @@ -103,7 +103,7 @@ public void testScheduledBlocksCounterShouldDecrementOnAbandonBlock()
DatanodeManager datanodeManager = cluster.getNamesystem().getBlockManager()
.getDatanodeManager();
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(dnList, dnList, false);
datanodeManager.fetchDatanodes(dnList, dnList, false, false);
for (DatanodeDescriptor descriptor : dnList) {
assertEquals(0, descriptor.getBlocksScheduled(),
"Blocks scheduled should be 0 for " + descriptor.getName());
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testScheduledBlocksCounterDecrementOnDeletedBlock()
cluster.getNamesystem().getBlockManager().getDatanodeManager();
ArrayList<DatanodeDescriptor> dnList =
new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(dnList, dnList, false);
datanodeManager.fetchDatanodes(dnList, dnList, false, false);

// 3. mark a couple of blocks as corrupt
LocatedBlock block = NameNodeAdapter
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testBlocksScheduledCounterOnTruncate() throws Exception {
cluster.getNamesystem().getBlockManager().getDatanodeManager();
ArrayList<DatanodeDescriptor> dnList =
new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(dnList, dnList, false);
datanodeManager.fetchDatanodes(dnList, dnList, false, false);

// 3. restart the stopped datanode
cluster.restartDataNode(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testNullCheckSumWhenDNRestarted()
// fetch live DN
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().fetchDatanodes(live, null, false);
.getDatanodeManager().fetchDatanodes(live, null, false, false);
assertTrue(live.size() == 2,
"DN start should be success and live dn should be 2");
assertTrue(fs.getFileStatus(file).getLen() == chunkSize,
Expand Down
Loading
Loading