HDFS里的Hedged Read源码以及局限性分析

  |     |   本文总阅读量:

版权声明:本文原创,转载请留意文尾,如有侵权请留言,谢谢

引言

  最近在读 HDFS 的源码,本文记录一下 Hedged Read 实现的源码解析,以及它存在的一些局限性。
  这部分的源码大部分都在 DFSInputStream.java 和 DFSClient.java 里,本文的源码都来自 Hadoop 2.6.0-cdh5.4.4。

Hedged Read

  如我们这篇文章所说,对读慢节点问题来说,重启一个 speculative task,它依然可能读的是同一个节点,因为 namenode 很可能返回的 datanode 副本的顺序是一样的,它们并不是坏节点,而只是慢节点而已。     解决这个问题的思路也挺简单的,有两种思路:

  1. 第一种思路,我们想办法让 speculative task 读一个新的副本,我们可以让它随机选择副本,而不是按顺序选择副本,这样做的同时也丧失了 namenode 给所有副本排序,选取最优副本优先读取的功能了,对于跨机房的场景来说,可能对性能会有更大的损失,会有点得不偿失。
  2. 第二种思路,我们可以设置一个 timeout,读副本的时间超过这个 timeout,我们就触发读另一个副本。

  对于第二种思路,Hadoop 从 2.4 开始已经有了这个新特性了,也就是 DFSClient Hedged Read 的特性,如果读取一个数据块的操作比较慢,DFSClient Hedged Read 将会开启一个从另一个副本的 hedged 读操作。我们会选取首先完成的操作,并取消其它操作。这个 Hedged 读特性将有助于控制异常值,比如由于命中一个坏盘等原因而需要花费较长时间的异常阅读等,我们来分析下它的源码实现。

Hedged Read 线程池初始化

  DFSClient Hedged Read特性默认是关闭的。如果要开启,则需配置如下:

1
2
3
4
// 并发 Hedged 读的线程池大小
dfs.client.hedged.read.threadpool.size
// 开启一个 Hedged 读前的等待时间(毫秒)
dfs.client.hedged.read.threshold.millis

  Hedged Read 的本质是开一个静态的线程池,在有需要的时候让里面的线程去读取新的副本,它的声明和初始化过程是在 DFSClient 里的。

1
2
// 声明,是一个ThreadPoolExecutor
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;

  触发线程池初始化的代码在 DFSClient 的构造函数里,他会根据我们有没有设置线程池的大小来决定要不要初始化线程池:

1
2
3
4
5
6
7
8
9
this.hedgedReadThresholdMillis = conf.getLong(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
int numThreads = conf.getInt(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}

  真正初始化过程在initThreadsNumForHedgedReads中,它会根据我们设的值,初始化线程池的最大容量。  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
* it does not already exist.
* @param num Number of threads for hedged reads thread pool.
* If zero, skip hedged reads thread pool creation.
*/
private synchronized void initThreadsNumForHedgedReads(int num) {
if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex =
new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("hedgedRead-" +
threadIndex.getAndIncrement());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() {

@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution rejected, Executing in current thread");
HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Using hedged reads; pool threads=" + num);
}
}

  WorkQueue 由 SynchronousQueue 实现,是一个无缓冲的等待队列,在某次添加元素后必须等待其他线程取走后才能继续添加。ThreadFactory 是 Hadoop 自己实现的后台线程工厂,并自定义了 RejectedExecutionHandler,主要是在有异常时实现 HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(),即计数器减 1。

DFSInputStream.read实现

  启用 Hedged Read 具体读的代码在 DFSInputStream 里的一个 read 函数里,我们可以看出它是一个随机读的函数,它会判断 Hedged Read 是否开启了,从而使用不同的方法去具体读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Read bytes starting from the specified position.
*
* @param position start read from this position
* @param buffer read buffer
* @param offset offset into buffer
* @param length number of bytes to read
*
* @return actual number of bytes read
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}

// determine the block and byte range within the block
// corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
// 判断 Hedged Read 是否开启了
if (dfsClient.isHedgedReadsEnabled()) {
DFSClient.LOG.debug("Enter hedgedFetchBlockByteRange");
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
- 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted
// BlockMissingException may be caught if all block replicas are
// corrupted.
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
}

remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(realLen);
}
return realLen;
}

  普通 read 是通过 DFSClient 拿到了 LocatedBlocks,而随机读的 API 是通过 getBlockRange 拿到了一些块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Get blocks in the specified range.
