MaxCompute Tunnel 技术原理及开发实战是怎样的

14次阅读
没有评论

这篇文章给大家介绍 MaxCompute Tunnel 技术原理及开发实战是怎样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

一、MaxCompute Tunnel 技术原理

MaxCompute Tunnel 技术原理及开发实战是怎样的

上图是架构图,可以看到对外的服务提供了一个统一的 SDK,然后集成到所有的外部服务里。在服务端,提供的服务可以大概分为 API 层和执行层。API 层有两个集群 Frontend 集群会负责控制流的介入,Tunnel 集群负责数据。在执行层分为控制集群和计算集群,控制集群会负责资源管控,meta 的管理,和权限管理这些功能,计算集群就负责实际的计算和存储。

可以看到,Tunnel 是属于 API 层的一个组件,专门负责数据的上传和下载。为什么这么做, 是因为这是一个大数据的系统,所以在 MaxCompute 上跑一个 SQL 其实就发了一条控制指令。由于目标的场景是一个大数据量的查询,比如说十亿条这种量级的,这是一个规模比较大的操作,如果在这个场景下想做数据的同步,就不能像 MySQL 传统输入一样通过 insert into,因为 insert into 走控制集群这个链路是非常浪费资源的,同时也会有一些限制。一次一行的效率特别低,因此设计了分开的控制流和数据流。

Tunnel 集群负责的功能是在 SDK 层提供了 Tunnel 的 API,让用户可以通过一个结构化的方式去访问数据。另外,Tunnel 是对外放出来的唯一的数据接口,会对用户写进来的数据做格式检查和权限校验,控制数据安全。同时会保证用户通过 Tunnel 写出来的数据用 SQL 可读,不用担心比如 SQL 读不了写进来的数据,或者写的数据和 SQL 读出来的值有差异。

另外一点,Tunnel 是直接访问存储层的,MaxCompute 在底层的存储是一个分布式文件系统,Tunnel 是直接访问这个文件系统的,这样在性能上就有了保证。也就是说,Tunnel 在理想情况下是可以保证单并发达到 10 兆每秒的吞吐能力,通过假并发也是可以水平扩展整个吞吐能力。

二、MaxCompute Tunnel 丰富的生态

MaxCompute 有非常丰富的生态,推荐首先要看一下有什么工具,或者有哪些服务可以做,建议优先使用一些成熟的服务,尽量不要自己先写代码。

官方的 SDK 有 Java SDK 和 Python SDK。

另外,官方还提供了三种工具。MaxCompute 客户端是一个命令行工具,在数据同步这方面支持用户把一个本地文件上传到 MaxCompute 里面,也可以通过下载一张表到一个本地文件上。MaxCompute Studio 是一个 idea 插件,它也支持文件上传下载这样的方式。MMA2.0 迁移工具是最近推出的一个工具,可以帮助用户把数据从现有的大数据系统里迁移到 MaxCompute 上,这些工具都是基于 SDK 开发的,都是通过 SDK 传输。

除了工具以外,MaxCompute 在第三方服务上也是集成的,比如云上的数据通道图,SLS(阿里云的日志服务),DataHub(数据通道),他们都是原生就支持 MaxCompute 投递的,Kafka 也是有官方的插件。

流计算方面,Blink,Spark 也都是有 MaxCompute 同步插件的。数据同步服务方面,DataWorks 的数据同步,实时同步和离线同步,都是支持 MaxCompute 同步的。

总结一下,如果有数据同步的需求,最好先看一下现有的服务是不是可以满足需求。如果觉得都满足不了,想要自己开发的话,可以看一下 SDK 是可以有哪些功能,和使用上的一些注意事项。

三、Tunnel 功能简介

MaxCompute Tunnel 技术原理及开发实战是怎样的

上图是 Tunnel 总体功能的表格。现在有两套 API,分批量数据通道和流式数据通道。

批量数据通道目标的场景单并发的吞吐量很大,这种理想的场景是传量大的数据,一次一批,QPS 和并发都不能特别高,但是单并发的吞吐量可以做得很大,这个在 API 上也有一些优化。

流式数据通道是新提供的一种服务,因为现在的上游服务大多数都是一些流式服务灌进来的,也就是说单并发可能流量没有那么大,但是都是比较细碎的数据,这种情况如果用批量数据通道会遇到很多限制。最明显的就是小文件问题,用批量数据通道写特别碎的数据进来会产生大量的碎片文件,跑 SQL 查询就会非常慢,用 Tunnel 下载也会非常慢。针对这种场景平台提供了流式数据通道服务,通过流式数据上来可以写得特别碎,一行写一次也可以,不需要担心小文件的问题,也不用担心并发的问题,并发可以无限多。流式数据通道是不限并发的,但是批量是限并发的。

