前言
在上篇博客《人工智能在线特征系统中的数据存取技术》中,我们围绕着在线特征系统存储与读取这两方面话题,针对具体场景介绍了一些通用技术,此外特征系统还有另一个重要话题:特征生产调度。本文将以美团点评酒旅在线特征系统为原型,介绍特征生产调度的架构演进及核心技术。架构演进共包含三个阶段,不同阶段面临的需求痛点和挑战各有不同,包括导入并发控制、特征变更原子切换、实时特征计算框架涉及、实时与离线调度融合等。本文我们将从业务需求角度出发,介绍系统演进的三个阶段所解决的主要问题和技术手段,然后把系统演化过程中的一些常见问题和解决方案抽象出来,放在特征生产技术章节统一讨论。
特征生产调度演进
从离线到在线
在线特征系统最核心的目标是将离线的特征数据通过在线服务的方式,提供给策略系统使用。在线特征系统的出现是为了实现如下的系统目标:
- 将离线的特征数据,以接口访问的形式提供给线上策略系统使用
- 特征数据每日更新一次
- 支撑的数据量在百亿级以上,可以水平扩展
- 每秒特征访问量峰值达到百万,平均响应延迟在20ms以内
从整体系统功能上来划分,在线特征系统需要做两件事情:第一,每日将离线更新的特征数据写入到存储引擎,这里我们选用分布式KV(Key-Value)存储引擎Tair作为线上存储引擎,利用公司的ETL工具定期将离线数据写入到Tair;第二,提供接口服务,我们搭建了一个基于Thrift接口协议的RPC服务来对外提供特征读取服务。
由于不同特征集查询方式都相同,只是数据不同,因此在Service层我们把一组特征集合以及它的查询维度抽象成Domain。举个例子,Domain=ABC表示用户基础画像特征,包含性别、年龄、星座等特征,同时它又定义了查询维度为用户ID。这样对于不同的特征集,只需要调用同一个接口,传入不同的Domain即可。
在这一阶段,系统的重点是搭建一套特征导入、存储、读取的流程。我们利用公司提供的工具和组件迅速完成了任务。当有新的特征表需要接入时,开发一个导入ETL,在服务端做相应的配置即可生效。同时,结构上的松散也带来很大的灵活性。在业务发展初期,团队组织结构单一,需求量少,变化快,种类多,系统保持简单、松耦合,有助于灵活应对不断变化的需求。
从手动到自动
随着每日接入Domain数量的增加,接入新Domain工作显得繁琐而效率低下:每接入一个新的特征表,需要开发ETL,而且ETL需要测试、上线、配置调度。因此,我们重新设计了数据导入的方案。
元数据驱动,平台化导入
ETL工具需要开发数据导入脚本,它的灵活性相对较高,写出错的可能性也很大,测试和审核流程难以避免,新入职同学更是需要较大的学习成本。而对于特征导入这个需求,它的模式固化,可以抽取出以下元数据:
- 数据源信息:离线数据库、表名称等。
- 存储引擎信息:引擎类型、机房、IP等。
- 存储格式信息:Key字段、Value字段等。
- 特征更新信息:更新周期、分区字段、分区方式等。
根据这些元数据,将导入流程都固化下来,可以进行平台化的统一调度。用户通过填写或选择少量的表单信息注册任务,出错的可能性大大降低,流程也可以从原来的写ETL代码、测试作业、配置调度、上线审核,简化成了填写表单和审核。接入流程从原来的几个小时,缩短到几分钟。同时,存储引擎从原来的仅支持Tair,到现在Squirrel(美团点评基于Redis的KV分布式存储中间件)等多引擎加入,系统调度架构如下。
- 控制台(Console)是元数据的入口,用户在这里完成表单的填写,元数据落入Settings模块的MySQL库中。
- 调度模块(Scheduler)从Settings模块读取元数据,每日扫描需要导入的Hive表,待当日离线数据生产完成,便会启动Map Reduce Job来执行导入工作。
- 接口服务(Service)接收来自客户端的请求,根据Domain名称从Settings库中加载Domain元数据,然后从存储引擎取到对应的特征信息。由于调度模块与接口服务模块统一了元数据,因此新特征的接入可以实现服务端工作零成本,新上线的Domain可以直接从服务接口取到数据,无需任何人工操作。
阶段二的完成大大简化了离线特征的上线流程,使接入工作从几个小时缩短到几分钟,也降低了出错的可能性。导入平台化的实现,也为通用性优化功能提供了土壤:数据压缩功能使得内存、带宽资源得到了更充分的利用;多引擎存储功能满足了需求方对性能的不同要求;导入调度功能解决了更新流量峰值的问题,提高了系统的整体可用性。
从天级到秒级
迄今为止,原始特征数据都是离线的,且更新周期都是一天,这跟离线数据仓库的T+1模式有关。而很多关键的业务指标希望做到实时化,特征工程也是如此。用户近几分钟、近几秒的行为信息往往比很多离线特征更具有价值,实时特征必然会在策略系统中发挥越来越重要的作用。
参考离线特征的计算过程,离线大部分是利用了数据平台的ETL工具,它的输入输出都相对固定,只能落地到Hive,用户大部分的精力只需要关心计算逻辑。因此从离线Hive导入到线上存储引擎,成为了特征系统的主要工作,无需操心特征计算。而目前公司没有很完备的、类似Hive SQL的计算框架支持实时特征计算,生产计算实时特征需要自己写流式处理作业。因此我们有必要提供一个专用、便捷的特征计算工具来支持常见特征的计算工作,利用简单配置完成实时特征计算。
实时部分的系统架构如上图所示,与离线类似,Console部分接受用户的表单配置并将元数据写入Settings持久化。Scheduler会负责读取Settings的元数据信息调度实时特征生产任务。我们采用Storm流式服务计算实时特征,从实时数据仓库的Kafka Topic接收流式数据,并按照预先配置好的特征计算逻辑生产、计算实时特征,然后写入到线上存储引擎。
下面详细讨论一下我们对于实时特征计算的平台化以及优化方案。
实时特征计算平台化
算法使用的特征有繁有简,复杂多变,设计一个自动化的实时特征计算系统难度很大。回到业务需求,我们的目的是通过特征生产系统来简化开发工作量,而非完全取代特征开发;因此我们选择一部分常见的实时特征类型,实现自动化生产和导入。对于更复杂的实时特征,提供了更新接口来支持第三方特征生产程序对接。
以下是系统支持配置化生产的特征类型。首先是不同的时间跨度分类:
- 固定时间窗口,时间窗口的起止时间点是固定的,比如某日的销售额。
- 滑动时间窗口,时间窗口的长度是固定的,但起止时间点一直在向前滚动,比如近2小时销售额。
- 无限时间窗口,时间窗口的起点是固定的,但终止时间点一直在向前滚动,比如商家历史上销售总额。
销售额这个指标其实是对订单金额做求和(SUM)操作,总结常见的计算类型有如下几种:
- 求和(SUM),如销售额。
- 计数(COUNT),如订单量。
- 最大值(MAX),如最大订单金额。
- 最小值(MIN),如最小订单金额。
- 平均数(AVG),如平均订单金额。
- 去重计数(DISTINCT COUNT),如页面的用户浏览量(同一个用户多次浏览算一次)。
- 最新值(LAST),如最后支付时间。
- 列表(LIST),如最近的支付用户ID列表。
以上时间窗口与指标的组合,一共支持24种常见特征的计算类型。
对于实现上述特征的计算,主要包含如下三个抽象步骤: 1. 读取相关的数据(如上次特征值,或一些中间结果)。 2. 根据收到的业务数据,以及步骤1取到的数据进行计算(如累加或求去重数),得到新的特征值(和中间结果)。 3. 将特征(和中间结果)更新到系统。
不同时间窗口的实现方式应该尽量跟计算类型解耦,可以抽象出各自的处理方式: 1. 固定时间窗口,这类特征应该将时间窗的标识放在特征的Key当中。例如某商户某日销售额这个特征,将Key设置成${商户ID}_${日期},这样可以实现时间窗的自然滚动。 2. 滑动时间窗口,常见的做法是缓存时间窗内的所有明细数据作为中间结果,当新的明细数据到来时,删除时间窗内过期的明细数据,并利用缓存的明细数据重新计算特征值。但这种实现方式缺点是当滑动时间窗的跨度较大时,需要缓存大量中间结果,可能成为系统瓶颈。对于这个问题,我们采用了延迟队列的实现方式。
延迟队列实现滑动时间窗,当新的明细数据到来时,会直接累计到特征值,同时将明细数据发送到延迟队列。延迟队列的作用是可以将数据延迟指定时间后重新发送回系统。系统接收到延迟消息时,再从特征值中抵消该部分数据(例如计算近2小时销售额,收到订单数据后累加销售额,收到延迟订单消息则减去销售额),这样可以只保留特征值,无需缓存明细数据即可实现窗口滑动的逻辑。延迟队列的实现方式只适用于可抵消的计算类型,如求和、计数等,但像最大值、最小值、去重计数等无法满足
- 无限时间窗口,简单粗暴的方式是回溯所有历史消息即可。然而这样存在的问题是,第一,流式实时数据本身一般不会持久化保留太长的时间(通常是几天);第二,这种方式太耗费资源,特征的每一次更新都涉及多次RPC。较为合适的办法是离线数据计算特征的基准值,实时数据基于离线计算结束的时间点继续累积。详细过程参考下文数据融合与数据恢复。
为了保证数据可靠性与查询效率,中间结果和特征都存放在分布式Key-Value存储引擎中。下图是Storm计算框架的拓扑逻辑图,其中Calc Bolt承担着不同计算类型的实现,而Mafka Delay Topic则是延迟队列组件,用于实现滑动时间窗口。
上述24个特征是常见的一些实时统计类特征,开发者只需要填写表单,选择需要的特征类型即可完成特征开发工作。对于现阶段不支持配置实现的个性化、计算逻辑复杂的特征,开发者可以自己开发Storm拓扑实现计算逻辑(对应实时特征生产调度图中灰色的Third Party模块),并通过更新接口写入到线上存储引擎。
实时特征计算优化
从上述支持的特征列表中可以看出,实时计算框架目前只支持聚合、明细列表这样的简单特征。即便如此,实时特征计算还是面临很大的挑战。离线特征只需要计算出更新周期内特征的最终值即可,而实时特征需要把每次特征变化都要实时计算出来,它既要计算的快,又要计算的多,因此它无法支持很大量的数据。
当面临数据计算量的挑战时,优化思路之一是利用一些中间结果或上次计算结果简化计算量,化全量计算为增量计算。例如求平均数这种特征,你可以存住所有的明细数据,当新的一条明细数据加入进来时,将所有明细数据求和再除以总数。这样需要O(N)的时间和空间复杂度,N是明细数据个数。而你也可以仅保留总和跟总数,每次更新只要做一次加法和除法即可。
另一种优化思路是利用近似计算。比如求去重数(DISTINCT COUNT)这种指标,要精确计算可能很难找到一个时空复杂度都比较低的方案,而如果可以忍受近似计算的误差,HyperLogLog算法是一个不错的选择。
特征生产调度技术
在生产调度演进过程中,会不断遇到各种系统问题,如可靠性、一致性、性能等等。在这一章节我们把特征生产调度中一些常见的技术手段,以及常见问题的解决方案汇总起来呈现给大家。
逻辑存储层
逻辑存储层的含义是Domain的元数据并不直接存放与存储相关的信息,而是将这些信息抽象成Storage元数据,如下图所示。其中Domain存储了访问控制、离线源信息、Storage ID等信息,而Storage则存储了存储介质、特征元数据、数据存储格式等与存储相关的信息。Domain与Storage是一对多的关系。
抽象存储层Storage有很多好处:
- 支持数据版本灵活切换。一个Domain可能存在多个Storage数据版本,而只有一个是生效的。由于线上存储着多份版本数据,Domain与Storage的对应关系自由切换,从而可以实现不同数据版本的自由变更。
- 可以做到读写分离。这个特性对离线特征更新是一个好消息,因为离线特征对时效性要求不高,因此可以做到读的Storage跟写的Storage不同,在合适的时机切换读取的Storage。这样切换表结构、更换存储引擎都可以平滑完成,而对读取方做到完全透明。
- 实现数据切换的原子性。一次数据导入从Domain上看并不是原子操作(更新一个Key-Value对是原子操作,但是整个离线表导入到KV存储引擎并不是原子的),Storage的引入可以实现Domain导入的原子性,当数据格式、特征元数据发生变化时可以保证数据读取的一致性。
增量更新与数据一致性
对于每日的离线特征更新,我们发现有些虽然总数据量庞大,但每天的变化比较少。比如用户画像,有很多沉睡的用户他的特征基本不发生变化。如果每天将全量数据刷到线上,其实做了很多无谓的更新操作,对系统资源是一个巨大的浪费。尤其是更新线上存储引擎,写入压力将导致在线服务稳定性的波动。因此考虑在更新前计算出特征的增量变化数据,只更新变化的部分。而计算增量数据需要有线上特征集合的完整离线数据备份——数据镜像。
数据镜像(SNAPSHOT)是对线上存储引擎数据的离线备份。由于KV存储的特点适用于随机访问,而对顺序访问(如遍历)的支持并不是其强项,因此通过构造离线数据镜像,可以一定程度上帮助我们更为方便的操作线上KV存储引擎中的数据。这里主要是为了支持增量更新和数据恢复功能。
如下图所示同一个更新周期(Period)内需要做两次数据处理流程:归档(Archive)和同步(Sync)。Archive会将上一个更新周期的SNAPSHOT和这个更新周期的特征数据表做差集和并集。差集的结果是增量数据(Diff),并集的结果是该更新周期内的SNAPSHOT。对于数据量大而Diff又少的特征集合来说,增量更新会极大的节约线上的资源。
增量更新可能带来数据一致性的问题。如果Sync步骤出现了少量数据更新失败(比如写操作偶然性超时),会导致SNAPSHOT与KV存储引擎的数据不一致。这种问题在全量更新时并不是什么大问题,当数据在后续更新周期内全量写入时,可以认为总会修复上次的更新失败问题。然而在增量更新时,这种错误是永久性的。因此我们在生成SNAPSHOT时为每条数据附上一条租约(Lease),当租约到期时,强制将该条数据加入Diff参与当次更新,这样可以保证数据的最终一致性。Lease的时间我们可以对每一条数据进行随机分布,这样需要更新的数据会平稳的分布到每一天而不出现明显尖峰。Lease机制其实是全量更新到增量更新的一个平滑过渡,Lease为0时是全量更新,Lease为无穷大时是增量更新。
写入削峰
随着离线特征表增多,同一时刻进行数据导入的作业相互抢占资源,未加控制的写入速度影响了KV存储引擎的正常读取,甚至引起雪崩。实时特征也面临类似问题,实时数据流容易随着集群的状况、业务的特点出现流量峰谷,如果没有消费速度的限制,很容易导致存储引擎压力突增突减,甚至将其打垮。
离线与实时通过不同手段控制并发写入线上存储速度。离线更新的特点是:
- 更新具有周期性,需要同步时流量很大,同步结束后流量变为0
- 对更新延迟性要求不高(往往在小时级别)
- 写入方完全是特征系统内部模块(每个Sync作业)
我们的目标是尽快将这些数据同步到线上存储引擎,同时兼顾写入速度(影响更新延迟)和集群资源(线上存储压力)。鉴于离线更新的特点,且Sync作业本来就由调度器管理,因此很容易将并发控制实现在调度器内部。调度器会控制每个存储引擎的最大Sync作业并发数量,同时每个Sync作业内部并发的写入速度也是固定的。负载限制的关系如下:
同步中的作业数 * 作业内部并发度 ≤ 线上存储引擎的最大写入压力
而实时特征更新的特点是:
- 每时每刻都有写入的流量
- 流量随着业务时间变化会有波动
- 对更新延迟要求较高(往往在秒级)
- 写入方有特征系统内部模块,也有第三方的服务
由于写入方可能来自特征系统外部,难以统一控制写入方速度,因此我们没有像离线一样让写入方直接操作线上存储,而是在两者之间增加了一个Updater服务(参考图5.实时特征生产调度),由它控制每个写入方的速度。实时特征流量波动大,且对更新延迟要求高,新接入的实时特征需要预估流量峰值并配置到Updater服务中。对于超过预设流量的请求予以拒绝或延迟。
原子更新
离线特征与实时特征面临的原子更新问题各有不同。离线更新的粒度为天级别,所有特征一天只更新一次,有的特征集合希望保证天级别的更新是原子的。即不希望任意时刻出现一部分特征是昨天的值,一部分特征是今天的值。这个问题利用上文提到的逻辑存储层可以很好的解决,这里不再赘述。
然而实时特征生产更新却面临另一种问题。很多时候需要先读取特征当前值,然后基于当前值做计算得到新值写入KV存储引擎,一次更新过程涉及到读取,计算、写入三步。因此如果要保证数据更新的一致性,必须要保证一次更新的读、算、写操作的原子性或者事务性。对于原子更新的需求主要有两类解决方案:
- 生产方通过数据分组,保证相同Key的数据只通过一个线程更新,系统配置化生产的特征都基于Storm计算框架,实现起来非常方便。
- 如果一些第三方(Third Party)不方便做数据分组,我们通过系统内Updater服务提供的CAS(Compare And Swap)接口,生产方调用CAS接口进行更新,同样也可以做到原子更新。
数据融合与数据恢复
如果说实时数据是离线数据的延伸,那么离线数据可以说是实时数据的备份,二者是相辅相成的。理论上,利用实时数据可以计算出所有想要的特征,但离线数据可以从不同方面解决实时特征计算中诸多棘手问题:
- 提升效率。可以利用离线计算来提升效率。例如计算每个商家有史以来的营业额,如果全部采用实时数据,那将要实时回放历史上所有订单数据,这样的数据量和计算代价都是巨大的,此时可以利用离线框架计算出历史营业额,在特征初始化时将离线计算好的历史营业额导入线上存储引擎。之后的特征计算更新依赖实时框架,这样可以节省系统开销。
- 提升可靠性。可以利用离线计算和导入校正实时更新可能产生的误差,提升数据可靠性。实时特征计算采用Storm框架,可以保证数据记录不漏(At Least Once),但不能保证不重(At Most Once)。从系统设计的角度看,对于实时流式处理要做到确保计算一次(Exactly Once)的代价往往很高,相比于让流式计算绝对可靠,与离线计算结果融合往往是更合适的选择。对于像每日营业额这种固定时间窗的特征,实时更新流程只会更新当前时间窗内的特征(今日营业额),而并不会改动历史时间窗的数据,因此历史时间窗的特征可以利用离线数据重新校正一次,这样可以保证数据的最终正确性。
上图为离线实时特征生产的整体架构。离线与实时的数据融合,需要一个更强大的调度器,它负责协调离线任务与实时任务的关系,高效、可靠的完成数据导入工作。离线作业与实时作业的调度关系分为两种:
- 离线只初始化一次,后续只有实时数据从基于离线初始化的值做累积运算。如下图的离线初始化。这种调度类型常见于无限时间窗口的一些计算指标,如商户最后一次订单时间,用户累积消费金额等。
- 离线与实时作业并存,离线作业定期复写历史数据,实时作业更新最近数据。如下图的离线定期修复。这种调度类型常见于提升固定时间窗特征的可靠性,如商户每日营业额等,这类特征在Key中携带时间信息,特征数据天然按时间窗分区,离线与实时作业更新不同分区的数据而互不影响。
数据恢复是指当线上数据发生问题的时候(可能由于数据源问题、线上故障、硬件故障等)如何修复线上数据,使其恢复到正常状态。数据恢复是效率和可靠性的双重考验,越快速的恢复到正常状态,系统的可靠性就越高。离线增量更新的特征与实时特征都是在原有特征基础上累积计算,一旦某一时刻数据出现问题需要重导数据,只能从第一次增量开始重新累积,这无疑是及其低效的。如果能够定期备份线上特征的数据镜像,当实时更新从某一时刻出现故障时,可以用最近一次正确的离线SNAPSHOT版本刷新数据。离线数据最新的SNAPSHOT应与线上特征数据保持一致,而实时特征的SNAPSHOT会有一定延迟,这时只要将上游实时流数据回退到SNAPSHOT时间点重新开始消费(如下图所示),这样相比没有SNAPSHOT可以较为快速的恢复故障。
数据恢复功能是离线与实时架构融合的产物,只不过它的离线数据不是业务上产出的某张离线表,而是离线镜像数据SNAPSHOT。
后记
一个完整的在线特征系统数据流涵盖加载、计算、导入、存储、读取五个步骤。从两类数据的五个步骤来看,在线特征系统截至目前还并不完整;而深入到每一个步骤,还有很多功能特性需要继续完善:支持离线计算框架、支持更多的实时计算类型、实时特征计算高可用、缩短数据恢复时间、特征实时监控等等。在与其他团队交流时,也有将特征系统深入到策略系统内部,实现算法、特征迭代一体化流程。在线特征系统的工作仍任重而道远。
能力所限,难免管中窥豹,挂一漏万。欢迎感兴趣同学一起交流。
作者简介
杨浩,美团平台及酒旅事业群数据挖掘系统负责人,2011年毕业于北京大学,曾担任107间联合创始人兼CTO,2016年加入美团点评。长期致力于计算广告、搜索推荐、数据挖掘等系统架构方向。
伟彬,美团平台及酒旅事业群数据挖掘系统工程师,2015年毕业于大连理工大学,同年加入美团点评,专注于大数据处理技术与高并发服务。
最后发个广告,美团平台及酒旅事业群数据挖掘组长期招聘数据挖掘算法、大数据系统开发、Java后台开发方面的人才,有兴趣的同学可以发送简历到yanghao13#meituan.com。
如发现文章有错误、对内容有疑问,都可以关注美团技术团队微信公众号(meituantech),在后台给我们留言。
分享一线技术实践,沉淀成长学习经验