通过Thrift访问HDFS分布式文件系统的性能瓶颈分析

引言

  Hadoop提供的HDFS布式文件存储系统,提供了基于thrift的客户端访问支持,但是因为Thrift自身的访问特点,在高并发的访问情况下,thrift自身结构可能将会成为HDFS文件存储系统的一个性能瓶颈。我们先来看一下一不使用Thrfit方式访问HDFS文件系统的业务流程。

一、HDFS文件读取流程

  

流程说明:

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的DataNode地址;
  3. 客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
  4. 读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
  5. 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
  6. 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。

二、HDFS文件写入流程

流程说明:

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
  3. 当 客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以数据队列"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表, 列表的大小根据在Namenode中对replication的设置而定。
  4. 开始以pipeline(管道)的形式将packet写入所 有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此 pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
  5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。
  6. 如 果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除, 剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持 replicas设定的数量。

三、关键词

  HDFSClient通过文件IO操作最终实现是通过直接访问DataNode进行。

四、Thrift的访问流程:猜测版

  

流程说明:

1.ThriftClient客户端将操作命令传给ThriftServer。

2.ThriftServer调用HDFSClient接口API实现HDFS读写操作,操作流程如和三所示。

五、疑问

  与DataNode发生数据交换的到底是ThriftServer还是ThriftClient,如果是ThriftServer,那么多个ThriftClient并行访问时,ThriftServer必将成为HDFS访问的性能瓶颈;如果是ThriftClient直接访问DataNode,那么理论依据何在呢?

六、示例程序

  下面是一个基于Thrift实现的HDFS客户端程序,实现了文件的访问和创建和读取

// HdfsDemo.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>
#include <transport/TTransportUtils.h>
#include "ThriftHadoopFileSystem.h"

#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0500
#endif
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

int _tmain(int argc, _TCHAR* argv[])
{
    if (argc < 3)
    {
        std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl;
        //return -1;
    }
    boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2])));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    ThriftHadoopFileSystemClient client(protocol);
    try
    {
        transport->open();
        Pathname path;
        //01_create directory
        path.__set_pathname("/user/hadoop");
        if(client.exists(path) == true)
        {
            printf("path is exists.\r\n");
        }
        else
        {
            printf("path is not exists.");
            //return 0;
        }
        //02_put file
        Pathname filepath;
        filepath.__set_pathname("/user/hadoop/in/test1.txt");
        /*
        FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb");
        if (localfile == NULL)
        {
            transport->close();
            return 0;
        }
        ThriftHandle hdl;
        client.create(hdl,filepath);
        while (true)
        {
            char data[1024];
            memset(data,0x00,sizeof(data));
            size_t Num = fread(data,1,1024,localfile);
            if (Num <= 0)
            {
                break;
            }
            client.write(hdl,data);
        }
        fclose(localfile);
        client.close(hdl);
        */
        //03_get file
        /*
        ThriftHandle hd2;
        FileStatus stat1;
        client.open(hd2,filepath);
        client.stat(stat1,filepath);
        int index = 0;
        while(true)
        {
            string data;
            if (stat1.length <= index)
            {
                break;
            }
            client.read(data,hd2,index,1024);

            index += data.length();
            printf("==%s\r\n",data.c_str());
        }
        client.close(hd2);
        */

        //04_list files
        std::vector<FileStatus> vFileStatus;
        client.listStatus(vFileStatus,path);
        for (int i=0;i<vFileStatus.size();i++)
        {
            printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str());
        }
        transport->close();
    } catch (const TException &tx) {
    std::cerr << "ERROR: " << tx.what() << std::endl;
    }
    getchar();
    return 0;
}

 七、源码分析

  1.文件创建:

/**
      * Create a file and open it for writing, delete file if it exists
      */
    public ThriftHandle createFile(Pathname path,
                                   short mode,
                                   boolean  overwrite,
                                   int bufferSize,
                                   short replication,
                                   long blockSize) throws ThriftIOException {
      try {
        now = now();
        HadoopThriftHandler.LOG.debug("create: " + path +
                                     " permission: " + mode +
                                     " overwrite: " + overwrite +
                                     " bufferSize: " + bufferSize +
                                     " replication: " + replication +
                                     " blockSize: " + blockSize);
        FSDataOutputStream out = fs.create(new Path(path.pathname),
                                           new FsPermission(mode),
                                           overwrite,
                                           bufferSize,
                                           replication,
                                           blockSize,
                                           null); // progress
        long id = insert(out);
        ThriftHandle obj = new ThriftHandle(id);
        HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
        return obj;
      } catch (IOException e) {
        throw new ThriftIOException(e.getMessage());
      }
    }

  ThriftHandle的两端到底是谁呢?是ThriftClient和DataNode?还是ThriftServer与DataNode?

  2.文件写入

public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
      try {
        now = now();
        HadoopThriftHandler.LOG.debug("write: " + tout.id);
        FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
        byte[] tmp = data.getBytes("UTF-8");
        out.write(tmp, 0, tmp.length);
        HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
        return true;
      } catch (IOException e) {
        throw new ThriftIOException(e.getMessage());
      }
    }

  写入时依赖的还是ThriftHandle?  

  3.文件读取

