emqttd实现消息订阅、发布、退订

emqttd实现消息订阅、发布、退订

1 前言

springboot集成emqttd前请先安装和部署emqttd服务并进行简单的测试。

2 相关配置

1、在pom文件下添加以下maven依赖:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

2、在.yml文件中进行mqtt的连接配置

1
2
3
4
5
6
7
8
9
spring:
mqtt:
username: admin # 账号,开启匿名连接后不需要填写账号和密码
password: public # 密码
host-url: tcp://ip:1883 # mqtt连接tcp地址
client-id: test # 客户端Id,每个启动的id要不同
default-topic: test # 默认主题
timeout: 100 # 超时时间
keepalive: 100 # 保持连接数

三、springboot-emqttd的整合

1、获取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.qtone.study.mqtt;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
* @description: mqtt相关配置信息
* @author: fenghao
* @date: 2020/3/23
**/
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;

/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址ip+端口
*/
private String hostUrl;
/**
* 客户端Id
*/
private String clientId;
/**
* 主题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;

@Bean
public MqttPushClient getMqttPushClient() {
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
return mqttPushClient;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getHostUrl() {
return hostUrl;
}

public void setHostUrl(String hostUrl) {
this.hostUrl = hostUrl;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getDefaultTopic() {
return defaultTopic;
}

public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}

public int getTimeout() {
return timeout;
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

public int getKeepalive() {
return keepalive;
}

public void setKeepalive(int keepalive) {
this.keepalive = keepalive;
}
}

2、mqtt推送客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.qtone.study.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
* @description: mqtt推送客户端
* @author: fenghao
* @date: 2020/3/23
**/
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

@Autowired
private PushCallback pushCallback;

@Autowired
private MqttConfig mqttConfig;

private static MqttClient client;

private static MqttClient getClient() {
return client;
}

private static void setClient(MqttClient client) {
MqttPushClient.client = client;
}

/**
* @param: host ip+端口
* @param: clientID 客户端Id
* @param: username 用户名
* @param: password 密码
* @param: timeout 超时时间
* @param: keepalive 保持连接数
* @description: 客户端连接
* @return: void
* @date: 2020/3/23
*/
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* @param: topic
* @param: pushMessage
* @description: 推送,默认qos为0,非持久化
* @return: void
* @date: 2020/3/23
*/
public void publish(String topic,String pushMessage){
publish(0, false, topic, pushMessage);
}

/**
* @param: qos 连接方式(0,1,2)
* @param: retained 是否保留(boolean)
* @param: topic 主题
* @param: pushMessage 消息体
* @description: 推送消息
* @return: void
* @date: 2020/3/23
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* @param: topic
* @description: 订阅某个主题,qos默认为0
* @return: void
* @date: 2020/3/23
*/
public void subscribe(String name,String topic){
logger.info(name +"开始订阅主题" + topic);
subscribe(topic,2);
}


/**
* @param: topic
* @param: qos
* @description: 订阅某个主题
* @return: void
* @date: 2020/3/23
*/
public void subscribe(String topic, int qos) {
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @param: topic
* @description: 取消订阅某个主题
* @return: void
* @date: 2020/3/23
*/
public void unSubscribe(String name,String topic) {
logger.info(name +"取消订阅主题" + topic);
try {
MqttPushClient.getClient().unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @param:
* @description: 重新连接
* @return: void
* @date: 2020/3/27
*/
public synchronized void startReconnect() {
if (!client.isConnected()) {
while (!client.isConnected()) {
logger.info("mqtt开始尝试重连");
try {
TimeUnit.SECONDS.sleep(2);
mqttConfig.getMqttPushClient();
logger.info("mqtt重连成功");
break;
} catch (Exception e) {
logger.error("mqtt重连失败,继续重连中");
}
}
} else {
logger.info("mqtt已经连接,无需重连");
}
}
}

3、 消费监听类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.qtone.study.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @description: 消费监听类
* @author: fenghao
* @date: 2020/3/23
**/
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

@Autowired
private MqttPushClient mqttPushClient;

@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,在这里进行重连
logger.error("连接断开,正在尝试重连 -> ", throwable);
mqttPushClient.startReconnect();
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("消息发送成功 -> " + iMqttDeliveryToken.isComplete());
}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("=======接收消息主题 : " + topic);
logger.info("=======接收消息Qos : " + mqttMessage.getQos());
logger.info("=======接收消息内容 : " + new String(mqttMessage.getPayload()));
}
}

4、 测试消息发布、订阅、退订

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.qtone.study.controller;

import com.qtone.study.mqtt.MqttPushClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

/**
* @description: 测试消息发布、订阅、退订
* @author: fenghao
* @date: 2020/3/23
**/
@Controller
@RequestMapping("/")
public class TestController {

@Autowired
private MqttPushClient mqttPushClient;
/**
* @param: name
* @param: topic
* @description: 消息发布
* @return: java.lang.String
* @date: 2020/4/2
*/
@GetMapping(value = "/publishTopic")
public String publishTopic(final String name,final String topic){
mqttPushClient.subscribe(name,topic);
System.out.println("================name : "+name);
for(int i =1;i<=10;i++) {
mqttPushClient.publish(2,false,topic, "测试name: " + name + "推送第" + i + "条消息");
}
return "ok";
}
/**
* @param: name
* @param: topic
* @description: 消息订阅
* @return: java.lang.String
* @date: 2020/4/2
*/
@GetMapping(value = "/subscribe")
public String subscribe(String name,String topic) {
mqttPushClient.subscribe(name,topic);
return "ok";
}
/**
* @param: name
* @param: topic
* @description: 消息退订
* @return: java.lang.String
* @date: 2020/4/2
*/
@GetMapping(value = "/unSubscribe")
public String unSubscribe(String name,String topic) {
mqttPushClient.unSubscribe(name,topic);
return "ok";
}
}

3 测试成果

1、进入后台连接服务器

2、订阅“test/test”主题

3、接口发送相关信息进行测试(http://127.0.0.1:8080/publishTopic?name=qtone&topic=test/test)

4、后台接收到相应的信息

此时项目中的客户端也订阅了test/test下的主题,因此也会收到相应的消息。

更多详细信息:EMQ X 中文文档

EMQ X企业版中文文档

#

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×