消息处理方法、装置、平台、计算机设备及存储介质与流程

未命名 10-21 阅读:61 评论:0


1.本技术涉及互联网技术领域,尤其涉及一种消息处理方法、装置、平台、计算机设备及存储介质。


背景技术:

2.在高并发的业务场景中,为了可以对业务消息进行及时处理,常常通过对业务系统的不断扩展来提升消息的吞吐量。然而,对业务系统的扩展需要较大的成本,且即使对业务系统进行扩展,也会由于系统中发现热点现象,导致出现数据倾斜的问题。
3.有鉴以此,亟需提供一种在不对业务系统进行扩展的前提下,来提升消息的吞吐量,并避免出现数据倾斜的问题。


技术实现要素:

4.有鉴于此,现提供一种消息处理方法、装置、平台、计算机设备及存储介质,以解决现有技术在提升消息的吞吐量时,需要对业务系统进行不断的扩展的问题,且扩展的系统会产生数据倾斜的问题。
5.本技术提供了一种消息处理平台,所述消息处理平台包括生产者节点及消费者节点,所述消费者节点包括消息中间件、事件获取模块、解析模块及消息并发处理模块,所述消息中间件包括多个消息存储分区,所述消息并发处理模块包括多个并发消息存储队列、多个消息处理线程、热点分析单元及热点转移单元、热点队列及热点线程,每一个消息处理线程与一个并发消息存储队列一一对应,其中:
6.所述生产者节点,用于产生事件消息,根据所述事件消息的类型将所述事件消息发送至所述消息中间件中与所述类型相对应的目标消息存储分区中进行存储;
7.所述事件获取模块,用于从所述目标消息存储分区中获取事件消息;
8.所述解析模块,用于对获取到事件消息进行解析,得到解析结果,并根据解析结果将获取到的事件消息分发至对应的并发消息存储队列中;
9.每一个并发消息存储队列,用于存储所述解析模块发送的事件消息;
10.所述热点分析单元,用于对各个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;
11.所述热点转移单元,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;
12.所述事件获取模块,还用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;
13.所述热点线程,用于从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至所述数据库中;
14.每一个消息处理线程,用于从对应的并发消息存储队列中获取事件消息,并将获取到的非热点事件消息写入至数据库中。
15.可选地,所述热点分析单元,还用于对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,并根据分析结果确定热点事件消息。
16.可选地,所述消息并发处理模块还包括重投递单元、重投递队列及重投递线程,其中:
17.所述重投递单元,用于在检测到事件消息写入所述数据库失败时,将写入失败的事件消息重新投递至所述重投递队列中;
18.所述重投递线程,用于在所述失败的事件消息重新投递至所述重投递队列的时间达到预设时间后,从所述重投递队列中获取事件消息,并将获取到的事件消息写入至所述数据库中。
19.可选地,所述消费者节点还包括消息处理模式确定模块及消息聚合处理模块,所述消息聚合处理模块包括多个聚合消息存储队列、多个消息聚合线程,每一个消息聚合线程与一个聚合消息存储队列一一对应,其中:
20.所述消息处理模式确定模块,用于根据预设的配置信息确定消息处理模式,所述消息处理模式包括并发处理模式及聚合处理模式;
21.每一个聚合消息存储队列,用于存储所述解析模块发送的事件消息;
22.每一个消息聚合线程,用于从对应的聚合消息存储队列中获取事件消息,对获取的事件消息进行聚合处理,在聚合到的事件消息的数量达到预设数量或者聚合时间达到预设时间时,将聚合到的事件消息批量写入至所述数据库中。
23.可选地,所述生产者节点,还用于检测所述生产者节点与所述消息中间件之间的传输通道是否发生故障,若发生故障,则将所述产生的事件消息发送至所述事件获取模块。
24.可选地,消息处理平台还包括确认字符模块,其中:
25.所述事件获取模块,还用于将获取到的事件消息通过链表的方式有序串联,并为每一个事件消息添加处理状态;
26.所述事件获取模块,还用于在事件消息被处理后,将已处理的事件消息的处理状态修改为已处理;
27.所述确认字符模块,用于定时检测所述链表中的各个事件消息的处理状态,并在所述链表中的目标事件消息的前面所有的事件消息的处理状态都为已处理时,将所述目标事件消息的位置标记为确认字符位置。
28.本技术还提供了一种消息处理方法,应用于上述所述的消息处理平台中,所述方法包括:
29.对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;
30.将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;
31.从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;
32.从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至数据库中。
33.可选地,所述对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息包括:
34.对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,得到热点分析结果;
35.根据所述热点分析结果确定热点事件消息。
36.本技术还提供了一种消息处理装置,包括:
37.分析模块,用于对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;
38.转移模块,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;
39.存储模块,用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;
40.写入模块,用于从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至数据库中。
41.本技术还提供了一种计算机设备,所述计算机设备,包括存储器、处理器以及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述方法的步骤。
42.本技术还提供了一种计算机可读存储介质,其上存储有计算机程序,所述计算机程序被处理器执行时实现上述方法的步骤。
43.本实施例中消息处理平台通过在保持分区不变的情况下,通过增设一些队列的方式,使得当收到事件消息后,可以将同一个消息存储分区中存储的事件消息分发到不同的队列中,这样,再通过每个队列绑定独立的处理线程对队列中的事件消息进行处理,即可以实现对事件消息的并发处理,从而提高消息的吞吐量。此外,本实施例中,通过对各个队列中存储的事件消息进行实时分析,从而可以找出热点事件消息,并在找出热点事件消息后,将该热点事件消息转移至热点队列中,并同步启动热点线程将热点队列中热点事件消息写入至数据库中,从而实现将热点事件消息与其他事件消息进行隔离,并使得热点事件消息可以被正常处理,避免出现数据倾斜现象。
附图说明
44.图1为本技术实施例的消息处理平台的一实施例的应用环境示意图;
45.图2为本技术所述的消息处理平台的一种实施例的架构图;
46.图3为本技术所述消息处理方法的一种实施例的流程图;
47.图4为本技术的消息并发处理模块对事件消息处理的流程图;
48.图5为本技术的消息聚合处理模块对事件消息处理的流程图;
49.图6为本技术的热点事件消息分析的示意图;
50.图7为本技术所述消息处理方法的另一种实施例的流程图;;
51.图8为本技术所述的消息处理装置的一种实施例的程序模块图;
52.图9为本技术实施例提供的执行消息处理方法的计算机设备的硬件结构示意图。
具体实施方式
53.以下结合附图与具体实施例进一步阐述本技术的优点。
54.这里将详细地对示例性实施例进行说明,其示例表示在附图中。下面的描述涉及附图时,除非另有表示,不同附图中的相同数字表示相同或相似的要素。以下示例性实施例中所描述的实施方式并不代表与本公开相一致的所有实施方式。相反,它们仅是与如所附权利要求书中所详述的、本公开的一些方面相一致的装置和方法的例子。
55.在本公开采用的术语是仅仅出于描述特定实施例的目的,而非旨在限制本公开。在本公开和所附权利要求书中所采用的单数形式的“一种”、“所述”和“该”也旨在包括多数形式,除非上下文清楚地表示其它含义。还应当理解,本文中采用的术语“和/或”是指并包含一个或多个相关联的列出项目的任何或所有可能组合。
56.应当理解,尽管在本公开可能采用术语第一、第二、第三等来描述各种信息,但这些信息不应限于这些术语。这些术语仅用来将同一类型的信息彼此区分开。例如,在不脱离本公开范围的情况下,第一信息也可以被称为第二信息,类似地,第二信息也可以被称为第一信息。取决于语境,如在此所采用的词语“如果”可以被解释成为“在
……
时”或“当
……
时”或“响应于确定”。
57.在本技术的描述中,需要理解的是,步骤前的数字标号并不标识执行步骤的前后顺序,仅用于方便描述本技术及区别每一步骤,因此不能理解为对本技术的限制。
58.下面提供本技术的示例性应用环境。图1示意性示出了根据本技术实施例的环境应用示意图。
59.消息处理平台2,包括生产者节点及消费者节点。其中,生产者节点用于产生事件消息。所述消费者节点用于对生产者节点产生的事件消息进行消费,并将消费到的事件消息存储至数据库4中。
60.数据库4可以为各种类型的数据库,比如为关系型数据库oracle、mysql等,非关系型数据库mongod、cassandra、redis、hbase等。
61.消息处理平台2和数据库4可以通过网络连接。网络可以包括各种网络设备,例如路由器、交换机、多路复用器、集线器、调制解调器、网桥、中继器、防火墙和/或代理设备等。网络可以包括物理链路,例如同轴电缆链路、双绞线电缆链路、光纤链路及其组合和/或类似物。网络可以包括无线链路,例如蜂窝链路、卫星链路、wi-fi链路和/或类似物。
62.下面,将在上述示例性应用环境下提供若干个实施例,来说明本技术中的评论数据处理方案。
63.参阅图2,其为本技术一实施例的消息处理平台的架构图。所述消息处理平台包括生产者节点20及消费者节点21。所述消费者节点21包括消息中间件210、事件获取模块211、解析模块212及消息并发处理模块212,消息中间件210包括多个消息存储分区,所述消息并发处理模块212包括多个并发消息存储队列2120、多个消息处理线程2121、热点分析单元2122、热点转移单元2123及热点线程2124,每一个消息处理线程2120与一个并发消息存储队列2121一一对应。
64.生产者节点20,作为事件消息的产生者,可以为各种类型的设备。例如,可以是平板电脑、笔记本电脑、台式计算机、机架式服务器、刀片式服务器、塔式服务器或机柜式服务器(包括独立的服务器,或者多个服务器所组成的服务器集群)等。
65.生产者节点20,用于产生事件消息,并根据所述事件消息的类型将所述事件消息发送至所述消息中间件中与所述类型相对应的目标消息存储分区中进行存储。
66.具体地,所述事件消息为基于用户的行为事件所生成的消息,比如,该事件消息可以为用户基于点赞行为所生成的点赞消息,或者为用户基于评论行为所生成的评论消息等。
67.在本实施例中,该事件消息按照数据的格式可以抽象为以下4种类型:
68.1.sendbytes,发送字节类型的消息;
69.2.sendstring,发送字符串类型的消息;
70.3.sendjson,发送json类型字符串的消息;
71.4.sendbatch,批量发送字节类型的消息。
72.可以理解的是,生产者节点20在产生事件消息后,为了使得不同数据类型的事件消息可以方便地写入至消息中间件中,可以在产生事件消息后对事件消息进行数据格式转换,以将事件消息的数据格式转换为预设设定的数据格式的事件消息。
73.所述消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。其中,所述消息中间件可以为rabbitmq、rocketmq、kafka、pulsar、databus等。在本实施例中,所述消息中间件优选为kafka,相应地,所述消息中间件中的多个消息存储分区为所述kafka中的多个主题(topic)。
74.在一具体实施方式中,针对生产者节点20产生的不同类型的事件消息,会将其存储至消息中间件中与所述类型相对应的目标消息存储分区中。
75.需要说明的是,上述所述的不同类型的事件消息指的是针对数据对象的功能类型进行划分得到的不同种类的事件消息的,比如,针对视频a的点赞消息与视频a弹幕消息,其属于两种不同类型的事件消息。
76.作为示例,当所述消息中间件210为kafka,消息存储分区为kafka中的topic(主题),则针对点赞消息,会将其存储至kafka中的topic a,针对评论消息,会将其存储至kafka中的topic b。
77.其中,kafka是由apache软件基金会开发的一个开源流处理平台,由scala和java编写。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。topic(主题)是一个类别的名称,同类消息发送到同一个topic下面。对于每一个topic,下面可以有多个分区(partition)日志文件。
78.作为示例,当所述消息中间件210为rabbitmq,消息存储分区为rabbitmq中的queue(队列),则针对点赞消息,会将其存储至rabbitmq中的queue a,针对评论消息,会将其存储至rabbitmq中的queue b。
79.其中,rabbitmq是一套开源(mpl)的消息队列服务软件,是由lshift提供的一个advanced message queuing protocol(amqp)的开源实现,由以高性能、健壮以及可伸缩性出名的erlang写成。rabbitmq用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据rabbitmq配置的转发机制接收服务端发来的消息。rabbitmq依据指定的转发规则进行消息的转发、缓冲和持久化操作。
80.事件获取模块211,用于从所述目标消息存储分区获取事件消息。
81.具体地,所述目标存储分区指的存储所述事件消息的存储区间。
82.作为示例,当所述目标消息存储分区为kafka中的目标topic(主题)时,事件获取
模块211在从所述目标topic中获取事件消息时,若所述目标topic包含多个分区(partition),则事件获取模块211会从所述目标topic主题中的多个分区中获取事件消息。比如,所述目标topic包括3个partition,则事件获取模块211会从这3个partition中获取事件消息。
83.作为示例,当所述目标消息存储分区为rabbitmq中的queue(队列)时,则事件获取模块211会从目标queue中获取其中存储的事件消息。
84.解析模块212,用于对获取到事件消息进行解析,得到解析结果,并根据解析结果将获取到的事件消息分发至对应的并发消息存储队列中。
85.具体地,事件获取模块211在获取到事件消息后,会调用解析模块212对事件消息进行解析,从而得到解析结果,该解析结果用于确定该事件消息需要分发至哪个并发消息存储队列中。
86.在一实施方式中,解析模块212在对事件消息进行解析时,可以根据预设的配置文件确定对该事件消息的解析方法,然后根据该解析方法对事件消息进行解析。比如,该事件消息为视频的点赞消息,则可以预先在配置文件中配置视频的点赞消息的解析方法为根据视频id的哈希值进行确定存储队列。这样,在解析模块212对点赞消息进行解析时,会从该点赞消息中提取出视频id信息,然后根据该视频id进行哈希计算,得到哈希值,最后根据哈希值来确定存储队列。比如,哈希值为1时,确定存储队列为并发消息存储队列1;哈希值为2时,确定存储队列为并发消息存储队列2等。
87.每一个并发消息存储队列2120,用于存储解析模块212发送的事件消息。
88.具体地,解析模块212在对事件消息进行解析后,会根据不同的解析结果将各个事件消息分发至对应的并发消息存储队列2120中,这样,每一个并发消息存储队列2120即可以存储与该并发消息存储队列相匹配的事件消息。
89.需要说明的是,该并发消息存储队列2120的数量可以由用户根据实际情况在配置信息中进行设定。当用户需要提升消息的吞吐量时,只需要增加并发消息存储队列2120的数量即可。
90.为了便于理解本技术中消息并发处理模块212对从消息中间件中获取到的事件消息的并发处理流程,以下结合4进行详细说明。
91.消息并发处理模块在从消息中间件(kafka为例)中获取到事件消息后,会将事件消息分发到不同的并发消息存储队列(图4中的内存队列),之后,每一个与并发消息存储队列绑定的消息处理线程(图4中的thread处理线程)会从其中获取事件消息。
92.所述热点分析单元2122,用于对各个并发消息存储队列2120中存储的事件消息进行分析,并根据分析结果确定热点事件消息。
93.具体地,当系统中出现热点事件时,比如,大量的用户聚集在同一个视频下面点赞,针对这种场景,系统会产生严重的数据倾斜。为了解决上述问题,本实施例中,通过在消息处理平台中设置热点分析单元2122来对各个并发消息存储队列2120中存储的事件消息进行分析,以便可以及时发现热点事件消息,这样,后续可以将热点事件消息与普通的事件消息进行隔离,避免热点事件消息影响到其他事件消息的处理。
94.在一示例性的实施方式中,热点分析单元2122,用于对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,并根据分析结果确定热点事件消
息。
95.具体地,热点分析单元2122在对各条事件消息中的出现频次进行分析时,可以是根据各条事件消息中关键标识信息进行分析的。比如,该事件消息为视频的点赞消息,则在对事件消息中的出现频次进行分析时,是统计具有同一个视频id的点赞信息的出现次数。
96.在一实施方式中,热点分析单元2122在进行热点事件消息的分析时,也可以单独根据各条事件消息中的出现频次或处理耗时来进行确定。
97.所述热点转移单元2123,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中。
98.具体地,所述热点消息存储分区为用于存储热点事件消息的消息存储分区,所述热点消息存储分区可以预先设定,也可以根据实际情况进行选择,在本实施例中不作限定。
99.在本实施例中,热点分析单元2122在确定出哪些事件消息属于热点事件消息后,可以立即通知热点转移单元2123将热点事件消息转移至热点消息存储分区。在一实施例方式中,也可以在发现并发消息存储队列出现阻塞时,再通过热点分析单元2122将热点事件消息转移至热点消息存储分区。
100.在一示例性的实施方式中,热点转移单元2123在对确定的热点事件消息进行转移时,可以将所有的热点事件消息都转移至热点消息存储分区中,也可以根据处理压力(转移所需要消耗的资源的多少)来决定是否只将部分的热点事件消息都转移至热点消息存储分区中。
101.事件获取模块211,还用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中。
102.具体地,在将热点事件消息进行转移后,事件获取模块211会及时将热点事件消息进一步转移至独立的热点队列中,实现对热点事件消息与普通事件消息的隔离,避免出现数据拥塞的情况。
103.所述热点线程2124,用于从所述热点队列中获取热点事件消息,并将获取到的热点事件消息写入至所述数据库中。
104.具体地,在将热点事件消息存储至热点队列后,为了使得热点事件消息可以被及时写入至数据库中,会同步启动热点线程2124从热点队列中消费热点事件消息,并将消费到的热点事件消息写入至数据库中。
105.每一个消息处理线程2121,用于从对应的并发消息存储队列中获取事件消息,并将获取到的非热点事件消息写入至数据库中。
106.具体地,每一个消息处理线程2121与一个并发消息存储队列2120进行绑定,这样,每一个消息处理线程2121可以从其对应的并发消息存储队列2120获取事件消息,并将获取到的非热点事件消息写入至数据库中。
107.其中,所述非热点事件消息指的是普通的事件消息。
108.本实施例中消息处理平台通过在保持分区不变的情况下,通过增设一些队列的方式,使得当收到事件消息后,可以将同一个消息存储分区中存储的事件消息分发到不同的队列中,这样,再通过每个队列绑定独立的处理线程对队列中的事件消息进行处理,即可以实现对事件消息的并发处理,从而提高消息的吞吐量。此外,本实施例中,通过对各个队列中存储的事件消息进行实时分析,从而可以找出热点事件消息,并在找出热点事件消息后,
将该热点事件消息转移至热点队列中,并同步启动热点线程将热点队列中热点事件消息写入至数据库中,从而实现将热点事件消息与其他事件消息进行隔离,并使得热点事件消息可以被正常处理,避免出现数据倾斜现象。
109.在一示例性的实施方式中,消息并发处理模块212还可以包括重投递单元、重投递队列及重投递线程,其中:
110.所述重投递单元,用于在检测到事件消息写入所述数据库失败时,将写入失败的事件消息重新投递至所述重投递队列中。
111.具体地,重投递队列为用于存储写入失败的事件消息的队列。
112.所述重投递线程,用于在所述失败的事件消息重新投递至所述重投递队列的时间达到预设时间后,从所述重投递队列中获取事件消息,并将获取到的事件消息写入至所述数据库中。
113.具体地,重投递线程为用于对重投递队列中存储的事件消进行处理的线程。所述预设时间可以预先设定,也可以根据实际情况进行设定与调整,在本实施例中不作限定。比如,所述预设时间为1分钟。
114.本实施例中,通过在事件消息写入至数据库失败时,会对事件消息进行重新投递,并通过重投递线程延迟一段时间后进行重新写入,从而尽可能地使得所有的事件消息都可以被写入至数据库中。
115.在一示例性的实施方式中,消费者节点21还可以包括消息处理模式确定模块及消息聚合处理模块,所述消息聚合处理模块包括多个聚合消息存储队列、多个消息聚合线程,每一个消息聚合线程与一个聚合消息存储队列一一对应,其中:
116.所述消息处理模式确定模块,用于根据预设的配置信息确定消息处理模式,所述消息处理模式包括并发处理模式及聚合处理模式。
117.具体地,所述并发处理模式为采用消息并发处理模块对事件消息进行处理的模式,所述聚合处理模式为采用所述消息聚合处理模块对事件消息进行处理的模式。
118.每一个聚合消息存储队列,用于存储所述解析模块发送的事件消息。
119.具体地,所述聚合消息存储队列为在聚合处理模式下对所述解析模块发送的事件消息进行存储的队列。
120.每一个消息聚合线程,用于从对应的聚合消息存储队列中获取事件消息,对获取的事件消息进行聚合处理,并在聚合到的事件消息的数量达到预设数量或者聚合时间达到预设时间时,将聚合到的事件消息批量写入至所述数据库中。
121.具体地,所述消息聚合线程为负责对聚合消息存储队列中存储的事件消息进行聚合处理的线程。
122.作为示例,所述预设数量为100,所述预设时间为5分钟,则消息聚合线程会在聚合到100个事件消息,或者在聚合时间达到5分钟时,将聚合到的事件消息一次性批量写入至所述数据库中。
123.在一实施方式中,在程序关闭的时候,可以将聚合事件处理完再退出,并做好panic保护,避免进程崩溃的可能。
124.本实施例中,通过增设消息聚合处理模块,使得在对事件消息进行处理时,可以大大减少写入数据库的请求次数,提高消息处理平台的性能。
125.为了便于理解本技术中消息聚合处理模块对从消息中间件中获取到的事件消息的聚合处理流程,以下结合图5进行详细说明。
126.消息聚合处理模块在从消息中间件中获取到事件消息后,会对事件消息进行解析,然后根据解析结果将事件消息分发到不同的聚合消息存储队列中,之后,每一个与聚合消息存储队列绑定的消息聚合线程会从其中获取事件消息,并对获取到的事件消息进行聚合处理,并可以将聚合处理得到的多个事件消息放置在一个或者多个数据聚集通道(channel)中,最后,在各个数据聚集通道中的事件消息的数量或聚合时间达到预设时间时,将其写入至数据库中。
127.在一示例性的实施方式中,生产者节点20,还用于检测所述生产者节点20与所述消息中间件之间的传输通道是否发生故障,若发生故障,则将所述产生的事件消息发送至所述事件获取模块211。
128.具体地,为了使得在产者节点20与所述消息中间件之间的传输通道发送故障的时候,生产者节点20产生的事件消息仍然可以被处理,生产者节点20在检测到与消息中间件之间的传输通道发生故障时,会通过事件获取模块211提供的api接口直接将所述产生的事件消息发送至所述事件获取模块211,并利用多个并发消息存储队列作为缓冲区,从而使得在传输通道故障时,也可以保证消息链路的可用性。
129.在本实施例中,若生产者节点20与所述消息中间件之间的传输通道未发生故障,则会根据所述事件消息的类型将所述事件消息发送至所述消息中间件中与所述类型相对应的目标消息存储分区中进行存储。
130.本实施例中,通过在检测到与消息中间件之间的传输通道发生故障时,自动切换到http的通道上,将消息直接推向消费方,进入消费方的队列中,并利用现成的多级队列作为缓冲区,通过这种方式保障在主通道故障时消息链路的可用性。
131.在一示例性的实施方式中,消息处理平台还包括确认字符(ack,acknowledge character)模块,其中:
132.所述事件获取模块211,还用于将获取到的事件消息通过链表的方式有序串联,并为每一个事件消息添加处理状态。
133.具体地,为了维持各个事件消息的先后顺序,事件获取模块211会将获取到的各个事件消息按照其生成时间通过链表的方式有序串联起来,即形成如下链表:
134.事件消息1-》事件消息2-》事件消息3-》
…‑
》事件消息n-1-》事件消息n。
135.需要说明的是,事件消息n的产生时间晚于事件n-1的产生时间。
136.所述处理状态用于表明该事件消息是否被处理,即该事件消息是否被成功写入至数据库中。
137.在一实施方式中,所述处理状态可以包括未处理、处理中、已处理、处理失败四个状态。
138.所述事件获取模块211,还用于在事件消息被处理后,将已处理的事件消息的处理状态修改为已处理。
139.具体地,事件获取模块211在检测到事件消息已经被成功处理后,及时将该事件消息标记为已处理,以便后续可以根据该处理状态确定各个事件消息是否被处理。
140.所述确认字符模块,用于定时检测所述链表中的各个事件消息的处理状态,并在
所述链表中的目标事件消息的前面所有的事件消息的处理状态都为已处理时,将所述目标事件消息的位置标记为确认字符(ack)位置。
141.具体地,确定字符位置(ack)位置用于记录从消息存储分区中成功获取到的事件消息的offset(偏移地址),该ack位置用于在出现故障时,实现数据的恢复。
142.本实施例中,通过将事件消息用链表的方式有序串联起来,并在每个事件消息上增加处理标记,当消息被处理后,将消息标记成为已处理,定期检查链表状态,某个消息前面所有的事件都被处理后,才将消息设置确认字符状态,通过这种方式,解决了事件丢失的问题。
143.为了便于理解本技术中消息处理平台对事件消息的处理流程,以下结合7进行详细说明。
144.当从消息中间件中获取到事件消息后,在确定对事件消息的处理模式为并发处理模式时,会将事件消息分发至不同的队列(图7中的内存队列1-内存队列n)中,之后,会通过与队列绑定的线程(图7中的thread 1-thread n)从对应的队列中获取事件消息。此外,在检测到队列中存在热点事件消息时,会将该队列中的热点事件消息进行热点转移,以将该热点事件消息转移至热点队列中。当对事件消息写入至数据库失败时,会将事件消息写入至重试队列,以实现对事件消息的重投递。确定对事件消息的处理模式为聚合处理模式时,会将队列中存储的事件消息进行聚合处理,并在聚合处理到一点数量或者一定时间后,通过线程将聚合的事件消息进行批量处理。
145.在一示例性的实施方式中,参阅图3,所述方法包括:
146.步骤s30,对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;
147.在一示例性的实施方式中,对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息包括:对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,并根据分析结果确定热点事件消息。
148.在一具体场景中,参阅图5,在确定热点事件消息时,会根据每一个事件消息中的关键标识信息(如5中的key)对每一个并发消息存储队列中的存储的事件消息的出现次数以及处理耗时进行统计,从而得到各个消息事件的总出现次数和总处理耗时,之后,根据这个统计信息即可以确定出热点事件消息。
149.步骤s31,将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;
150.步骤s32,从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;
151.步骤s33,从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至所述数据库中。
152.在一示例性的实施方式中,所述方法还包括:
153.在检测到事件消息写入所述数据库失败时,将写入失败的事件消息重新投递至所述重投递队列中;
154.在所述失败的事件消息重新投递至所述重投递队列的时间达到预设时间后,从所述重投递队列中获取事件消息,并将获取到的事件消息写入至所述数据库中。
155.在一示例性的实施方式中,所述方法还包括:
156.从并发消息存储队列中获取事件消息;
157.对获取的事件消息进行聚合处理,在聚合到的事件消息的数量达到预设数量或者聚合时间达到预设时间时,将聚合到的事件消息批量写入至所述数据库中。
158.具体地,为了减少写入数据库的请求次数,提高消息处理平台的性能,在将并发消息存储队列中存储的事件消息写入至数据库时,可以通过一个单独的线程负责事件聚合,在聚合到一定的数量或者时间后,将事件批量打包,一次性更新数据库。
159.参阅图8所示,是本技术消息处理装置50一实施例的程序模块图。
160.本实施例中,所述消息处理装置50包括一系列的存储于存储器上的计算机程序指令,当该计算机程序指令被处理器执行时,可以实现本技术各实施例的拍摄功能。在一些实施例中,基于该计算机程序指令各部分所实现的特定的操作,消息处理装置50可以被划分为一个或多个模块,具体可以划分的模块如下:
161.分析模块51,用于对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;
162.转移模块52,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;
163.存储模块53,用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;
164.写入模块54,用于从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至所述数据库中。
165.在一示例性的实施方式中,分析模块51,还用于对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,并根据分析结果确定热点事件消息。
166.图9示意性示出了根据本技术实施例的适于实现消息处理方法的计算机设备6的硬件架构示意图。本实施例中,计算机设备6是一种能够按照事先设定或者存储的指令,自动进行数值计算和/或信息处理的设备。例如,可以是平板电脑、笔记本电脑、台式计算机、机架式服务器、刀片式服务器、塔式服务器或机柜式服务器(包括独立的服务器,或者多个服务器所组成的服务器集群)等。如图9所示,计算机设备6至少包括但不限于:可通过系统总线相互通信链接存储器120、处理器121、网络接口122。其中:
167.存储器120至少包括一种类型的计算机可读存储介质,该可读存储介质可以是易失性的,也可以是非易失性的,具体而言,可读存储介质包括闪存、硬盘、多媒体卡、卡型存储器(例如,sd或dx存储器等)、随机访问存储器(ram)、静态随机访问存储器(sram)、只读存储器(rom)、电可擦除可编程只读存储器(eeprom)、可编程只读存储器(prom)、磁性存储器、磁盘、光盘等。在一些实施例中,存储器120可以是计算机设备6的内部存储模块,例如该计算机设备6的硬盘或内存。在另一些实施例中,存储器120也可以是计算机设备6的外部存储设备,例如该计算机设备6上配备的插接式硬盘,智能存储卡(smart media card,简称为smc),安全数字(secure digital,简称为sd)卡,闪存卡(flash card)等。当然,存储器120还可以既包括计算机设备6的内部存储模块也包括其外部存储设备。本实施例中,存储器120通常用于存储安装于计算机设备6的操作系统和各类应用软件,例如消息处理方法的程序代码等。此外,存储器120还可以用于暂时地存储已经输出或者将要输出的各类数据。
168.处理器121在一些实施例中可以是中央处理器(central processing unit,简称为cpu)、控制器、微控制器、微处理器、或其它拍摄芯片。该处理器121通常用于控制计算机设备6的总体操作,例如执行与计算机设备6进行数据交互或者通信相关的控制和处理等。本实施例中,处理器121用于运行存储器120中存储的程序代码或者处理数据。
169.网络接口122可包括无线网络接口或有线网络接口,该网络接口122通常用于在计算机设备6与其它计算机设备之间建立通信链接。例如,网络接口122用于通过网络将计算机设备6与外部终端相连,在计算机设备6与外部终端之间的建立数据传输通道和通信链接等。网络可以是企业内部网(intranet)、互联网(internet)、全球移动通讯系统(global system of mobile communication,简称为gsm)、宽带码分多址(wideband code division multiple access,简称为wcdma)、4g网络、5g网络、蓝牙(bluetooth)、wi-fi等无线或有线网络。
170.需要指出的是,图9仅示出了具有部件120~122的计算机设备,但是应理解的是,并不要求实施所有示出的部件,可以替代的实施更多或者更少的部件。
171.在本实施例中,存储于存储器120中的消息处理方法可以被分割为一个或者多个程序模块,并由一个或多个处理器(本实施例为处理器121)所执行,以完成本技术。
172.本技术实施例提供了一种计算机可读存储介质,计算机可读存储介质其上存储有计算机程序,计算机程序被处理器执行时实现实施例中的消息处理方法的步骤。
173.本实施例中,计算机可读存储介质包括闪存、硬盘、多媒体卡、卡型存储器(例如,sd或dx存储器等)、随机访问存储器(ram)、静态随机访问存储器(sram)、只读存储器(rom)、电可擦除可编程只读存储器(eeprom)、可编程只读存储器(prom)、磁性存储器、磁盘、光盘等。在一些实施例中,计算机可读存储介质可以是计算机设备的内部存储单元,例如该计算机设备的硬盘或内存。在另一些实施例中,计算机可读存储介质也可以是计算机设备的外部存储设备,例如该计算机设备上配备的插接式硬盘,智能存储卡(smart media card,简称为smc),安全数字(secure digital,简称为sd)卡,闪存卡(flash card)等。当然,计算机可读存储介质还可以既包括计算机设备的内部存储单元也包括其外部存储设备。本实施例中,计算机可读存储介质通常用于存储安装于计算机设备的操作系统和各类应用软件,例如实施例中的消息处理方法的程序代码等。此外,计算机可读存储介质还可以用于暂时地存储已经输出或者将要输出的各类数据。
174.以上所描述的装置实施例仅仅是示意性的,其中作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到至少两个网络单元上。可以根据实际的需要筛选出其中的部分或者全部模块来实现本技术实施例方案的目的。本领域普通技术人员在不付出创造性的劳动的情况下,即可以理解并实施。
175.通过以上的实施方式的描述,本领域普通技术人员可以清楚地了解到各实施方式可借助软件加通用硬件平台的方式来实现,当然也可以通过硬件。本领域普通技术人员可以理解实现上述实施例方法中的全部或部分流程是可以通过计算机程序来指令相关的硬件来完成,所述的程序可存储于一计算机可读取存储介质中,该程序在执行时,可包括如上述各方法的实施例的流程。其中,所述的存储介质可为磁碟、光盘、只读存储记忆体(read-onlymemory,rom)或随机存储记忆体(randomaccessmemory,ram)等。
176.最后应说明的是:以上各实施例仅用以说明本技术的技术方案,而非对其限制;尽管参照前述各实施例对本技术进行了详细的说明,本领域的普通技术人员应当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其中部分或者全部技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案的本质脱离本技术各实施例技术方案的范围。

