一种流水线式任务处理方法、装置、电子设备及存储介质与流程

未命名 09-29 阅读:70 评论:0


1.本发明涉及数据处理技术领域,尤其涉及一种流水线式任务处理方法、装置、电子设备及存储介质。


背景技术:

2.现如今,流水线式任务处理是程序开发领域较为常见的一种任务处理模式,程序中处理任务的方式与一条流水线生产一件产品类似,一个任务被置于流水线上,逐一流经每个处理节点,每个节点根据其处理逻辑,对任务进行加工或根据任务内容进行额外操作,末尾节点任务处理完成,代表着一次流水线式任务处理过程的结束。
3.在相关技术中,流水线式任务处理带来了更多要求,以应对不同类型应用和不同开发场景的需求,在面对各种场景下的流水线式任务处理,开发对应的流水线功能,但是随着任务量的增多,使得流水线式任务处理的任务处理效率较低。


技术实现要素:

4.本发明实施例提供一种流水线式任务处理方法、装置、电子设备及存储介质,以解决现有技术中存在对流水线式任务处理效率较低的问题。
5.为了解决上述问题,本发明是这样实现的:第一方面,本发明实施例提供一种流水线式任务处理方法,包括:获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。
6.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,包括:在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;
其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。
7.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。
8.可选地,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。
9.可选地,所述第一任务处理单元还包括定时处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。
10.第二方面,本发明实施例还提供了一种流水线式任务处理装置,包括:第一处理模块,用于获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;第二处理模块,用于运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;第三处理模块,用于在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号
调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。
11.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;所述第二处理模块包括:发送单元,用于在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;第一处理单元,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。
12.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;所述流水线式任务处理装置还包括:发送模块,用于在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;第四处理模块,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。
13.可选地,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。
14.可选地,所述第一任务处理单元还包括定时处理器;所述流水线式任务处理装置还包括:获取模块,用于获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;缓存模块,用于基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;第五处理模块,用于在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;第六处理模块,用于运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。
15.第三方面,本发明实施例还提供一种电子设备,包括处理器、存储器及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述计算机程序被所述处理器执行时实现如上述第一方面所述的流水线式任务处理方法中的步骤。
16.第四方面,本发明实施例还提供一种可读存储介质,用于存储程序,所述程序被处理器执行时实现如上述第一方面所述的流水线式任务处理方法中的步骤。
17.在本发明实施例中,首先获取来自任务源发送的目标任务,并将目标任务的格式进行转化,以传输至流水线的第一任务处理单元中,接着,第一任务处理单元可以根据目标任务的任务编号对第一任务结构体进行任务处理,具体为调用第一目标程序,以得到第二任务结构体,在流水线包括两个任务处理单元的情况下,继续以任务编号为基础,调用与第二任务处理单元关联的第二目标程序对第二任务结构体进行任务处理,以得到第三任务结构体,从而完成对目标任务的任务处理。本发明实施例实现对目标任务的并发处理,并且支持以任务编号为基础的分发策略对目标任务进行分发,从而提高对目标任务的任务处理效率。
附图说明
18.为更清楚地说明本发明实施例的技术方案,下面将对本发明实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动性的前提下,还可以根据这些附图获得其他的附图。
19.图1是本发明实施例提供的一种流水线式任务处理方法的流程示意图;图2是本发明实施例提供的另一种流水线式任务处理方法的流程示意图;图3是本发明实施例提供的一种流水线式任务处理装置的结构图;图4是本发明实施提供的一种电子设备的结构示意图。
具体实施方式
20.下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
21.请参见图1,图1是本发明实施例提供的一种流水线式任务处理方法的流程示意图,如图1所示,包括以下步骤:步骤101、获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;步骤102、运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;步骤103、在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体
传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。
22.其中,上述流水线式任务处理方法所包括的步骤101、步骤102以及步骤103可以是由电子设备来执行,例如计算机等设备,对此本发明实施例不作限定。
23.另外,本发明实施例可以是基于go语言来实现,go语言是一种支持协程并发型的编程语言,语言原生具备管道类型的数据结构用以支持协程间通信,基于go语言实现流水线式任务处理开发框架在具备一定优势。
24.应理解,不同场景下使用的流水线式任务处理程序的关键差异在于每个节点的任务处理逻辑不同,将流水线式的程序骨架从处理逻辑中抽离,在go语言的基础上,本发明实施例可以理解为实现一套通用流水线式任务处理的开发框架,框架使用者只需要编写每个节点上的任务处理程序,任务的流转由开发框架负责,能减少重复开发工作量,提高程序开发效率。
25.在步骤101中,获取来自任务源发送的目标任务,其中,上述任务源可以理解为任务产生的源头,任务的产生通常可以来自于订阅消息系统(例如:kafka)获取的任务数据,也可以是获取数据库(redis)中指定的数据,或者是获取数据库(mysql)中某一数据表中的数据,以上三种获取目标任务的举例在本发明实施例中都可以通过相关手段实现,对此本发明实施例不作限定。
26.另外,上述目标任务可以是包括多个任务,那么上述目标任务可以理解为任务的集合,因为任务源的不同,因此需要将上述目标任务进行格式转换,得到与流水线匹配第一任务结构体,例如:在本发明实施例中可以采用json格式对任务结构体进行串行化处理。
27.其中,上述第一任务结构体包括任务参数和任务编号,上述任务编号是在任务结构体创建时赋予的唯一标号,也可以理解为标识符,而上述任务参数则可以理解为上述第一任务结构体中的参数,对步骤103中的第二任务结构体不作多余赘述。
28.应理解,上述流水线可以理解为对上述目标任务进行任务处理的流水线,在上述流水线中可以包括多个任务处理单元,每一个任务处理单元可以执行不同的任务处理程序,将目标任务依次经过每一个任务处理单元的处理,以完成任务处理流程。
29.在步骤102中,运用上述流水线中的第一任务处理单元按照上述任务编号调用第一目标程序,依据上述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,其中,在第一任务处理单元中可以设置有多个控制器或是处理器,用于执行任务处理程序(在本发明实施例中为上述第一目标程序)对第一任务结构体进行任务处理,因此需要将第一任务结构体传输至目标控制器或是目标处理器。
30.在本发明实施例中,可以是运用上述流水线中的第一任务处理单元按照上述任务编号来选择目标处理器(控制器)执行上述第一目标程序,即按照上述任务编号散列分布至不同的处理器(控制器),从而防止一个处理器在时间段内处理多个任务结构体,而有些处理器却没有任务结构体可处理的情况。
31.当然,也可以根据上述任务参数进行任务分配,例如:根据指定的任务参数散列分布,从而能够保证有相同参数的任务只分发同一个处理器,从而使得在不加锁的情况下,可
以实现对相同参数的任务的互斥处理,另外,在任务量不超过阈值的情况下,可以进行随机分配策略。
32.以上所举例的任务分发策略可以配置于任务处理单元中,根据用户的需求以及实际情况进行设定,对此本发明实施例不作限定。
33.另外,在依据上述第一目标程序对上述任务参数进行任务处理操作,得到第二任务结构体的过程中,可以是基于go语言的反射机制,依据配置的函数名称找到直连处理器需要执行函数,然后调用该函数,进而完成任务处理操作。
34.需要说明的是,在本发明实施例中的第一目标程序和第二目标程序可以由用户进行开发或是设定,处理程序在不同的框架使用场景下不同,本发明实施例的技术方案中处理程序应以go语言开发,并以函数的形式存在,处理程序会在对应节点上的直连处理器上被执行,对应关系在框架配置中进行配置,处理器通过配置中方法名找到对应函数(具体查找方法是go语言的反射机制),直连处理器收到任务数据体后,调用对应函数,完成当前任务处理单元对任务的处理工作,函数返回后,直连处理器将任务数据体传递到下一个任务处理单元。
35.在步骤103中,在经过上述第二任务处理单元,并得到上述第三任务结构体后,若上述流水线中还存在有任务处理单元,则继续将上述第三任务结构体传输至下一任务处理单元,以完成对上述目标任务的任务处理。
36.即本发明实施例的基本流程可以表示为:(1)任务源产生任务,构造任务结构体(任务数据体),传递给第一任务处理单元;(2)收到第一任务结构体的第一任务处理单元执行框架使用者所开发的处理程序;(3)收到第一任务结构体的任务处理单元更新任务数据体中数据,得到第二任务结构体,并将第二任务结构体传递给第二任务处理单元;(4)收到第二任务结构体的第二任务处理单元重复过程(2)(3),直到传递到最后一个任务处理单元;(5)最后一个任务处理单元执行过程(2)后,不再更新任务数据体,亦不向下传递任务数据体,任务处理完成。
37.在本发明实施例中,首先获取来自任务源发送的目标任务,并将目标任务的格式进行转化,以传输至流水线的第一任务处理单元中,接着,第一任务处理单元可以根据目标任务的任务编号对第一任务结构体进行任务处理,具体为调用第一目标程序,以得到第二任务结构体,在流水线包括两个任务处理单元的情况下,继续以任务编号为基础,调用与第二任务处理单元关联的第二目标程序对第二任务结构体进行任务处理,以得到第三任务结构体,从而完成对目标任务的任务处理。本发明实施例实现对目标任务的并发处理,并且支持以任务编号为基础的分发策略对目标任务进行分发,从而提高对目标任务的任务处理效率。
38.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,包括:在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结
构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。
39.在该实施方案中,首先判断上述任务编号是否符合上述第一预设条件,具体为上述目标任务中包括上述任务编号的个数达到第一预设值,因为上述目标任务中可以包含多个任务,因此上述目标任务中也可以包括多个对应任务的任务编号,然后根据上述任务编号生成第一目标数量,根据上述第一目标数量确定上述直连处理器的数量,最后运用上述直连处理器执行上述第一目标程序对上述第一任务结构体中的任务参数进行更新,得到上述第二任务结构体。通过该实施方案,使得上述流水线可以支持依据策略分发任务,从而满足用户的个性化任务分发的开发需求,进而提高任务处理的效率。
40.在依据上述任务编号生成上述第一目标数量的过程中,可以是确定上述任务编号的个数,例如在上述目标任务中包括三个任务,那么对应上述目标任务的任务编号也是三个,进而确定上述第一目标数量为三,在直连处理器的数量大于三的情况下,可以调用三个直连处理器对第一任务结构体进行任务处理。
41.当然,上述任务编号和上述第一目标数量的关系可以由用户进行自定义设定,例如:若上述任务编号为八个,那么第一目标数量可以为四,对此本发明实施例不作限定。
42.需要说明的是,上述第一阈值也可以由用户根据实际情况和需求进行设定,对此本发明实施例不作限定。
43.可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。
44.在该实施方案中,首先判断上述任务参数是否符合上述第二预设条件,具体为上述目标任务中具有相同参数的任务的个数达到第二预设值,因为上述目标任务中可以包含多个任务,因此上述目标任务中也可以包括多个对应任务的任务参数,然后根据上述任务参数生成第二目标数量,根据上述第二目标数量确定上述直连处理器的数量,最后运用上述直连处理器执行上述第一目标程序对上述第一任务结构体中的任务参数进行更新,得到上述第二任务结构体。通过该实施方案,使得上述流水线可以支持依据另一种策略进行任务的分发,从而满足用户的个性化任务分发的开发需求,进而提高任务处理的效率。
45.需要说明的是,该策略能够保证有相同参数的任务只分发至同一个直连处理器,在这样在不加锁的情况下,可以实现对相同参数的任务的互斥处理,减小任务处理过程中
故障发生的概率。
46.需要说明的是,上述第二阈值也可以由用户根据实际情况和需求进行设定,对此本发明实施例不作限定。
47.根据不同的分发策略,任务处理单元中直连处理器进行任务处理的流程可以如下表示:(1)第一任务处理单元将任务数据体传递到分发器;(2)分发器根据分发策略,将第一任务结构体(第一任务数据体)传递到对应直连处理器;(3)直连处理器将第一任务结构体作为参数,执行用户所开发的第一目标程序;(4)如果第一目标程序返回任务数据体,直连处理器将第一任务结构体传递给第二任务处理单元。
48.可选地,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。
49.在该实施方案中,上述直连处理器的主要作用是执行用户所开发的任务处理程序。在任务处理过程中,需要为每个任务处理单元配置处理程序对应的函数名称,具体是可以实现中是基于go语言的反射机制,依据配置的函数名称找到直连处理器需要执行函数,然后调用该函数,进而完成对目标任务的处理。通过该方法,可以在每一个任务处理单元中调用对应的执行函数对任务结构体进行任务处理操作,提高了任务处理的精准性和高效性。
50.可选地,所述第一任务处理单元还包括定时处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。
51.在该实施方案中,数据缓存单元主要用于存储任务数据,暂存时将任务数据存储在每个任务处理单元独有的任务数据列表中,提供任务数据暂存接口和任务数据列表获取接口,具体为获取任务定时处理指令,并根据上述任务定时处理指令运用上述直连处理器调用任务数据暂存接口,将上述第一任务结构体暂存至任务数据列表中,在上述第一任务结构体储存至上述任务数据缓存单元的时间达到上述预设时间间隔的情况下,运用上述定
时处理器将上述第一任务结构体从上述任务数据缓存单元取出,并发送至定时处理器,最后运用上述直连处理器执行上述第一任务处理目标程序对上述第一任务结构体中的任务参数进行更新处理,得到上述第二任务结构体。通过该方法,可以实现将流水线式任务转化为批量任务,进而满足周期性分批任务处理的场景,满足用户的需求,提高对于目标任务的任务处理效果。
52.对于任务处理单元中定时任务处理的流程可以如下表示:(1)第一任务处理单元将第一任务结构体(第一任务数据体)传递到分发器;(2)分发器根据分发策略,将任务数据体传递到对应直连处理器;(3)直连处理器将任务数据体暂存到任务数据缓存,任务数据会被存储在本任务处理单元独有的任务数据列表中;(4)时间到达定时处理器配置的执行时间,定时处理器将步骤(3)暂存的所有数据取出,组装成任务数据体列表,发送给直连处理器;(5)直连处理器将任务数据体列表作为参数,执行框架使用者所开发的任务处理程序;(6)如果用户任务处理程序返回任务数据体,直连处理器将任务数据体传递给下游任务处理单元。
53.需要说明的是,上述预设时间间隔可以是由用户根据实际数据量和实际情况进行设定,对此本发明实施例不做限定。
54.作为一种可选的实施方式,请参见图2,图2是本发明实施例提供的另一种流水线式任务处理方法的流程示意图,如图2所示,流水线包括多个任务处理单元,任务传递系统将任务传输至流水线,在此之前需要将任务转化成结构统一的任务结构体,接着由第一任务处理单元的分发器将任务结构体(任务数据体)分发至对于的直连处理器中,同时也可以将任务结构体暂存至任务数据缓存中,具体是存储至对应第一任务处理单元的任务数据列表,在直连处理器对任务结构体进行任务处理后,得到第二任务结构体,并传输至下一个任务处理单元,直至最后一个任务处理单元完成任务处理操作。
55.需要说明的是,首先任务源是只负责对接任务传递系统(如kafka、redis、mysql等),因为任务生产系统涉及任务生产逻辑,所以并不包含在本发明实施例的方案中,以常见系统(如kafka、redis、mysql等)作为消息传递系统,对接方式统一。
56.(1)kafka是一种开源消息订阅系统,任务生产系统作为生产者生产任务,任务源模块作为消费者消费任务,任务在kafka中传递时,采用json形式进行序列化,任务源模块引用开源go语言库shopify/sarama与kafka系统通信,收到任务后,json反序列化将任务转换成任务数据体(go语言中的struct),并补充上任务编号。
57.(2)redis是开源的key-value内存缓存系统,支持多种数据结构,作为任务传递系统时,约定使用list数据结构传递任务,任务生产系统使用redis命令rpush将任务放入list中,任务源模块使用lpop命令获取list中的任务,同样任务数据采用json形式进行序列化。任务源模块引用开源go语言库go-redis与redis系统通信,收到任务后,json反序列化将任务转换成任务数据体(go语言中的struct),并补充上任务编号。
58.(3)mysql是开源的关系数据库系统,使用mysql作为任务传递系统时,需要约定好数据库名和数据表名,数据表结构要与任务结构一致,任务生产系统向库表中新增数据,任
务源模块定时查询库表中是否有新增数据,引用开源go语言库jmoiron/sqlx与mysql系统通信,获取一条新任务数据后,更新数据表,标记该条任务已处理,然后将数据转换成任务数据体(go语言中的struct),并补充上任务编号。
59.在图2所示的流程示意图基础上,以拼车场景为例,示例任务结构体(任务数据体)及相关数据的表现形式:拼车订单任务数据体定义例子可以如下表示:go语言版本的任务数据体可以如下表示:typetaskstruct{taskidint64//任务编号useridint64//乘客编号startingpointstring//起点destinationstring//目的地routeresultinterface{}//路线信息matchresult[]int64//拼友乘客编号列表dispatchresultint64//派单司机编号}通过kafka传递任务时的json结构可以如下表示:{“userid”:“乘客编号”,“startingpoint”:“起点”,“destination”:“目的地”}需要说明的是,拼车订单的处理流水线上需要4个节点,因此需要对应4种任务处理单元,可以如下表示:(1)根据起点、目的地,规划适合的行驶路线。对应处理程序的函数名称为routing;(2)根据行驶路线,查询顺路同行拼友。对应处理程序的函数名称为matching;(3)根据拼单信息,匹配司机,进行派单。对应处理程序的函数名称为dispatching;(4)拼单数据统计,统计每分钟新增订单数量。对应处理程序的函数名称为counting;任务源使用消息系统kafka,任务数据缓存使用redis。
[0060]
综上使用本框架进行拼车订单任务处理的配置如下:{“processingunits”:[#任务处理单元配置{“funcname”:“routing”,#规划路线函数“dispatchtype”:“random”#任务随机分发},
{“funcname”:“matching”,#查询顺路拼友函数“dispatchtype”:“hashbyid”#任务编号散列分发},{“funcname”:“dispatching”,#匹配司机派单函数“dispatchtype”:“random”#任务随机分发},{“funcname”:“counting”,#数据统计函数“dispatchtype”:“random”,#任务随机分发“ticker”:“on”,#开启定时任务处理“duration”:60#定时间隔60秒}],“taskorigin”:{#任务源配置“type”:“kafka”,#使用kafka“topic”:“carpoolingorder”#kafkatopic},“taskcache”:{#任务数据缓存配置“type”:“redis”,#使用redis“db”:“0”#redisdb}}请参见图3,图3是本发明实施例提供的一种流水线式任务处理装置的结构图,如图3所示,流水线式任务处理装置300包括:第一处理模块301,用于获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;第二处理模块302,用于运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;第三处理模块303,用于在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。
[0061]
可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;
第二处理模块302包括:发送单元,用于在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;第一处理单元,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。
[0062]
可选地,所述第一任务处理单元包括分发器和至少一个直连处理器;流水线式任务处理装置300还包括:发送模块,用于在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;第四处理模块,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。
[0063]
可选地,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。
[0064]
可选地,所述第一任务处理单元还包括定时处理器;流水线式任务处理装置300还包括:获取模块,用于获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;缓存模块,用于基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;第五处理模块,用于在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;第六处理模块,用于运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。
[0065]
本发明实施例提供的对话生成装置为能实现上述流水线式任务处理方法的各实施例的各个过程,技术特征一一对应,且能达到相同的技术效果,为避免重复,这里不再赘述。
[0066]
需要说明的是,本发明实施例中的流水线式任务处理装置可以是装置,也可以是
电子设备中的部件、集成电路、或芯片。
[0067]
本发明实施例还提供一种电子设备,参见图4,图4是本发明实施提供的一种电子设备的结构示意图,电子设备包括存储器401、处理器402和存储在存储器401上运行的程序或者指令,该程序或者指令被处理器402执行时可实现图1对应的方法实施例中的任意步骤及达到相同的有益效果,此处不再赘述。
[0068]
其中,处理器402可以是中央处理器(central processing unit,cpu)、专用集成电路(application specific integrated circuit,asic)、现场可编程逻辑门阵列(field programmable gate array,fpga)或图形处理器(graphics processing unit,gpu)。
[0069]
本领域普通技术人员可以理解实现上述实施例方法的全部或者部分步骤是可以通过程序指令相关的硬件来完成,所述的程序可以存储于一可读取介质中。
[0070]
本发明实施例还提供一种可读存储介质,可读存储介质上存储有计算机程序,计算机程序被处理器执行时可实现上述图1对应的对话生成方法实施例中的任意步骤,且能达到相同的技术效果,为避免重复,这里不再赘述。所述的存储介质,如只读存储器(read-only memory,rom)、随机存取存储器(random access memory,ram)、磁碟或者光盘等。
[0071]
本发明实施例中的术语“第一”、“第二”等是用于区别类似的对象,而不必用于描述特定的顺序或先后次序。此外,术语“包括”和“具有”以及他们的任何变形,意图在于覆盖不排他的包含,例如,包含一系列步骤或单元的过程、方法、系统、产品或设备不必限于清楚地列出的那些步骤或单元,而是可包括没有清楚地列出的或对于这些过程、方法、产品或设备固有的其它步骤或单元。此外,本技术中使用“和/或”表示所连接对象的至少其中之一,例如a和/或b和/或c,表示包含单独a,单独b,单独c,以及a和b都存在,b和c都存在,a和c都存在,以及a、b和c都存在的7种情况。
[0072]
需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者装置不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者装置所固有的要素。在没有更多限制的情况下,由语句“包括一个
……”
限定的要素,并不排除在包括该要素的过程、方法、物品或者装置中还存在另外的相同要素。
[0073]
通过以上的实施方式的描述,本领域的技术人员可以清楚地解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本技术的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质(如rom/ram、磁碟、光盘)中,包括若干指令用以使得一台终端(可以是手机,计算机,服务器,空调器,或者第二终端设备等)执行本技术各个实施例的方法。
[0074]
上面结合附图对本技术的实施例进行描述,但是本技术并不局限于上述的具体实施方式,上述的具体实施方式仅仅是示意性的,而不是限制性的,本领域的普通技术人员在本技术的启示下,在不脱离本技术宗旨和权利要求所保护的范围情况下,还可做出很多形式,均属于本技术的保护之内。

技术特征:
1.一种流水线式任务处理方法,其特征在于,包括:获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。2.根据权利要求1所述的流水线式任务处理方法,其特征在于,所述第一任务处理单元包括分发器和至少一个直连处理器;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,包括:在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。3.根据权利要求1所述的流水线式任务处理方法,其特征在于,所述第一任务处理单元包括分发器和至少一个直连处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。4.根据权利要求1至3任一项所述的流水线式任务处理方法,其特征在于,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。
5.根据权利要求2所述的流水线式任务处理方法,其特征在于,所述第一任务处理单元还包括定时处理器;在所述获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体之后,所述方法还包括:获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。6.一种流水线式任务处理装置,其特征在于,包括:第一处理模块,用于获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体,所述第一任务结构体包括所述目标任务的任务参数和任务编号;第二处理模块,用于运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体,所述第一目标程序为预先配置于所述第一任务处理单元中的程序,所述流水线中包括至少一个任务处理单元;第三处理模块,用于在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理;其中,所述第二目标程序为预先配置于所述第二任务处理单元中的程序。7.根据权利要求6所述的流水线式任务处理装置,其特征在于,所述第一任务处理单元包括分发器和至少一个直连处理器;所述第二处理模块包括:发送单元,用于在所述任务编号符合第一预设条件的情况下,运用所述分发器将所述第一任务结构体发送至匹配第一目标数量的直连处理器,所述第一目标数量依据所述任务编号生成;第一处理单元,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第一预设条件表示所述目标任务中包括所述任务编号的个数达到第一预设值。8.根据权利要求6所述的流水线式任务处理装置,其特征在于,所述第一任务处理单元包括分发器和至少一个直连处理器;所述流水线式任务处理装置还包括:
发送模块,用于在所述任务参数符合第二预设条件的情况下,运用所述流水线中的所述分发器将所述目标任务发送至匹配第二目标数量的直连处理器,所述第二目标数量依据所述任务参数生成;第四处理模块,用于运用所述直连处理器执行所述第一目标程序对所述第一任务结构体中的任务参数进行更新,得到所述第二任务结构体;其中,所述第二预设条件表示所述目标任务中具有相同参数的任务的个数达到第二预设值。9.根据权利要求6至8任一项所述的流水线式任务处理装置,其特征在于,任务处理单元通过如下方式对任务结构体进行更新处理:在接收到所述目标任务的情况下,获取与所述任务处理单元匹配的任务处理程序,并依据所述任务处理程序确定对应的函数名称;依据所述函数名称查询所述任务处理单元匹配的执行函数;调用所述执行函数对所述任务结构体进行任务处理。10.根据权利要求7所述的流水线式任务处理装置,其特征在于,所述第一任务处理单元还包括定时处理器;所述流水线式任务处理装置还包括:获取模块,用于获取任务定时处理指令,所述任务定时处理指令用于将所述第一任务结构体按照预设时间间隔进行缓存;缓存模块,用于基于所述任务定时处理指令运用所述直连处理器调用任务数据暂存接口,将所述第一任务结构体暂存至任务数据列表中,所述任务数据列表配置于任务数据缓存单元;第五处理模块,用于在所述第一任务结构体储存至所述任务数据缓存单元的时间达到所述预设时间间隔的情况下,运用所述定时处理器将所述第一任务结构体从所述任务数据缓存单元取出,并发送至定时处理器,所述第一任务结构体由所述定时处理器转发至所述直连处理器;第六处理模块,用于运用所述直连处理器执行所述第一任务处理目标程序对所述第一任务结构体中的任务参数进行更新处理,得到所述第二任务结构体。11.一种电子设备,其特征在于,包括处理器、存储器及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述计算机程序被所述处理器执行时实现如权利要求1至5中任一项所述的流水线式任务处理方法中的步骤。12.一种可读存储介质,用于存储程序,其特征在于,所述程序被处理器执行时实现如权利要求1至5中任一项所述的流水线式任务处理方法中的步骤。

技术总结
本发明提供一种流水线式任务处理方法、装置、电子设备及存储介质,涉及数据处理技术领域,该方法包括:获取来自任务源发送的目标任务,并对所述目标任务进行格式转换,得到与流水线匹配第一任务结构体;运用所述流水线中的第一任务处理单元按照所述任务编号调用第一目标程序,依据所述第一目标程序对所述任务参数进行任务处理操作,得到第二任务结构体;在所述流水线包括第二任务处理单元的情况下,将所述第二任务结构体传输至所述第二任务处理单元,运用所述第二任务处理单元按照所述任务编号调用第二目标程序,依据所述第二目标程序对所述第二任务结构体的任务参数进行任务处理操作,得到第三任务结构体,以完成对所述目标任务的任务处理。标任务的任务处理。标任务的任务处理。


技术研发人员:王健
受保护的技术使用者:云账户技术(天津)有限公司
技术研发日:2023.08.21
技术公布日:2023/9/23
版权声明

本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

航空之家 https://www.aerohome.com.cn/

航空商城 https://mall.aerohome.com.cn/

航空资讯 https://news.aerohome.com.cn/

分享:

扫一扫在手机阅读、分享本文

评论

相关推荐