HDFS里的Hedged Read源码以及局限性分析

  |     |   本文总阅读量:

版权声明:本文原创,转载请留意文尾,如有侵权请留言, 谢谢

引言

  最近在读HDFS的源码,本文记录一下Hedged Read实现的源码解析,以及它存在的一些局限性。
  这部分的源码大部分都在DFSInputStream.java和DFSClient.java里,本文的源码都来自Hadoop 2.6.0-cdh5.4.4。

Hedged Read

  如我们这篇文章所说HDFS里read operation源码解析以及读慢节点问题探究,对读慢节点问题来说,重启一个speculative task,它依然可能读的是同一个节点,因为namenode很可能返回的datanode副本的顺序是一样的,它们并不是坏节点,而只是慢节点而已。     解决这个问题的思路也挺简单的,有两种思路:

  1. 第一种思路,我们想办法让speculative task读一个新的副本,我们可以让它随机选择副本,而不是按顺序选择副本,这样做的同时也丧失了namenode给所有副本排序,选取最优副本优先读取的功能了,对于跨机房的场景来说,可能对性能会有更大的损失,会有点得不偿失。
  2. 第二种思路,我们可以设置一个timeout,读副本的时间超过这个timeout,我们就触发读另一个副本。

  对于第二种思路,Hadoop从2.4开始已经有了这个新特性了,也就是DFSClient Hedged Read的特性,如果读取一个数据块的操作比较慢,DFSClient Hedged Read将会开启一个从另一个副本的hedged读操作。我们会选取首先完成的操作,并取消其它操作。这个Hedged读特性将有助于控制异常值,比如由于命中一个坏盘等原因而需要花费较长时间的异常阅读等,我们来分析下它的源码实现。

Hedged Read线程池初始化

  DFSClient Hedged Read特性默认是关闭的。如果要开启,则需配置如下:

1
2
3
4
// 并发Hedged 读的线程池大小
dfs.client.hedged.read.threadpool.size
// 开启一个Hedged 读前的等待时间(毫秒)
dfs.client.hedged.read.threshold.millis

  Hedged Read的本质是开一个静态的线程池,在有需要的时候让里面的线程去读取新的副本,它的声明和初始化过程是在DFSClient里的。

1
2
// 声明,是一个ThreadPoolExecutor
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;

  触发线程池初始化的代码在DFSClient的构造函数里,他会根据我们有没有设置线程池的大小来决定要不要初始化线程池:

1
2
3
4
5
6
7
8
9
this.hedgedReadThresholdMillis = conf.getLong(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
int numThreads = conf.getInt(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}

  真正初始化过程在initThreadsNumForHedgedReads中,它会根据我们设的值,初始化线程池的最大容量。  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
* it does not already exist.
* @param num Number of threads for hedged reads thread pool.
* If zero, skip hedged reads thread pool creation.
*/
private synchronized void initThreadsNumForHedgedReads(int num) {
if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex =
new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("hedgedRead-" +
threadIndex.getAndIncrement());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() {

@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution rejected, Executing in current thread");
HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Using hedged reads; pool threads=" + num);
}
}

  WorkQueue由SynchronousQueue实现,是一个无缓冲的等待队列,在某次添加元素后必须等待其他线程取走后才能继续添加。ThreadFactory是Hadoop自己实现的后台线程工厂,并自定义了RejectedExecutionHandler,主要是在有异常时实现HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(),即计数器减1。

DFSInputStream.read实现

  启用Hedged Read具体读的代码在DFSInputStream里的一个read函数里,我们可以看出它是一个随机读的函数,它会判断Hedged Read是否开启了,从而使用不同的方法去具体读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Read bytes starting from the specified position.
*
* @param position start read from this position
* @param buffer read buffer
* @param offset offset into buffer
* @param length number of bytes to read
*
* @return actual number of bytes read
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}

// determine the block and byte range within the block
// corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
// 判断Hedged Read是否开启了
if (dfsClient.isHedgedReadsEnabled()) {
DFSClient.LOG.debug("Enter hedgedFetchBlockByteRange");
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
- 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted
// BlockMissingException may be caught if all block replicas are
// corrupted.
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
}

remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(realLen);
}
return realLen;
}

  普通read是通过DFSClient拿到了LocatedBlocks,而随机读的API是通过getBlockRange拿到了一些块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Get blocks in the specified range.