技术特征:
1.一种消息处理平台,所述消息处理平台包括生产者节点及消费者节点,其特征在于,所述消费者节点包括消息中间件、事件获取模块、解析模块及消息并发处理模块,所述消息中间件包括多个消息存储分区,所述消息并发处理模块包括多个并发消息存储队列、多个消息处理线程、热点分析单元及热点转移单元、热点队列及热点线程,每一个消息处理线程与一个并发消息存储队列一一对应,其中:所述生产者节点,用于产生事件消息,根据所述事件消息的类型将所述事件消息发送至所述消息中间件中与所述类型相对应的目标消息存储分区中进行存储;所述事件获取模块,用于从所述目标消息存储分区中获取事件消息;所述解析模块,用于对获取到事件消息进行解析,得到解析结果,并根据解析结果将获取到的事件消息分发至对应的并发消息存储队列中;每一个并发消息存储队列,用于存储所述解析模块发送的事件消息;所述热点分析单元,用于对各个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;所述热点转移单元,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;所述事件获取模块,还用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;所述热点线程,用于从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至所述数据库中;每一个消息处理线程,用于从对应的并发消息存储队列中获取事件消息,并将获取到的非热点事件消息写入至数据库中。2.根据权利要求1所述的消息处理平台,其特征在于,所述热点分析单元,还用于对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,并根据分析结果确定热点事件消息。3.根据权利要求1所述的消息处理平台,其特征在于,所述消息并发处理模块还包括重投递单元、重投递队列及重投递线程,其中:所述重投递单元,用于在检测到事件消息写入所述数据库失败时,将写入失败的事件消息重新投递至所述重投递队列中;所述重投递线程,用于在所述失败的事件消息重新投递至所述重投递队列的时间达到预设时间后,从所述重投递队列中获取事件消息,并将获取到的事件消息写入至所述数据库中。4.根据权利要1至3任一项所述的消息处理平台,其特征在于,所述消费者节点还包括消息处理模式确定模块及消息聚合处理模块,所述消息聚合处理模块包括多个聚合消息存储队列、多个消息聚合线程,每一个消息聚合线程与一个聚合消息存储队列一一对应,其中:所述消息处理模式确定模块,用于根据预设的配置信息确定消息处理模式,所述消息处理模式包括并发处理模式及聚合处理模式;每一个聚合消息存储队列,用于存储所述解析模块发送的事件消息;每一个消息聚合线程,用于从对应的聚合消息存储队列中获取事件消息,对获取的事
件消息进行聚合处理,在聚合到的事件消息的数量达到预设数量或者聚合时间达到预设时间时,将聚合到的事件消息批量写入至所述数据库中。5.根据权利要求1所述的消息处理平台,其特征在于,所述生产者节点,还用于检测所述生产者节点与所述消息中间件之间的传输通道是否发生故障,若发生故障,则将所述产生的事件消息发送至所述事件获取模块。6.根据权利要求5所述的消息处理平台,其特征在于,消息处理平台还包括确认字符模块,其中:所述事件获取模块,还用于将获取到的事件消息通过链表的方式有序串联,并为每一个事件消息添加处理状态;所述事件获取模块,还用于在事件消息被处理后,将已处理的事件消息的处理状态修改为已处理;所述确认字符模块,用于定时检测所述链表中的各个事件消息的处理状态,并在所述链表中的目标事件消息的前面所有的事件消息的处理状态都为已处理时,将所述目标事件消息的位置标记为确认字符位置。7.一种消息处理方法,应用于权利要求1至6任一项所述的消息处理平台中,其特征在于,所述方法包括:对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至数据库中。8.根据权利要求7所述的消息处理方法,其特征在于,所述对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息包括:对各个并发消息存储队列中存储的各条事件消息中的出现频次及处理耗时进行分析,得到热点分析结果;根据所述热点分析结果确定热点事件消息。9.一种消息处理装置,其特征在于,包括:分析模块,用于对多个并发消息存储队列中存储的事件消息进行分析,并根据分析结果确定热点事件消息;转移模块,用于将确定的热点事件消息转移至所述消息中间件中的热点消息存储分区中;存储模块,用于从所述热点消息存储分区中获取热点事件消息,将从所述热点消息存储分区中获取到的热点事件消息存储至所述热点队列中;写入模块,用于从所述热点队列中获取热点事件消息,将从所述热点队列中获取到的热点事件消息写入至数据库中。10.一种计算机设备,所述计算机设备,包括存储器、处理器以及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现权利要求
7或8所述的方法的步骤。11.一种计算机可读存储介质,其上存储有计算机程序,其特征在于:所述计算机程序被处理器执行时实现权利要求7或8所述的方法的步骤。

技术总结
本申请公开了一种消息处理平台,包括:生产者节点,用于产生事件消息,根据所述事件消息的类型将所述事件消息发送至所述消息中间件中与所述类型相对应的目标消息存储分区中进行存储;事件获取模块,用于从所述目标消息存储分区中获取事件消息;所述解析模块,用于对获取到事件消息进行解析,得到解析结果,并根据解析结果将获取到的事件消息分发至对应的并发消息存储队列中;每一个并发消息存储队列,用于存储所述解析模块发送的事件消息;每一个消息处理线程,用于从对应的并发消息存储队列中获取事件消息,并将获取到的事件消息写入至数据库中。入至数据库中。入至数据库中。


技术研发人员:王旭 王伟锋
受保护的技术使用者:上海哔哩哔哩科技有限公司
技术研发日:2023.07.19
技术公布日:2023/10/19
版权声明

本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

航空之家 https://www.aerohome.com.cn/

航空商城 https://mall.aerohome.com.cn/

航空资讯 https://news.aerohome.com.cn/

分享:

扫一扫在手机阅读、分享本文

评论

相关推荐