云音乐线上场景众多,比如直播、评论、广告,各个业务线都会有消息场景比如发奖券,也会有延迟消息和事务消息场景,以及大数据做埋点数据、数据清洗、离线处理等。
云音乐线上 RocketMQ topic 为 1 万+/天,QPS 流量峰值为150万/s,日消息量千亿级别。为了支撑庞大的数据规模和场景,除了搭建开源RocketMQ集群,我们也做了监控的完善和工具体验。监控完善主要包括对整个集群的容量、状态、水位进行健康状态的监控,针对消息的发送和消费提供流量、延迟、失败、耗时等监控指标。基于以上监控指标,还需搭建一套业务巡检体系,以实现线上告警。
另外,我们也提供改了一些工具帮助业务方提升使用 RocketMQ 的体验,比如数据迁移和同步消息路由的组件,提供稳定性保障的限流能力、降级能力以及动态参数干预的预案能力。当线上业务方发现消费不符合预期时,需要提供查询帮助其快速定位,以及提供死信处理工具等。
云音乐目前有三个机房,每个机房部署了一套 RocketMQ 集群,除了 Manesrv、 HA 等基础组件,还有自研或开源改造的组件,比如 monitor 组件、告警巡检组件、降级维稳组件等。
每个机房里有一套平台化的管控组件,管控端包含提工单、上下线、查数据、订阅问题,还包括一套消息路平台和数据库。
网易云音乐拥有多个流量入口,不同业务的数据和流量需要做隔离,每个租户下都是一套独立的业务线。而物理隔离成本过高,因此我们实现了逻辑隔离。各个业务之间流量不互通,逻辑上无法相互调用,且租户下所有 topic 名字一致,中台只需要切换租户名,无需改动任何其配置、代码,即可直接上线。
所有 topic 都在一个物理集群内,每个租户有自己的一套逻辑集群,逻辑集群内有自己的 topic,不同逻辑集群之间的 topic 同名,实现了多租户隔离。
随着云音乐的业务愈发庞大,业务方提出了更多需求。比如异地多活,消息需要在多个机房消费,比如通用埋点数据,需要将多个产品的数据汇总到机场的数据处理集群做离线处理,比如架构升级,不同单元间的流量能够动态调度。
基于以上需求,消息路由需要实现以下几个功能:
①跨机房消息复制。
②流量去重:消息路由在复制时不可避免会有失败,因此必然有内部的重试,可能会导致有消息重复;此外,双向路由必然需要提供双向复制,而两边 topic 名字一样,复制时会导致错乱,因此需要有标签来实现流量重。
③数据迁移任务。
④监控完善,进度可控。
云音乐的消息路由实现方案如上图所示。
首先,在管控平台会维护一套路由任务元数据表,业务方可以提工单或者通过其他方式申请路由任务,支持任意机房的任意两个 topic 之间做消息路由。任务提交之后,消息路由集群会定时同步管控端上的消息路由任务的状态,同时将消息发送到目标 topic 。路由任务能够自行上报监控数据、消费延迟、堆积监控报表等,可在管控端进行查看。
云音乐的数据处理任务包括埋点、trace,大多使用Flink。但由于开源方向没有与我们的需求非常匹配的 connector ,因此我们封装实现了自己的 RocketMQ Flink connector。
因为内部封装了接口和集群配置,RocketMQ 作为 Flink 的 source 和 sink 需要有数据源的配置。我们对数据源做了封装,比如 connector 如何解析元数据,从而正确地连接数据源、读写消息。
大数据任务的特点为测试环境与线上数据会混在一起,多环境都有接入需求,因此我们设计了一套元数据,使得 connect 能够连接多环境且能够处理多环境里面流量标、环境标等标签的过滤。
Flink有自己的 checkpoint 机制,只有在做 checkpoint 时才会将 consumer offset 提交给 broker ,同时需要对 consumer offset 进行管理,否则消费位点消失会导致数据重新消费,因此我们实现了 state 管理机制。
Flink的 spot task 比较敏感,抛出错误则会导致 task 重新执行,连续重复几次后会导致TaskManager failover 。此外,RocketMQ 在网络场景下时常出现broker busy 或网络问题导致发送失败异常。我们针对Flink 定制了一套异常场景处理,使其变得不敏感。
此外,我们目前面临的线上问题主要包括消息流量激增、机器负载高、大数据任务突刺、重置消费位点等。集群突然出现大流量行动时,其稳定性会受到极大冲击,频繁发送失败,线上其业务也会受到 topic 的影响。
面对以上问题,除了提供隔离能力外,也需要限流降级的能力。
第一,服务端的发送限流。支持 topic 级别,也支持 group 级别。后续将支持客户端级别,支持多个维度的发送端限流。
第二,全局消费限流。分为 topic 和 group,可以对整个 group 消费关系下所有机器的总量进行限流,适用于大数据场景。
第三,单机消费限流。适用于线业务场景,因为在线业务场景每台机器的负载有限,不希望某个业务无上限地占用资源,因此需要对单机限流。在线业务集群容量不够时,可以做动态扩容来增加容量。增加集群容量时无需修改全局容量。
上图折线图反应了开启单机限流之后,消费数据随着发布缓慢平稳上涨,解决了流量突刺,提升线上集群稳定性和消费服务的负载平稳。
随着集群规模增大,逐渐出现了消息延迟的情况。经排查发现,producer 能够正常发送消息到 broker ,但是由于数据量非常大,后台创建 consumer queue 的速度跟不上发送速度,导致消费延迟。其次,消费也面临瓶颈,跟不上发送速度,因为同一个发送可能存在多个消费方。
针对以上问题,我们进行了索引优化。
开源版本下, commit log 写入之后,会有 Reput service 方法建 consumer queue 、index 索引等一套流程,从头扫到尾,块状地建立 consumer queue。
而我们发现,保证顺序性和位点的有序性的前提下,可以并发地建索引,只需处理好位点的提交即可。因此,我们设置了 reput queue 异步线程池,里面有不同的 reput task ,每个 task 建立自己的 comment log 索引。建好之后,索引并不是立刻可见。建好之后会有全局的索引往后推,如果前面的 commit log 索引已经建好,则后面的索引也立马可见,提升了索引的建立效率。
上图为索引优化前后的性能对比。横轴代表 topic 数量,纵轴代表建索引的速度。灰色线代表建索引的速度,橙色线代表发送速度。
优化前,topic 较少时,建索引的速度慢于发送速度。随着 topic 数量增多,两者速度逐渐一致,但性能均明显下降。
优化后,建索引的速度基本与发送速度持平,且性能不会随着 topic 数量增加而大幅下降。优化后建索引的性能达到优化前的3倍,保证了消费性能。
广播消费场景下,通常需要确保所有机器上的数据最终一致。而开源的广播消费失败后不会重试也不会告警。且消费位点为 local,不会上报到远端,如果本地服务重启则offset 丢失,并且无法做预案干预。同时因为不上报,缺少问题定位的能力。
我们的解决方案为逻辑 group +实际 group 。
逻辑 group 指业务方在代码和在管控平台申请的 consumer group 。申请 group 之后,在客户端进行设定,将其标识为新版广播消费的 group,每个实例启动时在逻辑group 后加上扩展名来生成实际 group 。
实际 group 可以进行正常的集群消费,也可以用复用集群消费的所有能力,包括租户隔离、消息路由、监控、限流能力等,最终就解决了广播消费的问题,能够使用死信、重试、重置消费位点、位点查询、监控告警等能力。
此外,云音乐日常还会出现线上流量突增来不及发布,或线上流量与预期不符而发布成本过高等问题。
为此,我们为业务方提供了实时线程式调整的能力。开源版本中,每个客户端都会向broker 做 Consumer RunningInfo 的上报,包括是否消费暂停、subscribe 的状态、订阅了哪些 topic、消费位点等。我们在在上报信息里加上了每个 topic 自己消费的线程池的 coresize、maxsize ,并在管控端展示,使用户能够实时感知当前线程池的状态。
此外,我们提供了非常简单的修改方式。在管控端修改 kv config 并上报到 NameSvr ,NameSvr 监听并下发。客户端监听到 kv config 变化后将最新配置下拉。然后再本地找到 topic 对应的线程池,修改 coresize、maxsize 值。
此前,业务在线上发现问题后发布往往需要 10-20 分钟起步。而现在只需修改一个参数、下发、轮询即可完成,整个过程不超过 30 秒。
云音乐在 RocketMQ 的未来规划如下:
第一,云原生。云原生有弹性扩缩容的能力,可以更好地节约成本以及应对线上突发风险。
第二,提效率。比如 Topic 签迁移、从一个集群迁移到另一个集群、从顺序消息改为非顺序消息等操作目前还未实现完全白屏化,后续会针对此方面提高效率,提高用户体验。
第三,开源社区交流贡献。