消息队列的重发方法及装置、电子设备及存储介质与流程
未命名
08-29
阅读:88
评论:0

1.本技术涉及分布式技术,尤其涉及一种消息队列的重发方法及装置、电子设备及存储介质。
背景技术:
2.在数据业务技术中,消息是指在两台计算机间传送的数据单位,消息队列则是在消息的传输过程中保存消息的容器,是一种分布式应用间交换信息的信息处理技术,主要用于保证消息的传递,为应用程序提供消息处理和消息队列功能。通过生产、消费的操作将消息推送至容器或拉取出来,这种处理机制能够有效降低分布式系统的耦合性,简化各子节点之间的通讯复杂性。在现有技术中,消息队列一般采用异步方式生产消息,当生产者使用调用发送消息当方法时,通过设置回调函数,并根据消息的发送状态确定下一步动作,例如,当消息发送失败时,可以在回调函数中设置重发操作。
3.而当网络发生异常时,生产者无法收到回调函数,即此时生产者无法判断本次消息的实际发送状态是否成功,继而无法及时处理生产者消息,因此,现有的消息队列的重发方法存在生产者消息丢失率高可靠性低的技术问题。
技术实现要素:
4.本技术提供一种消息队列的重发方法及装置、电子设备及存储介质,用以解决生产者消息丢失率高可靠性低的技术问题。
5.第一方面,本技术提供一种消息队列的重发方法,包括:
6.获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;
7.基于分布式锁记录,判断消息的分布式锁状态;
8.在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的轮询起点;
9.基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发。
10.可选地,基于分布式锁记录确定消息的轮询起点,包括:
11.基于分布式锁记录确定消息的分布式锁的上一次处理时间;
12.基于分布式锁的上一次处理时间与当前时间,确定消息的轮询起点。
13.可选地,基于分布式锁的上一次处理时间与当前时间,确定消息的轮询起点,包括:
14.基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;
15.判断分布式锁的上一次处理时间与预设时间间隔之和是否晚于当前时间;
16.若是,则将游标分区确定为消息的轮询起点。
17.可选地,基于分布式锁的上一次处理时间与当前时间,确定消息的轮询起点,还包括:
18.在分布式锁的上一次处理时间与预设时间间隔之和早于当前时间时,将分布式锁的上一次处理时间对应的时间分区确定为消息的轮询起点。
19.可选地,基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发之后,重发方法还包括:
20.基于轮询起点,确定多个时间分区的轮询顺序;
21.基于轮询顺序,对下一时间分区中满足预设条件的消息进行重发;其中,预设条件是指发送时间超过预设时长且发送次数不超过预设阈值。
22.可选地,基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发,包括:
23.获取消息发送表中轮询起点对应的至少一个第一消息,并获取第一消息的发送状态;
24.将发送状态为发送中的第一消息确定为第二消息,获取第二消息的发送时间;
25.将发送时间超过预设时长的第二消息确定为第三消息,获取第三消息的发送次数;
26.对发送次数超过预设阈值的第三消息进行重发。
27.可选地,在获取消息发送表之前,包括:
28.基于斯普瑞布特spring boot读取目标数据源,并建立mybat is与目标数据源所在的数据库之间的连接;
29.基于mybatis与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中;
30.定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源中处于发送中或发送失败的消息建立消息发送表。
31.第二方面,本技术提供一种消息队列的重发装置,包括:
32.获取模块,用于获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;
33.判断模块,用于基于分布式锁记录,判断消息的分布式锁状态;
34.确定模块,用于在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的轮询起点;
35.重发模块,用于基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发。
36.可选地,确定模块用于:
37.基于分布式锁记录确定消息的分布式锁的上一次处理时间;
38.基于分布式锁的上一次处理时间与当前时间,确定消息的轮询起点。
39.可选地,确定模块还用于:
40.基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;
41.判断分布式锁的上一次处理时间与预设时间间隔之和是否晚于当前时间;
42.若是,则将游标分区确定为消息的轮询起点。
43.可选地,确定模块还用于:
44.在分布式锁的上一次处理时间与预设时间间隔之和早于当前时间时,将分布式锁的上一次处理时间对应的时间分区确定为消息的轮询起点。
45.可选地,装置还用于:
46.基于轮询起点,确定多个时间分区的轮询顺序;
47.基于轮询顺序,对下一时间分区中满足预设条件的消息进行重发;其中,预设条件是指发送时间超过预设时长且发送次数不超过预设阈值。
48.可选地,装置还用于:
49.获取消息发送表中轮询起点对应的至少一个第一消息,并获取第一消息的发送状态;
50.将发送状态为发送中的第一消息确定为第二消息,获取第二消息的发送时间;
51.将发送时间超过预设时长的第二消息确定为第三消息,获取第三消息的发送次数;
52.对发送次数超过预设阈值的第三消息进行重发。
53.可选地,装置还用于:
54.基于斯普瑞布特spring boot读取目标数据源,并建立mybat is与目标数据源所在的数据库之间的连接;
55.基于mybatis与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中;
56.定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源中处于发送中或发送失败的消息建立消息发送表。
57.第三方面,本技术提供一种电子设备,包括:处理器,以及与处理器通信连接的存储器;
58.存储器存储计算机执行指令;
59.处理器执行存储器存储的计算机执行指令,以实现如第一方面的消息队列的重发方法。
60.第四方面,本技术提供一种计算机可读存储介质,计算机可读存储介质中存储有计算机执行指令,计算机执行指令被处理器执行时用于实现如第一方面的消息队列的重发方法。
61.第五方面,本技术提供了一种程序产品,该程序产品包括计算机程序,计算机程序被处理器执行实现如第一方面的消息队列的重发方法。
62.本技术提供的消息队列的重发方法及装置、电子设备及存储介质,通过在消息发送表中消息对应的分布式锁处于未锁定状态时,基于分布式锁记录确定轮询起点,并基于轮询起点对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发的手段,基于数据库表存储消息并记录消息的发送状态,在对消息进行发送操作时将消息的状态置为发送中,若此时网络异常,则该条消息记录的状态会长期处于发送中,通过扫表操作确定网络异常产生的生产者丢失消息,在消息对应的分布式锁未锁定时基于轮询起点分批对已经超时但发送次数未超过预设阈值的消息进行重发,可以精准确定生产者丢失消息并对其进行重发,实现了降低生产者消息丢失率提高消息发送可靠性的技术效果。
附图说明
63.此处的附图被并入说明书中并构成本说明书的一部分,示出了符合本技术的实施例,并与说明书一起用于解释本技术的原理。
64.图1为本技术实施例提供的消息队列的重发方法流程图一;
65.图2为本技术实施例提供的消息队列的重发方法流程图二;
66.图3为本技术实施例提供的消息队列的重发方法流程图三;
67.图4为本技术实施例提供的消息队列的重发装置的结构示意图;
68.图5为本技术实施例提供的一种电子设备的结构示意图。
69.通过上述附图,已示出本技术明确的实施例,后文中将有更详细的描述。这些附图和文字描述并不是为了通过任何方式限制本技术构思的范围,而是通过参考特定实施例为本领域技术人员说明本技术的概念。
具体实施方式
70.这里将详细地对示例性实施例进行说明,其示例表示在附图中。下面的描述涉及附图时,除非另有表示,不同附图中的相同数字表示相同或相似的要素。以下示例性实施例中所描述的实施方式并不代表与本技术相一致的所有实施方式。相反,它们仅是与如所附权利要求书中所详述的、本技术的一些方面相一致的装置和方法的例子。
71.现有的消息队列一般采用异步方式生产消息,当生产者使用调用发送消息当方法时,可以设置回调函数,根据回调函数返回的消息发送状态确定是否需要对消息进行重发,具体的,生产消息的回调函数将返回发送失败的消息,在消息发送失败时在回调函数中设置重发操作。但是,在网络发生异常时,生产者无法收到回调函数,即此时生产者无法判断本次消息是否发送成功,因此无法及时处理生产者消息丢失,存在生产者消息丢失率高可靠性低的技术问题。
72.本技术提供的消息队列的重发方法及装置、电子设备及存储介质,通过在消息发送表中消息对应的分布式锁处于未锁定状态时,基于分布式锁记录确定轮询起点,并基于轮询起点对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发的手段,基于数据库表存储消息并记录消息的发送状态,在对消息进行发送操作时将消息的状态置为发送中,若此时网络异常,则该条消息记录的状态会长期处于发送中,通过扫表操作确定网络异常产生的生产者丢失消息,在消息对应的分布式锁未锁定时基于轮询起点分批对已经超时但发送次数未超过预设阈值的消息进行重发,可以精准确定生产者丢失消息并对其进行重发,实现了降低生产者消息丢失率提高消息发送可靠性的技术效果。
73.下面以具体的实施例对本技术的技术方案以及本技术的技术方案如何解决上述技术问题进行详细说明。下面这几个具体的实施例可以相互结合,对于相同或相似的概念或过程可能在某些实施例中不再赘述。下面将结合附图,对本技术的实施例进行描述。
74.图1为本技术实施例提供的消息队列的重发方法流程图一。如图1所示,本实施例提供的消息队列的重发方法,包括:
75.s101、获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;
76.本实施例中,采用落库方式,即每次操作对应落到数据库中,辅助进行生产者消息
重发操作。基于斯普瑞布特spring boot读取目标数据源,并建立mybatis与目标数据源所在的数据库之间的连接,基于mybatis与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中,记录每条消息的发送状态,发送状态包括发送中、发送成功或发送失败,基于定时任务模块定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源中处于发送中或发送失败的消息建立消息发送表,并获取消息发送表中每条消息的分布式锁记录。此外,只在第一次搭建服务的时候通过斯普瑞布特spring boot的自动配置功能读取目标数据源,建立mybatis链接,并将目标数据源缓存于spring boot相应的容器之中。定时任务模块负责提供消息生产定时重试作业,可以在yml配置文件或前端运维页面中进行设置,同时,前端运维页面可以向消息队列的重发过程提供实时状态可视化监控。
77.s102、基于分布式锁记录,判断消息的分布式锁状态;
78.本实施例中,分布式锁记录包括分布式锁状态、分布式锁上一次处理时间、上一次处理时间的时间分区,当分布式锁状态为锁定时,无法对相应消息进行任何处理。
79.本实施例中,分布式锁基于mysql数据库实现,用来保证轮询过程中处理的每一条消息的强一致性,通过创建记录分布式锁的数据库表,每条分布式锁记录用于表征一个消息记录的锁,可以通过设置状态status字段记录分布式锁的状态,例如设置两个常量分别代表锁定和未锁定状态。分布式锁记录还可以基于其他字段记录重试服务名、服务所在物理单元定位。
80.s103、在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的轮询起点;
81.本实施例中,基于消息的分布式锁的上一次处理时间确定消息对应的时间分区;基于预设时间间隔将消息记录表中的消息分类至相应的时间分区;通过yml配置文件可以对预设时间间隔的大小进行修改;预设时间间隔可以是1分钟,也可以是10分钟,还可以是1小时,在此不对预设时间间隔进行具体限定;在一具体示范例中,预设时间间隔为10分钟,时间分区的划分方式如下表所示,将消息记录表当日时间从00:00分开始,基于预设时间间隔划分为144个时间分区,分别命名为p01、p02、p03...p144。
[0082][0083]
本实施例中,基于消息的分布式锁记录对应的时间分区与当前时间对应的时间分区确定轮询起点,即轮询开始的时间分区。将当前时间对应的时间分区确定为游标分区,将当前时间与消息对应的分布式锁的上一次处理时间进行比较,若分布式锁的上一次处理时间加上预设时间间隔晚于当前时间,则使用游标分区作为轮询起点;否则,则使用分布式锁的上一次处理时间对应的时间分区作为轮询起点。
[0084]
s104、基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消
息进行重发。
[0085]
本实施例中,根据确定的轮询起点,将每一个时间分区的消息作为一个批次,分批从消息记录表中取出消息记录,检查每条消息的发送状态,若发送状态显示为发送中,且发送时间超过预设时长且发送次数不超过预设阈值则对消息进行重发;若发送时间超过预设时长且发送次数超过预设阈值,则将消息记录转发至管理端以使运维人员可以对消息进行重发或者删除。此外,在对消息按时间分区进行分批处理时,需要根据时间分区确定对应的时间分区开始时间和时间分区结束时间,在时间分区开始时间时开始对消息进行处理,在时间分区结束时间结束对消息的处理。
[0086]
本实施例中,可以基于重发模块对消息进行重发,重发时重发模块检查每条消息的发送状态,发送状态包括发送中、发送成功、消息废弃、重发次数,在消息的发送状态为发送中且发送时间超过预设时长且发送次数不超过预设阈值时,对消息进行重发。
[0087]
本技术提供的消息队列的重发方法,通过在消息发送表中消息对应的分布式锁处于未锁定状态时,基于分布式锁记录确定轮询起点,并基于轮询起点对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发的手段,基于数据库表存储消息并记录消息的发送状态,在对消息进行发送操作时将消息的状态置为发送中,若此时网络异常,则该条消息记录的状态会长期处于发送中,通过扫表操作确定网络异常产生的生产者丢失消息,在消息对应的分布式锁未锁定时基于轮询起点分批对已经超时但发送次数未超过预设阈值的消息进行重发,可以精准确定生产者丢失消息并对其进行重发,实现了降低生产者消息丢失率提高消息发送可靠性的技术效果。
[0088]
图2为本技术实施例提供的消息队列的重发方法流程图二,本实施例提供的一种消息队列的重发方法,包括:
[0089]
s201、基于斯普瑞布特spring boot读取目标数据源,并建立mybatis与目标数据源所在的数据库之间的连接;基于mybat is与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中;定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源中处于发送中或发送失败的消息建立消息发送表;
[0090]
s202、获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;
[0091]
s203、基于分布式锁记录,判断消息的分布式锁状态;
[0092]
s204、在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的分布式锁的上一次处理时间;基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;判断分布式锁的上一次处理时间与预设时间间隔之和是否晚于当前时间;若是,则将游标分区确定为消息的轮询起点;
[0093]
s205、在分布式锁的上一次处理时间与预设时间间隔之和早于当前时间时,将分布式锁的上一次处理时间对应的时间分区确定为消息的轮询起点;
[0094]
s206、获取消息发送表中轮询起点对应的至少一个第一消息,并获取第一消息的发送状态;将发送状态为发送中的第一消息确定为第二消息,获取第二消息的发送时间;将发送时间超过预设时长的第二消息确定为第三消息,获取第三消息的发送次数;对发送次数超过预设阈值的第三消息进行重发;
[0095]
s207、基于轮询起点,确定多个时间分区的轮询顺序;基于轮询顺序,对下一时间分区中满足预设条件的消息进行重发;其中,预设条件是指发送时间超过预设时长且发送次数不超过预设阈值。
[0096]
通过上述s201至s207,可以实现基于分布式锁记录确定消息的轮询起点,通过数据库轮询的消息队列重试方法,基于时间分区规则,分批次轮询消息发送表中的消息,识别发送超时且重试次数不超过预设阈值的消息并进行重发,降低了生产者消息丢失率提高了消息发送可靠性。
[0097]
图3为本技术实施例提供的消息队列的重发方法流程图三,本实施例提供的一种消息队列的重发方法,包括:
[0098]
s301、基于斯普瑞布特spring boot读取目标数据源,并建立mybatis与目标数据源所在的数据库之间的连接;基于mybatis与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中;基于定时任务模块定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源中处于发送中或发送失败的消息建立消息发送表;
[0099]
s302、获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;其中,分布式锁记录基于分布式锁模块创建;
[0100]
s303、基于分布式锁记录,判断消息的分布式锁状态;
[0101]
s304、在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的分布式锁的上一次处理时间;基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;判断分布式锁的上一次处理时间与预设时间间隔之和是否晚于当前时间;若是,则将游标分区确定为消息的轮询起点;
[0102]
s305、在分布式锁的上一次处理时间与预设时间间隔之和早于当前时间时,将分布式锁的上一次处理时间对应的时间分区确定为消息的轮询起点;
[0103]
s306、获取消息发送表中轮询起点对应的至少一个第一消息,并获取第一消息的发送状态;将发送状态为发送中的第一消息确定为第二消息,获取第二消息的发送时间;将发送时间超过预设时长的第二消息确定为第三消息,获取第三消息的发送次数;对发送次数超过预设阈值的第三消息进行重发;
[0104]
s307、基于轮询起点,确定多个时间分区的轮询顺序;根据轮询顺序,基于重发模块对下一时间分区中满足预设条件的消息进行重发;其中,预设条件是指发送时间超过预设时长且发送次数不超过预设阈值;
[0105]
s308、基于重发模块将发送时间超过预设时长且发送次数超过预设阈值消息转发至管理端模块,以使运维人员重发或者删除消息。
[0106]
本技术提供的消息队列的重发方法,通过在消息发送表中消息对应的分布式锁处于未锁定状态时,基于分布式锁记录确定轮询起点,并基于轮询起点对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发的手段,基于数据库表存储消息并记录消息的发送状态,在对消息进行发送操作时将消息的状态置为发送中,若此时网络异常,则该条消息记录的状态会长期处于发送中,通过扫表操作确定网络异常产生的生产者丢失消息,在消息对应的分布式锁未锁定时基于轮询起点分批对已经超时但发送次数未超过预设阈值的消息进行重发,可以精准确定生产者丢失消息并对其进行重发,实现了降低生产者
消息丢失率提高消息发送可靠性的技术效果。
[0107]
图3为本技术实施例提供的消息队列的重发装置的结构示意图。如图4所示,本技术实施例提供的一种消息队列的重发装置400,包括:获取模块401、判断模块402、确定模块403以及重发模块404,
[0108]
获取模块401,用于获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;
[0109]
判断模块402,用于基于分布式锁记录,判断消息的分布式锁状态;
[0110]
确定模块403,用于在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的轮询起点;
[0111]
重发模块404,用于基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发。
[0112]
一种可能的实现方式中,确定模块用于:
[0113]
基于分布式锁记录确定消息的分布式锁的上一次处理时间;
[0114]
基于分布式锁的上一次处理时间与当前时间,确定消息的轮询起点。
[0115]
一种可能的实现方式中,确定模块还用于:
[0116]
基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;
[0117]
判断分布式锁的上一次处理时间与预设时间间隔之和是否晚于当前时间;
[0118]
若是,则将游标分区确定为消息的轮询起点。
[0119]
一种可能的实现方式中,确定模块还用于:
[0120]
在分布式锁的上一次处理时间与预设时间间隔之和早于当前时间时,将分布式锁的上一次处理时间对应的时间分区确定为消息的轮询起点。
[0121]
一种可能的实现方式中,该装置还用于:
[0122]
基于轮询起点,确定多个时间分区的轮询顺序;
[0123]
基于轮询顺序,对下一时间分区中满足预设条件的消息进行重发;其中,预设条件是指发送时间超过预设时长且发送次数不超过预设阈值。
[0124]
一种可能的实现方式中,该装置还用于:
[0125]
获取消息发送表中轮询起点对应的至少一个第一消息,并获取第一消息的发送状态;
[0126]
将发送状态为发送中的第一消息确定为第二消息,获取第二消息的发送时间;
[0127]
将发送时间超过预设时长的第二消息确定为第三消息,获取第三消息的发送次数;
[0128]
对发送次数超过预设阈值的第三消息进行重发。
[0129]
一种可能的实现方式中,该装置还用于:
[0130]
基于斯普瑞布特spring boot读取目标数据源,并建立mybat is与目标数据源所在的数据库之间的连接;
[0131]
基于mybatis与目标数据源所在的数据库的连接,将目标数据源缓存至斯普瑞布特spring boot相应的容器之中;
[0132]
定时扫描斯普瑞布特spring boot相应的容器中的目标数据源,基于目标数据源
中处于发送中或发送失败的消息建立消息发送表。
[0133]
本技术提供的消息队列的重发装置,包括获取模块、判断模块、确定模块以及重发模块,通过获取模块获取消息发送表中消息对应的分布式锁记录,在分布式锁处于未锁定状态时,通过判断模块和确定模块基于分布式锁记录确定轮询起点,通过重发模块基于轮询起点对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发,可以基于数据库表存储消息并记录消息的发送状态,在对消息进行发送操作时将消息的状态置为发送中,若此时网络异常,则该条消息记录的状态会长期处于发送中,通过扫表操作确定网络异常产生的生产者丢失消息,在消息对应的分布式锁未锁定时基于轮询起点分批对已经超时但发送次数未超过预设阈值的消息进行重发,可以精准确定生产者丢失消息并对其进行重发,实现了降低生产者消息丢失率提高消息发送可靠性的技术效果。
[0134]
图5为本技术实施例提供的一种电子设备的结构示意图,该设备包括:
[0135]
处理器501和存储器502;
[0136]
存储器存储计算机执行指令;
[0137]
处理器执行存储器502存储的计算机执行指令,使得电子设备执行如上述的消息队列的重发方法。
[0138]
应理解,上述处理器501可以是中央处理单元(英文:central processing unit,简称:cpu),还可以是其他通用处理器、数字信号处理器(英文:digital signal processor,简称:dsp)、专用集成电路(英文:application specific integrated circuit,简称:asic)等。通用处理器可以是微处理器或者该处理器也可以是任何常规的处理器等。结合发明所公开的方法的步骤可以直接体现为硬件处理器执行完成,或者用处理器中的硬件及软件模块组合执行完成。存储器502可能包含高速随机存取存储器(英文:random access memory,简称:ram),也可能还包括非易失性存储器(英文:non-volat ile memory,简称:nvm),例如至少一个磁盘存储器,还可以为u盘、移动硬盘、只读存储器、磁盘或光盘等。
[0139]
本技术实施例相应还提供一种计算机可读存储介质,计算机可读存储介质中存储有计算机执行指令,计算机执行指令被处理器执行时用于实现如上述的消息队列的重发方法。
[0140]
本公开实施例提供一种计算机程序产品,包括计算机程序,该计算机程序被处理器执行时实现如上述的消息队列的重发方法。
[0141]
本领域技术人员在考虑说明书及实践这里公开的发明后,将容易想到本技术的其他实施方案。本技术旨在涵盖本技术的任何变型、用途或者适应性变化,这些变型、用途或者适应性变化遵循本技术的一般性原理并包括本技术未公开的本技术领域中的公知常识或惯用技术手段。说明书和实施例仅被视为示例性的,本技术的真正范围和精神由下面的权利要求书指出。
[0142]
应当理解的是,本技术并不局限于上面已经描述并在附图中示出的精确结构,并且可以在不脱离其范围进行各种修改和改变。本技术的范围仅由所附的权利要求书来限制。
技术特征:
1.一种消息队列的重发方法,其特征在于,包括:获取消息发送表,基于所述消息发送表获取所述消息发送表中每条消息的分布式锁记录;基于所述分布式锁记录,判断所述消息的分布式锁状态;在所述分布式锁处于未锁定状态时,基于所述分布式锁记录确定所述消息的轮询起点;基于所述轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的所述消息进行重发。2.根据权利要求1所述的重发方法,其特征在于,所述基于所述分布式锁记录确定所述消息的轮询起点,包括:基于所述分布式锁记录确定所述消息的分布式锁的上一次处理时间;基于所述分布式锁的上一次处理时间与当前时间,确定所述消息的轮询起点。3.根据权利要求2所述的重发方法,其特征在于,所述基于所述分布式锁的上一次处理时间与当前时间,确定所述消息的轮询起点,包括:基于预设时间间隔确定多个时间区间,并将当前时间对应的时间区间确定为游标分区;判断所述分布式锁的上一次处理时间与所述预设时间间隔之和是否晚于当前时间;若是,则将所述游标分区确定为所述消息的所述轮询起点。4.根据权利要求3所述的重发方法,其特征在于,所述基于所述分布式锁的上一次处理时间与当前时间,确定所述消息的轮询起点,还包括:在所述分布式锁的上一次处理时间与所述预设时间间隔之和早于当前时间时,将所述分布式锁的上一次处理时间对应的时间分区确定为所述消息的所述轮询起点。5.根据权利要求4所述的重发方法,其特征在于,在所述基于所述轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的所述消息进行重发之后,所述重发方法还包括:基于所述轮询起点,确定多个所述时间分区的轮询顺序;基于所述轮询顺序,对下一时间分区中满足预设条件的所述消息进行重发;其中,所述预设条件是指发送时间超过所述预设时长且发送次数不超过所述预设阈值。6.根据权利要求4所述的重发方法,其特征在于,所述基于所述轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的所述消息进行重发,包括:获取所述消息发送表中所述轮询起点对应的至少一个第一消息,并获取所述第一消息的发送状态;将发送状态为发送中的所述第一消息确定为第二消息,获取所述第二消息的发送时间;将发送时间超过所述预设时长的所述第二消息确定为第三消息,获取所述第三消息的发送次数;对发送次数超过所述预设阈值的所述第三消息进行重发。7.根据权利要求1所述的重发方法,其特征在于,在所述获取消息发送表之前,包括:基于斯普瑞布特spring boot读取目标数据源,并建立mybatis与所述目标数据源所在
的数据库之间的连接;基于mybatis与所述目标数据源所在的数据库的连接,将所述目标数据源缓存至斯普瑞布特spring boot相应的容器之中;定时扫描所述斯普瑞布特spring boot相应的容器中的所述目标数据源,基于所述目标数据源中处于发送中或发送失败的消息建立所述消息发送表。8.一种消息队列的重发装置,包括:获取模块,用于获取消息发送表,基于所述消息发送表获取所述消息发送表中每条消息的分布式锁记录;判断模块,用于基于所述分布式锁记录,判断所述消息的分布式锁状态;确定模块,用于在所述分布式锁处于未锁定状态时,基于所述分布式锁记录确定所述消息的轮询起点;重发模块,用于基于所述轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的所述消息进行重发。9.一种电子设备,其特征在于,包括:处理器,以及与所述处理器通信连接的存储器;所述存储器存储计算机执行指令;所述处理器执行所述存储器存储的计算机执行指令,以实现如权利要求1至7任一项所述的消息队列的重发方法。10.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质中存储有计算机执行指令,所述计算机执行指令被处理器执行时用于实现如权利要求1至7任一项所述的消息队列的重发方法。
技术总结
本申请提供了一种消息队列的重发方法及装置、电子设备及存储介质,涉及分布式技术。该方法包括获取消息发送表,基于消息发送表获取消息发送表中每条消息的分布式锁记录;基于分布式锁记录,判断消息的分布式锁状态;在分布式锁处于未锁定状态时,基于分布式锁记录确定消息的轮询起点;基于轮询起点,对发送时间超过预设时长且发送次数不超过预设阈值的消息进行重发。通过本申请的方法可以实现降低生产者消息丢失率提高消息发送可靠性的技术效果。者消息丢失率提高消息发送可靠性的技术效果。者消息丢失率提高消息发送可靠性的技术效果。
技术研发人员:王艺羲 杨涛 王丹 刘新兰 赵春晖 雷声 朱伟 王婧 梁文婧 安妮
受保护的技术使用者:中国银行股份有限公司
技术研发日:2023.05.26
技术公布日:2023/8/28
版权声明
本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
航空之家 https://www.aerohome.com.cn/
飞机超市 https://mall.aerohome.com.cn/
航空资讯 https://news.aerohome.com.cn/