* Fetch them from the namenode if not cached. This function
* will not get a read request beyond the EOF.
* @param offset starting offset in file
* @param length length of data
* @return consequent segment of located blocks
* @throws IOException
*/
private List<LocatedBlock> getBlockRange(long offset,
long length) throws IOException {
// getFileLength(): returns total file length
// locatedBlocks.getFileLength(): returns length of completed blocks
if (offset >= getFileLength()) {
throw new IOException("Offset: " + offset +
" exceeds file length: " + getFileLength());
}
synchronized(infoLock) {
final List<LocatedBlock> blocks;
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;

if (readOffsetWithinCompleteBlk) {
//get the blocks of finalized (completed) block range
blocks = getFinalizedBlockRange(offset,
Math.min(length, lengthOfCompleteBlk - offset));
} else {
blocks = new ArrayList<LocatedBlock>(1);
}

// get the blocks from incomplete block range
if (readLengthPastCompleteBlk) {
blocks.add(locatedBlocks.getLastLocatedBlock());
}

return blocks;
}
}

  我们还可以发现如果 Hedged Read 开启了,会采用 hedgedFetchBlockByteRange,否则使用 fetchBlockByteRange 方法,我们分别看看这两个方法。

fetchBlockByteRange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
// 设为 false,不更新一些全局变量
block = getBlockAt(block.getStartOffset(), false);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
corruptedBlockMap);
return;
} catch (IOException e) {
// Ignore. Already processed inside the function.
// Loop through to try the next node.
}
}
}

  getBlockAt 和 chooseDataNode 的代码我们在HDFS里read operation源码解析以及读慢节点问题探究里分析过,不同的是,这里 getBlockAt 的 updataPosition 设为了false,因为这个 read 实现的是随机读,所以就没必要更新全局变量 pos 等变量了。
  我们来看一下 actualGetFromOneDataNode 这个函数。

actualGetFromOneDataNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
private void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once

while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
block = getBlockAt(block.getStartOffset(), false);
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
}
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
StorageType storageType = datanode.storageType;
BlockReader reader = null;

allowShortCircuitLocalReads &= !datanode.replicaPipeliner;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
// 构造 BlockReader
BlockReaderFactory builder = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
setStartOffset(start).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(len).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration());
if (datanode.replicaPipeliner) {
builder.setUseUpstream(true);
builder.setUpstreamDatanode(datanode.replicaUpstream.info);
builder.setUpstreamAddr(datanode.replicaUpstream.addr);
builder.setUpstreamStorageType(datanode.replicaUpstream.storageType);
}
reader = builder.build();
// 调用 readAll 读
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);

if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
} catch (ChecksumException e) {
String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ chosenNode;
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
addToDeadNodes(chosenNode);
throw new IOException(msg);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + e);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
try {
fetchBlockAt(block.getStartOffset());
} catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop
}
continue;
} else {
String msg = "Failed to connect to " + targetAddr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(chosenNode);
throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}

  在这个方法里,构建了 blockReader,并且调用了 reader.readAll 方法去具体的读文件,我们可以看一下 BlockReader.read 和 BlockReader.readAll 的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
* This violates one rule when there is a checksum error:
* "Read should not modify user buffer before successful read"
* because it first reads the data to user buffer and then checks
* the checksum.
* Note: this must return -1 on EOF, even in the case of a 0-byte read.
* See HDFS-5762 for details.
*/
int read(byte[] buf, int off, int len) throws IOException;

/**
* Similar to {@link #readFully(byte[], int, int)} except that it will
* not throw an exception on EOF. However, it differs from the simple
* {@link #read(byte[], int, int)} call in that it is guaranteed to
* read the data if it is available. In other words, if this call
* does not throw an exception, then either the buffer has been
* filled or the next call will return EOF.
*/
int readAll(byte[] buf, int offset, int len) throws IOException;

hedgedFetchBlockByteRange

  hedgedFetchBlockByteRange 通过 ExecutorCompletionService 和 Future List 实现了 Hedged Read 特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
* int, Map)} except we start up a second, parallel, 'hedged' read
* if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
// 构造一个 futures 列表
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
// 构造一个 ExecutorCompletionService
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<ByteBuffer>(
dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
block = getBlockAt(block.getStartOffset(), false);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null;
// there is no request already executing.
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
try {
// 第一次获取,设置 timeout,非阻塞
Future<ByteBuffer> future = hedgedService.poll(
dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
// 超时
if (future != null) {
ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
return;
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+ "ms to read from " + chosenNode.info
+ "; spawning hedged read");
}
// Ignore this node on next go around.
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
continue; // no need to refresh block locations
} catch (InterruptedException e) {
// Ignore
} catch (ExecutionException e) {
// Ignore already logged in the call.
}
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
try {
// 拿到新的副本
try {
chosenNode = getBestNodeDNAddrPair(block, ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
bb = ByteBuffer.allocate(len);
// 把 actualGetFromOneDataNode 构造成 Callable
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
// 读新的副本
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Failed getting node for hedged read: "
+ ioe.getMessage());
}
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
// 获得第一个完成的结果,内部是通过阻塞式的 take 完成的
ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
return;
} catch (InterruptedException ie) {
// Ignore and retry
}
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info);
}
}
}
}

  重点就在于 getFromOneDataNode 把 actualGetFromOneDataNode 构造成 Callable 然后 submit。
  第一次读取时,用非阻塞的 poll 获取结果 future,判断 future 是否成功,成功即返回,否则在 ignored 中添加下次需要忽略的本节点。
  第二次读取时,通过 getBestNodeDNAddrPair 或 chooseDataNode 选取 DataNode,构造 Callable 并提交至 hedgedService,通过 getFirstToComplete 获取第一个成功的结果后,getFirstToComplete 中,是通过阻塞式的 hedgedService.take() 来实现的。调用 cancelAll 取消其它的,并计数,否则也是计数外加忽略本次 datanode。

