diff --git a/pom.xml b/pom.xml index 347b725..93f0247 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,18 @@ log4j-api 2.17.0 + + + io.netty + netty-all + 4.1.85.Final + + + + cn.hutool + hutool-all + 5.7.16 + com.baomidou diff --git a/src/main/java/com/hive/bee/entity/BeeHive.java b/src/main/java/com/hive/bee/entity/BeeHive.java new file mode 100644 index 0000000..d36ee10 --- /dev/null +++ b/src/main/java/com/hive/bee/entity/BeeHive.java @@ -0,0 +1,101 @@ +package com.hive.bee.entity; + +import lombok.*; + +import java.io.Serializable; +import java.util.*; +import java.time.LocalDateTime; +import java.time.LocalDateTime; +import java.time.LocalDateTime; +import com.baomidou.mybatisplus.annotation.*; + +/** + * 蜂箱 DO + * + * @author 芋道源码 + */ +@TableName("bee_hive") +@KeySequence("bee_hive_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 +@Data +@ToString(callSuper = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BeeHive implements Serializable { + private static long serialVersionUID = 1L; + + /** + * 主键 + */ + @TableId + private Long id; + /** + * 蜂场id + * + * 枚举 {@link TODO mf_type 对应的类} + */ + private Long peakFieldId; + /** + * 蜂箱编号 + */ + private String beeHiveNumber; + /** + * 蜂箱名称 + */ + private String beeHiveName; + /** + * mac号 + */ + private String beeHiveMac; + /** + * 蜂箱位置 + */ + private String beeHivePosition; + /** + * 最后在线时间 + */ + private LocalDateTime lastOnlineTime; + /** + * 在线状态 + */ + private Integer onlineType; + /** + * 蜂箱重量 + */ + private String beeHiveWeight; + /** + * 当前温度 + */ + private String currentTemperature; + /** + * 当前湿度 + */ + private String currentHumidity; + /** + * 当前噪音 + */ + private String currentNoise; + /** + * 是否删除 + */ + private boolean deleted; + /** + * 设备di + */ + private long deviceId; + /** + * 蜜蜂数量 + */ + private String currentNumber; + /** + * 电量 + */ + private String electricQuantity; + + /** + * 数据上传时间 + */ + @TableField(exist = false) + private LocalDateTime dateUploadTime; + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/entity/BeeHiveLog.java b/src/main/java/com/hive/bee/entity/BeeHiveLog.java new file mode 100644 index 0000000..7fd6b02 --- /dev/null +++ b/src/main/java/com/hive/bee/entity/BeeHiveLog.java @@ -0,0 +1,70 @@ +package com.hive.bee.entity; + +import lombok.*; + +import java.io.Serializable; +import java.time.LocalDateTime; + +import com.baomidou.mybatisplus.annotation.*; + +/** + * 蜂箱日志 DO + * + * @author 智能蜂箱 + */ +@TableName("bee_hive_log") +@KeySequence("bee_hive_log_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 +@Data +@ToString(callSuper = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BeeHiveLog implements Serializable { + private static long serialVersionUID = 1L; + /** + * 主键 + */ + @TableId + private Long id; + /** + * 蜂场id + */ + private Long peakFieldId; + /** + * 蜂箱编号 + */ + private String beeHiveNumber; + /** + * 蜂箱名称 + */ + private String beeHiveName; + /** + * 重量 + */ + private String beeHiveWeight; + /** + * 温度 + */ + private String currentTemperature; + /** + * 湿度 + */ + private String currentHumidity; + /** + * 噪音 + */ + private String currentNoise; + /** + * 是否删除 + */ + private boolean deleted; + /** + * 蜂箱id + */ + private long beeHiveId; + + /** + * 数据上传时间 + */ + private LocalDateTime dateUploadTime; +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/mapper/HiveLogMapper.java b/src/main/java/com/hive/bee/mapper/HiveLogMapper.java new file mode 100644 index 0000000..de625e8 --- /dev/null +++ b/src/main/java/com/hive/bee/mapper/HiveLogMapper.java @@ -0,0 +1,16 @@ +package com.hive.bee.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.entity.BeeHiveLog; +import org.apache.ibatis.annotations.Mapper; + +/** + * 蜂箱历史数据 Mapper + */ +@Mapper +public interface HiveLogMapper extends BaseMapper { + + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/mapper/HiveMapper.java b/src/main/java/com/hive/bee/mapper/HiveMapper.java new file mode 100644 index 0000000..5bb2b4f --- /dev/null +++ b/src/main/java/com/hive/bee/mapper/HiveMapper.java @@ -0,0 +1,15 @@ +package com.hive.bee.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.hive.bee.entity.BeeHive; +import org.apache.ibatis.annotations.Mapper; + +/** + * 蜂箱 Mapper + */ +@Mapper +public interface HiveMapper extends BaseMapper { + + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/mapper/xml/HiveLogMapper.xml b/src/main/java/com/hive/bee/mapper/xml/HiveLogMapper.xml new file mode 100644 index 0000000..6f52cc1 --- /dev/null +++ b/src/main/java/com/hive/bee/mapper/xml/HiveLogMapper.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/main/java/com/hive/bee/mapper/xml/HiveMapper.xml b/src/main/java/com/hive/bee/mapper/xml/HiveMapper.xml new file mode 100644 index 0000000..bdad5a0 --- /dev/null +++ b/src/main/java/com/hive/bee/mapper/xml/HiveMapper.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/main/java/com/hive/bee/service/HiveLogService.java b/src/main/java/com/hive/bee/service/HiveLogService.java new file mode 100644 index 0000000..ddd6b3d --- /dev/null +++ b/src/main/java/com/hive/bee/service/HiveLogService.java @@ -0,0 +1,14 @@ +package com.hive.bee.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.entity.BeeHiveLog; + +/** + * 蜂箱历史数据 Service 接口 + */ +public interface HiveLogService extends IService { + + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/service/HiveService.java b/src/main/java/com/hive/bee/service/HiveService.java new file mode 100644 index 0000000..dc7337f --- /dev/null +++ b/src/main/java/com/hive/bee/service/HiveService.java @@ -0,0 +1,13 @@ +package com.hive.bee.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.hive.bee.entity.BeeHive; + +/** + * 蜂箱 Service 接口 + */ +public interface HiveService extends IService { + + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/service/impl/HiveLogServiceImpl.java b/src/main/java/com/hive/bee/service/impl/HiveLogServiceImpl.java new file mode 100644 index 0000000..5d6f739 --- /dev/null +++ b/src/main/java/com/hive/bee/service/impl/HiveLogServiceImpl.java @@ -0,0 +1,20 @@ +package com.hive.bee.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.entity.BeeHiveLog; +import com.hive.bee.mapper.HiveLogMapper; +import com.hive.bee.mapper.HiveMapper; +import com.hive.bee.service.HiveLogService; +import com.hive.bee.service.HiveService; +import org.springframework.stereotype.Service; + + +/** + * 蜂箱历史数据 Service 实现类 + */ +@Service +public class HiveLogServiceImpl extends ServiceImpl implements HiveLogService { + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/service/impl/HiveServiceImpl.java b/src/main/java/com/hive/bee/service/impl/HiveServiceImpl.java new file mode 100644 index 0000000..7b6a6fa --- /dev/null +++ b/src/main/java/com/hive/bee/service/impl/HiveServiceImpl.java @@ -0,0 +1,17 @@ +package com.hive.bee.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.mapper.HiveMapper; +import com.hive.bee.service.HiveService; +import org.springframework.stereotype.Service; + + +/** + * 蜂箱 Service 实现类 + */ +@Service +public class HiveServiceImpl extends ServiceImpl implements HiveService { + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/bee/vo/BeeHiveVo.java b/src/main/java/com/hive/bee/vo/BeeHiveVo.java new file mode 100644 index 0000000..067dc14 --- /dev/null +++ b/src/main/java/com/hive/bee/vo/BeeHiveVo.java @@ -0,0 +1,89 @@ +package com.hive.bee.vo; + +import com.baomidou.mybatisplus.annotation.KeySequence; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.*; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 蜂箱 DO + * + * @author 芋道源码 + */ +@TableName("bee_hive") +@KeySequence("bee_hive_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 +@Data +@ToString(callSuper = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BeeHiveVo implements Serializable { + private static long serialVersionUID = 1L; + /** + * 蜂场id + * + * 枚举 {@link TODO mf_type 对应的类} + */ + private Long peakFieldId; + /** + * 蜂箱编号 + */ + private String beeHiveNumber; + /** + * 蜂箱名称 + */ + private String beeHiveName; + /** + * mac号 + */ + private String beeHiveMac; + /** + * 蜂箱位置 + */ + private String beeHivePosition; + /** + * 最后在线时间 + */ + private LocalDateTime lastOnlineTime; + /** + * 在线状态 + */ + private Integer onlineType; + /** + * 蜂箱重量 + */ + private String beeHiveWeight; + /** + * 当前温度 + */ + private String currentTemperature; + /** + * 当前湿度 + */ + private String currentHumidity; + /** + * 当前噪音 + */ + private String currentNoise; + /** + * 设备di + */ + private long deviceId; + /** + * 蜜蜂数量 + */ + private String currentNumber; + /** + * 电量 + */ + private String electricQuantity; + + /** + * 数据上传时间 + */ + private LocalDateTime dateUploadTime; + +} \ No newline at end of file diff --git a/src/main/java/com/hive/communication/netty/server/ChannelManager.java b/src/main/java/com/hive/communication/netty/server/ChannelManager.java new file mode 100644 index 0000000..51ac153 --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/ChannelManager.java @@ -0,0 +1,42 @@ +package com.hive.communication.netty.server; + +import com.hive.config.netty.NettyConfig; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + +/** + * @Author: HIVE - LGH + * @Date: 2024/10/11 15:28 + * @Version: V1.0 + */ +public class ChannelManager { + + // 添加Channel到group和map中 + public static void addChannel(Channel channel, String deviceId) { + NettyConfig.group.add(channel); + NettyConfig.portToChannelMap.put(deviceId, channel); + } + // 从group和map中移除Channel + public static void removeChannel(Channel channel) { + NettyConfig.group.remove(channel); + NettyConfig.portToChannelMap.values().removeIf(c -> c.equals(channel)); + } + // 关闭指定端口的Channel + public static void closeChannelByPort(int port) { + Channel channel = NettyConfig.portToChannelMap.get(port); + System.out.println(channel.isOpen()); + if (channel != null && channel.isOpen()) { + ChannelFuture future = channel.close(); + future.addListener(f -> { + if (f.isSuccess()) { + NettyConfig.portToChannelMap.remove(port); + } + }); + } + } + // 关闭所有Channel + public static void closeAllChannels() { + NettyConfig.group.close().syncUninterruptibly(); + NettyConfig.portToChannelMap.clear(); + } +} diff --git a/src/main/java/com/hive/communication/netty/server/EchoServer.java b/src/main/java/com/hive/communication/netty/server/EchoServer.java new file mode 100644 index 0000000..cccb259 --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/EchoServer.java @@ -0,0 +1,117 @@ +package com.hive.communication.netty.server; + +import com.hive.communication.netty.server.decoder.DelimiterBasedFrameDecoder; +import com.hive.communication.netty.server.handler.EchoServerHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +public class EchoServer { + + private final int port; + + public EchoServer(int port) { + this.port = port; + } + + private static final Map portToChannelMap = new HashMap<>(); + private static final Map portToBootstrapMap = new HashMap<>(); + private static final Map portToEventLoopGroupMap = new HashMap<>(); + + public void start() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup group = new NioEventLoopGroup(); + try { + ServerBootstrap sb = new ServerBootstrap(); + sb.option(ChannelOption.SO_BACKLOG, 1024); + sb.group(group, bossGroup) // 绑定线程池 + .channel(NioServerSocketChannel.class) // 指定使用的channel + .localAddress(port)// 绑定监听端口 + //保持连接数 + .option(ChannelOption.SO_BACKLOG, 128) + //有数据立即发送 + .option(ChannelOption.TCP_NODELAY, true) + //保持连接 + .childOption(ChannelOption.SO_KEEPALIVE, true) + //处理新连接 + .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作 + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + System.out.println("报告"); + System.out.println("信息:有一客户端链接到本服务端"); + System.out.println("IP:" + ch.localAddress().getHostName()); + System.out.println("Port:" + ch.localAddress().getPort()); + System.out.println("报告完毕"); + + ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); + String startDelimiterStr = "99"; + String endDelimiterStr = "0A"; + ByteBuf startDelimiter = Unpooled.copiedBuffer(startDelimiterStr, Charset.forName("UTF-8")); + ByteBuf endDelimiter = Unpooled.copiedBuffer(endDelimiterStr, Charset.forName("UTF-8")); + ch.pipeline().addLast(new DelimiterBasedFrameDecoder(startDelimiter, endDelimiter)); + ch.pipeline().addLast(new EchoServerHandler()); + ch.pipeline().addLast(new ByteArrayEncoder()); + + } + }); + ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定 + System.out.println(EchoServer.class + " 启动正在监听: " + cf.channel().localAddress()); + portToChannelMap.put(port, cf.channel()); + portToBootstrapMap.put(port, sb); + portToEventLoopGroupMap.put(port, group); + cf.channel().closeFuture().sync(); // 关闭服务器通道 + } catch (Exception e) { + e.printStackTrace(); + } finally { + //释放线程池资源 + group.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + public void stopServer(int port) { + Channel channel = portToChannelMap.get(port); + if (channel != null && channel.isOpen()) { + ChannelFuture future = channel.close(); + future.addListener(f -> { + if (f.isSuccess()) { + portToChannelMap.remove(port); + // 确保在Channel关闭后处理ServerBootstrap和EventLoopGroup + handleServerBootstrapAndEventLoopGroup(port); + } else { + // 处理关闭Channel失败的情况 + } + }); + } else { + // 处理Channel不存在或已关闭的情况 + handleServerBootstrapAndEventLoopGroup(port); + } + } + + private void handleServerBootstrapAndEventLoopGroup(int port) { + ServerBootstrap bootstrap = portToBootstrapMap.get(port); + if (bootstrap != null) { + EventLoopGroup bossGroup = portToEventLoopGroupMap.get(port); + EventLoopGroup workerGroup = bootstrap.childGroup(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + portToBootstrapMap.remove(port); + portToEventLoopGroupMap.remove(port); + } else { + System.out.println("找不到端口" + port); + } + } + + +} \ No newline at end of file diff --git a/src/main/java/com/hive/communication/netty/server/NettyServerRunner.java b/src/main/java/com/hive/communication/netty/server/NettyServerRunner.java new file mode 100644 index 0000000..6eebceb --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/NettyServerRunner.java @@ -0,0 +1,15 @@ +package com.hive.communication.netty.server; + +import com.hive.communication.util.NettyServerUtil; +import com.hive.config.netty.NettyHiveServerConfig; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +@Component +public class NettyServerRunner implements CommandLineRunner { + + @Override + public void run(String... args) throws Exception { + new EchoServer(NettyServerUtil.getPort()).start(); + } +} diff --git a/src/main/java/com/hive/communication/netty/server/dataprocessing/BeeHiveAdd.java b/src/main/java/com/hive/communication/netty/server/dataprocessing/BeeHiveAdd.java new file mode 100644 index 0000000..fbbd17c --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/dataprocessing/BeeHiveAdd.java @@ -0,0 +1,37 @@ +package com.hive.communication.netty.server.dataprocessing; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.entity.BeeHiveLog; +import com.hive.bee.service.HiveLogService; +import com.hive.bee.service.HiveService; +import com.hive.bee.vo.BeeHiveVo; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import static com.hive.util.ConverterUtil.copySameFields; + +@Slf4j +@Component +public class BeeHiveAdd { + @Autowired + private HiveService hiveService; + @Autowired + private HiveLogService hiveLogService; + public void addAndEdit(BeeHive beeHive){ + hiveService.getBaseMapper().updateById(beeHive); + BeeHiveLog beeHiveLog = new BeeHiveLog(); + BeeHiveVo beeHiveVo = new BeeHiveVo(); + //用来去除不需要的字段 + copySameFields(beeHive, beeHiveVo); + copySameFields(beeHiveVo, beeHiveLog); + beeHiveLog.setBeeHiveId(beeHive.getId()); + hiveLogService.save(beeHiveLog); + } + public BeeHive getOneBeeHive(long deviceId){ + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.lambda().eq(BeeHive::isDeleted,true).eq(BeeHive::getDeviceId,deviceId); + return hiveService.getBaseMapper().selectOne(queryWrapper); + } +} diff --git a/src/main/java/com/hive/communication/netty/server/dataprocessing/HandlerDateProcessing.java b/src/main/java/com/hive/communication/netty/server/dataprocessing/HandlerDateProcessing.java new file mode 100644 index 0000000..d8ccd52 --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/dataprocessing/HandlerDateProcessing.java @@ -0,0 +1,157 @@ +package com.hive.communication.netty.server.dataprocessing; + +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.hive.bee.entity.BeeHive; +import com.hive.communication.util.HexConversion; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.hive.communication.netty.server.handler.EchoServerHandler.sendActive; +import static com.hive.util.DateConvetUtil.isDifferenceMoreThan10Minutes; + +/** + * 数据处理 + * + * @Author: hive - LGH + * @Date: 2024/10/11 14:46 + * @Version: V1.0 + */ +@Slf4j +@Component +public class HandlerDateProcessing { + + + /** + * 处理响应数据 + */ + private final AtomicBoolean isScheduled = new AtomicBoolean(false); + + // private boolean isScheduled = false; + public void handleResponse(ChannelHandlerContext ctx, StringBuffer sb) throws Exception { + boolean flag = false; + log.info("收到蜂箱数据" + sb); + BeeHive beeHive = dataAnalysis(sb.toString()); + if (ObjectUtils.isNotNull(beeHive)) { + BeeHiveAdd beeHiveAdd = new BeeHiveAdd(); + beeHiveAdd.addAndEdit(beeHive); + flag = true; + } + // 处理响应逻辑 + // 读取数据并获取设备编号 + // String deviceId = deviceChannels.get(ctx.channel().id()); + // String deviceId = ctx.channel().attr(DEVICE_ID).get(); + //数据长度不够则解析异常,需要设备再次上报 + /* if (!flag) { + //解析 数据不符合要求,需要设备再次上报 + } else { + + //变更注册回复协议 + }*/ + // 创建一个CompletableFuture,将在2s后完成 + if (flag) { + CompletableFuture future = new CompletableFuture<>(); + //isScheduled.compareAndSet 确保schedule()方法只被调用一次 + if (isScheduled.compareAndSet(false, true)) { + ctx.channel().eventLoop().schedule(() -> { + try { + // 正常完成Future + future.complete(null); + } catch (Exception e) { + // 异常完成Future + future.completeExceptionally(e); + } finally { + isScheduled.set(false); // 无论如何都将标志重置为false + } + // }, 4, TimeUnit.HOURS); + }, 2, TimeUnit.SECONDS); // 将延迟时间设置为2s + // isScheduled = true; // 设置标志变量为true,表示延迟逻辑已被调度 + future.thenAccept(v -> sendActive(ctx, true, beeHive, true)); + //ctx.writeAndFlush(messageBuffer) + + } else { + //ctx.writeAndFlush(); + } + } + } + + public BeeHive dataAnalysis(String data) { + BeeHiveAdd beeHiveAdd = new BeeHiveAdd(); + BeeHive beeHive = new BeeHive(); + for (int i = 0; i < 7; i++) { + Integer startIndex = 0; + Integer endIndex = 0; + switch (i) { + case 0: + //蜂箱id + startIndex += 16; + endIndex += 32; + long deviceId = HexConversion.hexToBigLong(getSubData(data, startIndex, endIndex)); + beeHive = beeHiveAdd.getOneBeeHive(deviceId); + if(ObjectUtils.isNull(beeHive)){ + return null; + } + beeHive.setDeviceId(deviceId); + break; + case 1: + //时间戳 + startIndex += 6; + endIndex += 14; + long date = HexConversion.hexToBigInt(getSubData(data, startIndex, endIndex)); + //时间戳大于10分钟则为有效数据 + if (!isDifferenceMoreThan10Minutes(Instant.now().getEpochSecond(), date)) { + return null; + } + beeHive.setDateUploadTime(LocalDateTime.ofInstant(Instant.ofEpochSecond(date), ZoneId.systemDefault())); + break; + case 2: + //当前温度值2 + startIndex += 36; + endIndex += 40; + beeHive.setCurrentTemperature(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex)) / 10)); + //startIndex=4 ,endIndex=10 + break; + case 3: + //当前湿度值2 + startIndex += 40; + endIndex += 44; + beeHive.setCurrentHumidity(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex)))); + //startIndex=4 ,endIndex=10 + break; + case 4: + //蜂箱重量4 + startIndex += 44; + endIndex += 52; + beeHive.setBeeHiveWeight(String.valueOf(HexConversion.hexDouble2BigDecimal2(getSubData(data, startIndex, endIndex)))); + //startIndex=4 ,endIndex=10 + break; + case 5: + //蜂进出数量4 + startIndex += 52; + endIndex += 60; + beeHive.setCurrentNumber(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex)))); + //startIndex=4 ,endIndex=10 + break; + case 6: + //声音异常0表示无异常FF表示异常1 + startIndex += 60; + endIndex += 62; + beeHive.setCurrentNoise(String.valueOf(getSubData(data, startIndex, endIndex))); + break; + } + } + return beeHive; + } + + public String getSubData(String data, Integer startIndex, Integer endIndex) { + return data.substring(startIndex, endIndex); + } + +} diff --git a/src/main/java/com/hive/communication/netty/server/decoder/CustomDecoder.java b/src/main/java/com/hive/communication/netty/server/decoder/CustomDecoder.java new file mode 100644 index 0000000..720e7ee --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/decoder/CustomDecoder.java @@ -0,0 +1,69 @@ +package com.hive.communication.netty.server.decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +/** + * @Author: HIVE - LGH + * @Date: 2024/10/11 13:27 + * @Version: V1.0 + */ +public class CustomDecoder extends ByteToMessageDecoder { + private static final byte HEARTBEAT_FLAG = (byte) 0xfe; // 心跳消息的标识 + private int length1; + private int length2; + private byte specialFlag; + + public CustomDecoder(int length1, int length2, byte specialFlag) { + this.length1 = length1; + this.length2 = length2; + this.specialFlag = specialFlag; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + // 检查是否为心跳消息 + // 如果是心跳消息,则将其添加到输出列表中 + out.add(in.readBytes(in.readableBytes())); + + if (in.getByte(in.readerIndex()) == HEARTBEAT_FLAG) { + // 如果是心跳消息,则将其添加到输出列表中 + out.add(in.readBytes(in.readableBytes())); + }else { + // 检查特殊标识 + if (in.getByte(in.readerIndex()) == specialFlag) { + // 解析长度为length1的消息(水文协议) + if (in.readableBytes() >= length1) { + out.add(in.readBytes(length1)); + } + } else { + // 解析长度为length2的消息(modbus协议) + if (in.readableBytes() >= length2) { + out.add(in.readBytes(length2)); + }/*else{ + // 不满足长度,释放已读取的ByteBuf + ReferenceCountUtil.release(in); + }*/ + } + } + } + /* private int length; // 消息长度 + + public CustomDecoder(int length) { + this.length = length; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (in.readableBytes() >= length) { + out.add(in.readBytes(length)); // 将指定长度的数据添加到输出列表中 + } + + + }*/ + + +} diff --git a/src/main/java/com/hive/communication/netty/server/decoder/DelimiterBasedFrameDecoder.java b/src/main/java/com/hive/communication/netty/server/decoder/DelimiterBasedFrameDecoder.java new file mode 100644 index 0000000..c8e7225 --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/decoder/DelimiterBasedFrameDecoder.java @@ -0,0 +1,60 @@ +package com.hive.communication.netty.server.decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +/** + * @Author: HIVE - LGH + * @Date: 2024/10/11 15:42 + * @Version: V1.0 + */ +public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { + + private final ByteBuf startDelimiter; // 开始标识字符串 + private final ByteBuf endDelimiter; // 结束标识字符串 + + public DelimiterBasedFrameDecoder(ByteBuf startDelimiter, ByteBuf endDelimiter) { + this.startDelimiter = startDelimiter; + this.endDelimiter = endDelimiter; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + /* // 将ByteBuf转换为Java字符串以便搜索标识符 + String data = in.toString(StandardCharsets.US_ASCII); + + int startIndex = data.indexOf(startDelimiter); + int endIndex = data.indexOf(endDelimiter); + + if (startIndex != -1 && endIndex != -1 && startIndex < endIndex) { + String frame = data.substring(startIndex + startDelimiter.length(), endIndex); + out.add(frame); // 添加提取的数据帧到输出列表中 + }*/ + + // 将ByteBuf转换为byte数组 + byte[] startDelim = new byte[startDelimiter.readableBytes()]; + startDelimiter.getBytes(startDelimiter.readerIndex(), startDelim); + + byte[] endDelim = new byte[endDelimiter.readableBytes()]; + endDelimiter.getBytes(endDelimiter.readerIndex(), endDelim); + + int startDelimIndex = ByteBufUtil.indexOf(startDelimiter, in); + int endDelimIndex = ByteBufUtil.indexOf(endDelimiter, in); + + // 确保找到了开始和结束分隔符 + if (startDelimIndex >= 0 && endDelimIndex > startDelimIndex) { + // 提取数据帧 + ByteBuf frame = in.slice(startDelimIndex + startDelimiter.readableBytes(), endDelimIndex - (startDelimIndex + startDelimiter.readableBytes())); + // 将String转换为ByteBuf,使用UTF-8编码 + out.add(frame.retain()); + // 更新readerIndex到结束分隔符之后 + in.readerIndex(endDelimIndex + endDelimiter.readableBytes()); + } + } + + +} diff --git a/src/main/java/com/hive/communication/netty/server/handler/EchoServerHandler.java b/src/main/java/com/hive/communication/netty/server/handler/EchoServerHandler.java new file mode 100644 index 0000000..faf16e8 --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/handler/EchoServerHandler.java @@ -0,0 +1,293 @@ +package com.hive.communication.netty.server.handler; + + +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.hive.bee.entity.BeeHive; +import com.hive.communication.netty.server.dataprocessing.HandlerDateProcessing; +import com.hive.communication.util.HexConversion; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.util.AttributeKey; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + +import static com.hive.util.DateConvetUtil.getCurrentSeconds; + +@Slf4j +public class EchoServerHandler extends ChannelInboundHandlerAdapter { + + + private static final String HEARTBEAT_REQUEST = "FE"; + //private final int reconnectDelay; // 重连延迟时间,单位为秒 + private boolean isHeartbeatReceived = false; + // 创建一个固定大小的线程池用于消息处理 + // private final ExecutorService executorService = Executors.newFixedThreadPool(10); + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + // 使用AttributeKey定义设备编号属性的键 + private static final AttributeKey DEVICE_ID = AttributeKey.valueOf("deviceId"); + private static final ConcurrentHashMap pendingRequests = new ConcurrentHashMap<>(); + + /***************************************** 发送消息 *****************************************/ + /** + * channelAction + * channel 通道 action 活跃的 + * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据 + */ + + public static void sendActive(ChannelHandlerContext ctx, boolean flag, BeeHive beeHive, boolean notice) { + if (flag) { + try { + // String paddedString = String.format("%4s", code).replace(' ', '0'); + String codes = "注册完回复"; + //获取十分钟后的时间秒数,需要转换为16进制 + getCurrentSeconds(); + log.info("当前时间: {}, 消息: {}", LocalDateTime.now(), codes); + // 先将十六进制字符串转换为字节数组 + byte[] messageBytes = HexConversion.hexStringToByteArray(codes); + // 将字节数组封装到ByteBuf中 + ByteBuf messageBuffer = Unpooled.wrappedBuffer(messageBytes); + // 通过ChannelHandlerContext发送ByteBuf + // 关联设备编号与Channel + // sendRequest(ctx.channel(),messageBuffer,devicePointsSensor.getId()); + retrySend(ctx, messageBuffer) + .whenComplete((result,error) -> { + log.info("发送状态: {},当前时间: {}, 消息: {},名称:{},蜂箱id:{}", result,LocalDateTime.now(), codes,beeHive.getBeeHiveName(),beeHive.getId()); + /* if (result) { + // 发送成功 + } else { + // 发送失败 + }*/ + }); + } catch (Exception e) { + // 处理异常 + } + } + } + + /** + * 功能:调用retrySendAsync并返回结果 + */ + public static CompletableFuture retrySend(ChannelHandlerContext ctx, ByteBuf messageBuffer) { + int maxRetries = 3; // 最大重试次数 + CompletableFuture future = new CompletableFuture<>(); + try { + retrySendAsync(ctx, messageBuffer, maxRetries, future); + } catch (Exception e) { + //确保异常程序正常下行 + e.printStackTrace(); + //释放资源 + if (messageBuffer.refCnt() > 0) { + messageBuffer.release(); + } + future.complete(false); + } + return future; + } + /** + * 功能:发送消息并监听成功与失败进行重试 + */ + private static void retrySendAsync(ChannelHandlerContext ctx, ByteBuf messageBuffer, int remainingRetries, CompletableFuture future) { + messageBuffer.retain(); // 增加引用计数 + ChannelFuture sendFuture = ctx.writeAndFlush(messageBuffer); + sendFuture.addListener((ChannelFutureListener) sendFutureListener -> { + if (sendFutureListener.isSuccess()) { + messageBuffer.release(); // 释放在发送前增加的引用计数 + future.complete(true); // 发送成功,将结果设置为 true + } else { + if (remainingRetries > 0) { + ctx.executor().schedule(() -> { + // 发送失败,但还有剩余重试次数 + if (messageBuffer.isReadable()) { + // 如果messageBuffer仍然可读,重新使用它进行重试 + retrySendAsync(ctx,messageBuffer, remainingRetries - 1, future); + + } else { + // messageBuffer不可读,需要重新创建或复制一个新的messageBuffer + ByteBuf newMessageBuffer = messageBuffer.retainedDuplicate(); + retrySendAsync(ctx, newMessageBuffer, remainingRetries - 1, future); + } + }, 2, TimeUnit.SECONDS); + } else { + messageBuffer.release(); // 释放在发送前增加的引用计数 + future.complete(false); // 达到最大重试次数,将结果设置为 false + + } + } + }); + } + /** + * 功能:或缺设备编号等存入pendingRequests等读取是进行匹配 + */ + public static ChannelFuture sendRequest(Channel channel, ByteBuf message, String messageId) { + ChannelPromise promise = channel.newPromise(); + pendingRequests.put(messageId, promise); + // 将messageId附加到消息中,并发送消息 + // 这里用逗号分隔messageId和消息体 + ByteBuf msgToSend = message.copy().writeBytes((messageId + ",").getBytes(CharsetUtil.UTF_8)); + channel.writeAndFlush(msgToSend).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + pendingRequests.remove(messageId); + } + }); + + return promise; + } + /** + * 功能:连接成功调用 + */ + public void channelActive(ChannelHandlerContext ctx) throws Exception { + //ChannelManager.addChannel(ctx.channel(),(Integer) localAddress(ctx).get("port")); + //NettyConfig.group.add(ctx.channel()); + } + + /** + * channelInactive + * channel 通道 Inactive 不活跃 + * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 + */ + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!"); + // 关闭流 只将当前的Channel从ChannelGroup中移除,不会立即关闭Channel + // NettyConfig.group.remove(ctx.channel()); + //关闭当前的Channel并释放相关资源 + ctx.close(); + } + + /** + * @param buf + * @return + * @author Taowd + * TODO 此处用来处理收到的数据中含有中文的时 出现乱码的问题 + */ + private String getMessage(ByteBuf buf) { + byte[] con = new byte[buf.readableBytes()]; + buf.readBytes(con); + try { + return new String(con, "UTF-8"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return null; + } + } + /*****************************************处理收到的消息 *****************************************/ + + /** + * 功能:读取发送过来的信息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ObjectUtils.isNotNull(msg)) ; + // StringBuffer sb = null; + ByteBuf buf = (ByteBuf) msg; + byte[] req = new byte[buf.readableBytes()]; + buf.readBytes(req); + // String body = new String(req, "UTF-8"); + StringBuffer sb = new StringBuffer(req.length * 2); + for (int i = 0; i < req.length; i++) { + sb.append(Character.forDigit((req[i] & 240) >> 4, 16)); + sb.append(Character.forDigit(req[i] & 15, 16)); + } + // System.out.println("接收到的数据sb" + sb.toString()); + // 检查消息是否是心跳请求 + if (sb.toString().equalsIgnoreCase(HEARTBEAT_REQUEST)) { + handleHeartbeat(ctx,msg); + } else { + // 如果不是心跳请求,调用ChannelInboundHandler的下一个处理器 + ctx.fireChannelRead(msg); + if (ObjectUtils.isNotNull(sb) && sb.length() > 4) { + new HandlerDateProcessing().handleResponse(ctx, sb); + } + } + // System.out.println("客户端收到服务器数据111 服务端ip与端口:" + localAddress(ctx)); + // System.out.println("客户端收到服务器数据111 客户端ip与端口:" + remoteAddress(ctx)); + } + + private void handleHeartbeat(ChannelHandlerContext ctx, Object msg) { + isHeartbeatReceived = true; + ctx.fireChannelRead(msg); + //ctx.fireChannelReadComplete(); + // 如果需要,发送心跳响应 + // ctx.writeAndFlush("FE"); + } + + /** + * 功能:读取完毕客户端发送过来的数据之后的操作 + */ + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。 + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); + //ctx.flush(); + // 调用父处理器的channelReadComplete方法,确保事件能传递到下一个处理器 + super.channelReadComplete(ctx); + // 在任何情况下都传递到下一个处理器 + //ctx.fireChannelReadComplete(); + // ctx.flush(); + // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。 + // ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。 + } + + /** + * 功能:服务端发生异常的操作 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + /* cause.printStackTrace(); + ctx.close();*/ + if (cause instanceof ReadTimeoutException) { + // 处理超时情况,例如关闭连接或者记录日志等 + ctx.close(); + } else { + // 其他异常情况的处理 + cause.printStackTrace(); + ctx.close(); + } + } + + /** + * 功能:获取客户端端口与ip + */ + public static Map remoteAddress(ChannelHandlerContext ctx) { + SocketAddress clientAddress = ctx.channel().remoteAddress(); +// 这里可以进行类型转换,因为通常情况下远程地址是 InetSocketAddress 类型 + Map map = new HashMap(); + if (clientAddress instanceof InetSocketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) clientAddress; + String ipAddress = inetSocketAddress.getAddress().getHostAddress(); + int port = inetSocketAddress.getPort(); + map.put("ipAddress", ipAddress); + map.put("port", port); + return map; + } + return null; + } + + /** + * 功能:获取服务端端口与ip + */ + public static Map localAddress(ChannelHandlerContext ctx) { + SocketAddress serverAddress = ctx.channel().localAddress(); + Map map = new HashMap(); + if (serverAddress instanceof InetSocketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) serverAddress; + String ipAddress = inetSocketAddress.getAddress().getHostAddress(); + int port = inetSocketAddress.getPort(); + map.put("ipAddress", ipAddress); + map.put("port", port); + return map; + } + return null; + } + + +} diff --git a/src/main/java/com/hive/communication/netty/server/heartbeat/GatewayHeartbeatHandler.java b/src/main/java/com/hive/communication/netty/server/heartbeat/GatewayHeartbeatHandler.java new file mode 100644 index 0000000..1fc41cf --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/heartbeat/GatewayHeartbeatHandler.java @@ -0,0 +1,95 @@ +package com.hive.communication.netty.server.heartbeat; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; + +/** + * @Author: HIVE - LGH + * @Date: 2024/10/11 15:24 + * @Version: V1.0 + */ +public class GatewayHeartbeatHandler extends IdleStateHandler { + + // 用于记录网关的在线状态 + private boolean isOnline = false; + // 用于记录上一次设置为在线状态的时间戳 + private long lastOnlineTimestamp = System.currentTimeMillis(); + + private static final long ONLINE_STATUS_INTERVAL = TimeUnit.HOURS.toMillis(1); // 1小时 + + public GatewayHeartbeatHandler(int readerIdleTimeSeconds) { + super(readerIdleTimeSeconds, 0, 0, TimeUnit.SECONDS); + } + + @Override + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { + if (evt.state() == IdleState.READER_IDLE) { + // 如果心跳超时,设置为离线,并关闭连接 + setOffline(ctx); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (isHeartbeatMessage(msg)) { + // 接收到心跳 + // long currentTime = System.currentTimeMillis(); + // || (currentTime - lastOnlineTimestamp > ONLINE_STATUS_INTERVAL) + if (!isOnline) { + // 如果网关之前不在线或者超过了更新间隔,设置为在线 + setOnline(ctx); + // lastOnlineTimestamp = currentTime; + } + // 确保调用此方法以重置空闲状态监测 + // ReferenceCountUtil.release(msg); // 如果msg是ByteBuf类型,释放它 + // super.channelRead(ctx, msg); + // ctx.fireChannelRead(msg); + // 忽略心跳消息,不传递给下一个handler + } + //else { + // 不是心跳消息,传递给下一个handler + super.channelRead(ctx, msg); + // } + // 此调用保证IdleStateHandler能够重置其读取计时器 + // ctx.fireChannelRead(msg); + } + + private boolean isHeartbeatMessage(Object msg) { + // 判断消息是否为心跳消息 + if (msg instanceof ByteBuf) { + ByteBuf data = (ByteBuf) msg; + // 检查数据包是否足够长 + if (data.readableBytes() >= 1) { + // 标记readerIndex的当前位置,之后可以重置 + data.markReaderIndex(); + // 读取第一个字节 + byte firstByte = data.readByte(); + // 重置readerIndex到标记的位置,这样不会影响后续的读取操作 + data.resetReaderIndex(); + // 检查第一个字节是否为心跳包的标志 + return (firstByte == (byte) 0xFE || firstByte == (byte) 0xfe); + } + } + return false; + } + + private void setOnline(ChannelHandlerContext ctx) { + // 更新网关状态为在线 + System.out.println("设备上线"); + isOnline = true; + // 实现更新状态的逻辑,比如更新数据库记录 + } + + private void setOffline(ChannelHandlerContext ctx) { + // 更新网关状态为离线 + System.out.println("设备离线"); + isOnline = false; + // 实现更新状态的逻辑,比如更新数据库记录 + ctx.close(); // 关闭连接 + } +} diff --git a/src/main/java/com/hive/communication/netty/server/launch/TimeServer.java b/src/main/java/com/hive/communication/netty/server/launch/TimeServer.java new file mode 100644 index 0000000..c2b848c --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/launch/TimeServer.java @@ -0,0 +1,54 @@ +package com.hive.communication.netty.server.launch; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; + +/** + * 用于发送测试数据的服务端 + * + */ +public class TimeServer { + + public static void main(String[] args) { + TimeServer.run(8021); + TimeServer.run(8023); + TimeServer.run(8024); + + } + + private static void run(int port){ + // EventLoop 代替原来的 ChannelFactory + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new TimeServerHandler(), + new WriteTimeoutHandler(10), + //控制写入超时10秒构造参数10表示如果持续10秒钟都没有数据写了,那么就超时。 + new ReadTimeoutHandler(10) + ); + } + }).option(ChannelOption.SO_KEEPALIVE, true); + ChannelFuture f = serverBootstrap.bind(port).sync(); + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/hive/communication/netty/server/launch/TimeServerHandler.java b/src/main/java/com/hive/communication/netty/server/launch/TimeServerHandler.java new file mode 100644 index 0000000..ab9295c --- /dev/null +++ b/src/main/java/com/hive/communication/netty/server/launch/TimeServerHandler.java @@ -0,0 +1,26 @@ +package com.hive.communication.netty.server.launch; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class TimeServerHandler extends ChannelInboundHandlerAdapter { + + //ChannelHandlerContext通道处理上下文 + @Override + public void channelActive(final ChannelHandlerContext ctx) throws InterruptedException { // (1) + + while (true) { + ByteBuf time = ctx.alloc().buffer(4); //为ByteBuf分配四个字节 + time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); + ctx.writeAndFlush(time); + Thread.sleep(200); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/main/java/com/hive/communication/util/CheckCode.java b/src/main/java/com/hive/communication/util/CheckCode.java new file mode 100644 index 0000000..7503b8f --- /dev/null +++ b/src/main/java/com/hive/communication/util/CheckCode.java @@ -0,0 +1,35 @@ +package com.hive.communication.util; + +public class CheckCode { + + // 计算字符串的校验码(异位校验) + public static int calculateParity(String input, boolean oddParity) { + int parity = 0; + for (char ch : input.toCharArray()) { + parity ^= ch; + } + // 根据需要调整奇偶校验位 + int parityBit = Integer.bitCount(parity) % 2; + if (oddParity) { + parityBit = parityBit == 0 ? 1 : 0; // 奇校验:如果0变成1,如果1变成0 + } + return parityBit; + } + // 计算16进制字符串的校验码 + public static int calculateChecksum(String hexString) { + int sum = 0; + + // 每两个字符作为一个16进制数进行解析 + for (int i = 0; i < hexString.length(); i += 2) { + String hexPair = hexString.substring(i, i + 2); + int hexValue = Integer.parseInt(hexPair, 16); + sum += hexValue; + } + + // 保留最后8位 + int checksum = sum & 0xFF; // 0xFF表示8位全1的掩码 + + return checksum; + } + +} diff --git a/src/main/java/com/hive/communication/util/HexConversion.java b/src/main/java/com/hive/communication/util/HexConversion.java new file mode 100644 index 0000000..1c03984 --- /dev/null +++ b/src/main/java/com/hive/communication/util/HexConversion.java @@ -0,0 +1,97 @@ +package com.hive.communication.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +/** + * 进制转换 + */ +public class HexConversion { + + + /** + * 十六进制单精度浮点数,转BigDecimal,保留2为小数,截掉多余小数位 + * @param hex + * @return + */ + public static BigDecimal hexFloat2BigDecimal(String hex) { + float value = Float.intBitsToFloat((int) Long.parseLong(hex, 16)); + BigDecimal bd = new BigDecimal(Float.toString(value)); + return bd.setScale(2, BigDecimal.ROUND_DOWN); + } + /** + * 该方法主要使用正则表达式来判断字符串中是否包含字母 + * @param cardNum 待检验的原始卡号 + * @return 返回是否包含 + */ + public static boolean judgeContainsStr(String cardNum) { + + // String regex1 = ".*[a-zA-z].*"; + + String regex=".*[a-zA-z].*"; + return cardNum.matches(regex); + } + + /** + * 十六进制双精度浮点数,转BigDecimal,保留2为小数,截掉多余小数位 + * @param hex + * @return + */ + public static BigDecimal hexDouble2BigDecimal2(String hex) { + double value = Double.longBitsToDouble(Long.valueOf(hex,16).longValue()); + BigDecimal bd = BigDecimal.valueOf(value); + return bd.setScale(2, BigDecimal.ROUND_DOWN); + } + /** + * 十六进制转十进制(只支持正数) + * @param hex + * @return Integer + */ + public static Integer hexToBigInt(String hex){ + return new BigInteger(hex,16).intValue(); + } + + /** + * 十六进制转十进制(只支持正数) + * @param hex + * @return long + */ + public static long hexToBigLong(String hex){ + return Long.parseLong(hex, 16); + } + + + /** + * hex 转为数组 + * @param s + * @return String + */ + public static byte[] hexStringToByteArray(String s) { + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + + Character.digit(s.charAt(i+1), 16)); + } + return data; + } + /** + * 十六进制转十进制(负数也可以转) + * @param hexadecimalStr + * @return String + */ + public static int hexadecimal16Conversion(String hexadecimalStr) { + int getDataDecimal = 0;//转化得到的目标数据 +//16进制代表数据 4位数字 + if (hexadecimalStr.length() == 4) { + int bit1Num = Integer.parseInt(hexadecimalStr.substring(0, 1), 16);//获取第一位。判断是正数还是负数 + if (bit1Num < 8) { //小于8是正数 + getDataDecimal = Integer.parseInt(hexadecimalStr, 16); + } else { //负数 + hexadecimalStr = "FFFF" + hexadecimalStr; //先不全八位 + getDataDecimal = new BigInteger(hexadecimalStr, 16).intValue(); + } + return getDataDecimal; + } + return 0; + } +} diff --git a/src/main/java/com/hive/communication/util/NettyServerUtil.java b/src/main/java/com/hive/communication/util/NettyServerUtil.java new file mode 100644 index 0000000..0ae0b85 --- /dev/null +++ b/src/main/java/com/hive/communication/util/NettyServerUtil.java @@ -0,0 +1,15 @@ +package com.hive.communication.util; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NettyServerUtil { + private static int port; + public static void setPort(int port) { + NettyServerUtil.port = port; + } + public static int getPort() { + return port; + } + +} diff --git a/src/main/java/com/hive/config/MinioConfiguration.java b/src/main/java/com/hive/config/MinioConfiguration.java index c56defb..5c6ee28 100644 --- a/src/main/java/com/hive/config/MinioConfiguration.java +++ b/src/main/java/com/hive/config/MinioConfiguration.java @@ -9,7 +9,7 @@ import org.springframework.context.annotation.Configuration; public class MinioConfiguration { @Autowired - private com.coffee.config.MinioProperties properties; + private com.hive.config.MinioProperties properties; @Bean public MinioClient minioClient() { diff --git a/src/main/java/com/hive/config/netty/NettyConfig.java b/src/main/java/com/hive/config/netty/NettyConfig.java new file mode 100644 index 0000000..08746da --- /dev/null +++ b/src/main/java/com/hive/config/netty/NettyConfig.java @@ -0,0 +1,17 @@ +package com.hive.config.netty; + +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class NettyConfig { + /** + * 存储每一个客户端接入进来时的channel对象 + */ + public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + public static final Map portToChannelMap = new ConcurrentHashMap<>(); +} diff --git a/src/main/java/com/hive/config/netty/NettyHiveServerConfig.java b/src/main/java/com/hive/config/netty/NettyHiveServerConfig.java new file mode 100644 index 0000000..563882e --- /dev/null +++ b/src/main/java/com/hive/config/netty/NettyHiveServerConfig.java @@ -0,0 +1,19 @@ +package com.hive.config.netty; + +import com.hive.communication.util.NettyServerUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +@Slf4j +@Configuration +public class NettyHiveServerConfig { + @Value(value = "${hive-server.port}") + private int port; + @Bean + public void initServer(){ + NettyServerUtil.setPort(port); + } +} diff --git a/src/main/java/com/hive/controller/ExcelController.java b/src/main/java/com/hive/controller/ExcelController.java deleted file mode 100644 index 57a6b51..0000000 --- a/src/main/java/com/hive/controller/ExcelController.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.hive.controller; - -import com.hive.common.AjaxResult; -import com.hive.entity.XieZhuan; -import com.hive.service.XieZhuanService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.multipart.MultipartFile; - -import java.io.IOException; -import java.util.List; - -@RestController -public class ExcelController { - - @Autowired - private XieZhuanService xieZhuanService; - - @PostMapping("/importFile") - private AjaxResult importFile(@RequestParam("file") MultipartFile file) throws IOException { - - xieZhuanService.importFile(file); - return AjaxResult.success("导入成功"); - } - - @GetMapping("/getXZList") - private AjaxResult getXZList(){ - List list = xieZhuanService.getXZList(); - return AjaxResult.success("查询成功",list); - } - - -} diff --git a/src/main/java/com/hive/entity/XieZhuan.java b/src/main/java/com/hive/entity/XieZhuan.java deleted file mode 100644 index 5933b45..0000000 --- a/src/main/java/com/hive/entity/XieZhuan.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.hive.entity; - -import com.alibaba.excel.annotation.ExcelProperty; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; - -@Data -@TableName("xiezhuan") -public class XieZhuan { - - @TableId(type= IdType.AUTO) - private Long id; - - @ExcelProperty("省代码") - private String provinceCode; - - @ExcelProperty("省公司") - private String provinceCompanny; - - @ExcelProperty("数据日期") - private String dataDate; - - @ExcelProperty("省侧系统原因导致携转业务失败量") - private String failCount; - - @ExcelProperty("携转业务总量") - private String businessCount; - - @ExcelProperty("携转业务接口成功率") - private String successRate; - -} diff --git a/src/main/java/com/hive/mapper/XieZhuanMapper.java b/src/main/java/com/hive/mapper/XieZhuanMapper.java deleted file mode 100644 index 80d2542..0000000 --- a/src/main/java/com/hive/mapper/XieZhuanMapper.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.hive.mapper; - -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.coffee.entity.XieZhuan; -import org.apache.ibatis.annotations.Mapper; - -@Mapper -public interface XieZhuanMapper extends BaseMapper { -} diff --git a/src/main/java/com/hive/service/XieZhuanService.java b/src/main/java/com/hive/service/XieZhuanService.java deleted file mode 100644 index 364d69c..0000000 --- a/src/main/java/com/hive/service/XieZhuanService.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.hive.service; - -import com.baomidou.mybatisplus.extension.service.IService; -import com.hive.entity.XieZhuan; -import org.springframework.web.multipart.MultipartFile; - -import java.io.IOException; -import java.util.List; - -public interface XieZhuanService extends IService { - - - void importFile(MultipartFile file) throws IOException; - - List getXZList(); -} diff --git a/src/main/java/com/hive/service/impl/XieZhuanImpl.java b/src/main/java/com/hive/service/impl/XieZhuanImpl.java deleted file mode 100644 index 4d94bb6..0000000 --- a/src/main/java/com/hive/service/impl/XieZhuanImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.hive.service.impl; - -import com.alibaba.excel.EasyExcel; -import com.alibaba.excel.context.AnalysisContext; -import com.alibaba.excel.event.AnalysisEventListener; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.coffee.entity.XieZhuan; -import com.coffee.mapper.XieZhuanMapper; -import com.hive.service.XieZhuanService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.web.multipart.MultipartFile; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.UUID; - -@Service -public class XieZhuanImpl extends ServiceImpl implements XieZhuanService { - - @Autowired - XieZhuanMapper xieZhuanMapper; - - private static final String UPLOAD_DIR = "C:\\Users\\Lenovo\\Desktop\\"; - - @Override - public void importFile(MultipartFile file) throws IOException { - - byte[] bytes = file.getBytes(); - Path path = Paths.get(UPLOAD_DIR); - - // 创建目录(如果不存在) - Files.createDirectories(path); - - String fileName = file.getOriginalFilename(); - - // 存储文件 - Files.write(path.resolve(fileName), bytes); - - String filePath = UPLOAD_DIR + fileName; - readExcel(filePath, XieZhuan.class, new CustomExcelListener(fileName)); - } - - @Override - public List getXZList() { - return xieZhuanMapper.selectList(null); - } - - class CustomExcelListener extends AnalysisEventListener { - - private String filename; - - public CustomExcelListener() { - } - - public CustomExcelListener(String filename) { - this.filename = filename; - } - - - @Override - public void invoke(T object, AnalysisContext context) { - - switch (filename) { - case "携转业务成功率日指标.xlsx": - xieZhuanMapper.insert((XieZhuan) object); - break; - } - System.out.println("解析数据:" + object); - } - - @Override - public void doAfterAllAnalysed(AnalysisContext context) { - System.out.println("读取" + filename + "文件并存入数据库结束======"); - } - - } - - - - - public static void readExcel(String path, Class clazz, AnalysisEventListener listener) { - EasyExcel.read(path, clazz, listener).sheet().doRead(); - } - -} diff --git a/src/main/java/com/hive/util/ConverterUtil.java b/src/main/java/com/hive/util/ConverterUtil.java new file mode 100644 index 0000000..a321904 --- /dev/null +++ b/src/main/java/com/hive/util/ConverterUtil.java @@ -0,0 +1,30 @@ +package com.hive.util; +import com.hive.bee.entity.BeeHive; +import com.hive.bee.entity.BeeHiveLog; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; + +public class ConverterUtil { + + public static void copySameFields(T source, U target) { + Field[] sourceFields = source.getClass().getDeclaredFields(); + Field[] targetFields = target.getClass().getDeclaredFields(); + for (Field sourceField : sourceFields) { + for (Field targetField : targetFields) { + if (sourceField.getName().equals(targetField.getName()) && + sourceField.getType().equals(targetField.getType())) { + try { + sourceField.setAccessible(true); + targetField.setAccessible(true); + targetField.set(target, sourceField.get(source)); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + } + } + } +} diff --git a/src/main/java/com/hive/util/DateConvetUtil.java b/src/main/java/com/hive/util/DateConvetUtil.java new file mode 100644 index 0000000..fa3b5cb --- /dev/null +++ b/src/main/java/com/hive/util/DateConvetUtil.java @@ -0,0 +1,58 @@ +package com.hive.util; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class DateConvetUtil { + + public long dateStrToSecond(String dateTimeStr) { + // 定义日期时间格式 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + // 解析日期时间字符串为 LocalDateTime + LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter); + + // 转换为时间戳(秒数) + long timestamp = localDateTime + .atZone(ZoneId.systemDefault()) // 获取默认时区 + .toInstant() // 转换为 Instant + .getEpochSecond(); // 转换为秒数 + //.toEpochMilli() 毫秒 + return timestamp; + } + + public String secondToDateStr(long timestampInSeconds) { + // 使用 Instant 解析秒级时间戳 + Instant instant = Instant.ofEpochSecond(timestampInSeconds); + // 转换为 LocalDateTime + LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + // 定义输出格式 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + // 格式化为字符串 + String formattedDateTime = dateTime.format(formatter); + return formattedDateTime; + } + + /** + * 判断两个秒数之间的差异是否大于10分钟 + * + * @param seconds1 第一个时间的秒数 + * @param seconds2 第二个时间的秒数 + * @return 如果差异大于10分钟返回true,否则返回false + */ + public static boolean isDifferenceMoreThan10Minutes(long seconds1, long seconds2) { + long difference = Math.abs(seconds1 - seconds2); // 计算绝对差异 + return difference >= 600; // 10分钟等于600秒 + } + + public static long getCurrentSeconds() { + // 获取当前时间的秒数 + long currentSeconds = Instant.now().getEpochSecond(); + // 加10分钟 (10分钟 = 600秒) + long newSeconds = currentSeconds + Duration.ofMinutes(10).getSeconds(); + return newSeconds; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e47626b..02efe8a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,9 +1,9 @@ spring: datasource: driverClassName: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/wd_jsh?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false + url: jdbc:mysql://192.168.10.31:7536/hive?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false username: root - password: 123456 + password: a8EYUSoT8wHbuRkX # 文件上传 servlet: multipart: @@ -43,4 +43,5 @@ minio: secret-key: minioadmin # 密钥密码 bucket-name: works # 默认的Bucket名称 secure: false # 是否使用HTTPS(如果使用HTTPS,请设置为true) - +hive-server: + port: 888