From c696dc477d33f2d650537ea72df0878c4e1251d5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=B0=8F=E9=B1=BC=E5=B9=B2?= <1810377322@163.com>
Date: Mon, 7 Jul 2025 18:13:37 +0800
Subject: [PATCH] =?UTF-8?q?=E7=87=83=E6=B0=94=E6=8A=A5=E8=AD=A6=E5=99=A8?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90\=E5=8A=A8=E7=81=AB=E7=A6=BB=E4=BA=BA?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
emqx-plugin/pom.xml | 2 +-
.../plugins/emqx/conf/CRC16ModbusUtil.java | 35 ++-
.../plugins/emqx/conf/EventTypeMapper.java | 39 +--
.../emqx/conf/FireSafetyDataParser.java | 110 ++++++++
.../emqx/conf/FireSafetyDataReceiver.java | 187 ++++++++++++++
.../plugins/emqx/conf/GasAlarmDataParser.java | 163 +++++++++---
.../plugins/emqx/conf/GasDetectorParser.java | 44 ++++
.../plugins/emqx/conf/IoTConfigProtocol.java | 111 ++++++--
.../plugins/emqx/conf/MqttDataProcessor.java | 105 ++++++++
.../plugins/emqx/conf/MqttHexProcessor.java | 71 ++++++
.../plugins/emqx/conf/ProtocolTest.java | 180 ++++++++++++-
.../plugins/emqx/service/EmqxPlugin.java | 228 ++++++++++++++---
.../plugins/emqx/service/MqttDevice.java | 240 ++++++++++++++----
pom.xml | 2 +-
14 files changed, 1337 insertions(+), 180 deletions(-)
create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataParser.java
create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataReceiver.java
create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/GasDetectorParser.java
create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttDataProcessor.java
create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttHexProcessor.java
diff --git a/emqx-plugin/pom.xml b/emqx-plugin/pom.xml
index 5f5b31a..54ad1bc 100644
--- a/emqx-plugin/pom.xml
+++ b/emqx-plugin/pom.xml
@@ -5,7 +5,7 @@
iot-iita-plugins
cc.iotkit.plugins
- 2.10.19
+ 2.10.18
4.0.0
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/CRC16ModbusUtil.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/CRC16ModbusUtil.java
index cbb875c..46691e3 100644
--- a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/CRC16ModbusUtil.java
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/CRC16ModbusUtil.java
@@ -6,10 +6,21 @@ public class CRC16ModbusUtil {
public static String calculate(String hexStr) {
byte[] data = hexToByteArray(hexStr);
- int crc = calculateCrc(data);
+ int crc = calculateCrcWithSwap(data);
return String.format("%04X", crc);
}
-
+ private static int calculateCrcWithSwap(byte[] data) {
+ int crc = 0xFFFF; // INIT_VALUE应为0xFFFF
+ for (byte b : data) {
+ crc ^= (b & 0xFF);
+ for (int i = 0; i < 8; i++) {
+ boolean lsb = (crc & 1) == 1;
+ crc >>>= 1;
+ if (lsb) crc ^= 0xA001; // POLY应为0xA001
+ }
+ }
+ return ((crc & 0xFF) << 8) | ((crc >> 8) & 0xFF); // 关键交换逻辑
+ }
private static int calculateCrc(byte[] data) {
int crc = INIT_VALUE;
for (byte b : data) {
@@ -53,12 +64,24 @@ public class CRC16ModbusUtil {
String mid, String cmd, String dataHex
) {
String headerLength = "AA" + type + version + mid + cmd + dataHex +"crc1"+ "55";
- Integer length =headerLength.length()/2;
- String header = "AA" + type + version + length + mid + cmd;
- String crcData = length + mid + cmd + dataHex;
+ //额外加上长度的两个字节
+ Integer length =headerLength.length()/2 + 2;
+ String lengthHex = String.format("%04X", length);
+ String header = "AA" + type + version + lengthHex + mid + cmd;
+ // 转换为2字节16进制(大端序)
+ System.out.println("dataHex+++++++++++++++++++++++++++++++" + dataHex);
+ System.out.println("header+++++++++++++++++++++++++++++++" + header);
+ String crcData = lengthHex + mid + cmd + dataHex;
+ System.out.println("crcData+++++++++++++++++++++++++++++++" + crcData);
String crc = calculate(crcData);
return header + dataHex + crc + "55";
}
-
+ public static byte[] toBigEndianBytes(int length) {
+ if(length > 65535) throw new IllegalArgumentException("长度超过2字节范围");
+ return new byte[] {
+ (byte)((length >> 8) & 0xFF), // 高字节
+ (byte)(length & 0xFF) // 低字节
+ };
+ }
}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/EventTypeMapper.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/EventTypeMapper.java
index 6bb2980..397c573 100644
--- a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/EventTypeMapper.java
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/EventTypeMapper.java
@@ -18,28 +18,31 @@ public class EventTypeMapper {
EVENT_TYPE_MAP.put(8, "节点通讯断网故障");
EVENT_TYPE_MAP.put(9, "探头传感器故障恢复");
EVENT_TYPE_MAP.put(10, "节点通讯断网故障恢复");
- EVENT_TYPE_MAP.put(11, "自检/模块联动");
- EVENT_TYPE_MAP.put(12, "阀门动作");
+ EVENT_TYPE_MAP.put(11, "节点自检");
+ EVENT_TYPE_MAP.put(12, "节点自检完成");
+ EVENT_TYPE_MAP.put(13, "模块联动");
+ EVENT_TYPE_MAP.put(14, "阀门动作");
+ EVENT_TYPE_MAP.put(15, "浓度变化上报");
// 控制器相关事件
- EVENT_TYPE_MAP.put(13, "自检");
- EVENT_TYPE_MAP.put(14, "备电故障");
- EVENT_TYPE_MAP.put(15, "主电故障");
- EVENT_TYPE_MAP.put(16, "主电欠压");
- EVENT_TYPE_MAP.put(17, "控制器复位");
- EVENT_TYPE_MAP.put(18, "控制器开机");
- EVENT_TYPE_MAP.put(19, "主电故障恢复");
- EVENT_TYPE_MAP.put(20, "备电故障恢复");
- EVENT_TYPE_MAP.put(21, "主电欠压恢复");
+ EVENT_TYPE_MAP.put(30, "自检");
+ EVENT_TYPE_MAP.put(31, "备电故障");
+ EVENT_TYPE_MAP.put(32, "主电故障");
+ EVENT_TYPE_MAP.put(33, "主电欠压");
+ EVENT_TYPE_MAP.put(34, "控制器复位");
+ EVENT_TYPE_MAP.put(35, "控制器开机");
+ EVENT_TYPE_MAP.put(36, "主电故障恢复");
+ EVENT_TYPE_MAP.put(37, "备电故障恢复");
+ EVENT_TYPE_MAP.put(38, "主电欠压恢复");
// 动火离人相关事件
- EVENT_TYPE_MAP.put(22, "设备开机");
- EVENT_TYPE_MAP.put(23, "动火离人报警");
- EVENT_TYPE_MAP.put(24, "动火离人报警恢复");
- EVENT_TYPE_MAP.put(25, "设备故障");
- EVENT_TYPE_MAP.put(26, "设备故障恢复");
- EVENT_TYPE_MAP.put(27, "断网故障");
- EVENT_TYPE_MAP.put(28, "断网故障恢复");
+ EVENT_TYPE_MAP.put(50, "设备开机");
+ EVENT_TYPE_MAP.put(51, "动火离人报警");
+ EVENT_TYPE_MAP.put(52, "动火离人报警恢复");
+ EVENT_TYPE_MAP.put(53, "设备故障");
+ EVENT_TYPE_MAP.put(54, "设备故障恢复");
+ EVENT_TYPE_MAP.put(55, "断网故障");
+ EVENT_TYPE_MAP.put(56, "断网故障恢复");
// 正常上报事件
EVENT_TYPE_MAP.put(128, "燃气报警器正常数据上报");
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataParser.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataParser.java
new file mode 100644
index 0000000..245a737
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataParser.java
@@ -0,0 +1,110 @@
+package cc.iotkit.plugins.emqx.conf;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FireSafetyDataParser {
+ // 事件类型常量
+ public static final int EVENT_POWER_ON = 50;
+ public static final int EVENT_FIRE_ALARM = 51;
+ public static final int EVENT_ALARM_RESET = 52;
+ public static final int EVENT_DEVICE_FAIL = 53;
+ public static final int EVENT_FAIL_RESET = 54;
+ public static final int EVENT_NET_DOWN = 55;
+ public static final int EVENT_NET_RECOVER = 56;
+
+ // 数据项ID常量
+ public static final int ITEM_TEMP_DISTANCE = 0x0040;
+ public static final int ITEM_ALARM_DURATION = 0x0041;
+ public static final int ITEM_SOUP_ALARM = 0x0042;
+ public static final int ITEM_VOLUME = 0x0043;
+ public static final int ITEM_SOUP_MODE = 0x0044;
+ public static final int ITEM_ALARM_STATUS = 0x0045;
+ public static final int ITEM_FLAME_DETECT = 0x0046;
+ public static final int ITEM_HUMAN_DETECT = 0x0047;
+ public static final int ITEM_ERROR_CODE = 0x0048;
+
+ public static Map parseData(byte[] rawData) {
+ ByteBuffer buffer = ByteBuffer.wrap(rawData);
+ Map result = new HashMap<>();
+ result.put("dataType", "HOT_WORK_DATA");
+ // 解析事件头(1字节事件类型 + 6字节BCD时间)
+ // int eventType = buffer.get() & 0xFF;
+ int eventType = buffer.getShort() & 0xFFFF;
+ // buffer.getShort() & 0xFFFF
+ result.put("eventType", eventType);
+ result.put("timestamp", GasAlarmDataParser.parseBcdTime(buffer));
+ result.put("eventTypeValue", EventTypeMapper.getEventTypeName(Integer.parseInt(result.get("eventType").toString())));
+ // 解析数据域
+ if (buffer.remaining() > 0) {
+ int dataLength = buffer.get() & 0xFF;
+ while (buffer.remaining() >= 4) { // 每个数据项至少4字节(2字节ID+2字节数据)
+ int itemId = buffer.getShort() & 0xFFFF;
+ int itemValue = buffer.getShort() & 0xFFFF;
+ result.put(getItemName(itemId), parseItemValue(itemId, itemValue));
+ }
+ }
+ return result;
+ }
+
+ /* private static String parseBcdTime(ByteBuffer buffer) {
+ byte[] bcdTime = new byte[6];
+ buffer.get(bcdTime);
+ return String.format("20%02d-%02d-%02d %02d:%02d:%02d",
+ bcdTime[0], bcdTime[1], bcdTime[2], bcdTime[3], bcdTime[4], bcdTime[5]);
+ }*/
+
+ private static String getItemName(int itemId) {
+ switch (itemId) {
+ case ITEM_TEMP_DISTANCE: return "temperatureDistance";
+ case ITEM_ALARM_DURATION: return "alarmDuration";
+ case ITEM_SOUP_ALARM: return "soupAlarmTime";
+ case ITEM_VOLUME: return "volumeLevel";
+ case ITEM_SOUP_MODE: return "soupMode";
+ case ITEM_ALARM_STATUS: return "alarmStatus";
+ case ITEM_FLAME_DETECT: return "flameDetected";
+ case ITEM_HUMAN_DETECT: return "humanDetected";
+ case ITEM_ERROR_CODE: return "errorCode";
+ default: return "unknown_" + Integer.toHexString(itemId);
+ }
+ }
+
+ private static Object parseItemValue(int itemId, int value) {
+ switch (itemId) {
+
+ // 测温距离(0x0040)
+ case ITEM_TEMP_DISTANCE:
+ return new String[]{"2米","2-3米","3-3.5米"}[value-1];
+ // 动火报警时间(0x0041)
+ case ITEM_ALARM_DURATION:
+ return value + "分钟";
+
+ // 煲汤报警时间(0x0042)
+ case ITEM_SOUP_ALARM:
+ return (value > 60) ?
+ (value/60 + "小时" + value%60 + "分钟") :
+ (value + "分钟");
+ // 音量控制(0x0043)
+ case ITEM_VOLUME:
+ return new String[]{"静音","小","中","大"}[value];
+ // 煲汤模式(0x0044)
+ case ITEM_SOUP_MODE:
+ return value == 0 ? "普通模式" : "煲汤模式";
+ // 报警状态(0x0045)
+ case ITEM_ALARM_STATUS:
+ return value == 0 ? "未报警" : "报警中";
+ // 火苗检测(0x0046)
+ case ITEM_FLAME_DETECT:
+ return value == 0 ? "未检测到火苗" : "检测到火苗";
+ // 人体检测(0x0047)
+ case ITEM_HUMAN_DETECT:
+ return value == 0 ? "无人员" : "有人员";
+ // 故障码(预留)
+ case ITEM_ERROR_CODE:
+ return String.format("故障码0x%04X", value);
+ default:
+ return value;
+ }
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataReceiver.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataReceiver.java
new file mode 100644
index 0000000..2376a2b
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/FireSafetyDataReceiver.java
@@ -0,0 +1,187 @@
+package cc.iotkit.plugins.emqx.conf;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class FireSafetyDataReceiver {
+ public static void main(String[] args) {
+ // 测试16进制报文:17(事件类型) + 250616123456(时间) + 02(数据长度) + 0040(测温距离ID) + 0002(2-3米)
+ String hexData = "17 25 06 16 12 34 56 02 00 40 00 02";
+ Map result = handleHexData(hexData);
+
+ System.out.println("解析结果:" + result);
+ result.forEach((k, v) -> System.out.println(k + ": " + v));
+ }
+ /*public void processDataStream(InputStream inputStream) throws Exception {
+ byte[] header = new byte[HEADER_SIZE];
+ while (inputStream.read(header) == HEADER_SIZE) {
+ int dataLength = inputStream.read();
+ byte[] payload = new byte[dataLength];
+ inputStream.read(payload);
+
+ byte[] fullData = new byte[HEADER_SIZE + 1 + dataLength];
+ System.arraycopy(header, 0, fullData, 0, HEADER_SIZE);
+ fullData[HEADER_SIZE] = (byte)dataLength;
+ System.arraycopy(payload, 0, fullData, HEADER_SIZE+1, dataLength);
+
+ Map parsedData = FireSafetyDataParser.parseData(fullData);
+ handleEvent(parsedData);
+ }
+ }*/
+ public static Map handleHexData(String hexString) {
+ byte[] data = hexStringToBytes(hexString);
+ if (data == null || data.length < 1) {
+ return null;
+ }
+
+ int eventType = Byte.toUnsignedInt(data[0]) + Byte.toUnsignedInt(data[1]);
+ if (eventType == 129) {
+ return handleEvent(data);
+ }
+ return OpenFireHandleEvent(data);
+ }
+
+ private static byte[] hexStringToBytes(String hex) {
+ hex = hex.replaceAll("\\s", "");
+ byte[] bytes = new byte[hex.length() / 2];
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
+ }
+ return bytes;
+ }
+ /*
+ * 动火离人告警事件
+ * */
+ public static Map OpenFireHandleEvent(byte[] data) {
+ Map result= new HashMap<>();
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ result.put("dataType", "HOT_WORK_DATA");
+ int eventType = buffer.getShort() & 0xFFFF;
+ // 解析事件头
+ //int eventType = buffer.get() & 0xFF;
+ result.put("eventType", eventType);
+ result.put("timestamp", GasAlarmDataParser.parseBcdTime(buffer));
+ result.put("eventTypeValue", EventTypeMapper.getEventTypeName(Integer.parseInt(result.get("eventType").toString())));
+ // 完整事件处理
+ /* switch (eventType) {
+ case FireSafetyDataParser.EVENT_POWER_ON:
+ result.put("eventName", "设备开机");
+ // result.put("action", "记录启动日志");
+ break;
+ case FireSafetyDataParser.EVENT_FIRE_ALARM:
+ result.put("eventName", "动火离人报警");
+ //result.put("action", "触发声光报警+推送告警");
+ // parseDataItems(buffer, result);
+ break;
+ case FireSafetyDataParser.EVENT_ALARM_RESET:
+ result.put("eventName", "动火离人报警恢复");
+ // result.put("action", "停止报警+记录处置");
+ break;
+ case FireSafetyDataParser.EVENT_DEVICE_FAIL:
+ result.put("eventName", "设备故障");
+ // result.put("action", "启动自检+通知维护");
+ // parseDataItems(buffer, result);
+ break;
+ case FireSafetyDataParser.EVENT_FAIL_RESET:
+ result.put("eventName", "设备故障恢复");
+ // result.put("action", "更新设备状态");
+ break;
+ case FireSafetyDataParser.EVENT_NET_DOWN:
+ result.put("eventName", "断网故障");
+ // result.put("action", "启用本地存储");
+ break;
+ case FireSafetyDataParser.EVENT_NET_RECOVER:
+ result.put("eventName", "断网故障恢复");
+ // result.put("action", "同步缓存数据");
+ break;
+ default:
+ result.put("eventName", "未知事件");
+ // result.put("action", "需要人工核查");
+ }*/
+ System.out.println("解析后的数据" + result);
+ return result;
+ }
+ /*
+ * 动火离人正常上报数据解析
+ * */
+ public static Map handleEvent(byte[] data) {
+ // Map result = new HashMap<>();
+ Map result= FireSafetyDataParser.parseData(data);
+ /* ByteBuffer buffer = ByteBuffer.wrap(data);
+
+ // 解析事件头
+ int eventType = buffer.get() & 0xFF;
+ result.put("eventType", eventType);
+ result.put("timestamp", GasAlarmDataParser.parseBcdTime(buffer));
+
+ // 完整事件处理
+ switch (eventType) {
+ case FireSafetyDataParser.EVENT_POWER_ON:
+ result.put("eventName", "设备开机");
+ // result.put("action", "记录启动日志");
+ break;
+ case FireSafetyDataParser.EVENT_FIRE_ALARM:
+ result.put("eventName", "动火离人报警");
+ //result.put("action", "触发声光报警+推送告警");
+ // parseDataItems(buffer, result);
+ break;
+ case FireSafetyDataParser.EVENT_ALARM_RESET:
+ result.put("eventName", "动火离人报警恢复");
+ // result.put("action", "停止报警+记录处置");
+ break;
+ case FireSafetyDataParser.EVENT_DEVICE_FAIL:
+ result.put("eventName", "设备故障");
+ // result.put("action", "启动自检+通知维护");
+ // parseDataItems(buffer, result);
+ break;
+ case FireSafetyDataParser.EVENT_FAIL_RESET:
+ result.put("eventName", "设备故障恢复");
+ // result.put("action", "更新设备状态");
+ break;
+ case FireSafetyDataParser.EVENT_NET_DOWN:
+ result.put("eventName", "断网故障");
+ // result.put("action", "启用本地存储");
+ break;
+ case FireSafetyDataParser.EVENT_NET_RECOVER:
+ result.put("eventName", "断网故障恢复");
+ // result.put("action", "同步缓存数据");
+ break;
+ default:
+ result.put("eventName", "未知事件");
+ // result.put("action", "需要人工核查");
+ }*/
+ System.out.println("解析后的数据" + result);
+ return result;
+ }
+ /* private static String parseBcdTime(byte[] data) {
+ return String.format("20%02d-%02d-%02d %02d:%02d:%02d",
+ bcdToInt(data[0]), bcdToInt(data[1]), bcdToInt(data[2]),
+ bcdToInt(data[3]), bcdToInt(data[4]), bcdToInt(data[5]));
+ }
+ private static int bcdToInt(byte b) {
+ return ((b >> 4) & 0x0F)*10 + (b & 0x0F);
+ }*/
+/*
+ private static String parseBcdTime(ByteBuffer buffer) {
+ byte[] timeBytes = new byte[6];
+ buffer.get(timeBytes);
+ return String.format("20%02d-%02d-%02d %02d:%02d:%02d",
+ timeBytes[0], timeBytes[1], timeBytes[2],
+ timeBytes[3], timeBytes[4], timeBytes[5]);
+ }
+*/
+
+ private static void parseDataItems(ByteBuffer buffer, Map result) {
+ if (buffer.remaining() > 0) {
+ int dataLength = buffer.get() & 0xFF;
+ while (buffer.remaining() >= 4) {
+ int itemId = buffer.getShort() & 0xFFFF;
+ int value = buffer.getShort() & 0xFFFF;
+ result.put("item_" + Integer.toHexString(itemId), value);
+ }
+ }
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/GasAlarmDataParser.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/GasAlarmDataParser.java
index 6a6a3d8..d0b9210 100644
--- a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/GasAlarmDataParser.java
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/GasAlarmDataParser.java
@@ -2,10 +2,20 @@ package cc.iotkit.plugins.emqx.conf;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.chrono.IsoChronology;
+import java.time.format.DateTimeFormatter;
+import java.time.format.ResolverStyle;
import java.util.*;
public class GasAlarmDataParser {
public static void main(String[] args) {
+ // 测试数据:250707131758 -> 对应字节数组 {0x25, 0x07, 0x07, 0x13, 0x17, 0x58}
+ byte[] testData = new byte[]{0x25, 0x07, 0x07, 0x13, 0x17, 0x58};
+ ByteBuffer buffer = ByteBuffer.wrap(testData);
+
+ String result = parseBcdTime(buffer);
+ System.out.println("解析结果: " + result);
// 节点相关事件测试数据 (1-12)
// 节点相关事件测试数据 (1-12) - 数值部分改为低位在前
String[] nodeEvents = {
@@ -63,10 +73,10 @@ public class GasAlarmDataParser {
System.out.println("\n=== " + groupName + "测试 ===");
for (String hexData : testCases) {
try {
- List