hdfs读文件过程分析:获取文件对应的block列表-亚博电竞手机版
在使用java读取一个文件系统中的一个文件时,我们会首先构造一个datainputstream对象,然后就能够从文件中读取数据。对于存储在hdfs上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用dfsclient.dfsdatainputstream类来读取hdfs上一个文件的一段代码来看,如下所示:
package org.shirdrn.hadoop.hdfs; import java.io.bufferedreader; import java.io.ioexception; import java.io.inputstreamreader; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; public class hdfsfilereader { public static void main(string[] args) { string file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log"; path path = new path(file); configuration conf = new configuration(); filesystem fs; fsdatainputstream in; bufferedreader reader = null; try { fs = filesystem.get(conf); in = fs.open(path); // 打开文件path,返回一个fsdatainputstream流对象 reader = new bufferedreader(new inputstreamreader(in)); string line = null; while((line = reader.readline()) != null) { // 读取文件行内容 system.out.println("record: " line); } } catch (ioexception e) { e.printstacktrace(); } finally { try { if(reader != null) reader.close(); } catch (ioexception e) { e.printstacktrace(); } } } }
基于上面代码,我们可以看到,通过一个filesystem对象可以打开一个path文件,返回一个fsdatainputstream文件输入流对象,然后从该fsdatainputstream对象就能够读取出文件的内容。所以,我们从fsdatainputstream入手,详细分析从hdfs读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的block列表元数据信息,整体流程如下图所示:
下面,详细说明整个流程:
创建fsdatainputstream流对象
从一个path路径对象,能够获取到一个filesystem对象,然后通过调用filesystem的open方法打开一个文件流:
public fsdatainputstream open(path f) throws ioexception { return open(f, getconf().getint("io.file.buffer.size", 4096)); }
由于filesystem是抽象类,将具体的打开操作留给具体子类实现,例如ftpfilesystem、harfilesystem、webhdfsfilesystem等,不同的文件系统具有不同打开文件的行为,我们以distributedfilesystem为例,open方法实现,代码如下所示:
public fsdatainputstream open(path f, int buffersize) throws ioexception { statistics.incrementreadops(1); return new dfsclient.dfsdatainputstream( dfs.open(getpathname(f), buffersize, verifychecksum, statistics)); }
statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个dfsclient.dfsdatainputstream对象,该对象的参数是通过dfsclient dfs客户端对象打开一个这个文件从而返回一个dfsinputstream对象,下面,我们看dfsclient的open方法实现,代码如下所示:
public dfsinputstream open(string src, int buffersize, boolean verifychecksum, filesystem.statistics stats) throws ioexception { checkopen(); // get block info from namenode return new dfsinputstream(src, buffersize, verifychecksum); }
checkopen方法就是检查一个标志位clientrunning,表示当前的dfs客户端对象是否已经创建并初始化,在dfs客户端创建的时候该标志就为true,表示客户端正在运行状态。我们知道,当客户端dfsclient连接到namenode的时候,实际上是创建了一个到namenode的rpc连接,namenode作为server角色,dfsclient作为client角色,它们之间建立起socket连接。只有显式调用dfsclient的close方法时,才会修改clientrunning的值为false,实际上真正地关闭了已经建立的rpc连接。
我们看一下创建dfsinputstream的构造方法实现:
dfsinputstream(string src, int buffersize, boolean verifychecksum) throws ioexception { this.verifychecksum = verifychecksum; this.buffersize = buffersize; this.src = src; prefetchsize = conf.getlong("dfs.read.prefetch.size", prefetchsize); openinfo(); }
先设置了几个与读取文件相关的参数值,这里有一个预先读取文件的block字节数的参数prefetchsize,它的值设置如下:
public static final long default_block_size = dfsconfigkeys.dfs_block_size_default; public static final long dfs_block_size_default = 64*1024*1024; defaultblocksize = conf.getlong("dfs.block.size", default_block_size); private long prefetchsize = 10 * defaultblocksize;
这个prefetchsize的值默认为10*64*1024*1024=671088640,也就是说,默认预读取一个文件的10个块,即671088640b=640m,如果想要修改这个值,设置dfs.block.size即可覆盖默认值。
然后调用了openinfo方法,从namenode获取到该打开文件的信息,在openinfo方法中,具体实现如下所示:
synchronized void openinfo() throws ioexception { for (int retries = 3; retries > 0; retries--) { if (fetchlocatedblocks()) { // fetch block success. 如果成功获取到待读取文件对应的block列表,则直接返回 return; } else { // last block location unavailable. when a cluster restarts, // dns may not report immediately. at this time partial block // locations will not be available with nn for getting the length. // lets retry a few times to get the length. dfsclient.log.warn("last block locations unavailable. " "datanodes might not have reported blocks completely." " will retry for " retries " times"); waitfor(4000); } } throw new ioexception("could not obtain the last block locations."); }
上述代码中,有一个for循环用来获取block列表。如果成功获取到待读取文件的block列表,则直接返回,否则,最多执行3次等待重试操作(最多花费时间大于12秒)。未能成功读取文件的block列表信息,是因为namenode无法获取到文件对应的块列表的信息,当整个集群启动的时候,datanode会主动向nnamenode上报对应的block信息,只有block report完成之后,namenode就能够知道组成文件的block及其所在datanode列表的信息。openinfo方法方法中调用了fetchlocatedblocks方法,用来与namenode进行rpc通信调用,实际获取对应的block列表,实现代码如下所示:
private boolean fetchlocatedblocks() throws ioexception, filenotfoundexception { locatedblocks newinfo = callgetblocklocations(namenode, src, 0, prefetchsize); if (newinfo == null) { throw new filenotfoundexception("file does not exist: " src); } if (locatedblocks != null && !locatedblocks.isunderconstruction() && !newinfo.isunderconstruction()) { iteratorolditer = locatedblocks.getlocatedblocks().iterator(); iterator newiter = newinfo.getlocatedblocks().iterator(); while (olditer.hasnext() && newiter.hasnext()) { if (!olditer.next().getblock().equals(newiter.next().getblock())) { throw new ioexception("blocklist for " src " has changed!"); } } } boolean isblkinfoupdated = updateblockinfo(newinfo); this.locatedblocks = newinfo; this.currentnode = null; return isblkinfoupdated; }
调用callgetblocklocations方法,实际上是根据创建rpc连接以后得到的namenode的代理对象,调用namenode来获取到指定文件的block的位置信息(位于哪些datanode节点上):namenode.getblocklocations(src, start, length)。调用callgetblocklocations方法返回一个locatedblocks对象,该对象包含了文件长度信息、list blocks列表对象,其中locatedblock包含了一个block的基本信息:
private block b; private long offset; // offset of the first byte of the block in the file private datanodeinfo[] locs; private boolean corrupt;
有了这些文件的信息(文件长度、文件包含的block的位置等信息),dfsclient就能够执行后续读取文件数据的操作了,详细过程我们在后面分析说明。
通过namenode获取文件信息
上面,我们提到获取一个文件的基本信息,是通过namenode来得到的,这里详细分析namenode是如何获取到这些文件信息的,实现方法getblocklocations的代码,如下所示:
public locatedblocks getblocklocations(string src, long offset, long length) throws ioexception { mymetrics.incrnumgetblocklocations(); return namesystem.getblocklocations(getclientmachine(), src, offset, length); }
可以看到,namenode又委托管理hdfs name元数据的fsnamesystem的getblocklocations方法实现:
locatedblocks getblocklocations(string clientmachine, string src, long offset, long length) throws ioexception { locatedblocks blocks = getblocklocations(src, offset, length, true, true, true); if (blocks != null) { //sort the blocks // in some deployment cases, cluster is with separation of task tracker // and datanode which means client machines will not always be recognized // as known data nodes, so here we should try to get node (but not // datanode only) for locality based sort. node client = host2datanodemap.getdatanodebyhost(clientmachine); if (client == null) { listhosts = new arraylist (1); hosts.add(clientmachine); string rname = dnstoswitchmapping.resolve(hosts).get(0); if (rname != null) client = new nodebase(clientmachine, rname); } dfsutil.stalecomparator comparator = null; if (avoidstaledatanodesforread) { comparator = new dfsutil.stalecomparator(staleinterval); } // note: the last block is also included and sorted for (locatedblock b : blocks.getlocatedblocks()) { clustermap.pseudosortbydistance(client, b.getlocations()); if (avoidstaledatanodesforread) { arrays.sort(b.getlocations(), comparator); } } } return blocks; }
跟踪代码,最终会在下面的方法中实现了,如何获取到待读取文件的block的元数据列表,以及如何取出该文件的各个block的数据,方法实现代码,这里我做了详细的注释,可以参考,如下所示:
private synchronized locatedblocks getblocklocationsinternal(string src, long offset, long length, int nrblockstoreturn, boolean doaccesstime, boolean needblocktoken) throws ioexception { inodefile inode = dir.getfileinode(src); // 获取到与待读取文件相关的inode数据 if (inode == null) { return null; } if (doaccesstime && isaccesstimesupported()) { dir.settimes(src, inode, -1, now(), false); } block[] blocks = inode.getblocks(); // 获取到文件src所包含的block的元数据列表信息 if (blocks == null) { return null; } if (blocks.length == 0) { // 获取到文件src的block数,这里=0,该文件的block数据还没创建,可能正在创建 return inode.createlocatedblocks(new arraylist(blocks.length)); } list results; results = new arraylist (blocks.length); int curblk = 0; // 当前block在block[] blocks数组中的索引位置 long curpos = 0, blksize = 0; // curpos表示某个block在文件中的字节偏移量,blksize为block的大小(字节数) int nrblocks = (blocks[0].getnumbytes() == 0) ? 0 : blocks.length; // 获取到文件src的block数,实际上一定>0,但是第一个block大小可能为0,这种情况认为nrblocks=0 for (curblk = 0; curblk < nrblocks; curblk ) { // 根据前面代码,我们知道offset=0,所以这个循环第一次进来肯定就break出去了(正常的话,blksize>0,所以我觉得这段代码写的稍微有点晦涩) blksize = blocks[curblk].getnumbytes(); assert blksize > 0 : "block of size 0"; if (curpos blksize > offset) { break; } curpos = blksize; } if (nrblocks > 0 && curblk == nrblocks) // offset >= end of file, 到这里curblk=0,如果从文件src的第一个block的字节数累加计算,知道所有的block的字节数都累加上了,总字节数仍然<=请求的offset,说明即使到了文件尾部,仍然没有达到offset的值。从前面fetchlocatedblocks()方法中调用我们知道,offset=0,所以执行该分支表示文件src没有可用的block数据块可读 return null; long endoff = offset length; // do { // 获取block所在位置(datanode节点) int numnodes = blocksmap.numnodes(blocks[curblk]); // 计算文件src中第curblk个block存储在哪些datanode节点上 int numcorruptnodes = countnodes(blocks[curblk]).corruptreplicas(); // 计算存储文件src中第curblk个block但无法读取该block的datanode节点数 int numcorruptreplicas = corruptreplicas.numcorruptreplicas(blocks[curblk]); // 计算fsnamesystem在内存中维护的block=>datanode映射的列表中,无法读取该block的datanode节点数 if (numcorruptnodes != numcorruptreplicas) { log.warn("inconsistent number of corrupt replicas for " blocks[curblk] "blockmap has " numcorruptnodes " but corrupt replicas map has " numcorruptreplicas); } datanodedescriptor[] machineset = null; // 下面的if...else用来获取一个block所在的datanode节点 boolean blockcorrupt = false; if (inode.isunderconstruction() && curblk == blocks.length - 1 && blocksmap.numnodes(blocks[curblk]) == 0) { // 如果文件正在创建,当前blocks[curblk]还没有创建成功(即没有可用的datanode可以提供该block的服务),仍然返回待创建block所在的datanode节点列表。数据块是在datanode上存储的,只要datanode完成数据块的存储后,通过heartbeat将数据块的信息上报给namenode后,这些信息才会存储到blocksmap中 // get unfinished block locations inodefileunderconstruction cons = (inodefileunderconstruction) inode; machineset = cons.gettargets(); blockcorrupt = false; } else { // 文件已经创建完成 blockcorrupt = (numcorruptnodes == numnodes); // 是否当前的block在所有datanode节点上的副本都坏掉,无法提供服务 int nummachineset = blockcorrupt ? numnodes : (numnodes - numcorruptnodes); // 如果是,则返回所有datanode节点,否则,只返回可用的block副本所在的datanode节点 machineset = new datanodedescriptor[nummachineset]; if (nummachineset > 0) { // 获取到当前block所有副本所在的datanode节点列表 numnodes = 0; for (iterator it = blocksmap.nodeiterator(blocks[curblk]); it.hasnext();) { datanodedescriptor dn = it.next(); boolean replicacorrupt = corruptreplicas.isreplicacorrupt(blocks[curblk], dn); if (blockcorrupt || (!blockcorrupt && !replicacorrupt)) machineset[numnodes ] = dn; } } } locatedblock b = new locatedblock(blocks[curblk], machineset, curpos, blockcorrupt); // 创建一个包含block的元数据对象、所在datanode节点列表、起始索引位置(字节数)、健康状况的locatedblock对象 if (isaccesstokenenabled && needblocktoken) { // 如果启用block级的令牌(token)访问,则为当前用户生成读模式的令牌信息,一同封装到返回的locatedblock对象中 b.setblocktoken(accesstokenhandler.generatetoken(b.getblock(), enumset.of(blocktokensecretmanager.accessmode.read))); } results.add(b); // 收集待返回给读取文件的客户端需要的locatedblock列表 curpos = blocks[curblk].getnumbytes(); curblk ; } while (curpos < endoff && curblk < blocks.length && results.size() < nrblockstoreturn); return inode.createlocatedblocks(results); // 将收集的locatedblock列表数据封装到一个locatedblocks对象中返回 }
我们可以看一下,最后的调用inode.createlocatedblocks(results)生成locatedblocks对象的实现,代码如下所示:
locatedblocks createlocatedblocks(listblocks) { return new locatedblocks(computecontentsummary().getlength(), blocks, isunderconstruction()); // 通过contentsummary对象获取到文件的长度 }
客户端通过rpc调用,获取到了文件对应的block以及所在datanode列表的信息,然后就可以根据locatedblocks来进一步获取到对应的block对应的物理数据块。
对block列表进行排序
我们再回到fsnamesystem类,调用getblocklocationsinternal方法的getblocklocations方法中,在返回文件block列表locatedblocks之后,会对每一个block所在的datanode进行的一个排序,排序的基本规则有如下2点:
- client到block所在的datanode的距离最近,这个是通过网络拓扑关系来进行计算,例如client的网络路径为/dc1/r1/c1,那么路径为/dc1/r1/dn1的datanode就比路径为/dc1/r2/dn2的距离小,/dc1/r1/dn1对应的block就会排在前面
- 从上面一点可以推出,如果client就是某个datanode,恰好某个block的datanode列表中包括该datanode,则该datanode对应的block排在前面
- block所在的datanode列表中,如果其中某个datanode在指定的时间内没有向namenode发送heartbeat(默认由常量dfsconfigkeys.dfs_namenode_stale_datanode_interval_default定义,默认值为30s),则该datanode的状态即为stale,具有该状态的datanode对应的block排在后面
基于上述规则排序后,block列表返回到client。
client与datanode交互更新文件block列表
我们要回到前面分析的dfsclient.dfsinputstream.fetchlocatedblocks()方法中,查看在调用该方法之后,是如何执行实际处理逻辑的:
private boolean fetchlocatedblocks() throws ioexception, filenotfoundexception { locatedblocks newinfo = callgetblocklocations(namenode, src, 0, prefetchsize); // rpc调用向namenode获取待读取文件对应的block及其位置信息locatedblocks对象 if (newinfo == null) { throw new filenotfoundexception("file does not exist: " src); } if (locatedblocks != null && !locatedblocks.isunderconstruction() && !newinfo.isunderconstruction()) { // 这里面locatedblocks!=null是和后面调用updateblockinfo方法返回的状态有关的 iteratorolditer = locatedblocks.getlocatedblocks().iterator(); iterator newiter = newinfo.getlocatedblocks().iterator(); while (olditer.hasnext() && newiter.hasnext()) { // 检查2次获取到的locatedblock列表:第2次得到newinfo包含的block列表,在第2次得到的locatedblocks中是否发生变化,如果发生了变化,则不允许读取,抛出异常 if (!olditer.next().getblock().equals(newiter.next().getblock())) { throw new ioexception("blocklist for " src " has changed!"); } } } boolean isblkinfoupdated = updateblockinfo(newinfo); this.locatedblocks = newinfo; this.currentnode = null; return isblkinfoupdated; }
如果第一次读取该文件时,已经获取到了对应的block列表,缓存在客户端;如果客户端第二次又读取了该文件,仍然获取到一个block列表对象。在两次读取之间,可能存在原文件完全被重写的情况,所以新得到的block列表与原列表完全不同了,存在这种情况,客户端直接抛出io异常,如果原文件对应的block列表没有变化,则更新客户端缓存的对应block列表信息。
当集群重启的时候(如果允许安全模式下读文件),或者当一个文件正在创建的时候,datanode向namenode进行block report,这个过程中可能namenode还没有完全重建好block到datanode的映射关系信息,所以即使在这种情况下,仍然会返回对应的正在创建的block所在的datanode列表信息,可以从前面getblocklocationsinternal方法中看到,inode的对应underconstruction状态为true。这时,一个block对应的所有副本中的某些可能还在创建过程中。
上面方法中,调用updateblockinfo来更新文件的block元数据列表信息,对于文件的某些block可能没有创建完成,所以namenode所保存的关于文件的block的的元数据信息可能没有及时更新(datanode可能还没有完成block的报告),代码实现如下所示:
private boolean updateblockinfo(locatedblocks newinfo) throws ioexception { if (!serversupportshdfs200 || !newinfo.isunderconstruction() || !(newinfo.locatedblockcount() > 0)) { // 如果获取到的newinfo可以读取文件对应的block信息,则返回true return true; } locatedblock last = newinfo.get(newinfo.locatedblockcount() - 1); // 从namenode获取文件的最后一个block的元数据对象locatedblock boolean lastblockinfile = (last.getstartoffset() last.getblocksize() == newinfo.getfilelength()); if (!lastblockinfile) { // 如果“文件长度 != 最后一个块起始偏移量 最后一个块长度”,说明文件对应block的元数据信息还没有更新,但是仍然返回给读取文件的该客户端 return true; } // 这时,已经确定last是该文件的最后一个bolck,检查最后个block的存储位置信息 if (last.getlocations().length == 0) { return false; } clientdatanodeprotocol primary = null; block newblock = null; for (int i = 0; i < last.getlocations().length && newblock == null; i ) { // 根据从namenode获取到的locatedblock last中对应的datanode列表信息,client与datanode建立rpc连接,获取最后一个block的元数据 datanodeinfo datanode = last.getlocations()[i]; try { primary = createclientdatanodeprotocolproxy(datanode, conf, last .getblock(), last.getblocktoken(), sockettimeout, connecttodnviahostname); newblock = primary.getblockinfo(last.getblock()); } catch (ioexception e) { if (e.getmessage().startswith( "java.io.ioexception: java.lang.nosuchmethodexception: " "org.apache.hadoop.hdfs.protocol" ".clientdatanodeprotocol.getblockinfo")) { // we're talking to a server that doesn't implement hdfs-200. serversupportshdfs200 = false; } else { log.info("failed to get block info from " datanode.gethostname() " probably does not have " last.getblock(), e); } } finally { if (primary != null) { rpc.stopproxy(primary); } } } if (newblock == null) { // datanode上不存在最后一个block对应的元数据信息,直接返回 if (!serversupportshdfs200) { return true; } throw new ioexception("failed to get block info from any of the dn in pipeline: " arrays.tostring(last.getlocations())); } long newblocksize = newblock.getnumbytes(); long delta = newblocksize - last.getblocksize(); // 对于文件的最后一个block,如果从namenode获取到的元数据,与从datanode实际获取到的元数据不同,则以datanode获取的为准,因为可能datanode还没有及时将block的变化信息向namenode汇报 last.getblock().setnumbytes(newblocksize); long newlength = newinfo.getfilelength() delta; newinfo.setfilelength(newlength); // 修改文件block和位置元数据列表信息 log.debug("dfsclient setting last block " last " to length " newblocksize " filesize is now " newinfo.getfilelength()); return true; }
我们看一下,在updateblockinfo方法中,返回false的情况:client向namenode发起的rpc请求,已经获取到了组成该文件的数据块的元数据信息列表,但是,文件的最后一个数据块的存储位置信息无法获取到,说明datanode还没有及时通过block report将数据块的存储位置信息报告给namenode。通过在openinfo()方法中可以看到,获取文件的block列表信息有3次重试机会,也就是调用updateblockinfo方法返回false,可以有12秒的时间,等待datanode向namenode汇报文件的最后一个块的位置信息,以及namenode更新内存中保存的文件对应的数据块列表元数据信息。
我们再看一下,在updateblockinfo方法中,返回true的情况:
- 文件已经创建完成,文件对应的block列表元数据信息可用
- 文件正在创建中,但是当前能够读取到的已经完成的最后一个块(非组成文件的最后一个block)的元数据信息可用
- 文件正在创建中,文件的最后一个block的元数据部分可读:从namenode无法获取到该block对应的位置信息,这时client会与datanode直接进行rpc通信,获取到该文件最后一个block的位置信息
上面client会与datanode直接进行rpc通信,获取文件最后一个block的元数据,这时可能由于网络问题等等,无法得到文件最后一个block的元数据,所以也会返回true,也就是说,client仍然可以读取该文件,只是无法读取到最后一个block的数据。
这样,在client从namenode/datanode获取到的文件的block列表元数据已经是可用的信息,可以根据这些信息读取到各个block的物理数据块内容了,准确地说,应该是文件处于打开状态了,已经准备好后续进行的读操作了。