hdfs读文件过程分析:获取文件对应的block列表-亚博电竞手机版

在使用java读取一个文件系统中的一个文件时,我们会首先构造一个datainputstream对象,然后就能够从文件中读取数据。对于存储在hdfs上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用dfsclient.dfsdatainputstream类来读取hdfs上一个文件的一段代码来看,如下所示:

package org.shirdrn.hadoop.hdfs; import java.io.bufferedreader; import java.io.ioexception; import java.io.inputstreamreader;  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path;  public class hdfsfilereader {       public static void main(string[] args) {           string file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log";           path path = new path(file);            configuration conf = new configuration();           filesystem fs;           fsdatainputstream in;           bufferedreader reader = null;           try {                fs = filesystem.get(conf);                in = fs.open(path); // 打开文件path,返回一个fsdatainputstream流对象                reader = new bufferedreader(new inputstreamreader(in));                string line = null;                while((line = reader.readline()) != null) { // 读取文件行内容                     system.out.println("record: "   line);                }           } catch (ioexception e) {                e.printstacktrace();           } finally {                try {                     if(reader != null) reader.close();                } catch (ioexception e) {                     e.printstacktrace();                }           }      }  }

基于上面代码,我们可以看到,通过一个filesystem对象可以打开一个path文件,返回一个fsdatainputstream文件输入流对象,然后从该fsdatainputstream对象就能够读取出文件的内容。所以,我们从fsdatainputstream入手,详细分析从hdfs读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的block列表元数据信息,整体流程如下图所示:

下面,详细说明整个流程:

创建fsdatainputstream流对象

从一个path路径对象,能够获取到一个filesystem对象,然后通过调用filesystem的open方法打开一个文件流:

public fsdatainputstream open(path f) throws ioexception {   return open(f, getconf().getint("io.file.buffer.size", 4096)); }

由于filesystem是抽象类,将具体的打开操作留给具体子类实现,例如ftpfilesystem、harfilesystem、webhdfsfilesystem等,不同的文件系统具有不同打开文件的行为,我们以distributedfilesystem为例,open方法实现,代码如下所示:

public fsdatainputstream open(path f, int buffersize) throws ioexception {   statistics.incrementreadops(1);   return new dfsclient.dfsdatainputstream(         dfs.open(getpathname(f), buffersize, verifychecksum, statistics)); }

statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个dfsclient.dfsdatainputstream对象,该对象的参数是通过dfsclient dfs客户端对象打开一个这个文件从而返回一个dfsinputstream对象,下面,我们看dfsclient的open方法实现,代码如下所示:

public dfsinputstream open(string src, int buffersize, boolean verifychecksum,                     filesystem.statistics stats) throws ioexception {   checkopen();   //    get block info from namenode   return new dfsinputstream(src, buffersize, verifychecksum); }

checkopen方法就是检查一个标志位clientrunning,表示当前的dfs客户端对象是否已经创建并初始化,在dfs客户端创建的时候该标志就为true,表示客户端正在运行状态。我们知道,当客户端dfsclient连接到namenode的时候,实际上是创建了一个到namenode的rpc连接,namenode作为server角色,dfsclient作为client角色,它们之间建立起socket连接。只有显式调用dfsclient的close方法时,才会修改clientrunning的值为false,实际上真正地关闭了已经建立的rpc连接。

我们看一下创建dfsinputstream的构造方法实现:

dfsinputstream(string src, int buffersize, boolean verifychecksum) throws ioexception {   this.verifychecksum = verifychecksum;   this.buffersize = buffersize;   this.src = src;   prefetchsize = conf.getlong("dfs.read.prefetch.size", prefetchsize);   openinfo(); }

先设置了几个与读取文件相关的参数值,这里有一个预先读取文件的block字节数的参数prefetchsize,它的值设置如下:

public static final long default_block_size = dfsconfigkeys.dfs_block_size_default; public static final long    dfs_block_size_default = 64*1024*1024;    defaultblocksize = conf.getlong("dfs.block.size", default_block_size);   private long prefetchsize = 10 * defaultblocksize;

这个prefetchsize的值默认为10*64*1024*1024=671088640,也就是说,默认预读取一个文件的10个块,即671088640b=640m,如果想要修改这个值,设置dfs.block.size即可覆盖默认值。

然后调用了openinfo方法,从namenode获取到该打开文件的信息,在openinfo方法中,具体实现如下所示:

synchronized void openinfo() throws ioexception {   for (int retries = 3; retries > 0; retries--) {     if (fetchlocatedblocks()) { // fetch block success. 如果成功获取到待读取文件对应的block列表,则直接返回       return;     } else {       // last block location unavailable. when a cluster restarts,       // dns may not report immediately. at this time partial block       // locations will not be available with nn for getting the length.       // lets retry a few times to get the length.       dfsclient.log.warn("last block locations unavailable. "             "datanodes might not have reported blocks completely."             " will retry for "   retries   " times");       waitfor(4000);     }   }   throw new ioexception("could not obtain the last block locations."); }

上述代码中,有一个for循环用来获取block列表。如果成功获取到待读取文件的block列表,则直接返回,否则,最多执行3次等待重试操作(最多花费时间大于12秒)。未能成功读取文件的block列表信息,是因为namenode无法获取到文件对应的块列表的信息,当整个集群启动的时候,datanode会主动向nnamenode上报对应的block信息,只有block report完成之后,namenode就能够知道组成文件的block及其所在datanode列表的信息。openinfo方法方法中调用了fetchlocatedblocks方法,用来与namenode进行rpc通信调用,实际获取对应的block列表,实现代码如下所示:

private boolean fetchlocatedblocks() throws ioexception,     filenotfoundexception {   locatedblocks newinfo = callgetblocklocations(namenode, src, 0, prefetchsize);   if (newinfo == null) {     throw new filenotfoundexception("file does not exist: "   src);   }    if (locatedblocks != null && !locatedblocks.isunderconstruction() && !newinfo.isunderconstruction()) {     iterator olditer = locatedblocks.getlocatedblocks().iterator();     iterator newiter = newinfo.getlocatedblocks().iterator();     while (olditer.hasnext() && newiter.hasnext()) {       if (!olditer.next().getblock().equals(newiter.next().getblock())) {         throw new ioexception("blocklist for "   src   " has changed!");       }     }   }   boolean isblkinfoupdated = updateblockinfo(newinfo);   this.locatedblocks = newinfo;   this.currentnode = null;   return isblkinfoupdated; }

调用callgetblocklocations方法,实际上是根据创建rpc连接以后得到的namenode的代理对象,调用namenode来获取到指定文件的block的位置信息(位于哪些datanode节点上):namenode.getblocklocations(src, start, length)。调用callgetblocklocations方法返回一个locatedblocks对象,该对象包含了文件长度信息、list blocks列表对象,其中locatedblock包含了一个block的基本信息:

private block b; private long offset;  // offset of the first byte of the block in the file private datanodeinfo[] locs; private boolean corrupt;

有了这些文件的信息(文件长度、文件包含的block的位置等信息),dfsclient就能够执行后续读取文件数据的操作了,详细过程我们在后面分析说明。

通过namenode获取文件信息

上面,我们提到获取一个文件的基本信息,是通过namenode来得到的,这里详细分析namenode是如何获取到这些文件信息的,实现方法getblocklocations的代码,如下所示:

public locatedblocks getblocklocations(string src, long offset, long length) throws ioexception {   mymetrics.incrnumgetblocklocations();   return namesystem.getblocklocations(getclientmachine(), src, offset, length); }

可以看到,namenode又委托管理hdfs name元数据的fsnamesystem的getblocklocations方法实现:

locatedblocks getblocklocations(string clientmachine, string src, long offset, long length) throws ioexception {   locatedblocks blocks = getblocklocations(src, offset, length, true, true, true);   if (blocks != null) {     //sort the blocks     // in some deployment cases, cluster is with separation of task tracker     // and datanode which means client machines will not always be recognized     // as known data nodes, so here we should try to get node (but not     // datanode only) for locality based sort.     node client = host2datanodemap.getdatanodebyhost(clientmachine);     if (client == null) {       list hosts = new arraylist (1);       hosts.add(clientmachine);       string rname = dnstoswitchmapping.resolve(hosts).get(0);       if (rname != null)         client = new nodebase(clientmachine, rname);     }        dfsutil.stalecomparator comparator = null;     if (avoidstaledatanodesforread) {       comparator = new dfsutil.stalecomparator(staleinterval);     }     // note: the last block is also included and sorted     for (locatedblock b : blocks.getlocatedblocks()) {       clustermap.pseudosortbydistance(client, b.getlocations());       if (avoidstaledatanodesforread) {         arrays.sort(b.getlocations(), comparator);       }     }   }   return blocks; }

跟踪代码,最终会在下面的方法中实现了,如何获取到待读取文件的block的元数据列表,以及如何取出该文件的各个block的数据,方法实现代码,这里我做了详细的注释,可以参考,如下所示:

private synchronized locatedblocks getblocklocationsinternal(string src,                                                      long offset,                                                      long length,                                                      int nrblockstoreturn,                                                      boolean doaccesstime,                                                      boolean needblocktoken)                                                      throws ioexception {         inodefile inode = dir.getfileinode(src);  // 获取到与待读取文件相关的inode数据         if (inode == null) {              return null;         }         if (doaccesstime && isaccesstimesupported()) {              dir.settimes(src, inode, -1, now(), false);         }         block[] blocks = inode.getblocks(); // 获取到文件src所包含的block的元数据列表信息         if (blocks == null) {              return null;         }         if (blocks.length == 0) { // 获取到文件src的block数,这里=0,该文件的block数据还没创建,可能正在创建              return inode.createlocatedblocks(new arraylist(blocks.length));         }         list results;         results = new arraylist(blocks.length);          int curblk = 0; // 当前block在block[] blocks数组中的索引位置         long curpos = 0, blksize = 0; // curpos表示某个block在文件中的字节偏移量,blksize为block的大小(字节数)         int nrblocks = (blocks[0].getnumbytes() == 0) ? 0 : blocks.length; // 获取到文件src的block数,实际上一定>0,但是第一个block大小可能为0,这种情况认为nrblocks=0         for (curblk = 0; curblk < nrblocks; curblk  ) {  // 根据前面代码,我们知道offset=0,所以这个循环第一次进来肯定就break出去了(正常的话,blksize>0,所以我觉得这段代码写的稍微有点晦涩)              blksize = blocks[curblk].getnumbytes();              assert blksize > 0 : "block of size 0";              if (curpos   blksize > offset) {                   break;              }              curpos  = blksize;         }          if (nrblocks > 0 && curblk == nrblocks) // offset >= end of file, 到这里curblk=0,如果从文件src的第一个block的字节数累加计算,知道所有的block的字节数都累加上了,总字节数仍然<=请求的offset,说明即使到了文件尾部,仍然没有达到offset的值。从前面fetchlocatedblocks()方法中调用我们知道,offset=0,所以执行该分支表示文件src没有可用的block数据块可读              return null;          long endoff = offset   length; //           do {              // 获取block所在位置(datanode节点)              int numnodes = blocksmap.numnodes(blocks[curblk]); // 计算文件src中第curblk个block存储在哪些datanode节点上              int numcorruptnodes = countnodes(blocks[curblk]).corruptreplicas(); // 计算存储文件src中第curblk个block但无法读取该block的datanode节点数              int numcorruptreplicas = corruptreplicas.numcorruptreplicas(blocks[curblk]); // 计算fsnamesystem在内存中维护的block=>datanode映射的列表中,无法读取该block的datanode节点数              if (numcorruptnodes != numcorruptreplicas) {                   log.warn("inconsistent number of corrupt replicas for "                               blocks[curblk]   "blockmap has "   numcorruptnodes                               " but corrupt replicas map has "   numcorruptreplicas);              }              datanodedescriptor[] machineset = null;  // 下面的if...else用来获取一个block所在的datanode节点              boolean blockcorrupt = false;              if (inode.isunderconstruction() && curblk == blocks.length - 1                        && blocksmap.numnodes(blocks[curblk]) == 0) { // 如果文件正在创建,当前blocks[curblk]还没有创建成功(即没有可用的datanode可以提供该block的服务),仍然返回待创建block所在的datanode节点列表。数据块是在datanode上存储的,只要datanode完成数据块的存储后,通过heartbeat将数据块的信息上报给namenode后,这些信息才会存储到blocksmap中                   // get unfinished block locations                   inodefileunderconstruction cons = (inodefileunderconstruction) inode;                   machineset = cons.gettargets();                   blockcorrupt = false;              } else { // 文件已经创建完成                   blockcorrupt = (numcorruptnodes == numnodes); // 是否当前的block在所有datanode节点上的副本都坏掉,无法提供服务                   int nummachineset = blockcorrupt ? numnodes : (numnodes - numcorruptnodes); // 如果是,则返回所有datanode节点,否则,只返回可用的block副本所在的datanode节点                   machineset = new datanodedescriptor[nummachineset];                   if (nummachineset > 0) { // 获取到当前block所有副本所在的datanode节点列表                        numnodes = 0;                        for (iterator it = blocksmap.nodeiterator(blocks[curblk]); it.hasnext();) {                             datanodedescriptor dn = it.next();                             boolean replicacorrupt = corruptreplicas.isreplicacorrupt(blocks[curblk], dn);                             if (blockcorrupt || (!blockcorrupt && !replicacorrupt))                                  machineset[numnodes  ] = dn;                        }                   }              }              locatedblock b = new locatedblock(blocks[curblk], machineset, curpos, blockcorrupt); // 创建一个包含block的元数据对象、所在datanode节点列表、起始索引位置(字节数)、健康状况的locatedblock对象              if (isaccesstokenenabled && needblocktoken) { // 如果启用block级的令牌(token)访问,则为当前用户生成读模式的令牌信息,一同封装到返回的locatedblock对象中                   b.setblocktoken(accesstokenhandler.generatetoken(b.getblock(), enumset.of(blocktokensecretmanager.accessmode.read)));              }               results.add(b); // 收集待返回给读取文件的客户端需要的locatedblock列表              curpos  = blocks[curblk].getnumbytes();              curblk  ;         } while (curpos < endoff && curblk < blocks.length && results.size() < nrblockstoreturn);          return inode.createlocatedblocks(results); // 将收集的locatedblock列表数据封装到一个locatedblocks对象中返回    }

我们可以看一下,最后的调用inode.createlocatedblocks(results)生成locatedblocks对象的实现,代码如下所示:

locatedblocks createlocatedblocks(list blocks) {   return new locatedblocks(computecontentsummary().getlength(), blocks, isunderconstruction()); // 通过contentsummary对象获取到文件的长度 }

客户端通过rpc调用,获取到了文件对应的block以及所在datanode列表的信息,然后就可以根据locatedblocks来进一步获取到对应的block对应的物理数据块。

对block列表进行排序

我们再回到fsnamesystem类,调用getblocklocationsinternal方法的getblocklocations方法中,在返回文件block列表locatedblocks之后,会对每一个block所在的datanode进行的一个排序,排序的基本规则有如下2点:

  • client到block所在的datanode的距离最近,这个是通过网络拓扑关系来进行计算,例如client的网络路径为/dc1/r1/c1,那么路径为/dc1/r1/dn1的datanode就比路径为/dc1/r2/dn2的距离小,/dc1/r1/dn1对应的block就会排在前面
  • 从上面一点可以推出,如果client就是某个datanode,恰好某个block的datanode列表中包括该datanode,则该datanode对应的block排在前面
  • block所在的datanode列表中,如果其中某个datanode在指定的时间内没有向namenode发送heartbeat(默认由常量dfsconfigkeys.dfs_namenode_stale_datanode_interval_default定义,默认值为30s),则该datanode的状态即为stale,具有该状态的datanode对应的block排在后面

基于上述规则排序后,block列表返回到client。

client与datanode交互更新文件block列表

我们要回到前面分析的dfsclient.dfsinputstream.fetchlocatedblocks()方法中,查看在调用该方法之后,是如何执行实际处理逻辑的:

private boolean fetchlocatedblocks() throws ioexception,     filenotfoundexception {   locatedblocks newinfo = callgetblocklocations(namenode, src, 0, prefetchsize); // rpc调用向namenode获取待读取文件对应的block及其位置信息locatedblocks对象   if (newinfo == null) {     throw new filenotfoundexception("file does not exist: "   src);   }    if (locatedblocks != null && !locatedblocks.isunderconstruction() && !newinfo.isunderconstruction()) { // 这里面locatedblocks!=null是和后面调用updateblockinfo方法返回的状态有关的     iterator olditer = locatedblocks.getlocatedblocks().iterator();     iterator newiter = newinfo.getlocatedblocks().iterator();     while (olditer.hasnext() && newiter.hasnext()) { // 检查2次获取到的locatedblock列表:第2次得到newinfo包含的block列表,在第2次得到的locatedblocks中是否发生变化,如果发生了变化,则不允许读取,抛出异常       if (!olditer.next().getblock().equals(newiter.next().getblock())) {         throw new ioexception("blocklist for "   src   " has changed!");       }     }   }   boolean isblkinfoupdated = updateblockinfo(newinfo);   this.locatedblocks = newinfo;   this.currentnode = null;   return isblkinfoupdated; }

如果第一次读取该文件时,已经获取到了对应的block列表,缓存在客户端;如果客户端第二次又读取了该文件,仍然获取到一个block列表对象。在两次读取之间,可能存在原文件完全被重写的情况,所以新得到的block列表与原列表完全不同了,存在这种情况,客户端直接抛出io异常,如果原文件对应的block列表没有变化,则更新客户端缓存的对应block列表信息。

当集群重启的时候(如果允许安全模式下读文件),或者当一个文件正在创建的时候,datanode向namenode进行block report,这个过程中可能namenode还没有完全重建好block到datanode的映射关系信息,所以即使在这种情况下,仍然会返回对应的正在创建的block所在的datanode列表信息,可以从前面getblocklocationsinternal方法中看到,inode的对应underconstruction状态为true。这时,一个block对应的所有副本中的某些可能还在创建过程中。

上面方法中,调用updateblockinfo来更新文件的block元数据列表信息,对于文件的某些block可能没有创建完成,所以namenode所保存的关于文件的block的的元数据信息可能没有及时更新(datanode可能还没有完成block的报告),代码实现如下所示:

private boolean updateblockinfo(locatedblocks newinfo) throws ioexception {   if (!serversupportshdfs200 || !newinfo.isunderconstruction() || !(newinfo.locatedblockcount() > 0)) { // 如果获取到的newinfo可以读取文件对应的block信息,则返回true     return true;   }    locatedblock last = newinfo.get(newinfo.locatedblockcount() - 1); // 从namenode获取文件的最后一个block的元数据对象locatedblock   boolean lastblockinfile = (last.getstartoffset()   last.getblocksize() == newinfo.getfilelength());    if (!lastblockinfile) { // 如果“文件长度 != 最后一个块起始偏移量   最后一个块长度”,说明文件对应block的元数据信息还没有更新,但是仍然返回给读取文件的该客户端     return true;   }   // 这时,已经确定last是该文件的最后一个bolck,检查最后个block的存储位置信息   if (last.getlocations().length == 0) {     return false;   }    clientdatanodeprotocol primary = null;   block newblock = null;   for (int i = 0; i < last.getlocations().length && newblock == null; i  ) { // 根据从namenode获取到的locatedblock last中对应的datanode列表信息,client与datanode建立rpc连接,获取最后一个block的元数据     datanodeinfo datanode = last.getlocations()[i];     try {       primary = createclientdatanodeprotocolproxy(datanode, conf, last .getblock(), last.getblocktoken(), sockettimeout, connecttodnviahostname);       newblock = primary.getblockinfo(last.getblock());     } catch (ioexception e) {       if (e.getmessage().startswith(           "java.io.ioexception: java.lang.nosuchmethodexception: "                 "org.apache.hadoop.hdfs.protocol"                 ".clientdatanodeprotocol.getblockinfo")) {         // we're talking to a server that doesn't implement hdfs-200.         serversupportshdfs200 = false;       } else {         log.info("failed to get block info from "               datanode.gethostname()   " probably does not have "               last.getblock(), e);       }     } finally {       if (primary != null) {         rpc.stopproxy(primary);       }     }   }    if (newblock == null) { // datanode上不存在最后一个block对应的元数据信息,直接返回     if (!serversupportshdfs200) {       return true;     }     throw new ioexception("failed to get block info from any of the dn in pipeline: "   arrays.tostring(last.getlocations()));   }    long newblocksize = newblock.getnumbytes();   long delta = newblocksize - last.getblocksize();   // 对于文件的最后一个block,如果从namenode获取到的元数据,与从datanode实际获取到的元数据不同,则以datanode获取的为准,因为可能datanode还没有及时将block的变化信息向namenode汇报   last.getblock().setnumbytes(newblocksize);   long newlength = newinfo.getfilelength()   delta;   newinfo.setfilelength(newlength); // 修改文件block和位置元数据列表信息   log.debug("dfsclient setting last block "   last   " to length "   newblocksize   " filesize is now "   newinfo.getfilelength());   return true; }

我们看一下,在updateblockinfo方法中,返回false的情况:client向namenode发起的rpc请求,已经获取到了组成该文件的数据块的元数据信息列表,但是,文件的最后一个数据块的存储位置信息无法获取到,说明datanode还没有及时通过block report将数据块的存储位置信息报告给namenode。通过在openinfo()方法中可以看到,获取文件的block列表信息有3次重试机会,也就是调用updateblockinfo方法返回false,可以有12秒的时间,等待datanode向namenode汇报文件的最后一个块的位置信息,以及namenode更新内存中保存的文件对应的数据块列表元数据信息。

我们再看一下,在updateblockinfo方法中,返回true的情况:

  • 文件已经创建完成,文件对应的block列表元数据信息可用
  • 文件正在创建中,但是当前能够读取到的已经完成的最后一个块(非组成文件的最后一个block)的元数据信息可用
  • 文件正在创建中,文件的最后一个block的元数据部分可读:从namenode无法获取到该block对应的位置信息,这时client会与datanode直接进行rpc通信,获取到该文件最后一个block的位置信息

上面client会与datanode直接进行rpc通信,获取文件最后一个block的元数据,这时可能由于网络问题等等,无法得到文件最后一个block的元数据,所以也会返回true,也就是说,client仍然可以读取该文件,只是无法读取到最后一个block的数据。

这样,在client从namenode/datanode获取到的文件的block列表元数据已经是可用的信息,可以根据这些信息读取到各个block的物理数据块内容了,准确地说,应该是文件处于打开状态了,已经准备好后续进行的读操作了。

展开全文
内容来源于互联网和用户投稿,文章中一旦含有亚博电竞手机版的联系方式务必识别真假,本站仅做信息展示不承担任何相关责任,如有侵权或涉及法律问题请联系亚博电竞手机版删除

最新文章

网站地图