查看原文
其他

懂车帝准实时指标体系架构及应用

连家漯 DataFunSummit
2024-09-11

导读 本文将介绍懂车帝准实时指标体系架构,及其建设过程中所解决的主要问题。

今天的介绍会围绕下面四点展开:

1. 业务背景及挑战

2. 技术架构如何应对业务挑战

3. 遇到的问题及解决办法

4. 成果及规划

分享嘉宾|连家漯 懂车帝 资深数仓研发工程师 

编辑整理|胡胜达 蔚来汽车

内容校对|李瑶

出品社区|DataFun


01

业务背景及挑战

首先来介绍一下懂车帝二手车业务,及二手车门店数仓面临的挑战。

1. 懂车帝二手车业务介绍

懂车帝二手车业务当前分为线上媒体和线下门店两部分。线下门店是 22 年 5 月启动的新业务,目标是通过自营门店为用户提供值得信赖的二手车交易服务,最终成为受信赖的信息交易服务平台。线上媒体主要是做信息服务,而线下门店主要聚焦交易方向。

2. 二手车门店数仓面临的挑战

由于二手车线下门店是新业务,业务发展迅速,系统迭代会比较频繁,数仓需求交付节奏也会比较快。另一个问题是架构不统一,当前的实时和离线是两套架构、两套代码,并且指标查询也有多套方案。这些因素的叠加导致了指标一致性差的问题,这个问题无论是在成熟的还是快速发展中的初创公司都会发生。成熟的公司由于发展时间较久,代码和人员的变更会使该问题更加严重。

门店数仓和线上媒体业务有一些区别,门店数仓主要做交易,可以理解为偏业务的数仓,线上媒体则是偏流量的数仓。业务数仓和流量数仓的区别主要有以下两点:
  • 业务数仓的业务流程会比较复杂,数据变化快,一个流程从开始到结束,各个节点会发生频繁的变化。
  • 流量数仓数据量比较大,但是数据一旦产生后(主要是以流量日志的形式),数据本身不会发生太大变化。

02

技术架构如何应对业务挑战

接下来介绍我们的技术架构是如何应对业务挑战的。

要解决指标一致性问题,我们从以下两个方面入手:
  • 存储计算层的统一,统一实时离线架构;
  • 查询服务层的统一,统一指标的生产、消费。
必须同时解决上述两个问题,才能最终解决指标一致性问题。如果只是单纯做了流批一体,在指标出口不做管理,是无法实现指标一致性的。

1. 统一实时离线架构

当前二手车门店数仓采用了离线、实时两套架构,两套代码分别支持离线和实时的计算,这样的处理会有以下问题:
  • 需要维护两套不同的计算代码,维护开发成本高;
  • 开发迭代过程中,非常容易出现指标口径不一致的问题。
应对这一问题,需要统一实时、离线的计算层和存储层。

(1)统一存储层

我们选用了湖仓一体的方案 Las 来统一存储层。

Las 是基于 Apache Hudi 深度定制的湖仓一体存储引擎。其主要特点如下:
  • 提供了湖仓一体可扩展、高可用、高性能的元数据服务,完全兼容 Hive Metastore,支持多种计算引擎。
  • 支持对海量数据的高效 Insert/Overwrite 能力,并且完全兼容 HSQL OLAP 能力。
  • 支持对历史数据的 Update/Delete 能力。
  • 支持对新增数据的 Upsert(主键)和 Append(非主键)能力。
  • 支持 Streaming Source/Sink,提供近实时分析能力,同时支持接入数据集看板。
Las 从用户的视角来看其实就是一张 Hive 表,底层也是基于 HDFS 做了存储。可以用 Flink 进行流式更新和写入,Spark 做批次的更新和写入。并且在 Las 表出现数据变化的时候,可以通过 Las 进行流式消费。存储层需要满足以上能力,才能实现流批一体在存储层的统一。

Las 借助 Catalog Service 为所有的查询统计引擎提供了统一的元数据视图,站在使用者的角度,使用 Las 表与使用 Hive 没有区别,可以通过 SQL 的方式统一访问。

Las 表的存储类型有 COPY_ON_WRITE 和 MERGE_ON_READ 两种。
COPY_ON_WRITE 目前很少使用,LAS 2.0 基本都是 MERGE_ON_READ 方式。MERGE_ON_READ 的存储方式和 Hbase 类似,对于流式的写操作,会先写 Delta Log,最终合并成一个版本,形成一个 Base File。并且在写文件之前会在内存里对数据进行排序,把排序后的数据块刷新到磁盘上,定期进行 Compaction 操作,这样的目的是为了让新的 Delta Log 形成新的 Base File,从而提升查询性能。