* Fetch them from the namenode if not cached. This function
* will not get a read request beyond the EOF.
* @param offset starting offset in file
* @param length length of data
* @return consequent segment of located blocks
* @throws IOException
*/
private List<LocatedBlock> getBlockRange(long offset,
long length) throws IOException {
// getFileLength(): returns total file length
// locatedBlocks.getFileLength(): returns length of completed blocks
if (offset >= getFileLength()) {
throw new IOException("Offset: " + offset +
" exceeds file length: " + getFileLength());
}
synchronized(infoLock) {
final List<LocatedBlock> blocks;
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;

if (readOffsetWithinCompleteBlk) {
//get the blocks of finalized (completed) block range
blocks = getFinalizedBlockRange(offset,
Math.min(length, lengthOfCompleteBlk - offset));
} else {
blocks = new ArrayList<LocatedBlock>(1);
}

// get the blocks from incomplete block range
if (readLengthPastCompleteBlk) {
blocks.add(locatedBlocks.getLastLocatedBlock());
}

return blocks;
}
}

  我们还可以发现如果Hedged Read如果开启了,会采用hedgedFetchBlockByteRange,否则使用fetchBlockByteRange方法,我们分别看看这两个方法。

fetchBlockByteRange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
// 设为false,不更新一些全局变量
block = getBlockAt(block.getStartOffset(), false);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
corruptedBlockMap);
return;
} catch (IOException e) {
// Ignore. Already processed inside the function.
// Loop through to try the next node.
}
}
}

  getBlockAt和chooseDataNode的代码我们在HDFS里read operation源码解析以及读慢节点问题探究里分析过,不同的是,这里getBlockAt的updataPosition设为了false,因为这个read实现的事随机读,所以就没必要更新全局变量pos等变量了。
  我们来看一下actualGetFromOneDataNode这个函数。

actualGetFromOneDataNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
private void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once

while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
block = getBlockAt(block.getStartOffset(), false);
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
}
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
StorageType storageType = datanode.storageType;
BlockReader reader = null;

allowShortCircuitLocalReads &= !datanode.replicaPipeliner;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
// 构造BlockReader
BlockReaderFactory builder = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
setStartOffset(start).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(len).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration());
if (datanode.replicaPipeliner) {
builder.setUseUpstream(true);
builder.setUpstreamDatanode(datanode.replicaUpstream.info);
builder.setUpstreamAddr(datanode.replicaUpstream.addr);
builder.setUpstreamStorageType(datanode.replicaUpstream.storageType);
}
reader = builder.build();
// 调用readAll读
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);

if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
} catch (ChecksumException e) {
String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ chosenNode;
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
addToDeadNodes(chosenNode);
throw new IOException(msg);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + e);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
try {
fetchBlockAt(block.getStartOffset());
} catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop
}
continue;
} else {
String msg = "Failed to connect to " + targetAddr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(chosenNode);
throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}

  在这个方法里,构建了blockReader,并且调用了reader.readAll方法去具体的读文件,我们可以看一下BlockReader.read和BlockReader.readAll的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
* This violates one rule when there is a checksum error:
* "Read should not modify user buffer before successful read"
* because it first reads the data to user buffer and then checks
* the checksum.
* Note: this must return -1 on EOF, even in the case of a 0-byte read.
* See HDFS-5762 for details.
*/
int read(byte[] buf, int off, int len) throws IOException;

/**
* Similar to {@link #readFully(byte[], int, int)} except that it will
* not throw an exception on EOF. However, it differs from the simple
* {@link #read(byte[], int, int)} call in that it is guaranteed to
* read the data if it is available. In other words, if this call
* does not throw an exception, then either the buffer has been
* filled or the next call will return EOF.
*/
int readAll(byte[] buf, int offset, int len) throws IOException;

hedgedFetchBlockByteRange

  hedgedFetchBlockByteRange通过ExecutorCompletionService和Future List实现了Hedged Read特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
