??斗地主捕鱼电竞提现秒到 广告位招租 - 15元/月全站展示
??支付宝搜索579087183领大额红包 ??伍彩集团官网直营彩票
??好待遇→招代理 ??伍彩集团官网直营彩票
??络茄网 广告位招租 - 15元/月全站展示
ES学习笔记-elasticsearch-hadoop导入hive数据到es的实现探究

转载   sbp810050504   2018-10-10   浏览量:108


各个业务数据“汇总到hive, 经过ETL处理后, 导出到数据库“是大数据产品的典型业务流程。这其中,sqoop(离线)和kafka(实时)几乎是数据总线的标配了。

但是有些业务也有不标准的,比如hive数据导入到ES. hive数据导入到ES, 官方组件是elasticsearch-hadoop. 其用法在前面的博客中已有介绍。 那么其实现原理是怎样的呢? 或者说, es-hadoop这家伙到底是怎么把hive表的数据弄到es中去的? 为了弄清楚这个问题, 我们首先需要有一个本地的源码环境。

s1: 下载elasticsearch-hadoop源码。

git clone https://github.com/elastic/elasticsearch-hadoop.git

s2: 编译源码。直接编译master即可。

gradlew distZip

s3: 编译成功后,导入到intellij。 这里注意导入build.gradle文件,就像maven项目导入pom文件一样。

s4: 在intellij中编译一次项目。

s5: 在本地启动一个es, 默认的端口即可。

s6: 运行测试用例AbstractHiveSaveTest.testBasicSave()。 直接运行是会报错的, 需要略微修改一下代码,添加一个类的属性:

    @Cla***ule
    public static ExternalResource hive = HiveSuite.hive;

如果是在windows环境下,需要新建packageorg.apache.hadoop.io.nativeio, 然后在该package下建立NativeIO.java类。 修改代码如下:

// old
    public static boolean access(String path, Acce***ight desiredAccess)
        throws IOException {
      return access0(path, desiredAccess.acce***ight());
    }

// new 
    public static boolean access(String path, Acce***ight desiredAccess)
        throws IOException {
      return true;
    }

这样就运行起来了一个本地的hive到es的代码??梢詃ebug,了解详细流程了。

在elasticsearch-hadoop这个比较庞大的项目中,修改代码也比较麻烦,因此可以单独建立一个项目hive-shgy, 然后改造这个测试类, 跑通testBasicSave()。

由于对gradle不熟悉, 还是建立maven项目, 项目的依赖如下:

    <repositories>
        <repository>
            <id>spring-libs</id>
            <url>//repo.spring.io/libs-milestone/</url>
        </repository>
    </repositories>
    <dependencies>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>2.6.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>     <!-- 桥接:告诉Slf4j使用Log4j2 -->
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-cli</artifactId>
            <version>1.2.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>6.3.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

这里用到了log4j2, 所以日志类放在前面。

接下来迁移测试代码。迁移的原则是 若无必要,不新增类。 如果只用到了类的一个方法,那么只迁移一个方法。 这里的测试代码迁移,其实就是围绕HiveEmbeddedServer2来构建的。个人感觉这里比较巧妙的是,通过HiveEmbeddedServer2启动了一个嵌入式的hive实例。能够执行hive sql, 而且是在一个jvm中,对于研究hive的实现原理来说,太酷了。

基础的环境搭建好后,就可以研究elasticsearch-hadoop的源码了, 先看源码的结构:

elasticsearch-hadoop/hive/src/main/java/org/elasticsearch/hadoop/hive$ tree .
.
├── EsHiveInputFormat.java
├── EsHiveOutputFormat.java
├── EsSerDe.java
├── EsStorageHandler.java
├── HiveBytesArrayWritable.java
├── HiveBytesConverter.java
├── HiveConstants.java
├── HiveFieldExtractor.java
├── HiveType.java
├── HiveUtils.java
├── HiveValueReader.java
├── HiveValueWriter.java
├── HiveWritableValueWriter.java
└── package-info.java

0 directories, 14 files