在读取的时候为了获取一个最新的、正确的数据版本,需要将 Deta Log 和Base File 合并,通过排序拿到最新版本的数据。这里会涉及数据可见性的问题,因为数据的写入是通过刷新的方式实现的,在流式的场景下,数据的可见性是与 Flink 任务的 Checkpoint 频率相关的。如果 Checkpoint 频率高,那么数据的时效性也会高,所以 Las 实现的是准实时的数据查询,而不是实时的。

Las 的表索引类型主要有 HIVE_BUCKET_INDEX 和 NON_INDEX 两种。如果要有高效的去重和 CRUD 能力,需要选择有主键的,如果没有主键,性能会比较差。这里需要注意的是,如果采用了 Insert Overwrite 的方式,是批量的插入操作,在引擎侧是不会做去重的,这时候需要在业务侧做去重。

除了前面提到的特性,Las 还有其它一些重要特性,也为实现流批一体起到了关键作用。

首先是多流列拼接,可以用来支持一张表按照不同的列、不同的优先级的 SLA 的保障。尤其是在宽表的场景下,每一列的数据可能会来源于不同的业务域,它们的就绪时间是不一样的。在批的场景下,SLA 的数据就绪时间是由最晚的一列数据产出时间决定的,这样会影响部分应用场景,因为有些业务场景是不关心个别产出时间比较晚的字段的。在这种情况下,如果能提前产出部分列的数据,就能提升业务使用的时效性。多流列拼接的底层实现比较简单,由于 Las 是支持主键更新的,可以用多个流往同一张表去写,这样就可以实现提前就绪的数据提前写入,后就绪的数据后写入。

另外,对于多流 join 大状态导致作业不稳定的问题,相信大家做实时作业的时候都会遇到。如果在数据量比较大的场景下用 Flink 去做多流 join,为了避免作业不稳定的问题,通常不会用 Flink 的大状态,而是用外部存储(比如 Redis)。但是这样又会引入一个新的问题,即需要自己去实现这个存取逻辑,并且需要考虑不同的流到达的时机不一样,进行流和流之间的匹配,这样会导致实现的成本很高,代码的逻辑比较复杂。所以我们可以不在 Flink 上做 join,而是在存储层做 join,可以用多个流往同一张表的同一个主键去写,等到数据写完后,就自然解决了这个问题。

另一个重要特性是支持自定义聚合逻辑。主键表默认的聚合逻辑是取最新的一条数据,但是部分场景下需要对某些字段做 sum/min/max/count 等自定义聚合操作,复杂场景下还会有 top20 等需求。这里是借助 Hive 的自定义聚合函数 UDF 实现的,这个能力在 Doris 里也是支持的。

接下来是归档的能力,在流转批的场景下需要提供归档能力,根据数据的时间戳归档到相应的数据分区里。

最后一个重要的能力是增量消费。增量消费 Las 表的时候,上游流式写入,下游也要能够流式消费。否则下游只能去扫描 Las 表,延迟会加大。所以增量消费是很重要的。

还有一个尚未上线的 Time Travel 能力,即支持查询历史任意时间戳全量数据。比如一张 ODS 表接了一张 MySQL 的全量表,如果想生成 MySQL 表每天的数据快照做历史数据回溯,采用目前批量处理的方式会存在一个问题,这个全量表会存很多份。数据不一定每天都会发生变化,只有部分天里会发生变化,如果每天都生成快照,就会有大量的数据冗余。如果具备了 Time Travel 的能力,可以支持按照任意时间戳查询任意的版本,替代快照表的功能,这样可以节省很多存储开销。

(2)统一计算层

统一计算层的目的是为了避免一套代码用两套逻辑来实现。当前 Las 支持 Spark SQL、Flink SQL 直接读写,理论上可以通过 Flink SQL 完成离线实时场景的计算。但是现状是 Flink SQL 的语义完备性与 Las 的兼容性都还存在诸多问题,所以目前尚未真正实现计算层的统一。我们的解决办法是,由于存储层是统一的,因此针对同一份逻辑,只让一个任务,一种语言来实现,不同的任务可以使用不同的语言。

2. 统一查询服务

指标数据产生后如何让下游的用户去消费指标其实就是所谓的查询层,这里主要是实现统一指标生产和统一指标消费。

(1)统一指标生产

