当前位置:文档之家› zeromq的工作原理及使用

zeromq的工作原理及使用

zeromq的工作原理及使用一、ZeroMQ使用1.1ZeroMQ概述ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。

ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。

ZeroMQ看起来想一个可嵌入的网络库,但其作用就像是一个并发框架。

它为你提供了各种传输工具,如进程内,进程间,TCP和组播中进行原子消息传递的套接字。

你可以使用各种模式实现N对N的套接字连接,这些模式包括发布订阅,请求应答,扇出模式,管道模式。

它的速度足够快,因此可以充当集群产品的结构,他的异步IO模型提供了可扩展的多核应用程序,用异步消息来处理任务。

它虽然是以C为源码进行开发,但是可以绑定多种语言。

1.2请求/应答模式说到“请求-应答”模式,不得不说的就是它的消息流动模型以及数据包装模型。

消息流动模型指的是该模式下,必须严格遵守“一问一答”的方式。

发出消息后,若没有收到回复,再发出第二条消息时就会抛出异常。

同样的,对于Rep也是,在没有接收到消息前,不允许发出消息。

基于此构成“一问一答”的响应模式。

1.2.1服务端import timeimport zmqcontext=zmq.Context()socket=context.socket(zmq.REP)socket.bind("tcp://*:5555")while True:#Wait for next request from clientmessage=socket.recv()print("Received request:%s"%message)#Do some'work'time.sleep(1)#Send reply back to clientsocket.send(b"World")1.2.2客户端import zmqcontext=zmq.Context()#Socket to talk to serverprint("Connecting to hello world server…")socket=context.socket(zmq.REQ)socket.connect("tcp://localhost:5555")#Do10requests,waiting each time for a responsefor request in range(10):print("Sending request%s…"%request)socket.send(b"Hello")#Get the reply.message=socket.recv()print("Received reply%s[%s]"%(request,message))1.3发布/订阅模式“发布-订阅”模式下,“发布者”绑定一个指定的地址,例如“192.168.55.210:5556”,“订阅者”连接到该地址。

该模式下消息流是单向的,只允许从“发布者”流向“订阅者”。

且“发布者”只管发消息,不理会是否存在“订阅者”。

上图只是“发布-订阅”的最基本的模型,一个“发布者”可以拥有多个订阅者,同样的,一个“订阅者”也可订阅多个发布者。

下面给出“发布-订阅”模型的样例程序:1.3.1发布者import zmqfrom random import randrangecontext=zmq.Context()socket=context.socket(zmq.PUB)socket.bind("tcp://*:5556")while True:zipcode=randrange(1,100000)temperature=randrange(-80,135)relhumidity=randrange(10,60)socket.send_string("%i%i%i"%(zipcode,temperature, relhumidity))1.3.2订阅者import sysimport zmq#Socket to talk to servercontext=zmq.Context()socket=context.socket(zmq.SUB)print("Collecting updates from weather server…") socket.connect("tcp://localhost:5556")#Subscribe to zipcode,default is NYC,10001zip_filter=sys.argv[1]if len(sys.argv)>1else"10001"#Python2-ascii bytes to unicode strif isinstance(zip_filter,bytes):zip_filter=zip_filter.decode('ascii')socket.setsockopt_string(zmq.SUBSCRIBE,zip_filter)#Process5updatestotal_temp=0for update_nbr in range(5):string=socket.recv_string()zipcode,temperature,relhumidity=string.split() total_temp+=int(temperature)print("Average temperature for zipcode'%s'was%d F"%( zip_filter,total_temp/(update_nbr+1)))1.4管道模式“管道模式”一般用于任务分发与结果收集,由一个任务发生器来产生任务,“公平”的派发到其管辖下的所有worker,完成后再由结果收集器来回收任务的执行结果。

注:在ZeroMQ中并没有绝对的服务端与客户端之分,所有的数据接收与发送都是以连接为单位的,只区分ZeroMQ定义的类型。

就像套接字绑定地址时,可以使用“bind”,也可以使用“connect”,只是通常我们将理解中的服务端“bind”到一个地址,而理解中的客户端“connec”到该地址。

1.4.1任务发生器import zmqimport randomimport timetry:raw_inputexcept NameError:#Python3raw_input=inputcontext=zmq.Context()#Socket to send messages onsender=context.socket(zmq.PUSH)sender.bind("tcp://*:5557")#Socket with direct access to the sink:used to syncronize start of batchsink=context.socket(zmq.PUSH)sink.connect("tcp://localhost:5558")print("Press Enter when the workers are ready:")_=raw_input()print("Sending tasks to workers…")#The first message is"0"and signals start of batchsink.send(b'0')#Initialize random number generatorrandom.seed()#Send100taskstotal_msec=0for task_nbr in range(100):#Random workload from1to100msecsworkload=random.randint(1,100)total_msec+=workloadsender.send_string(u'%i'%workload)print("Total expected cost:%s msec"%total_msec)#Give0MQ time to delivertime.sleep(1)1.4.2Workerimport sysimport timeimport zmqcontext=zmq.Context()#Socket to receive messages onreceiver=context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557")#Socket to send messages tosender=context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558")#Process tasks foreverwhile True:s=receiver.recv()#Simple progress indicator for the viewer sys.stdout.write('.')sys.stdout.flush()#Do the worktime.sleep(int(s)*0.001)#Send results to sinksender.send(b'')1.4.3结果收集器import sysimport timeimport zmqcontext=zmq.Context()#Socket to receive messages onreceiver=context.socket(zmq.PULL) receiver.bind("tcp://*:5558")#Wait for start of batchs=receiver.recv()#Start our clock nowtstart=time.time()#Process100confirmationsfor task_nbr in range(100):s=receiver.recv()if task_nbr%10==0:sys.stdout.write(':')else:sys.stdout.write('.')sys.stdout.flush()#Calculate and report duration of batchtend=time.time()print("Total elapsed time:%d msec"%((tend-tstart)*1000))二、ZeroMQ特点2.1嵌入式消息组件与rabbitMQ,ActiveMQ有很大的不同,如果说rabbitMQ已经近乎是一个小型操作系统,那么ZeroMQ 就像是一个嵌入在操作系统内的一个组件,说白了ZeroMQ就是一组jar包,直接嵌入到项目中就可以运行,它不需要一台独立的服务器来承载整个消息系统。

相关主题