cancelAll

1
2
3
4
5
6
7
8
9
10
private void cancelAll(List<Future<ByteBuffer>> futures) {
for (Future<ByteBuffer> future : futures) {
// Unfortunately, hdfs reads do not take kindly to interruption.
// Threads return a variety of interrupted-type exceptions but
// also complaints about invalid pbs -- likely because read
// is interrupted before gets whole pb. Also verbose WARN
// logging. So, for now, do not interrupt running read.
future.cancel(false);
}
}

  我们看 cancelAll 函数,这里其实有个坑,我们看到 future.cancel(false) 是设置 false 的,也就是说 mayInterruptIfRunnin 是被设置为 false 的,当一个线程已经 running 的时候,它实际是不会 kill 掉这个线程的,这样带来的问题就是,如果我们读慢副本的线程已经开始读了,其他副本已经读完,这个读慢副本的线程是不会被 kill 掉的,它实际上继续占了我们线程池里的一个位子,如果这样的慢副本线程很多的话,我们的线程池会被打满,想要提交新的读取,就只能等这些慢副本读完了。
  看注释的意思设置为 false 的原因是,内部 read 的代码的中断机制不够优雅,我试着把它设置为 true 后,跑了几组测试,好像也没有带来正确性上的问题,就先这么办吧。

局限性

  Hedged Read 的设计是良好的,但是在实际开启参数后,发现它其实并没起作用,因为 Spark 和 Hadoop 读 HDFS 用到的一些Reader,例如 LineReader 走的 DFSInputStream 里的 read API 都是这个:

1
2
3
4
5
/**
* Read the entire buffer.
*/
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {

  也就是HDFS里read operation源码解析以及读慢节点问题探究里的我们介绍的那个 read,而不是 Hedged Read 走的那个 read API:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Read bytes starting from the specified position.
*
* @param position start read from this position
* @param buffer read buffer
* @param offset offset into buffer
* @param length number of bytes to read
*
* @return actual number of bytes read
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)

  显然 Hedged Read 是被应用到随机读的 read API 里了,所以 Hedged Read 其实并没有在顺序读的场景下起作用。HBase 里的随机读场景比较多,Hedged Read 在 HBase 里使用的较多。

改进

  我们想办法将第一个 read API 的读操作应用 Hedged Read,也就是替换掉 readWithStrategy,手动的更新 POS,每次执行随机读的 read API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private synchronized int readWithHedged(byte[] buf, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
int result = read(pos, buf, off, len);

if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}

return result;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
if (--retries == 0) {
throw e;
}
}

}

}

return -1;
}

  然而这样修改要慎重,因为开销变大了,原来顺序读是维护一个 BlockReader,读完了或者出问题再换一个新的 BlockReader,而 hedged read 是针对每一一次随机读都新建立一个 BlockReader 的连接,开销还是比较大的。

总结

  我们分析了 Hedged Read 的实现,它主要通过一个静态线程池来完成读多副本的功能,我们也发现了它的局限性,就是它运用的 read API 在顺序读中并没有被使用,而是主要应用于随机读的场景,MR 和 Spark 的任务开了几乎没有用,它主要真的还是 HBase 下的一些随机读场景。我们也提出了优化的方向,将 Hedged Read 应用于顺序读,但是这样做开销比较大。


坚持原创技术分享,您的支持将鼓励我继续创作,π(3.14)元就够啦!



文章目录
  1. 1. 引言
  2. 2. Hedged Read
  3. 3. Hedged Read 线程池初始化
  4. 4. DFSInputStream.read实现
    1. 4.1. fetchBlockByteRange
      1. 4.1.1. actualGetFromOneDataNode
    2. 4.2. hedgedFetchBlockByteRange
    3. 4.3. cancelAll
  5. 5. 局限性
    1. 5.1. 改进
  6. 6. 总结
您是第 位小伙伴 | 本站总访问量 | 已经写了 605.5k 字啦

载入天数...载入时分秒...