通过指标拆解流程,实现指标口径清晰准确的定义。我们采用阿里的 OneData 指标拆解方法,按照业务板块、数据域、业务过程等对所有的指标进行拆解。通过这样的拆解,可以实现把一个指标明确绑定到一个业务过程上去,包括一些维度的限定我们也可以做统一的管理,以达到给定一个指标就可以很清楚是如何产生的。

有了指标的定义之后,需要建立指标生产的 SOP,明确指标口径负责人及生命周期管理,杜绝指标同名不同义、同义不同名等问题。

前面提到的主要是元数据的定义,元数据定义完成后需要完成指标和模型的绑定,这样下游才能知道。或者通过指标去路由到真正的物理表,查询到最终的指标结果。具体来说我们会将指标模型在 Dataleap 指标平台上进行指标定义和维度的绑定,以解决指标检索与路由的问题,帮助下游使用方准确获取到所需指标。

(2)统一指标消费

借助 Dataleap 平台,我们可以基于指标服务,通过配置化的方式去完成数据看板的开发。输入查询码后可以选择对应的维度,通过配置即可直接生产看板。看板暴露给用户的过程中,需要做元数据的查询和透传,查询元数据后可以获取到指标的定义、指标如何产生等信息,也就是用业务的语言告知用户指标的口径是什么。

最终架构如下:

在该架构中,将数据源集成到 Las 表,然后通过 FlinkSQL 或者 HSQL 做读写,最终同步到 ClickHouse。此外我们还会做一些天级别的快照作为存储,并提供一个数据服务层,包括 DataLeap 指标平台、OneService 数据服务和风神敏捷 BI 平台。其中 OneService 服务是为了解决一些数据未纳入指标平台的问题,比如明细数据,可以通过 OneService 服务获取。风神敏捷 BI 平台是为了满足用户的一些数据分析需求,可以直接通过 Presto 查询数据表。架构的最下游包括实时大屏、数据看板、订阅推送和自助分析这四类数据应用。

当前结构的优点主要包括:
  • 与现有框架无缝集成;
  • 分钟级延迟满足业务要求;
  • 规范化指标生产与消费;
  • 学习开发运维成本低。
同时,它也存在着一些缺点:
  • 无法满足秒级时延;
  • 功能完备性还有待提升空间;
  • 具有额外的存储开销。
03

遇到的问题及解决办法

在指标体系的构建过程中,我们主要解决了如下四个问题:
  • Las 读写性能差
    在 Las2.0 版本中,通过一系列优化,使性能得到了提升。
  • Flink SQL 多表复杂 JOIN 场景下存在性能问题
    通过 HSQL 微批以及 Las 多流列拼接等方式解决了该问题。
  • 微批场景下 HSQL 调度等待
    将 HSQL 放到常驻进程的 Spark 集群上执行,避免申请资源时的等待。
  • 历史数据回溯
    新增 ODS 的 Hive 数据分区表,用于异常回溯。在后续,希望通过 Time Travel 的能力进一步改善,避免在 ODS 层存储大量快照。
04

成果及规划

目前成果主要体现在以下几点:
  • 手车门店所有指标(600+)实现了分钟级准实时化;
  • 二手车门店全面实现了流批一体;
  • 解决了二手车门店指标一致性的问题。
下一步规划为:
  • 二手车门店指标秒级实时化
    这会对存储有更高的要求,目前基于 HDFS 和 Las 的存储可能无法满足,会考虑迁移到基于内存的存储,这样 Checkpoint 就不会再有问题。
  • 真正实现计算层统一
    目前计算层还没真正实现统一,只是采用了一个变通的方法,即一个逻辑只用一份代码去写。将来要实现真正的计算层统一。
以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


连家漯

懂车帝

资深数仓研发工程师

先后在百度、爱奇艺、美团从事数仓研发工作,前期主要负责数仓建模、开发、数据治理相关工作,后期主要聚焦数据产品及数据服务相关工作;当前负责懂车帝二手车数仓相关数据建设。


往期推荐


华为盘古大模型微调实践

算法&大数据如何赋能?OPPO推荐领域降本增效指南

人工智能在跨境风控中的应用

小米 OLAP 引擎在 Trino 的应用实践

标签与指标融合应用业务案例详解

阿里飞猪供应链算法之推荐和选品模型

基于 StarRocks 和 Paimon 打造湖仓分析新范式

腾讯金融 AI 开发平台落地实践

生成式AI在育碧3A游戏图像生成的实践应用

字节跳动数据集成引擎 BitSail 开源架构演进和实践

金融行业在数仓建设与数据治理的最佳实践


点个在看你最好看

继续滑动看下一个
DataFunSummit
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存