Apache Flink 从 1.9.0 版本开始增加了与 Hive 集成的功能,用户可以通过 Flink 来访问 Hive 的元数据,以及读写 Hive 中的表。本文将主要从项目的设计架构、最新进展、使用说明等方面来介绍这一功能。
Flink on Hive 介绍
Hive 是大数据领域最早出现的 SQL 引擎,发展至今有着丰富的功能和广泛的用户基础。之后出现的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了与 Hive 集成的功能,从而方便用户使用现有的数据仓库、进行作业迁移等。因此我们认为提供与 Hive 交互的能力对于 FlinkSQL 也是非常重要的。
设计架构
与 Hive 集成主要包含了元数据和实际表数据的访问,因此我们会从这两方面介绍一下该项目的架构。
元数据
为了访问外部系统的元数据,Flink 提供了 ExternalCatalog 的概念。但是目前 ExternalCatalog 的定义非常不完整,基本处于不可用的状态。因此,我们提出了一套全新的 Catalog 接口来取代现有的 ExternalCatalog。新的 Catalog 能够支持数据库、表、分区等多种元数据对象;允许在一个用户 Session 中维护多个 Catalog 实例,从而同时访问多个外部系统;并且 Catalog 以可插拔的方式接入 Flink,允许用户提供自定义的实现。下图展示了新的 Catalog API 的总体架构。
创建 TableEnvironment 的时候会同时创建一个 CatalogManager,负责管理不同的 Catalog 实例。TableEnvironment 通过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。
目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将所有元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。另一方面,HiveCatalog 也可以用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 作为持久化存储使用,写入 Hive Metastore 中的元数据并不一定是 Hive 所支持的格式。一个 HiveCatalog 实例可以同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据创建不同的实例。
另外,我们设计了 HiveShim 来支持不同版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。
表数据
我们提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽可能的复用了 Hive 本身的 Input/Output Format 和 SerDe 等类,这样做的好处一方面是减少了代码重复,更重要的是可以最大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。
与 HiveCatalog 类似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。
项目进展
Flink 与 Hive 集成的功能会在 1.9.0 版本中作为试用功能发布,用户可以通过 Table API 或者 SQL Client 的模式与 Hive 进行交互。下面列出的是在 1.9.0 中已经支持的功能:
提供简单的 DDL 来读取 Hive 元数据,比如 show databases、show tables、describe table 等。
可通过 Catalog API 来修改 Hive 元数据,如 create table、drop table 等。
读取 Hive 数据,支持分区表和非分区表。
写 Hive 数据,支持非分区表。
支持 Text、ORC、Parquet、SequenceFile 等文件格式。
支持调用用户在 Hive 中创建的 UDF。
由于是试用功能,因此还有一些方面不够完善,下面列出的是在 1.9.0 中缺失的功能:
不支持INSERT OVERWRITE。
不支持写分区表。
不支持ACID表。
不支持Bucket表。
不支持View。
部分数据类型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。
如何应用
添加依赖
使用 Flink 与 Hive 集成的功能,用户首先需要添加相应的依赖。如果是使用 SQL Client,则需要将依赖的 jar 添加到 Flink 的 lib 目录中;如果使用 Table API,则需要将相应的依赖添加到项目中(如pom.xml)。
如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是针对不同版本所需的依赖。
其中 flink-shaded-hadoop-2-uber 包含了 Hive 对于 Hadoop 的依赖。如果不用 Flink 提供的包,用户也可以将集群中使用的 Hadoop 包添加进来,不过需要保证添加的 Hadoop 版本与 Hive 所依赖的版本是兼容的(Hive 2.3.4 依赖的 Hadoop 版本是 2.7.2;Hive 1.2.1 依赖的 Hadoop 版本是 2.6.0)。
依赖的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用户集群中 Hive 所提供的 jar 包,详情请见支持不同的 Hive 版本。
配置 HiveCatalog
要与 Hive 交互,必须使用 HiveCatalog,下面介绍一下如何配置 HiveCatalog。
SQL Client
使用 SQL Client 时,用户需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的“catalogs”列表中可以指定一个或多个 Catalog 实例。以下的示例展示了如何指定一个 HiveCatalog:
catalogs: # A typical catalog definition looks like: - name: myhive type: hive hive-conf-dir: /path/to/hive_conf_dir hive-version: 2.3.4 |
其中 name 是用户给每个 Catalog 实例指定的名字, Catalog 名字和 DB 名字构成了 FlinkSQL 中元数据的命名空间,因此需要保证每个 Catalog 的名字是唯一的。type 表示 Catalog 的类型,对于 HiveCatalog 而言,type 应该指定为 hive。hive-conf-dir 用于读取 Hive 的配置文件,用户可以将其设定为集群中 Hive 的配置文件目录。hive-version 用于指定所使用的 Hive 版本,可以设定为 2.3.4 或者 1.2.1。
指定了 HiveCatalog 以后,用户就可以启动 sql-client,并通过以下命令验证 HiveCatalog 已经正确加载。
Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive; |
其中 show catalogs 会列出加载的所有 Catalog 实例。需要注意的是,除了用户在sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 还会自动加载一个 GenericInMemoryCatalog 实例作为内置的 Catalog,该内置 Catalog 默认名字为 default_catalog。
使用 use catalog 可以设定用户 Session 当前的 Catalog。用户在 SQL 语句中访问元数据对象(如 DB、Table 等)时,如果不指定 Catalog 名字,则 FlinkSQL 会在当前 Catalog 中进行查找。
Table API
下面的代码展示了如何通过 TableAPI 来创建 HiveCatalog,并注册到 TableEnvironment。
String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/path/to/hive_conf_dir"; String version = "2.3.4"; TableEnvironment tableEnv = …; // create TableEnvironment HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name); |
将 HiveCatalog 注册到 TableEnvironment 以后,就可以在通过 TableEnvironment 提交 SQL 的时候访问 HiveCatalog 中的元数据了。与 SQL Client 类似, TableEnvironment 也提供了 useCatalog 接口让用户设定当前 Catalog。
读写 Hive 表
设置好 HiveCatalog 以后就可以通过 SQL Client 或者 Table API 来读写 Hive 中的表了。
SQL Client
假设 Hive 中已经有一张名为 src 的表,我们可以用以下的 SQL 语句来读写这张表。
Flink SQL> describe src; root |-- key: STRING |-- value: STRING Flink SQL> select * from src; key value 100 val_100 298 val_298 9 val_9 341 val_341 498 val_498 146 val_146 458 val_458 362 val_362 186 val_186 …… …… Flink SQL> insert into src values ('newKey','newVal'); |
Table API
类似的,也可以通过 Table API 来读写上面提到的这张表。下面的代码展示了如何实现这一操作。
TableEnvironment tableEnv = …; // create TableEnvironment tableEnv.registerCatalog("myhive", hiveCatalog); // set myhive as current catalog tableEnv.useCatalog("myhive"); Table src = tableEnv.sqlQuery("select * from src"); // write src into a sink or do further analysis …… tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')"); tableEnv.execute("insert into src"); |
支持不同的 Hive 版本
Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前我们只针对这两个版本进行了测试。使用 SQL Client 时,如果用户没有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我们会自动检测 classpath 中的 Hive 版本。如果检测到的 Hive 版本不是 2.3.4 或 1.2.1 就会报错。
借助 Hive 兼容性的保证,其它不同的小版本也比较可能是可以正常工作的。因此,如果用户使用的 Hive 小版本与我们所支持的不同,可以指定一个支持的版本来试用与 Hive 集成的功能。比如用户使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者代码中将 Hive 版本指定为 2.3.4。
执行模式与 Planner 的选择
Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工作,因此如果用户想要使用 Hive 的 TableSink,需要将执行模式设置为 batch。
Flink 1.9.0 增加了新的 blink planner,由于 blink planner 相比于原来的 planner 功能更加全面,因此我们建议在使用 FlinkSQL 与 Hive 集成时使用 blink planner。后续新的功能也可能会只支持 blink planner。
使用 SQL Client 时可以像这样在 sql-client-defaults.yaml 中指定执行模式和 planner:
execution: # select the implementation responsible for planning table programs # possible values are 'old' (used by default) or 'blink' planner: blink # 'batch' or 'streaming' execution type: batch |
对应的 Table API 的写法如下:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); |
后期规划
我们会在 Flink 后续版本中进一步完善与 Hive 集成的功能,预计会在 1.10.0 版本中实现 Production-Ready。我们在后续版本中计划开展的工作包括:
更完整的数据类型支持
支持写分区表,包括静态和动态分区
支持 INSERT OVERWRITE
支持 View
更完整的 DDL、DML 的支持
支持 Hive 的 TableSink 在 streaming 模式下工作,以便用户将流式数据写入到 Hive 中
测试并支持更多的 Hive 版本
支持 Bucket 表
性能测试与优化
上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理