更新時間:2022-10-13 10:48:59 來源:動力節點 瀏覽1957次
簡單實現一個MQTT協議,需要一個本地服務器。
下載EMQX ,在本地安裝好并運行起來,與Tomcat服務器類似,不過比Tomcat服務器安裝的步驟少且簡單不少。
java實現MQTT協議需要有三個類,一個客戶端類,一個服務端類,還有一個回調函數類。
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
服務端類
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Scanner;
public class ServiceMQTT {
public static final String HOST = "tcp://localhost:1883";
private String ServiceID = "ServiceFirst";
private String topic;
private MqttClient client;
private MqttTopic mqttTopic;
private MqttConnectOptions options;
private String user = "admin";
private String password = "public";
private MqttMessage message;
public ServiceMQTT() throws MqttException {
//創建連接
client = new MqttClient(HOST,ServiceID,new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(20);
options.setConnectionTimeout(50);
options.setUserName(user);
options.setPassword(password.toCharArray());
message = new MqttMessage();
}
public void getConnect(){
try {
client.setCallback(new PublishCallBack());
client.connect(options);
mqttTopic = client.getTopic(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("消息推送的狀態--->"+token.isComplete());
}
public static void main(String[] args) throws MqttException {
ServiceMQTT service = new ServiceMQTT();
Scanner input = new Scanner(System.in);
System.out.print("請輸入消息的主題:");
service.topic = input.next();
System.out.print("請輸入消息的內容:");
String messageVal = input.next();
service.getConnect();
service.message.setQos(1);
service.message.setRetained(true);
service.message.setPayload(messageVal.getBytes());
service.publish(service.mqttTopic,service.message);
System.out.println("消息的保持狀態:"+service.message.isRetained());
}
}
客戶端類
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Scanner;
public class ClientMQTT {
public static final String HOST = "tcp://localhost:1883";
private static final String clientID = "clientFirst";
private String TOPIC;
private MqttClient client;
private MqttConnectOptions options;
private String user = "admin";
private String password = "public";
public void clientStart(){
try {
client = new MqttClient(HOST,clientID,new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(10);
options.setConnectionTimeout(50);
options.setUserName(user);
options.setPassword(password.toCharArray());
client.setCallback(new PublishCallBack());
Scanner input = new Scanner(System.in);
System.out.print("請輸入訂閱的主題:");
TOPIC = input.next();
MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
options.setWill(topic,"close".getBytes(),1,true);
client.connect(options);
int[] Qos = {1};
String[] topic1 = {TOPIC};
client.subscribe(topic1,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
ClientMQTT clientMQTT = new ClientMQTT();
clientMQTT.clientStart();
}
}
回調函數類
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PublishCallBack implements MqttCallback {
public void connectionLost(Throwable throwable) {
//連接斷掉會執行到這里
System.out.println("連接以斷,請重新連接!!!");
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//subscribe后會執行到這里
System.out.println("消息的主題是:"+s);
System.out.println("消息的Qos是:"+mqttMessage.getQos());
System.out.println("消息的ID是:"+mqttMessage.getId());
System.out.println("消息的內容是:"+new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//publish可以執行到這里
System.out.println("This is deliveryComplete method----->"+iMqttDeliveryToken.isComplete());
}
}
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習