从表格中可以看到,通过 Tunnel 是可以访问这几种资源的:普通表,Hash Clustered 表,Range Clustered 表和 Transactional 表,最后是查询结果,这些都是可以下载的;普通表两种上传都支持;Hash Clustered 表和 Range Clustered 表并不适合 Tunnel 去写,因为数据在存储上需要做一个系统,而且会排序,而 Tunnel 集群规模没有计算机集群那么大,没有这个能力去做排序。因此,这种表一般经典的用法就是先写一张普通表,然后通过 SQL 做一个 insert overwrite,生成一张 Hash Clustered 表或者 Range Clustered 表。

流式上传在架构上做了升级,有一个异步处理的机制,会把用户写进来的数据在后台进行加工,所以后面会支持 Hash Clustered 表。

Transactional 表是说,MaxCompute 的普通表是不支持 update 或者 delete 的,系统最近在 SQL 上支持了这个语法,就是用户可以 update,也可以 delete,也可以支持 transaction。批量上传的 API 现在是支持 Transactional 表,但是只支持 append,也称为 insert into,它是不能从 Tunnel 的 API 上去 update 的。流式的也正在规划中,后续可能会连 update 也一起完成。批量的可能不会做 update 这个功能,但是批量的现在就可以 append 这种 Transactional 表。

查询结果就是说,如果跑一个 SQL,在 odpscmd 客户端或者 DataWorks 上对查询结果有 1 万条的限制。但是这个查询结果可以通过 Tunnel 下载,就不受条数限制,可以下载完整的查询结果到本地。

总而言之,如果使用 SDK 的话,就可以做到表格里的这些功能。

四、SDK 的使用方式

1)基本配置

如果想开发的话有哪些东西需要配置,不管上传、下载,还是流式上传,这些配置都是一样的。首先需要创建一个 ODPS 对象和一个 Table Tunnel 对象。如果想用 SDK 跑 SQL,要创建 ODPS;TableTunnel 是 Tunnel 入口的一个类,所有的功能都是从这个类发起的。

然后看具体的配置项,图中左侧列举的是比较关键的几个。Access ID 和 Access Key 就是账号信息,阿里云通过这个来表示一个账号。

ODPS Endpoint 是服务的一个入口,现在在公共云上应该有 21 个 region,包括金融云和政务云,中国有 7 个,海外有 14 个。每个 region 的 endpoint 是不一样的,使用时需要找到自己购买的 region 服务,并正确填写 endpoint 进去。

Tunnel Endpoint 是可选的,如果不填,系统会通过所填的 ODPS endpoint 自动路由到对应的 Tunnel endpoint 上。在公共云上的网络环境比较复杂,分公网域名和内网域名,内网域名还分经典网络和 VBC,也许会有路由的 endpoint 网络不通这种场景,这个时候平台提供了一个接口,用户可以把能访问的 Tunnel endpoint 填进来,这样就会优先用所填的 Tunnel endpoint 而不会用路由的,但 99% 的情况下是不用填。

Default project 这个参数是在弹内经常用的。MaxCompute 的权限管理非常丰富,比如如果在公共云上有多个 project,想要控制数据在跨 project 流动的话,就可以通过这个参数来配置。ODPS 里设置的 Default Project 可以理解为是原 project,下面的 create Stream Session 里面又有一个 project,即要访问数据所在的 project。如果这两个 project 不一样,系统会检查这个权限,用户是否可以访问目标 project 的数据。如果跑 SQL,它也会根据原 project 来控制资源的使用。如果只有一个,这两个填成一样的就可以。

一般来说,Access ID,Access Key, 和 ODPS Endpoint 是必选的,Tunnel Endpoint 可选,Default project 如果只有一个只填一个就行了。

2)具体的上传接口

接下来展示具体的上传接口。首先看批量上传。

【批量上传】

上图中可以看到,批量上传的流程是先创建一个 upload session (第 31 行), 然后 open writer,用 writer 去写数据,然后 close,再 upload session 加 commit。

Upload session 可以理解为是一个会话的对象,类似于 transaction 的概念。这次上传是以 upload session 为单位的,即最终 upload session commit 成功了这些数据才是可见的。在一个 upload session 内部可以 open 多个 writer,并且多个 writer 可以并发上传,但是 writer 是有状态的,需要给它指定一个不重复的 block ID,避免产生覆盖。Upload session 也是有状态的,没有 commit 就不可见;如果 commit 成功了,这个 session 就结束了,暂时就不能再去 open writer。Writer 的实现原理是 open 一个 writer 请求,系统会发一个 HTP 请求到服务端,然后保持这个长链接,写数据时平台会实时地把数据写到服务端,writer 是写一个临时目录。根据这个机制可以看到,如果 writer 或者 close 失败了,就相当于这个长连接断了。所以 writer 和 close 这两个接口是不能重试的,如果 writer 中间有任何阶段失败了,就需要重新写。

除了正常的 commit 之外,MaxCompute 还支持让用户检查数据正确性。比如用户 open 了五个 writer,commit 的时候可以把这五个 ID 当成例子上传确认。如果检查到服务端与这个例子不一致,commit 就会报错。

