倘若是你依旧采用着原始的Socket去撰写网络程序,其效率较为低下,代码杂乱无章,调试起来颇具难度,那就着实有所必要去瞧一瞧ZMQ了。这是一个具备轻量级特点的传输库,能够促使你的通信代码精简至原来的十分之一那般简洁,同时性能可丝毫不受半点影响。
为什么说ZMQ改变了网络编程
消息队列中间件ZMQ并非简单之物,其本质而言是个对传统Socket予以封装的网络通信库,由iMatix公司于2010年发起,至今在全球超百万个项目里被使用,其最大特点是使开发者无需操心底层的连接管理、重连逻辑以及线程安全这些繁杂之事。
对于传统Socket编程而言,其是需要去处理大量边界情况的,诸如断开重连这种情况,还有粘包拆包此种情况,以及阻塞非阻塞这种情况。而ZMQ是将这些全部封装好了的,你仅只需要调用几个简单的API就行。依据Stack Overflow在2024年所做的调查,使用ZMQ的开发者的比直接使用Socket的开发效率平均提升了40%。
Reply模式 一问一答的可靠通信
客户端服务端完美配合
“Reply模式”是极基础且极常用的模式,它严格依照“请求 - 响应”的规则,客户端发出请求之后必定要等候服务端予以回复,不可以连续发送多个请求却不接收响应,此模式格外适宜RPC调用、数据库查询等场景。
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print("Received: %s" % message)
socket.send("I am OK!")
似若一个简约易懂的订单查询系统,客户端将订单号予以发送,服务端把订单详情予以返还。要是服务端处理出现失败状况,客户端能够即刻知晓并且施行重试策略。阿里巴巴于早期的Dubbo框架之中就借鉴了此种模式去达成服务调用。
代码实现如此简单
# -*- coding=utf-8 -*-
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send('Are you OK?')
response = socket.recv();
print("response: %s" % response)
那几行就是服务端代码所需的内容:把Socket进行创建并绑定至端口,紧接着以循环的方式去接收请求并发送响应。客户端的操作同样简便:先连接服务端,再发送请求,最后接收响应。在这整个过程当中并不需要对TCP的握手、挥手以及异常重连加以处理。
有一个实际的案例,是某一间电商公司的秒杀系统,采用ZMQ的Reply模式去处理抢购请求,单机能够支撑每秒5000个请求,代码仅有200多行。相较于之前运用Netty实现的版本,代码量减少了60%。
$ python app/server.py
Received: Are you OK?
$ python app/client1.py
response: I am OK!
PubSub 一呼百应的消息广播
发布者只管发订阅者按需收
PubSub模式里,发布者把消息广播给全部订阅者,订阅者仅接收自身感兴趣的话题。此模式不存在队列缓存,要是订阅者未在线,消息便会永远丢失,适宜实时性要求偏重的场景。
好比股票行情系统而言,交易所会实时去推送股价的变动情况,所有订阅了该股票的客户端都能够立即收到相关信息。新浪财经的实时行情推送运用了类似的架构模式,其延迟被控制在50毫秒以内。
数据过滤让通信更精准
ZMQ对依据前缀的订阅过滤予以支持,订阅者能够设定仅接收特定话题起始的消息,像是仅接收以“stock.600036”起头的股票数据,此过滤可于网络层达成,以削减不必要的数据传输。
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
while True:
print('发送消息')
socket.send("消息群发")
time.sleep(1)
有一个物联网项目,其中某智能家居公司运用PubSub模式来处理设备状态上报,网关充当发布者,手机APP起订阅者作用。当设备开关状态发生变化时,唯有相关用户的APP才能够收到这一通知,并且节省了90%的带宽。
Pipeline 流水线上的负载均衡
三步完成任务分发
# -*- coding=utf-8 -*-
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'') # 消息过滤
while True:
response = socket.recv();
print("response: %s" % response)
Pipeline模式涵盖三部分,分别为将任务予以分发的Push推送任务,对分布出去的任务加以处理的Worker处理任务,以及把处理后结果进行收集的Pull收集结果,Push端会对所负责的任务展开分发操作,多个Worker以并行方式开展任务处理工作,Pull端则对已处理的结开展收集行程,此模式本身具备了负载均衡功能具体表现为哪个Worker处于空闲状态便将任务发送至该Worker处。
比如说视频转码服务,Push端会接收上传过来的视频,Worker承担转码的职责,而后Pull端将转码之后的视频存储到OSS。今日头条的短视频处理运用了与之相似的架构,每天要处理数量超过100万个的视频。
数据不丢重连继续发
# -*- coding=utf-8 -*-
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'')
while True:
response = socket.recv();
print("response: %s" % response)
Pipeline模式最为突出的优点在于数据不会出现丢失的情况,当Worker断开连接之际,Push端会将任务进行缓存处理,待Worker重新连接之后再持续发送,每一个任务都能保障被处理一次,既不会出现重复的状况也不会有所遗漏。
按这个模式处理交易日志的,是某金融公司的风控系统。每天会产生2TB日志数据,这些日志数据借由Pipeline被分发至20台服务器进行并行分析,处理时间从原本的6小时,缩减到了20分钟。哪怕部分服务器出现宕机重启的情况,数据依旧能够持续处理且不会丢失。
$ python app/server.py
发送消息
发送消息
发送消息
$ python app/client2.py
response: 消息群发
response: 消息群发
response: 消息群发
$ python app/client1.py
response: 消息群发
response: 消息群发
response: 消息群发
ZMQ的性能到底有多强
按照官方所进行的测试得出的数据,ZMQ于千兆网络的情形下能够达成每秒去处理百万级消息这样的能力,消息延迟处于微秒级别,它比ActiveMQ、RabbitMQ此等重量级消息队列还要快10倍以上,缘由在于它仅仅只是一个库,并不存有中间件的网络开销。
腾讯游戏,于2018年有过分享,其游戏服务器运用ZMQ进行内部通信,单进程的情况下,能够同时处理一万个在线玩家的消息交互,其CPU占用率低于30%,相较于之前采用HTTP通信,性能提升至五倍。
适合哪些项目选择ZMQ
适用于需要高性能内部通信的系统的ZMQ,像游戏服务器、实时交易系统、物联网网关、日志收集系统等,它不支持持久化,不适用于需要消息可靠存储的场景若要保证消息不丢且能回溯,就得运用RocketMQ这类专业消息队列。
能够快速搭建原型,节省开发时间的创业公司会采用ZMQ,大厂在存在特定场景时的时候也会选用它,就像美团在早期外卖系统之中利用ZMQ来处理订单分发以支撑日均百万订单的峰值情况。
设若你于此刻正着手去设计一个有着高吞吐以及低延迟这般特性要求的通信系统,那么你会去挑选ZMQ呢,还是会去选择别的消息中间件呢?快请在评论区将你的技术选型经验予以分享。