这里简要描述一下elasticsearch-hadoop将hive数据同步到es的原理, Hive开放了StorageHandler的接口。通过StoreageHandler, 可以使用SQL将数据写入到es,同时也可以使用SQL读取ES中的数据。 所以, 整个es-hive, 其入口类为EsStorageHandler, 这就是整个功能的框架。 了解了EsStorageHandler后,接下来很重要的一个类就是EsSerDe, 是序列化反序列化的功能组件。它是一个桥梁,通过它实现ES数据类型和Hive数据类型的转换。 核心类就是这两个了。

了解了代码的原理及结构,就可以自己仿照实现hive数据同步到mongo, hive数据同步到redis 等其他的功能了。 这样做的好处是业务无关, 一次开发,多次使用。方便管理维护。

最后总结一下,本文没有直接给出答案, 而是记录了寻找答案的过程。 通过这个过程,学会将hive数据同步到其他NoSQL中,这个实践比理解源码更重要。

转载自://blog.51cto.com/sbp810050504/2298491

招聘 不方便扫码就复制添加关注:程序员招聘谷,微信号:jobs1024



20180607早课记录26-Hive
1.hive哪些sql会触发mrjob带聚合函数,某些insert,还有createtableasselect2.createtabletasselect...这样的SQL会不会创建mrjob会3.hive的数据分为哪两块分别存储哪里元数据和真实数据,分别存储在mysql,hdfs中4.一般工作中,udf编写是很多的,那么怎样临时生效,永久生效?临时生效就是addjar,然后createtempo
20180604早课记录24-Hive
1.hive数据分为哪两块?分别存储在哪?元数据和数据本身;mysql等关系型数据库和hdfs2.hive的建表SQL语句你们觉得里有哪些内容?(不光光是字段的定义)定义分区,分隔字符,内外表之分3.默认的换行符和分割符是什么?/t/r4.加载数据或本地数据到hive表或者覆盖hive表,语法是什么?LOADDATALOCALINPATH'xxx'OVERWRITEINTOTABL
Hive环境搭建及简单使用
前言:上篇文章我们介绍了Hive源码编译的相关内容,本篇文章将主要介绍Hive环境的搭建步骤及简单使用。1.下载安装包并解压#下载地址//archive.cloudera.com/cdh5/cdh/5/选择hive-1.1.0-cdh5.7.0.tar.gz包,也可用我们上篇文章编译来的包[[email protected]~]$lltotal32drwxrwxr-x4hadoophado
Hive 高级编程——深入浅出学Hive
目录:初始HiveHive安装与配置Hive内建操作符与函数开发HiveJDBChive参数Hive高级编程HiveQLHiveShell基本操作hive优化Hive体系结构Hive的原理配套视频课程第一部分:产生背景
hadoop hive的安装
以下是本人hive的安装过程:hive是Hadoop中最常用的工具,可以说是必装工具。按apache官方文档,推荐使用svn下载后编译,但build时,因为依赖关系,整了很久,下了很多包也没有成功。推荐使用tar.gz包,直接安装...
20180531早课记录22-Hive
1.hive是什么?基于Hadoop的一个数据仓库工具2.hive的默认使用什么数据库?生产上我们一般用什么?默认使用derby生产使用MySQL3.hive的元数据存储在哪?数据存储在哪?MySQLHDFS4.hive的SQL语法和什么类似和MySQL语法类似5.hive底层执行计算引擎是什么MapReduce/tez/spark6.hive使用mysql做元数据存储,那么部署过程中,注意什么注
hive错误FAILED:SemanticException Error10041 :Nopartitionpredicatefoundfor如何解决?
hive错误FAILED:SemanticException[Error10041]:Nopartitionpredicatefoundfor如何解决?
关于Hive调优的实例讲解
关于Hive调优的实例讲解。1javaheapspace异常,如果仅设置:setmapredmapchildjavaopts=-Xmx2048m;
如何理解HIve函数row_number?
如何理解HIve函数row_number?使用语法:使用语法:ROW_NUMBER()OVER(PARTITIONBYCOLUMNORDERBYCOLUMN)
hive中leftjoin、leftouterjoin和leftsemijoin的区别详解
hive中leftjoin、leftouterjoin和leftsemijoin的区别详解。先说结论,再举例子。