* int, Map)} except we start up a second, parallel, 'hedged' read
* if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
// 构造一个futures列表
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
// 构造一个ExecutorCompletionService
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<ByteBuffer>(
dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
block = getBlockAt(block.getStartOffset(), false);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null;
// there is no request already executing.
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
try {
// 第一次获取,设置timeout,非阻塞
Future<ByteBuffer> future = hedgedService.poll(
dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
// 超时
if (future != null) {
ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
return;
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+ "ms to read from " + chosenNode.info
+ "; spawning hedged read");
}
// Ignore this node on next go around.
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
continue; // no need to refresh block locations
} catch (InterruptedException e) {
// Ignore
} catch (ExecutionException e) {
// Ignore already logged in the call.
}
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
try {
// 拿到新的副本
try {
chosenNode = getBestNodeDNAddrPair(block, ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
bb = ByteBuffer.allocate(len);
// 把actualGetFromOneDataNode构造成Callable
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
// 读新的副本
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Failed getting node for hedged read: "
+ ioe.getMessage());
}
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
// 获得第一个完成的结果,内部是通过阻塞式的take完成的
ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
return;
} catch (InterruptedException ie) {
// Ignore and retry
}
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info);
}
}
}
}

  重点就在于getFromOneDataNode把actualGetFromOneDataNode构造成Callable然后submit。
  第一次读取时,用非阻塞的poll获取结果future,判断future是否成功,成功即返回,否则在ignored中添加下次需要忽略的本节点。
  第二次读取时,通过getBestNodeDNAddrPair或chooseDataNode选取DataNode,构造Callable并提交至hedgedService,通过getFirstToComplete获取第一个成功的结果后,getFirstToComplete中,是通过阻塞式的hedgedService.take()来实现的。调用cancelAll取消其它的,并计数,否则也是计数外加忽略本次DataNode。

cancelAll

1
2
3
4
5
6
7
8
9
10
private void cancelAll(List<Future<ByteBuffer>> futures) {
for (Future<ByteBuffer> future : futures) {
// Unfortunately, hdfs reads do not take kindly to interruption.
// Threads return a variety of interrupted-type exceptions but
// also complaints about invalid pbs -- likely because read
// is interrupted before gets whole pb. Also verbose WARN
// logging. So, for now, do not interrupt running read.
future.cancel(false);
}
}

  我们看cancelAll函数,这里其实有个坑,我们看到future.cancel(false)是设置false为的,也就是说mayInterruptIfRunnin是被设置为false的,当一个线程已经running的时候,它实际是不会kill掉这个线程的,这样带来的问题就是,如果我们读慢副本的线程已经开始读了,其他副本已经读完,这个读慢副本的线程是不会被kill掉的,它实际上继续占了我们线程池里的一个位子,如果这样的慢副本线程很多的话,我们的线程池会被打满,想要提交新的读取,就只能等这些慢副本读完了。
  看注释的意思设置为false的原因是,内部read的代码的中断机制不够优雅,我试着把它设置为true后,跑了几组测试,好像也没有带来正确性上的问题,就先这么办吧。

局限性

  Hedged Read的设计是良好的,但是在实际开启参数后,发现它其实并没起作用,因为Spark和Hadoop读HDFS用到的一些Reader,例如LineReader走的DFSInputStream里的read API都是这个:

1
2
3
4
5
/**
* Read the entire buffer.
*/
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {

  也就是HDFS里read operation源码解析以及读慢节点问题探究里的我们介绍的那个read,而不是Hedged Read走的那个read API:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Read bytes starting from the specified position.
*
* @param position start read from this position
* @param buffer read buffer
* @param offset offset into buffer
* @param length number of bytes to read
*
* @return actual number of bytes read
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)

  显然Hedged Read是被应用到随机读的read API里了,所以Hedged Read其实并没有起作用。

改进

  我们想办法将第一个read API的读操作应用Hedged Read,也就是替换掉readWithStrategy,手动的更新POS,每次执行随机读的read API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private synchronized int readWithHedged(byte[] buf, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
int result = read(pos, buf, off, len);

if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}

return result;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
if (--retries == 0) {
throw e;
}
}

}

}

return -1;
}

  然而这样修改要慎重,因为开销变大了,原来顺序读是维护一个BlockReader,读完了或者出问题再换一个新的BlockReader,而hedged read是针对每一一次随机读都新建立一个BlockReader的连接,开销还是比较大的。

总结

  我们分析了Hedged Read的实现,它主要通过一个静态线程池来完成读多副本的功能,我们也发现了它的局限性,就是它运用的read API在顺序读中并没有被使用,而是主要应用于随机读的场景,MR和Spark的任务开了几乎没有用,它主要真的还是HBase下的一些随机读场景。我们也提出了优化的方向,将Hedged Read应用于顺序读,但是这样做开销比较大。


坚持原创技术分享,您的支持将鼓励我继续创作,π(3.14)元就够啦!



文章目录
  1. 1. 引言
  2. 2. Hedged Read
  3. 3. Hedged Read线程池初始化
  4. 4. DFSInputStream.read实现
    1. 4.1. fetchBlockByteRange
      1. 4.1.1. actualGetFromOneDataNode
    2. 4.2. hedgedFetchBlockByteRange
    3. 4.3. cancelAll
  5. 5. 局限性
    1. 5.1. 改进
  6. 6. 总结
您是第 位小伙伴 | 本站总访问量 | 已经写了 659.9k 字啦

载入天数...载入时分秒...