代码提交

This commit is contained in:
chy
2026-03-23 13:48:34 +08:00
commit 4dbe762e2c
1880 changed files with 200143 additions and 0 deletions

61
lidee-mq/pom.xml Normal file
View File

@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>iotlidee</artifactId>
<groupId>iot.lidee</groupId>
<version>3.8.5</version>
</parent>
<artifactId>lidee-mq</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>iot.lidee</groupId>
<artifactId>lidee-protocol-base</artifactId>
</dependency>
<dependency>
<groupId>iot.lidee</groupId>
<artifactId>lidee-protocol-collect</artifactId>
</dependency>
<dependency>
<groupId>iot.lidee</groupId>
<artifactId>sip-server</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-core</artifactId>
<version>${liteflow.version}</version>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-spring</artifactId>
<version>${liteflow.version}</version>
</dependency>
<dependency>
<groupId>iot.lidee</groupId>
<artifactId>lidee-notify-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,22 @@
package iot.lidee.mq.producer;
import iot.lidee.common.core.mq.DeviceReportBo;
import iot.lidee.mqttclient.IEmqxMessageProducer;
import org.springframework.stereotype.Component;
/**
* @author bill
*/
@Component
public class EmqxMessageProducer implements IEmqxMessageProducer {
@Override
public void sendEmqxMessage(String topicName, DeviceReportBo deviceReportBo) {
if (topicName.contains("property/post")){
MessageProducer.sendPublishMsg(deviceReportBo);
} else {
MessageProducer.sendOtherMsg(deviceReportBo);
}
}
}

View File

@@ -0,0 +1,54 @@
package iot.lidee.mq.producer;
import iot.lidee.common.core.mq.DeviceReportBo;
import iot.lidee.common.core.mq.DeviceStatusBo;
import iot.lidee.common.core.mq.DeviceTestReportBo;
import iot.lidee.common.core.mq.MQSendMessageBo;
import iot.lidee.common.core.mq.ota.OtaUpgradeBo;
import iot.lidee.iot.model.modbus.ModbusPollJob;
import iot.lidee.mq.queue.*;
import iot.lidee.mq.queue.*;
/**
*设备消息生产者 ,设备的消息发送通道
* @author bill
*/
public class MessageProducer {
/*发送设备获取属性消息到队列*/
public static void sendPropFetch(ModbusPollJob deviceJob){
DevicePropFetchQueue.offer(deviceJob);
}
/*发送设备服务下发消息到队列*/
public static void sendFunctionInvoke(MQSendMessageBo bo){
FunctionInvokeQueue.offer(bo);
}
/*发送设备上报消息到队列*/
public static void sendPublishMsg(DeviceReportBo bo){
DeviceReportQueue.offer(bo);
}
public static void sendOtherMsg(DeviceReportBo bo){
DeviceOtherQueue.offer(bo);
}
public static void sendStatusMsg(DeviceStatusBo bo){
DeviceStatusQueue.offer(bo);
}
/**
* 设备调试通道
* @param bo
*/
public static void sendDeviceTestMsg(DeviceTestReportBo bo){
DeviceTestQueue.offer(bo);
}
/*发送OTA消息到队列*/
public static void sendOtaUpgradeMsg(OtaUpgradeBo bo) {
OtaUpgradeQueue.offer(bo);
}
/*发送设备回复消息到队列*/
public static void sendDeviceReplyMsg(DeviceReportBo bo) {
DeviceReplyQueue.offer(bo);
}
}

View File

@@ -0,0 +1,38 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.ota.OtaUpgradeDelayTask;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
* OTA延迟升级队列
*
* @author gsb
* @date 2022/10/26 10:51
*/
@Slf4j
public class DelayUpgradeQueue {
/**
* 使用springboot的 DelayQueue实现延迟队列(OTA对单个设备延迟升级提高升级容错率)
*/
private static DelayQueue<OtaUpgradeDelayTask> queue = new DelayQueue<>();
public static void offerTask(OtaUpgradeDelayTask task) {
try {
queue.offer(task);
} catch (Exception e) {
log.error("OTA任务推送异常", e);
}
}
public static OtaUpgradeDelayTask task() {
try {
return queue.take();
} catch (Exception exception) {
log.error("=>OTA任务获取异常");
return null;
}
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.DeviceReportBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author gsb
* @date 2022/10/10 10:13
*/
public class DeviceOtherQueue {
private static final LinkedBlockingQueue<DeviceReportBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceReportBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceReportBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.iot.model.modbus.ModbusPollJob;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 设备属性获取存储列队
* @author gsb
* @date 2022/10/11 8:29
*/
public class DevicePropFetchQueue {
private static final LinkedBlockingQueue<ModbusPollJob> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(ModbusPollJob deviceJob){
queue.offer(deviceJob);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static ModbusPollJob take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.DeviceReportBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 设备消息回调队列 {@link DeviceReportBo}
* @author gsb
* @date 2022/10/10 10:15
*/
public class DeviceReplyQueue {
private static final LinkedBlockingQueue<DeviceReportBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceReportBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceReportBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.DeviceReportBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author gsb
* @date 2022/10/10 10:13
*/
public class DeviceReportQueue {
private static final LinkedBlockingQueue<DeviceReportBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceReportBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceReportBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.DeviceStatusBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 设备消息缓存队列 添加{@link DeviceStatusBo} 消息
* @author gsb
* @date 2022/10/10 9:59
*/
public class DeviceStatusQueue {
private static final LinkedBlockingQueue<DeviceStatusBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceStatusBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceStatusBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.DeviceTestReportBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author bill
*/
public class DeviceTestQueue {
private static final LinkedBlockingQueue<DeviceTestReportBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceTestReportBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceTestReportBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.MQSendMessageBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 服务下发队列 处理{@link MQSendMessageBo}
* @author gsb
* @date 2022/10/10 10:11
*/
public class FunctionInvokeQueue {
private static final LinkedBlockingQueue<MQSendMessageBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(MQSendMessageBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static MQSendMessageBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,25 @@
package iot.lidee.mq.queue;
import iot.lidee.common.core.mq.ota.OtaUpgradeBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* OTA升级列队 {@link OtaUpgradeBo}
* @author gsb
* @date 2022/10/10 10:30
*/
public class OtaUpgradeQueue {
private static final LinkedBlockingQueue<OtaUpgradeBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(OtaUpgradeBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static OtaUpgradeBo take(){
return queue.take();
}
}