/**
     * read from a file
     */
    public String read(ThriftHandle tout, long offset,
                       int length) throws ThriftIOException {
      try {
        now = now();
        HadoopThriftHandler.LOG.debug("read: " + tout.id +
                                     " offset: " + offset +
                                     " length: " + length);
        FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
        if (in.getPos() != offset) {
          in.seek(offset);
        }
        byte[] tmp = new byte[length];
        int numbytes = in.read(offset, tmp, 0, length);
        HadoopThriftHandler.LOG.debug("read done: " + tout.id);
        return new String(tmp, 0, numbytes, "UTF-8");
      } catch (IOException e) {
        throw new ThriftIOException(e.getMessage());
      }
    }

 八、遗留问题

  ThriftHandle可以看做是Socket连接句柄,但是他的两端到底是谁呢?如果是ThriftClient代表的客户端则一切OK,那么我该如何证明呢?存疑待考!

时间: 2024-02-19 10:38:17

通过Thrift访问HDFS分布式文件系统的性能瓶颈分析的相关文章

Java访问Hadoop分布式文件系统HDFS的配置说明_java

配置文件 m103替换为hdfs服务地址. 要利用Java客户端来存取HDFS上的文件,不得不说的是配置文件hadoop-0.20.2/conf/core-site.xml了,最初我就是在这里吃了大亏,所以我死活连不上HDFS,文件无法创建.读取. <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <co

Hadoop (HDFS)分布式文件系统基本操作

Hadoop HDFS提供了一组命令集来操作文件,它既可以操作Hadoop分布式文件系统,也可以操作本地文件系统.但是要加上theme(Hadoop文件系统用hdfs://,本地文件系统用file://) 1. 添加文件,目录 HDFS文件系统(需要加hdfs://): 因为我们在core-site.xml中配置了fs.default.name 所以所有和HDFS打交道的命令都不需要加上前缀hdfs://192.168.129.35:9000 比如我们要在Hadoop 文件系统中创建一个目录叫

HDFS简介及用C语言访问HDFS接口操作实践

一.概述 近年来,大数据技术如火如荼,如何存储海量数据也成了当今的热点和难点问题,而HDFS分布式文件系统作为Hadoop项目的分布式存储基础,也为HBASE提供数据持久化功能,它在大数据项目中有非常广泛的应用. Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统.HDFS是Hadoop项目的核心子项目,是一种具有高容错性.高可靠性.高可扩展性.高吞吐量等特征的分

分布式文件系统HDFS设计

<Hadoop 权威指南>上用这么一句话来描述HDFS: HDFS is a filesystem designed for storing very large files with streaming data access patterns, running on clusters of commodity hardware. 有几个关键性的词组:Very large files,Streaming data access,以及Commodity hardware.解下来一个一个解释.

Hadoop分布式文件系统HDFS的工作原理详述

Hadoop分布式文件系统(HDFS)是一种被设计成适合运行在通用硬件上的分布式文件系统.HDFS是一个高度容错性的系统,适合部署在廉价的机器上.它能提供高吞吐量的数据访问,非常适合大规模数据集上的应用.要理解HDFS的内部工作原理,首先要理解什么是分布式文件系统. 1.分布式文件系统 多台计算机联网协同工作(有时也称为一个集群)就像单台系统一样解决某种问题,这样的系统我们称之为分布式系统. 分布式文件系统是分布式系统的一个子集,它们解决的问题就是数据存储.换句话说,它们是横跨在多台计算机上的存

从HDFS看分布式文件系统的设计需求

分布式文件系统的设计需求大概是这么几个:透明性.并发控制.可伸缩性.容错以及安全需求等.我想试试从这几个角度去观察HDFS的设计和实现,可以更清楚地看出HDFS的应用场景和设计理念.     首先是透明性,如果按照开放分布式处理的标准确定就有8种透明性:访问的透明性.位置的透明性.并发透明性.复制透明性.故障透明性.移动透明性.性能透明性和伸缩透明性.对于分布式文件系统,最重要的是希望能达到5个透明性要求: 1)访问的透明性:用户能通过相同的操作来访问本地文件和远程文件资源.HDFS可以做到这一

分布式文件系统HDFS体系

系列文件列表: http://os.51cto.com/art/201306/399379.htm 1.介绍 hadoop文件系统(HDFS)是一个运行在普通的硬件之上的分布式文件系统,它和现有的分布式文件系统有着很多的相似性,然而和其他的分布式文件系统的区别也是很明显的,HDFS是高容错性的,可以部署在低成本的硬件之上,HDFS提供高吞吐量地对应用程序数据访问,它适合大数据集的应用程序,HDFS放开一些POSIX的需求去实现流式地访问文件数据,HDFS开始是为开源的apache项目nutch的

Hadoop白皮书(1):分布式文件系统HDFS简介

Hadoop 分布式文件系统 (HDFS) 是运行在通用硬件上的分布式文件系统.HDFS 提供了一个高度容错性和高吞吐量的海量数据存储解决方案.HDFS 已经在各种大型在线服务和大型存储系统中得到广泛应用,已经成为各大网站等在线服务公司的海量存储事实标准,多年来为网站客户提供了可靠高效的服务. 随着信息系统的快速发展,海量的信息需要可靠存储的同时,还能被大量的使用者快速地访问.传统的存储方案已经从构架上越来越难以适应近几年来的信息系统业务的飞速发展,成为了业务发展的瓶颈和障碍. HDFS 通过一

性能-分布式文件系统hdfs 有iops和tps这两种指标的测试吗?

问题描述 分布式文件系统hdfs 有iops和tps这两种指标的测试吗? 分布式文件系统hdfs 有iops和tps这两种指标的测试吗? 还有淘宝分布式文件系统,有iops的测试指标不?谢谢大神们