总结一下,基本的功能点有:

批量上传是有状态并发;

commit 成功后数据才可见;

支持 insertOverwrite, 也支持 InsertInto 语义。

Insert overwrite 指 commit 的时候支持使用某个 upload session 的数据直接 overwrite 掉一整个分区或者一张表,类似 SQL 的 Insert 和 Overwrite 的功能。

这个功能也有使用限制。

第一,一个 upload session 不能超过 2 万个 Block。

第二,Block ID 会导致数据覆盖。

第三,upload session 24 小时过期,因为 writer 数据是写在存储的临时目录的,临时数据有回收周期,超过 24 小时,writer 写过的数据就有可能被回收掉,这个就限制了 upload session 的生命周期。

第四,如果 open 了一个 writer 但是不写数据,就相当于占了一个空闲链接,服务端会把这个链接直接断掉。

【流式上传】

接下来看一下流式上传的接口。前文中有提到,流式上传是在 API 上做了简化,还去掉了并发的限制和时间的限制。

MaxCompute Tunnel 技术原理及开发实战是怎样的

图中可以看到,接口是 CreateStreamUploadSession,写数据的从 writer 改成了 RecordPack。所谓的 pack 其实相当于一个内存里的 buffer,可以用 pack.append(record),比如判断 size 只需要判断这个 buffer 足够大或者条数足够多,然后再 flush 就可以了(42 到 44 行)。Pack 并不是写网络的,而是写内存的。因此,不同于 writer,flush 是可以重试的,因为数据都在内存里。并且 Pack 也没有状态,不需要关心 writer 的 Block ID 等等。另外,因为 flush 成功后数据就可见了,所以 session 也没有 commit 这种状态。因此,如果要开发分布式服务,这个相比批量上传就简化很多,没有太多的限制,只需要确保本机内存是否够大就好了。

同时系统还支持了内存的复用,即 flush 过以后的 pack 是可以复用的。系统会把上次写满的内存留住,避免产生 GC。流式上传只支持 InsertInto,因为在 API 上没有另外的接口,所以 InsertOverwrite 语义现在是不支持的。另外,流式服务是支持异步数据处理的,也就是除了保证用户通过流式写上来的数据可读之外,服务端还有一个机制能识别出来新写进来的数据和存量数据,可以对新写出来的数据做一些异步的处理,比如 zorder by 排序和墨纸。

ZorderBy 排序是指一种数据的组织方式,可以把存在 MaxCompute 的数据按某些规则重新组织一遍,这样查询的时候效率会非常高。墨纸是指支持把数据在后端重新写一遍,把一些很碎的数据重新组织成存储数据存储效率较高的数据文件。在这个基础上还可以做一些排序和其他的处理,后续会再加更多的功能。

流式上传也会有一些限制。首先在写的时候,系统会对这个表加锁,流式写的时候其他的操作是不能写的,比如 InsertInto 和 Insert Overwrite 是会失败的,要把流式停掉之后才能正常写。另外,DDL 有一些延迟,如果要 drop table 或者 rename table 的话,可能 drop 完还能成功写几条数据,会有最多 60 秒的延迟。如果有这种场景,建议先把流式停掉再去 drop 或者 rename。

【批量下载】

接下来介绍批量下载的接口。

MaxCompute Tunnel 技术原理及开发实战是怎样的

图中可以看到,TableTunnel 创建了一个叫 downloadSession 的对象。可以得到 record Count,指一个分区或者一张表的总行数。下面是 open reader,可以和批量上传对应起来: reader 和 writer; uploadSession 和 downloadSession。Openreader 是按 record 来区分的,比如有 1000 行,可以分十个 100 行并发下载。Download 支持列裁剪,也就是可以下载其中几列。下载查询结果就把 TableTunnel 入口类改成 InstanceTunnel,odps 也是一样,53 行就不是 project, table 了,是一个 InstanceID。

使用限制方面,和批量上传类似,DownloadSession 限制也是 24 小时,因为它也有临时文件。同样空闲链接 120 秒超时,另外还有 Project 级别并发限流,性能受碎片文件影响。

五、最佳实践

如果并发很高,不推荐走批量接口,因为并发限流是 project 级别的,如果上传或者下载的限额打满,整个 project 的批量上传都会失败。

这种接口推荐把并发降下来,然后充分利用并发约 10 兆每秒的吞吐能力。流式因为架构上的原因,是不受并发限制的。QPS 不建议批量上传,因为碎片文件的问题,不建议用特别高的 QPS 来用批量的接口写数据。如果 QPS 和并发都不高,使用这三种方式都不会很受限制。

另外有几个场景,transaction 现在支持批量上传,流式上传后续会跟进。目前流式上传不支持 Insert Overwrite,可能后面也不一定会开发,因为这个场景明显是一个批量的语义。

关于 MaxCompute Tunnel 技术原理及开发实战是怎样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。