😀
MQTT、EMQX、MQTTX
了解
1:发布者发布一个主题,比如开启车联网的汽车空调,然后会有负载,我们把内容存到负载里面。之后订阅者订阅主题就可以取到内容
2:需要有服务质量,比如连接设备断网了,消息怎么办?
三个级别
Qos0:消息最多传递一次,说白了就是只发一次,发完我就关闭了,有没有收到我不管,如果当时客户端不可用,则丢失这条消息。
Qos1:消息传递至少一次,必须要你收到消息,你断网了没收到,我还会给你发, 直到你收到消息。注意:如果接收端没挂,只是消息堵塞了,那么还会发消息,这个时候接收端就会收到两条消息
Qos2:消息仅传送一次,我发送消息后,等待你给我返回确认,你什么时候确认收到了,我什么时候才会关闭发送消息
如果发布消息/主题订阅/接收消息三方设置的Qos等级不一样, 那么会优先考虑性能(等级最小的优先)
发送消息的时候要订阅主题,那么主题名字需要有命名规则:
比如你要发送汽车相关的,比如 汽车/座椅加热 汽车/开空调…… 那么接收的时候可以用通配符,汽车/* 代表接收有前面汽车的所有消息,可以多层匹配
可以理解为你发快递,发送北京/朝阳区 发到订阅者,然后接收者只要有北京/* 就可以接收到北京/的信息
还有+,这里没写,具体看下面。
延时消息:
用于下单后不支付, 我们可以设置为15分钟后去查看消息,如果为未支付,就关闭订单,恢复库存
发布者发送消息后,接收者15分钟后才能收到消息
怎么用?
在主题前面添加$delayed/DelayInterval/TopicName 最前面的是表示延迟消息,中间的是延迟时间,后面的是主题内容
共享消息:
只要订阅了主题,发送者发送消息后,订阅主题的都能收到消息
比如说有5w台机器,都要发送消息,那么接收端一下子就要接收很多消息,影响性能。这个时候我们可以设置集群,设置多个接收方来接收,接收方1号接收1w条消息,2号接收1w条消息…… ,这样就能提高执行效率了
共享消息是我发送消息后,接收方1号接收消息了,那么2号就不接收了
共享消息分为两种:带群组的 不带群组的
不带群组的:我发送消息1,2,3 接收方1处理1 接受方2处理2 …… 这里分发给谁,可以设置负载均衡,默认的策略是随机
带群组的:比如我订阅消息的有多个,有扣库存的,有通知物流的,我们把这两个分两个组,然后订阅消息。发送方发送消息后,两个组都能收到一条消息
JAVA使用,具体看下面
什么是MQTT?
MQ是异步发送消息 MQTT是物联网发送消息,说白了就是冰箱可以给手机发消息,可以给电脑发消息,设备之间互相发送消息,可以理解为订阅MQTT后,就可以给订阅过的设备发送消息了
连接设备(客户端)
(1)发布信息;
(2)订阅消息;
(3)退订或删除消息;
(4)断开与服务器连接;
中间件(服务端)
(1)接受来自客户端的网络连接;
(2)接受客户端发布的信息;
(3)处理来自客户端的订阅和退订请求;
(4)向订阅的客户转发应用程序消息;
MQTT协议构成
MQTT协议方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作:
(1)CONNECT:客户端连接到服务器
(2)CONNACK:连接确认
(3)PUBLISH:发布消息
(4)PUBACK:发布确认
(5)PUBREC:发布的消息已接收
(6)PUBREL:发布的消息已释放
(7)PUBCOMP:发布完成
(8)SUBSCRIBE:订阅请求
(9)SUBACK:订阅确认
(10)UNSUBSCRIBE:取消订阅
(11)UNSUBACK:取消订阅确认
(12)PINGREQ:客户端发送心跳
(13)PINGRESP:服务端心跳响应
(14)DISCONNECT:断开连接
(15)AUTH:认证
消息服务质量QoS
MQTT 设计了 3 个 QoS 等级:
QoS 0 等级
QoS0:”至多一次”,消息发布完全依赖底层TCP/IP网络。
特点:会发生消息丢失或重复。丢失一次读记录无所谓。
场景:可以接受消息偶尔丢失的场景下可以选择 QoS 0。
举例:APP消息推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
QoS 1 等级
QoS1:”至少一次”,确保消息到达,但消息重复可能会发生。
特点:消息确保至少成功发送和送达,会有消息重试的情况。
场景:物联网大部分场景都是选用 QoS1,它实现了系统资源性能和消息实时性、可靠性最优化
举例:普通消息推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,联网后还会收到消息。
QoS 2 等级
QoS2:”只有一次”,确保消息到达一次。
特点:消息确保至少成功发送和送达,不会有消息重复的情况。
场景:对于不能忍受消息丢失,且不希望收到重复消息,数据完整性与及时性要求较高的场景,可以选择 QoS 2
举例:APP即时消息推送,确保用户收到且只会收到一次。
发布与订阅QoS
对于 QoS 等级,在生产方、MQ服务、消费方都可以分别进行设置
但对于生产方和MQ服务设置等级不一致时,消费方会遵循的原则:以最小的等级为标准(性能优先)
Topic通配符匹配规则
通配符说明
层级分隔符:/
- / 用来分割主题树的每一层,并给主题空间提供分等级的结构。
多层通配符:#
- 多层通配符有可以表示大于等于0的层次。
1 2 3 4 5 6 7 8 9 10
| love/you/# 适配下面路径: love/you love/you/with love/you/with/all love/you/with/all/my/heart love/you/with/all/my/hearts love/#/you 不支持此规则(#只能写在最右侧)
|
单层通配符:+
- 只匹配主题的一层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| love/you/+
适配下面路径: love/you/with love/you/and 不适配下面路径 love/you love/you/with/all love/+/you 支持此规则(可以在中间)
适配下面路径: love/hehe/you love/all/you
|
MQTT思想有了,落地实现是什么?
EMQX
EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。
为什么选择EMQ X
- 从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。
- 中国本地的技术支持服务
- 扩展模块和插件,EMQ X 提供了灵活的扩展机制,支持企业的一些定制场景
环境搭建:docker安装
1 2 3 4 5
| docker pull emqx/emqx:v4.1.0
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
|
访问控制台:
地址:http://端口号:18083
账号:admin 密码:public
websocket
我们可以在工具中选择websocket来进行消息发送和订阅测试
客户端调试工具
MQTTX
创建连接:
填写连接信息
①:填写连接服务名称,可以自己定义。
②:填写连接EMQ服务的地址,这里写下发虚拟机中的ip
③: 其他选项保持默认,点击连接服务即可
订阅主题
发送和订阅
Java使用
发送方要连接EMQ,接收方也要连接EMQ ,他们连接的话都要有连接协议,连接地址,用户名密码等,连接是一样的,只是发消息和接受消息不一样,所以连接是重复的,我们可以把连接放在一个公共模块中,Maven依赖导入后,两个微服务就都可以使用了。而两个微服务只需要提供配置文件连接EMQX就可以
1:导入坐标依赖
1 2 3 4 5
| <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
|
2:把公共代码写到公共模块中
创建客户端连接对象:
最上面读的一堆参数就是微服务配置文件中配置的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Configuration @Component @Data @Slf4j public class MqttConfig { @Value("${mqtt.client.username}") private String username; @Value("${mqtt.client.password}") private String password; @Value("${mqtt.client.serverURI}") private String serverURI; @Value("${mqtt.client.clientId}") private String clientId; @Value("${mqtt.client.keepAliveInterval}") private int keepAliveInterval; @Value("${mqtt.client.connectionTimeout}") private int connectionTimeout;
@Autowired private MqttCallback mqttCallback;
@Bean public MqttClient mqttClient() { try { MqttClient client = new MqttClient(serverURI, clientId, mqttClientPersistence()); client.setManualAcks(true); client.setCallback(mqttCallback); mqttCallback.setMqttClient(client); client.connect(mqttConnectOptions()); return client; } catch (MqttException e) { log.error("emq connect error",e); return null; } }
@Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(connectionTimeout); options.setKeepAliveInterval(keepAliveInterval); options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); return options; }
public MqttClientPersistence mqttClientPersistence() { return new MemoryPersistence(); }
}
|
发消息:
最上面读的一堆参数就是微服务配置文件中配置的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| @Component @Slf4j public class MqttProducer {
@Value("${mqtt.producer.defaultQos}") private int defaultProducerQos; @Value("${mqtt.producer.defaultRetained}") private boolean defaultRetained; @Value("${mqtt.producer.defaultTopic}") private String defaultTopic;
@Autowired private MqttClient mqttClient;
public void send(String payload) { this.send(defaultTopic, payload); }
public void send(String topic, String payload) { this.send(topic, defaultProducerQos, payload); }
public void send(String topic, int qos, String payload) { this.send(topic, qos, defaultRetained, payload); }
public void send(String topic, int qos, boolean retained, String payload) { try { mqttClient.publish(topic, payload.getBytes(), qos, retained); } catch (MqttException e) { log.error("publish msg error.", e); } }
public <T extends Object> void send(String topic, int qos, T msg) throws JsonProcessingException { String payload = JsonUtil.serialize(msg); this.send(topic, qos, payload); } }
|
测试发消息
1 2 3 4 5 6 7
| @Autowired private MqttProducer mqttProducer;
@Test public void test() throws Exception { mqttProducer.send("主题名字/默认是testtopic","Json格式的消息"); }
|
接受消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| @Component @Slf4j public class MqttCallback implements MqttCallbackExtended {
@Value("${mqtt.consumer.consumerTopics}") private List<String> consumerTopics;
@Autowired private MqttService mqttService;
@Override public void connectionLost(Throwable throwable) { log.error("emq error.",throwable); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info( "topic:"+topic+" message:"+ new String(message.getPayload()) ); mqttService.processMessage(topic, message);
mqttClient.messageArrivedComplete(message.getId(),message.getQos()); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); }
@Override public void connectComplete(boolean b, String s) { if(consumerTopics != null && consumerTopics.size() > 0){ consumerTopics.forEach(t->{ try { log.info(">>>>>>>>>>>>>>subscribe topic:"+t); mqttClient.subscribe(t, 2); } catch (MqttException e) { log.error("emq connect error", e); } }); } }
private MqttClient mqttClient;
public void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } }
|
接受消息的话可能有多个接受方,所以另外写个接口和实现类来处理业务逻辑
接口
1 2 3
| public interface MqttService { void processMessage(String topic, MqttMessage message); }
|
实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Component @Slf4j public class MqttServiceImpl implements MqttService {
@Override public void processMessage(String topic, MqttMessage message) { String msgContent = new String(message.getPayload()); log.info("接收到消息:" + msgContent);
} }
|
3:在需要调用EMQX的配置类中配置信息,公共代码里面会扫描每个在微服务配置类写的信息
工单配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| mqtt: client: username: admin #EMQX用户名 password: public #密码 serverURI: tcp: clientId: monitor.task.${random.int[1000,9999]} #客户端id,后面是生成的随机数 keepAliveInterval: 10 #连接保持检查周期 connectionTimeout: 30 #连接超时时间 producer: defaultQos: 2 # 消息质量级别 defaultRetained: false #发消息后是否保留消息 defaultTopic: testtopic/test1 # 默认主题 consumer: consumerTopics: $queue/server/task/#,$share/task/server/vms/status 带分组/不带分组
|
设备配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| mqtt: client: username: admin password: public serverURI: tcp: clientId: monitor.vms.${random.int[1000,9999]} keepAliveInterval: 10 connectionTimeout: 30 producer: defaultQos: 2 defaultRetained: false defaultTopic: testtopic/test1 consumer: consumerTopics: $queue/server/vms/#
|
用户配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| mqtt: client: username: admin password: public serverURI: tcp: clientId: monitor.user.${random.int[1000,9999]} keepAliveInterval: 10 connectionTimeout: 30 producer: defaultQos: 2 defaultRetained: false defaultTopic: testtopic/test1 consumer: consumerTopics: $queue/server/user/#
|