一种消息处理方法、装置和电子设备与流程
未命名
09-03
阅读:91
评论:0

1.本技术涉及信息技术领域,更具体的说,是涉及一种消息处理方法、装置和电子设备。
背景技术:
2.消息系统在分布式计算领域广泛应用,业务应用运行后需要与其他相关业务应用交换信息,信息的流转路径控制和分发模式普遍采用基于topic的消息队列来实现。
3.在电信计费领域有很多业务进程在运行中需要接受外部业务指令,如共享内存数据更新、临时任务触发等,由于电信计费领域用户数据量巨大,需要以分片批处理方式组织业务进程,需要在传递这些指令时做进程级区分准确送达目标进程。在这种场景下,典型消息队列的发布订阅方式的topic数量是不够用的,广播模式也会造成巨大的流量浪费影响处理性能。
4.现有技术中,采用的消息管理方案是master-agent(管理-代理)模式的代理分层。每个主机上部署一个agent进程代理本机所有业务进程的消息手法,另有管理进程master汇总agent的代理信息进行全局消息管理。
5.如图1所示的消息管理方案示意图,包括:管理员(admin)、用户(users)、管理进程(master)、各代理进程(agent)1-4,其中,所有agent进程与master进程建立网络连接。需要传递消息时,上层系统管理员/用户通过http/https接口把消息传递给master进程,master转发消息给agent,agent在自己主机上找到业务进程转交消息。反之,消息逆向流动。多个主机之间如果要互通消息,都要先通过agent发给master,再转给目标主机的agent。
6.采用该消息管理方案,主机上进程的通信能力依赖本机agent进程这个单点服务,导致主机的高可用性无法保证。
技术实现要素:
7.有鉴于此,本技术提供了一种消息处理方法、装置和电子设备,如下:
8.一种消息处理方法,包括:
9.消息代理集群从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;
10.确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;
11.控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
12.可选的,上述的方法,还包括:
13.从消息队列中获得第二主题的第二消息,所述第二消息是任意消息代理器发起的心跳包;
14.分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;
15.基于处于运行状态的消息代理器个数满足预设个数条件,执行消息代理集群从消
息队列中获得第一主题的第一消息步骤;
16.基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警信息。
17.可选的,上述的方法,所述从消息队列中获得第二主题的第二消息之后,还包括:
18.分析所述第二消息,得到所述消息代理器的地址信息;
19.基于所述地址信息向所述消息代理器发起第一连接请求;
20.基于接收到所述消息代理器反馈的连接确认信息,与所述消息代理器构建消息代理集群。
21.可选的,上述的方法,所述从消息队列中获得第一主题的第一消息之前,还包括:
22.获得所述消息代理集群中任意消息代理器上传的业务进程信息;
23.基于所述任务进程信息更新所述消息代理集群的连接列表,所述连接列表包括消息代理器与业务进程的对应关系。
24.可选的,上述的方法,所述确定所述第一消息对应于第一消息代理器连接的第一业务进程,包括:
25.分析所述第一消息,得到所述第一消息对应的第一业务进程;
26.基于所述连接列表,确定所述第一业务进程对应于第一消息代理器。
27.可选的,上述的方法,还包括:
28.接收第二业务进程的第二连接请求;
29.分析所述连接列表,得到所述消息代理集群中包含的消息代理器;
30.在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器;
31.控制第二消息代理器接受所述第二业务进程,以使得所述第二业务进程通过所述第二消息代理器从消息队列获得消息。
32.可选的,上述的方法,所述在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器,包括:
33.分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数
34.获取消息代理集群中每个消息代理器的进程连接配额;
35.基于所述消息代理集群中每个消息代理器连接的业务进程个数以及进程连接配额,确定满足预设连接条件的第二消息代理器。
36.可选的,上述的方法,还包括:
37.分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;
38.基于所述消息代理集群中至少两个消息代理器连接的业务进程个数满足不均衡条件,控制所述至少两个消息代理器断开至少一个业务进程,以保留预设连接个数的业务进程;
39.接收第三业务进程对应的第二连接请求,所述第三业务进程是与所述消息代理器断开的业务进程;
40.基于所述消息代理集群中每个消息代理器连接的业务进程个数,确定第三消息代理器响应所述第二连接请求,连接所述第三业务进程,以使得所述消息代理集群中的消息
代理器连接的业务进程个数不满足不均衡条件。
41.一种消息处理装置,包括:
42.获得模块,用于从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;
43.确定模块,用于确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;
44.控制模块,用于控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
45.一种电子设备,包括:
46.存储器、处理器;
47.其中,存储器存储有处理程序;
48.所述处理器用于加载并执行所述存储器存储的所述处理程序,以实现如上述任一项所述的消息处理方法的各步骤。
49.综上所述,本技术提供了一种消息处理方法、装置和电子设备,包括:消息代理集群从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。本实施例中,由于消息代理集群中的消息代理器共享消息队列中的消息,任意消息代理器从消息队列中获得第一消息,确定其对应的第一消息代理器连接的第一业务进程,则控制第一消息代理器将该第一消息发送给其连接的第一业务进行响应,通过一层作为中间件的消息代理集群互通本地进程路由和消息转发实现业务进程连接的轻量级寻址,只需要有有限个消息分类通道就能够实现百万级进程的消息管理,实现大量有状态业务进程的消息链路的高性能管理。
附图说明
50.为了更清楚地说明本技术实施例的技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本技术的实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据提供的附图获得其他的附图。
51.图1是现有技术中消息管理方案示意图;
52.图2是本技术提供的一种消息处理方法实施例1的流程图;
53.图3是本技术提供的一种消息处理方法实施例1中消息代理集群示意图;
54.图4是本技术提供的一种消息处理方法实施例2的流程图;
55.图5是本技术提供的一种消息处理方法实施例3的流程图;
56.图6是本技术提供的一种消息处理方法实施例4的流程图;
57.图7是本技术提供的一种消息处理方法实施例5的流程图;
58.图8是本技术提供的一种消息处理方法实施例6的流程图;
59.图9是本技术提供的一种消息处理方法实施例7的流程图;
60.图10是本技术提供的一种消息处理装置实施例的结构示意图。
具体实施方式
61.下面将结合本技术实施例中的附图,对本技术实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本技术一部分实施例,而不是全部的实施例。基于本技术中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本技术保护的范围。
62.如图2所示的,为本技术提供的一种消息处理方法实施例1的流程图,该方法应用于一电子设备,该电子设备是运行消息代理集群中任意消息代理器的设备,该方法包括以下步骤:
63.步骤s201:消息代理集群从消息队列中获得第一主题的第一消息;
64.其中,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器。
65.其中,消息代理集群包括一个或者多个消息代理器。
66.具体的,若一个系统中包含多个业务主机,该消息代理器设置于各个业务主机上,可以部署任意个,且不需要每个主机部署,通常分散到不同的业务主机上部署,分散的目的是任意业务主机宕机不影响系统运行。
67.具体实施中,一个系统中设置的消息代理集群包含5-10个消息代理器。
68.其中,该消息代理集群中的消息代理器共享该消息队列中的消息,无master、agent节点区分。
69.其中,业务控制端通过相应的业务进程将消息发送到消息队列中。
70.其中,该第一主题的消息是外部的业务消息,具体是业务控制端生成的消息,与消息代理器本身无关的消息。
71.例如,该第一主题的消息统称外部的业务消息,该外部的业务消息可以分为更细的主题(topic)的消息,如话费单、积分单等。
72.其中,该消息队列中包含由各种主题的消息,具体包含业务控制端生成的第一主题消息,以及消息代理集群中的消息代理器生成的第二主题消息。
73.需要说明的是,后续实施例中会针对第二主题的消息进行说明,本实施例中不做详述。
74.其中,消息代理集群中的任意消息代理器可从消息队列中取出消息,而且该消息被一消息代理器取出后,该消息队列中不再有该消息,相应的,其他消息代理器不能从消息队列中获得同一消息。
75.步骤s202:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
76.其中,业务进程是每个业务主机上都可能启动的进程,它们与任意消息代理器进程建立连接并注册登记身份。
77.其中,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程。
78.需要说明的是,消息代理器是服务端,业务进程属于客户端,每个消息代理器实例上会有大量的业务进程接入,建立tcp(transmission control protocol,传输控制协议)长连接。
79.其中,由于消息代理集群中的消息代理器的信息共享,该第一消息可以是消息代理集群中任意的消息代理器获得的。
80.相应的,该消息代理器能够基于该第一消息中携带的信息确定其对应的第一业务进程,而第一业务进程是连接在第一消息代理器上,则确定该第一消息对应于第一消息代理器连接的第一业务进程。
81.步骤s203:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
82.其中,在确定了该第一消息是由第一业务进程处理,则控制连接第一业务进程的第一消息代理器将该第一消息发送给第一业务进程进行响应该第一消息。
83.其中,若获得该第一消息的是第一消息代理器,则该第一消息处理器直接将该第一消息发送给第一业务进程;若该第一消息是由除了第一消息代理器的其消息代理器获得,则将该第一消息发送给第一消息代理器,再由该第一消息代理器将该第一消息发送给第一业务进程。
84.如图3所示的是消息代理集群示意图,包括三个消息代理器301-303,消息代理器302连接业务进程304,消息代理器303连接业务进程305,消息代理器301从消息队列获得业务消息(第一消息),将该业务消息同步给消息代理器302,由业务进程302处理该第一消息。
85.需要说明的是,由于消息代理集合在消息队列的发布订阅过程中间增加消息中转的能力,接入消息队列接收外部消息,通过一层作为中间件的消息代理集群互通本地进程路由和消息转发实现业务进程连接的轻量级寻址,因此,只需要有有限个消息分类通道就能够实现百万级进程的消息管理,实现大量有状态业务进程的消息链路的高性能管理。
86.综上,本实施例提供的一种消息处理方法,包括:消息代理集群从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。本实施例中,由于消息代理集群中的消息代理器共享消息队列中的消息,任意消息代理器从消息队列中获得第一消息,确定其对应的第一消息代理器连接的第一业务进程,则控制第一消息代理器将该第一消息发送给其连接的第一业务进行响应,通过一层作为中间件的消息代理集群互通本地进程路由和消息转发实现业务进程连接的轻量级寻址,只需要有有限个消息分类通道就能够实现百万级进程的消息管理,实现大量有状态业务进程的消息链路的高性能管理。
87.如图4所示的,为本技术提供的一种消息处理方法实施例2的流程图,该方法包括以下步骤:
88.步骤s401:从消息队列中获得第二主题的第二消息;
89.其中,所述第二消息是任意消息代理器发起的心跳包。
90.其中,消息代理集群中的各个消息代理器向消息队列上传心跳包,该心跳包以第二主题的消息放入进入消息队列中。
91.具体实施中,消息代理器按照约定的周期上传心跳包,该心跳包中包含有该消息代理器的相关信息。
92.其中,各个消息代理器从消息队列中取出该第二消息,实现订阅其他消息代理器
的心跳,在消息代理器订阅其他消息代理器的心跳后,该其他消息代理器上传第二消息,会将该第二消息推送给该消息代理器。
93.步骤s402:分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;
94.基于处于运行状态的消息代理器个数满足预设个数条件,执行步骤s403-405,不满足,执行步骤s406。
95.其中,各个消息代理器从消息队列中获得的第二消息是该消息代理集群中各个消息代理器上传的。
96.具体实施中,可以按照约定的周期分析消息代理集群中处于运行状态的消息代理器个数。
97.具体的,分析该第二消息,能够确定该第二消息是由哪个消息代理器生成,相应的,若消息代理器能够生成心跳包,则表征该消息代理器处于运行状态,若在一个或者几个周期内均未获得某一消息代理器的第二消息,则确定该消息代理器挂了,不再处于运行状态。
98.需要说明的是,由于消息代理集群中所有的业务进程可以通过任意消息代理器连接,因此,只要有一个消息代理器实例存储即可让所有业务进程保证连接上。
99.其中,该消息代理集群采用的是分散部署的方式,消息代理器的个数可以是任意个,在每个主机上部署任意个(包括0)。
100.其中,该预设个数条件是非零即可,如1个到任意个。
101.其中,若处于运行状态的消息代理器非0,有至少一个,则可执行步骤s403-405,对于消息队列中的第一消息进行处理;若处于运行状态的消息代理为0个,无法实现对于消息队列中的第一消息进行处理,执行步骤s406。
102.步骤s403:基于处于运行状态的消息代理器个数满足预设个数条件,消息代理集群从消息队列中获得第一主题的第一消息;
103.步骤s404:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
104.步骤s405:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应;
105.其中,步骤s403-405与实施例1中的相应步骤一致,本实施例中不做赘述。
106.步骤s406:基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警信息。
107.其中,若处于运行状态的消息代理不满足预设个数条件,具体是该消息代理集合中无处于运行状态的消息代理器,则无法实现对于消息队列中的第一消息进行处理,生成报警信息,以提示用户无消息代理器活着,需要进行维护。
108.综上,本实施例提供的一种消息处理方法,还包括:从消息队列中获得第二主题的第二消息,所述第二消息是任意消息代理器发起的心跳包;分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;基于处于运行状态的消息代理器个数满足预设个数条件,执行消息代理集群从消息队列中获得第一主题的第一消息步骤;基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警信息。本实施例中,消息代理器通过发起心跳包作为第二消息放入消息队列中,以使得各个消息代理器能够确定当前消息
代理集群中消息代理器处于运行状态的个数,以保证消息代理集群中有消息代理器存活,让业务进程保证连接上,为保证业务进程提供基础。
109.如图5所示的,为本技术提供的一种消息处理方法实施例3的流程图,该方法包括以下步骤:
110.步骤s501:从消息队列中获得第二主题的第二消息;
111.其中,步骤s501与实施例2中的相应步骤一致,本实施例中不做赘述。
112.步骤s502:分析所述第二消息,得到所述消息代理器的地址信息;
113.需要说明的是,本实施例中,是对于构建消息代理集群的过程进行的说明,一般在系统创建或者创建新的消息代理器时执行步骤s502-504过程。
114.其中,消息代理器发起心跳包作为第二主题的第二消息放入消息队列中。
115.其中,该第二消息中携带有消息代理器所在节点的地址信息。
116.具体的,该地址信息包括ip(internet protocol,互联网协议)地址、端口等。
117.其中,任意能够读取该消息队列的消息代理器从该消息队列中获得第二消息,并且对于第二消息进行分析,得到该消息代理器的地址信息。
118.其中,若发送该心跳包的消息代理器尚不属于消息代理集群,则获得该第二消息的消息代理器未订阅该消息代理器的心跳,则需要将该发送心跳包的消息代理器加入消息代理集群。
119.步骤s503:基于所述地址信息向所述消息代理器发起第一连接请求;
120.其中,基于该地址信息,向该地址信息对应的地址发起第一连接请求,以请求与该消息代理器建立连接。
121.具体的,该第一连接请求用于请求与该消息代理器建立长连接。
122.步骤s504:基于接收到所述消息代理器反馈的连接确认信息,与所述消息代理器构建消息代理集群;
123.其中,该发送心跳包的消息代理器接收到第一连接请求后,可以反馈一连接确认信息,以实现与发起第一连接请求的消息代理器建立连接,实现与该消息代理器构建消息代理集群的过程。
124.其中,该构建的消息代理集群中是各个消息代理器组成的互联互通的网络。
125.相应的,后续步骤中,若是与第一消息代理器不同的消息代理器获得第一消息,通过其与第一消息代理器之间的长连接,将该第一消息转发给第一消息代理器。
126.步骤s505:分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;
127.基于处于运行状态的消息代理器个数满足预设个数条件,执行步骤s506-508,不满足,执行步骤s509。
128.步骤s506:基于处于运行状态的消息代理器个数满足预设个数条件,消息代理集群从消息队列中获得第一主题的第一消息;
129.步骤s507:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
130.步骤s508:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应;
131.步骤s509:基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警
信息。
132.其中,步骤s505-509与实施例2中的相应步骤一致,本实施例中不做赘述。
133.综上,本实施例提供的一种消息处理方法,还包括:分析所述第二消息,得到所述消息代理器的地址信息;基于所述地址信息向所述消息代理器发起第一连接请求;基于接收到所述消息代理器反馈的连接确认信息,与所述消息代理器构建消息代理集群。本实施例中,基于消息代理器发送的第二消息,确定消息代理器的地址信息,进而基于该地址信息向该消息代理器发起第一连请求,以与该消息代理器构建消息代理集群,为后续消息代理器之间相互转发第一消息提供网络基础。
134.如图6所示的,为本技术提供的一种消息处理方法实施例4的流程图,该方法包括以下步骤:
135.步骤s601:获得所述消息代理集群中任意消息代理器上传的业务进程信息;
136.其中,该消息代理集群中的每个消息代理器,将其连接任务进程信息作为同一个主题(topic),将该任务进程信息作为消息添加到消息队列中。
137.其中,每个消息代理器中连接的任务进程统计为进程连接表,将该进程连接表作为业务进程信息上传给消息队列。
138.其中,消息代理集群中的消息代理器可以从该消息队列中获取任意消息代理器上传的业务进程信息。
139.其中,某个消息代理器获得该主题下的业务进程信息,实现对于该业务进程信息的订阅,后续其他消息代理器上传任务进程信息,都会推送给该消息代理器。
140.步骤s602:基于所述任务进程信息更新所述消息代理集群的连接列表;
141.其中,所述连接列表包括消息代理器与业务进程的对应关系。
142.其中,在获得该任务进程信息后,更新本地存储的消息代理集群的连接列表,该连接列表中包含有该消息代理集群中各个消息代理器连接的业务进程。
143.其中,该连接列表是该消息代理集群的进程地图,包含整个系统中所有任务进程和消息代理器的连接信息。
144.其中,该进程地图是各个消息代理器共享的自己连接的进程列表组成的集合,由此可知其他代理器都连接了多少进程。
145.需要说明的是,由于各个消息代理器中存储有该消息代理集群的连接列表,能够与其他消息代理器互通本地路由,互相分享进程连接路由,组成进程地图,为后续确定第一消息对应第一业务连接的消息代理器提供确认基础,实现了轻量级寻址,而且,单个形如消息队列topic的消息入口即可支持数十万进程的消息转发。
146.步骤s603:消息代理集群从消息队列中获得第一主题的第一消息;
147.步骤s604:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
148.步骤s605:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
149.其中,步骤s603-605与实施例1中的相应步骤一致,本实施例中不做赘述。
150.综上,本实施例提供的一种消息处理方法,还包括:获得所述消息代理集群中任意消息代理器上传的业务进程信息;基于所述任务进程信息更新所述消息代理集群的连接列表,所述连接列表包括消息代理器与业务进程的对应关系。本实施例中,基于消息代理集群
中各个消息代理器共享的连接任务进程构建消息代理集群的连接列表,为后续确定第一消息对应第一业务连接的消息代理器提供确认基础,实现了轻量级寻址。
151.如图7所示的,为本技术提供的一种消息处理方法实施例5的流程图,该方法包括以下步骤:
152.步骤s701:获得所述消息代理集群中任意消息代理器上传的业务进程信息;
153.步骤s702:基于所述任务进程信息更新所述消息代理集群的连接列表;
154.步骤s703:消息代理集群从消息队列中获得第一主题的第一消息;
155.其中,步骤s706与实施例4中的相应步骤一致,本实施例中不做赘述。
156.步骤s704:分析所述第一消息,得到所述第一消息对应的第一业务进程;
157.其中,业务控制端通过一个固定的主题结构的消息队列发送消息。
158.因此,该第一主题的第一消息是采用固定形式的数据结构。
159.具体的,数据结构包括进程的业务逻辑编号、消息号及其他业务字段。
160.其中,该消息代理集群获得该第一消息后,通过业务逻辑编号查询到相应的业务进程是第一业务进程。
161.步骤s705:基于所述连接列表,确定所述第一业务进程对应于第一消息代理器;
162.其中,连接列表中记录着消息代理集群中各个消息代理器连接的业务集成。
163.具体的,查询该连接列表,确定该第一业务进程连接在第一消息代理器。
164.相应的,后续步骤中,控制该第一消息代理器将该第一消息转发给其连接的第一业务进程,以使得该第一业务进程响应该第一消息。
165.需要说明的是,第一业务进程如果需要向上给业务控制端发送消息,则通过该第一消息代理器将该消息发送给业务控制端。
166.步骤s706:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
167.其中,步骤s706与实施例4中的相应步骤一致,本实施例中不做赘述。
168.综上,本实施例提供的一种消息处理方法,包括:分析所述第一消息,得到所述第一消息对应的第一业务进程;基于所述连接列表,确定所述第一业务进程对应于第一消息代理器。本实施例中,分析第一消息,得到所述第一消息中携带的表征其对应的第一业务进程的信息,确定其对应的第一业务进程,在连接列表中确定该第一业务进程连接的第一消息代理器,实现了轻量级寻址的过程。
169.如图8所示的,为本技术提供的一种消息处理方法实施例6的流程图,该方法包括以下步骤:
170.步骤s801:获得所述消息代理集群中任意消息代理器上传的业务进程信息;
171.步骤s802:基于所述任务进程信息更新所述消息代理集群的连接列表;
172.步骤s803:消息代理集群从消息队列中获得第一主题的第一消息;
173.步骤s804:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
174.步骤s805:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应;
175.其中,步骤s803-805与实施例4中的相应步骤一致,本实施例中不做赘述。
176.步骤s806:接收第二业务进程的第二连接请求;
177.其中,该第二业务进程可以是新加入的业务进程,也可以是之前连接过某个消息代理器,而该消息代理器挂了,从新发起连接的业务进程。
178.其中,第二业务进程与消息代理集群中的任意一个消息代理器均未连接。
179.其中,接收到该第二业务进程的第二连接请求后,为该第二业务进程连接一个消息代理器。
180.其中,该第二连接请求可以是通过一预设接口向该消息代理集群发送的连接请求。
181.具体的,该消息代理集群中的任意一个消息连接器都可以作为接收到该第二连接请求的服务端。
182.具体实施中,消息代理集群设置有负载均衡地址,该第二业务进程可以向该负载均衡地址发起长连接,以实现请求连接消息代理器。
183.步骤s807:分析所述连接列表,得到所述消息代理集群中包含的消息代理器;
184.其中,分析前述步骤中更新的连接列表,确定消息代理集群中包含的消息代理器。
185.其中,该消息代理集群中包含的消息代理器,具体是当前处于运行状态的消息代理器,能够连接该第二业务进程。
186.步骤s808:在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器;
187.其中,该预设连接条件具体是负载较少,或者是剩余负载配额较多。
188.其中,由于连接列表中包含有消息代理集群中包含的各个消息代理器,以及各个消息代理器连接的进程,因此,基于该连接列表中记录的信息选择一第二消息代理器。
189.其中,在接收到该第二连接请求后,为了实现消息代理集群负载均衡,需要将该第二业务进程连接到负载较少的一个消息代理器。
190.具体的,步骤s808,包括:
191.步骤s8081:分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;
192.步骤s8082:获取消息代理集群中每个消息代理器的进程连接配额;
193.步骤s8083:基于所述消息代理集群中每个消息代理器连接的业务进程个数以及进程连接配额,确定满足预设连接条件的第二消息代理器。
194.其中,各个消息代理器连接的业务进程的个数以及其进程连接配额,可以通过消息代理器之间的长连接进行全局路由共享,为了保证该消息代理集群中的负载均衡,从中选择一个剩余配额较多的消息代理器作为第二消息代理器,以使得每个消息代理器均匀的接受消息链路,实现了消息代理集群中各个消息代理器的负载均衡。
195.步骤s809:控制第二消息代理器接受所述第二业务进程,以使得所述第二业务进程通过所述第二消息代理器从消息队列获得消息。
196.其中,在确定了第二消息代理器后,控制该第二消息代理器接受该第二业务进程,连接该第二业务进程,实现了将该第二业务进程连接到消息代理集群的目的,后续消息队列中发送给第二业务进程的消息通过该第二消息代理器获得。
197.具体的,该第二消息代理器可以直接从消息会队列获得或者的通过其他消息代理器获得发送给第二业务进程的第一主题的消息。
198.综上,本实施例提供的一种消息处理方法,还包括:接收第二业务进程的第二连接请求;分析所述连接列表,得到所述消息代理集群中包含的消息代理器;在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器;控制第二消息代理器接受所述第二业务进程,以使得所述第二业务进程通过所述第二消息代理器从消息队列获得消息。本实施例中,在有业务进程请求连接到消息代理集群时,在消息代理集群中处于运行状态的消息代理器中选择第二消息代理器连接,后续该业务进程的消息都由该第二消息代理器进行中转。
199.如图9所示的,为本技术提供的一种消息处理方法实施例7的流程图,该方法包括以下步骤:
200.步骤s901:获得所述消息代理集群中任意消息代理器上传的业务进程信息;
201.步骤s902:基于所述任务进程信息更新所述消息代理集群的连接列表;
202.步骤s903:消息代理集群从消息队列中获得第一主题的第一消息;
203.步骤s904:确定所述第一消息对应于第一消息代理器连接的第一业务进程;
204.步骤s905:控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应;
205.其中,步骤s901-905与实施例6中的相应步骤一致,本实施例中不做赘述。
206.步骤s906:分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;
207.其中,本实施例是针对消息代理集群的负载平衡过程进行的说明。
208.其中,当满足特定的触发条件时,触发该步骤s906。
209.其中,该特定的触发条件可以是达到约定的周期、某一个消息代理器的负载较大(如连接的业务进程个数达到约定连接个数上限)、部分业务进程被停止、消息代理器节点扩展实例数导致的消息代理器个数增加等。
210.其中,在消息代理集群运行过程中,其中的消息代理器更新连接列表,该连接列表中记载有该消息代理集群中每个消息代理器连接的业务进程个数。
211.其中,该消息代理集群较优的运行方式是各个消息代理器上连接的业务进程个数相同,各个消息代理器的负载平衡。
212.但是,实际运行过程中,可能由于各种原因,会导致某个/些消息代理器上连接的业务进程个数较多,使得该消息代理器需要中转的第一主题的消息较多,使得其上连接的业务进程可能需要等待,影响了消息处理速度,而在分布式场景中,由于有海量的数据需要处理,导致数据处理速度很慢。
213.步骤s907:基于所述消息代理集群中至少两个消息代理器连接的业务进程个数满足不均衡条件,控制所述至少两个消息代理器断开至少一个业务进程,以保留预设连接个数的业务进程;
214.其中,在确定了消息代理集群中各个消息代理器连接的业务进程个数后,分析该消息代理集群中的消息代理器的负载是否均衡。
215.具体的,任意两个消息处理器连接的业务进程个数差距大于某个个数阈值,则表征二者满足不均衡条件。
216.例如,一个消息处理器连接的业务进程个数是20个,另一个消息处理器连接的业
务进程个数是100个,二者差距为80个,远大于个数阈值10个,则确定二者满足不均衡条件。
217.其中,对于不满足均衡条件的消息代理器进行重新分配业务进程,以使其满足均衡条件。
218.具体的,断开部分业务进程,再将断开的业务进程重新分配连接。
219.具体实施中,可以预先设置每个消息代理器连接业务进程的最小连接数,将超过该最小连接数的业务进程断开,然后架构该断开的业务进程再次连接到消息代理集群中。
220.具体的,消息代理器对外http接口通知该业务进程对应的业务客户端触发连接重平衡,由该业务客户端主动断开并进行重连。
221.其中,该对外http接口还能够接收到外部的查询,实现对于该消息代理集合连接的任意业务进程的查询。
222.步骤s908:接收第三业务进程对应的第二连接请求,所述第三业务进程是于所述消息代理器断开的业务进程;
223.其中,接收到第三业务进程对应的业务客户端发出的第二连接请求,以请求将该第三业务进程重新连接到消息代理集群中。
224.步骤s909:基于所述消息代理集群中每个消息代理器连接的业务进程个数,确定第三消息代理器响应所述第二连接请求,连接所述第三业务进程,以使得所述消息代理集群中的消息代理器连接的业务进程个数不满足不均衡条件。
225.其中,消息代理集群中的任意消息代理器连接了一个业务进程后,都会与其他消息代理器共享该信息,因此,在接收到第二连接请求后,基于该消息代理集群中的各个消息代理器连接的业务进程个数,从中选择一个连接较少的一个作为第三消息代理器进行响应该第二连接请求,该第三消息代理器连接该第三业务进程,最终达到该消息代理集群中的各个消息代理器连接的业务进程个数均衡。
226.具体实施中,第二连接请求可以是业务客户端向任意一个消息代理器发起的,接收到该第二连接请求的消息代理器可以根据消息代理集群中各个消息代理器的情况决定是否接受此次连接,如果不接受,可以将该第二连接请求通过与其他消息代理器的长连接转发给其他消息代理器,迫使第三业务进程连接到连接了相对较少业务进程的消息代理器中。
227.综上,本实施例提供的一种消息处理方法,还包括:分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;基于所述消息代理集群中至少两个消息代理器连接的业务进程个数满足不均衡条件,控制所述至少两个消息代理器断开至少一个业务进程,以保留预设连接个数的业务进程;接收第三业务进程对应的第二连接请求,所述第三业务进程是与所述消息代理器断开的业务进程;基于所述消息代理集群中每个消息代理器连接的业务进程个数,确定第三消息代理器响应所述第二连接请求,连接所述第三业务进程,以使得所述消息代理集群中的消息代理器连接的业务进程个数不满足不均衡条件。本实施例中,基于消息代理集群中各个消息代理器连接的业务进程个数不满足均衡条件时,实现对于消息代理集群中消息代理器连接的业务进行重新分片重连接平衡的过程,可在业务扩缩容时通过重建连接自动均衡系统连接数,实现了消息代理集群中整体的业务进程平衡。
228.与上述本技术提供的一种消息处理方法实施例相对应的,本技术还提供了应用该
消息处理方法的装置实施例。
229.如图10所示的为本技术提供的一种消息处理装置实施例的结构示意图,该装置应用于一电子设备,该电子设备是运行消息代理集群中任意消息代理器的设备,该装置包括以下结构:获得模块1001、确定模块1002和控制模块1003;
230.其中,该获得模块1001,用于从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;
231.其中,该确定模块1002,用于确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;
232.其中,该控制模块1003,用于控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。
233.可选的,还包括:
234.第二消息获得模块,用于从消息队列中获得第二主题的第二消息,所述第二消息是任意消息代理器发起的心跳包;
235.第一分析模块,用于分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;
236.基于处于运行状态的消息代理器个数满足预设个数条件,触发获得模块;
237.生成模块,用于基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警信息。
238.可选的,所述还包括:
239.第二分析模块,用于从消息队列中获得第二主题的第二消息之后,分析所述第二消息,得到所述消息代理器的地址信息;
240.发起模块,用于基于所述地址信息向所述消息代理器发起第一连接请求;
241.构建模块,用于基于接收到所述消息代理器反馈的连接确认信息,与所述消息代理器构建消息代理集群。
242.可选的,,所述还包括:
243.业务进程信息获得模块,用于从消息队列中获得第一主题的第一消息之前,获得所述消息代理集群中任意消息代理器上传的业务进程信息;
244.更新模块,用于基于所述任务进程信息更新所述消息代理集群的连接列表,所述连接列表包括消息代理器与业务进程的对应关系。
245.可选的,确定模块,具体用于:
246.分析所述第一消息,得到所述第一消息对应的第一业务进程;
247.基于所述连接列表,确定所述第一业务进程对应于第一消息代理器。
248.可选的,还包括:
249.第一接收模块,接收第二业务进程的第二连接请求;
250.第三分析模块,用于分析所述连接列表,得到所述消息代理集群中包含的消息代理器;
251.确定模块,还用于在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器;
252.控制模块,还用于控制第二消息代理器接受所述第二业务进程,以使得所述第二
业务进程通过所述第二消息代理器从消息队列获得消息。
253.可选的,所述确定模块,具体用于:
254.分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数
255.获取消息代理集群中每个消息代理器的进程连接配额;
256.基于所述消息代理集群中每个消息代理器连接的业务进程个数以及进程连接配额,确定满足预设连接条件的第二消息代理器。
257.可选的,还包括:
258.第四分析模块,用于分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;
259.控制模块,还用于基于所述消息代理集群中至少两个消息代理器连接的业务进程个数满足不均衡条件,控制所述至少两个消息代理器断开至少一个业务进程,以保留预设连接个数的业务进程;
260.第二接收模块,用于接收第三业务进程对应的第二连接请求,所述第三业务进程是与所述消息代理器断开的业务进程;
261.确定模块,还用于基于所述消息代理集群中每个消息代理器连接的业务进程个数,确定第三消息代理器响应所述第二连接请求,连接所述第三业务进程,以使得所述消息代理集群中的消息代理器连接的业务进程个数不满足不均衡条件。
262.需要说明的是,本实施例中提供的一种消息处理装置中各个结构的功能解释,请参考前述方法实施例,本实施例中不做赘述。
263.综上,本实施例提供的一种消息处理装置,由于消息代理集群中的消息代理器共享消息队列中的消息,任意消息代理器从消息队列中获得第一消息,确定其对应的第一消息代理器连接的第一业务进程,则控制第一消息代理器将该第一消息发送给其连接的第一业务进行响应,通过一层作为中间件的消息代理集群互通本地进程路由和消息转发实现业务进程连接的轻量级寻址,只需要有有限个消息分类通道(topic)就能够实现百万级进程的消息管理,实现大量有状态业务进程的消息链路的高性能管理。
264.与上述本技术提供的一种消息处理方法实施例相对应的,本技术还提供了与该消息处理方法相应的电子设备以及可读存储介质。
265.其中,该电子设备,包括:存储器、处理器;
266.其中,存储器存储有处理程序;
267.所述处理器用于加载并执行所述存储器存储的所述处理程序,以实现如上述任一项所述的消息处理方法的各步骤。
268.具体该电子设备的实现消息处理方法,参考前述消息处理方法实施例即可。
269.其中,该可读存储介质,其上存储有计算机程序,所述计算机程序被处理器调用并执行,实现如上述任一项所述的消息处理方法的各步骤。
270.具体该可读存储介质存储的计算机程序执行实现消息处理方法,参考前述消息处理方法实施例即可。
271.本说明书中各个实施例采用递进的方式描述,每个实施例重点说明的都是与其他实施例的不同之处,各个实施例之间相同相似部分互相参见即可。对于实施例提供的装置
而言,由于其与实施例提供的方法相对应,所以描述的比较简单,相关之处参见方法部分说明即可。
272.对所提供的实施例的上述说明,使本领域专业技术人员能够实现或使用本技术。对这些实施例的多种修改对本领域的专业技术人员来说将是显而易见的,本文中所定义的一般原理可以在不脱离本技术的精神或范围的情况下,在其它实施例中实现。因此,本技术将不会被限制于本文所示的这些实施例,而是要符合与本文所提供的原理和新颖特点相一致的最宽的范围。
技术特征:
1.一种消息处理方法,其特征在于,包括:消息代理集群从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。2.根据权利要求1所述的方法,其特征在于,还包括:从消息队列中获得第二主题的第二消息,所述第二消息是任意消息代理器发起的心跳包;分析所述第二消息,得到所述消息代理集群中处于运行状态的消息代理器个数;基于处于运行状态的消息代理器个数满足预设个数条件,执行消息代理集群从消息队列中获得第一主题的第一消息步骤;基于处于运行状态的消息代理器个数不满足预设个数条件,生成报警信息。3.根据权利要求2所述的方法,其特征在于,所述从消息队列中获得第二主题的第二消息之后,还包括:分析所述第二消息,得到所述消息代理器的地址信息;基于所述地址信息向所述消息代理器发起第一连接请求;基于接收到所述消息代理器反馈的连接确认信息,与所述消息代理器构建消息代理集群。4.根据权利要求1所述的方法,其特征在于,所述从消息队列中获得第一主题的第一消息之前,还包括:获得所述消息代理集群中任意消息代理器上传的业务进程信息;基于所述任务进程信息更新所述消息代理集群的连接列表,所述连接列表包括消息代理器与业务进程的对应关系。5.根据权利要求4所述的方法,其特征在于,所述确定所述第一消息对应于第一消息代理器连接的第一业务进程,包括:分析所述第一消息,得到所述第一消息对应的第一业务进程;基于所述连接列表,确定所述第一业务进程对应于第一消息代理器。6.根据权利要求4所述的方法,其特征在于,还包括:接收第二业务进程的第二连接请求;分析所述连接列表,得到所述消息代理集群中包含的消息代理器;在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器;控制第二消息代理器接受所述第二业务进程,以使得所述第二业务进程通过所述第二消息代理器从消息队列获得消息。7.根据权利要求6所述的方法,其特征在于,所述在所述消息代理集群包含的消息代理器中确定满足预设连接条件的第二消息代理器,包括:分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数获取消息代理集群中每个消息代理器的进程连接配额;基于所述消息代理集群中每个消息代理器连接的业务进程个数以及进程连接配额,确
定满足预设连接条件的第二消息代理器。8.根据权利要求4所述的方法,其特征在于,还包括:分析所述连接列表,得到所述消息代理集群中每个消息代理器连接的业务进程个数;基于所述消息代理集群中至少两个消息代理器连接的业务进程个数满足不均衡条件,控制所述至少两个消息代理器断开至少一个业务进程,以保留预设连接个数的业务进程;接收第三业务进程对应的第二连接请求,所述第三业务进程是与所述消息代理器断开的业务进程;基于所述消息代理集群中每个消息代理器连接的业务进程个数,确定第三消息代理器响应所述第二连接请求,连接所述第三业务进程,以使得所述消息代理集群中的消息代理器连接的业务进程个数不满足不均衡条件。9.一种消息处理装置,其特征在于,包括:获得模块,用于从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;确定模块,用于确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;控制模块,用于控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。10.一种电子设备,其特征在于,包括:存储器、处理器;其中,存储器存储有处理程序;所述处理器用于加载并执行所述存储器存储的所述处理程序,以实现如权利要求1-8任一项所述的消息处理方法的各步骤。
技术总结
本申请提供了一种消息处理方法、装置和电子设备,包括:消息代理集群从消息队列中获得第一主题的第一消息,所述第一消息是业务控制端发送到消息队列,所述消息代理集群包括至少一个消息代理器;确定所述第一消息对应于第一消息代理器连接的第一业务进程,所述第一业务进程是消息代理集群中第一消息代理器连接的任务进程;控制所述第一消息代理器将所述第一消息发送给所述第一业务进程响应。消息发送给所述第一业务进程响应。消息发送给所述第一业务进程响应。
技术研发人员:孔祥飞 韦昌太 彭迈
受保护的技术使用者:亚信科技(南京)有限公司
技术研发日:2023.06.21
技术公布日:2023/8/31
版权声明
本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
航空之家 https://www.aerohome.com.cn/
飞机超市 https://mall.aerohome.com.cn/
航空资讯 https://news.aerohome.com.cn/