package com.thhy.staff.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.thhy.general.utils.UUIDUtils; import com.thhy.staff.config.EmqxConfig; import com.thhy.staff.modules.biz.face.entity.MqContent; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.ArrayList; import java.util.List; public class MqUtils { public static void createClient(EmqxConfig emqxConfig, List topics, String content){ String broker = "tcp://"+emqxConfig.getHost()+":"+emqxConfig.getPort(); String clientId = UUIDUtils.create(); MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(emqxConfig.getUsername()); connOpts.setPassword(emqxConfig.getPassword().toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置回调 //client.setCallback(new OnMessageCallback()); // 建立连接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); //System.out.println("Publishing message: " + content); // 订阅 //client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(emqxConfig.getQos()); for(String pubTopic : topics){ client.publish(pubTopic, message); } System.out.println("Message published"); client.disconnect(); } catch (MqttException e) { throw new RuntimeException(e); } } public static void main(String[] args) { EmqxConfig emqxConfig = new EmqxConfig(); emqxConfig.setHost("111.30.93.215"); emqxConfig.setPort("1883"); emqxConfig.setQos(1); emqxConfig.setUsername("thhy"); emqxConfig.setPassword("Thhy@123"); List list = new ArrayList<>(); list.add("0A:0C:E1:25:75:2C"); String prefix = "http://111.30.93.212:15002/staff/face/"; String notify = "/sp/notify"; MqContent mqContent = new MqContent("sync_person", prefix+notify); JSONObject jsonObject = new JSONObject(); jsonObject.put("path",prefix+"syncPersonSingle"); JSONObject pathParamJson = new JSONObject(); pathParamJson.put("dev_sno",""); pathParamJson.put("limit",10); pathParamJson.put("offset",0); pathParamJson.put("total",1); List userIds = new ArrayList<>(); userIds.add("02e6d1c4ded41512d0ca3622"); pathParamJson.put("person_list",userIds); pathParamJson.put("person_type","4"); jsonObject.put("path_params",pathParamJson); mqContent.setData(jsonObject); System.out.println(JSON.toJSONString(mqContent)); } }