1.添加智能蜂箱bee_hive与log的数据信息;

2.搭建netty服务信息(拆包、粘包)
3.解析智能蜂箱设备协议(假接入,设备程序暂未开发完成)
4.新建时间、转码、校验码等相关Util
main
leiguohui 1 month ago
parent aeba220a9a
commit 7b9ffea60b

@ -44,6 +44,18 @@
<artifactId>log4j-api</artifactId> <artifactId>log4j-api</artifactId>
<version>2.17.0</version> <version>2.17.0</version>
</dependency> </dependency>
<!-- 添加Netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
<!-- 添加Hutool依赖 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.16</version> <!-- 请使用最新的版本 -->
</dependency>
<!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter --> <!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter -->
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>

@ -0,0 +1,101 @@
package com.hive.bee.entity;
import lombok.*;
import java.io.Serializable;
import java.util.*;
import java.time.LocalDateTime;
import java.time.LocalDateTime;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.*;
/**
* DO
*
* @author
*/
@TableName("bee_hive")
@KeySequence("bee_hive_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@Data
@ToString(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BeeHive implements Serializable {
private static long serialVersionUID = 1L;
/**
*
*/
@TableId
private Long id;
/**
* id
*
* {@link TODO mf_type }
*/
private Long peakFieldId;
/**
*
*/
private String beeHiveNumber;
/**
*
*/
private String beeHiveName;
/**
* mac
*/
private String beeHiveMac;
/**
*
*/
private String beeHivePosition;
/**
* 线
*/
private LocalDateTime lastOnlineTime;
/**
* 线
*/
private Integer onlineType;
/**
*
*/
private String beeHiveWeight;
/**
*
*/
private String currentTemperature;
/**
* 湿
*/
private String currentHumidity;
/**
*
*/
private String currentNoise;
/**
*
*/
private boolean deleted;
/**
* di
*/
private long deviceId;
/**
*
*/
private String currentNumber;
/**
*
*/
private String electricQuantity;
/**
*
*/
@TableField(exist = false)
private LocalDateTime dateUploadTime;
}

@ -0,0 +1,70 @@
package com.hive.bee.entity;
import lombok.*;
import java.io.Serializable;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.*;
/**
* DO
*
* @author
*/
@TableName("bee_hive_log")
@KeySequence("bee_hive_log_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@Data
@ToString(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BeeHiveLog implements Serializable {
private static long serialVersionUID = 1L;
/**
*
*/
@TableId
private Long id;
/**
* id
*/
private Long peakFieldId;
/**
*
*/
private String beeHiveNumber;
/**
*
*/
private String beeHiveName;
/**
*
*/
private String beeHiveWeight;
/**
*
*/
private String currentTemperature;
/**
* 湿
*/
private String currentHumidity;
/**
*
*/
private String currentNoise;
/**
*
*/
private boolean deleted;
/**
* id
*/
private long beeHiveId;
/**
*
*/
private LocalDateTime dateUploadTime;
}

@ -0,0 +1,16 @@
package com.hive.bee.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.entity.BeeHiveLog;
import org.apache.ibatis.annotations.Mapper;
/**
* Mapper
*/
@Mapper
public interface HiveLogMapper extends BaseMapper<BeeHiveLog> {
}

@ -0,0 +1,15 @@
package com.hive.bee.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hive.bee.entity.BeeHive;
import org.apache.ibatis.annotations.Mapper;
/**
* Mapper
*/
@Mapper
public interface HiveMapper extends BaseMapper<BeeHive> {
}

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hive.bee.mapper.HiveLogMapper">
</mapper>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hive.bee.mapper.HiveMapper">
</mapper>

@ -0,0 +1,14 @@
package com.hive.bee.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.entity.BeeHiveLog;
/**
* Service
*/
public interface HiveLogService extends IService<BeeHiveLog> {
}

@ -0,0 +1,13 @@
package com.hive.bee.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.hive.bee.entity.BeeHive;
/**
* Service
*/
public interface HiveService extends IService<BeeHive> {
}

@ -0,0 +1,20 @@
package com.hive.bee.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.entity.BeeHiveLog;
import com.hive.bee.mapper.HiveLogMapper;
import com.hive.bee.mapper.HiveMapper;
import com.hive.bee.service.HiveLogService;
import com.hive.bee.service.HiveService;
import org.springframework.stereotype.Service;
/**
* Service
*/
@Service
public class HiveLogServiceImpl extends ServiceImpl<HiveLogMapper, BeeHiveLog> implements HiveLogService {
}

@ -0,0 +1,17 @@
package com.hive.bee.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.mapper.HiveMapper;
import com.hive.bee.service.HiveService;
import org.springframework.stereotype.Service;
/**
* Service
*/
@Service
public class HiveServiceImpl extends ServiceImpl<HiveMapper, BeeHive> implements HiveService {
}

@ -0,0 +1,89 @@
package com.hive.bee.vo;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.*;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* DO
*
* @author
*/
@TableName("bee_hive")
@KeySequence("bee_hive_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@Data
@ToString(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BeeHiveVo implements Serializable {
private static long serialVersionUID = 1L;
/**
* id
*
* {@link TODO mf_type }
*/
private Long peakFieldId;
/**
*
*/
private String beeHiveNumber;
/**
*
*/
private String beeHiveName;
/**
* mac
*/
private String beeHiveMac;
/**
*
*/
private String beeHivePosition;
/**
* 线
*/
private LocalDateTime lastOnlineTime;
/**
* 线
*/
private Integer onlineType;
/**
*
*/
private String beeHiveWeight;
/**
*
*/
private String currentTemperature;
/**
* 湿
*/
private String currentHumidity;
/**
*
*/
private String currentNoise;
/**
* di
*/
private long deviceId;
/**
*
*/
private String currentNumber;
/**
*
*/
private String electricQuantity;
/**
*
*/
private LocalDateTime dateUploadTime;
}

@ -0,0 +1,42 @@
package com.hive.communication.netty.server;
import com.hive.config.netty.NettyConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
/**
* @Author: HIVE - LGH
* @Date: 2024/10/11 15:28
* @Version: V1.0
*/
public class ChannelManager {
// 添加Channel到group和map中
public static void addChannel(Channel channel, String deviceId) {
NettyConfig.group.add(channel);
NettyConfig.portToChannelMap.put(deviceId, channel);
}
// 从group和map中移除Channel
public static void removeChannel(Channel channel) {
NettyConfig.group.remove(channel);
NettyConfig.portToChannelMap.values().removeIf(c -> c.equals(channel));
}
// 关闭指定端口的Channel
public static void closeChannelByPort(int port) {
Channel channel = NettyConfig.portToChannelMap.get(port);
System.out.println(channel.isOpen());
if (channel != null && channel.isOpen()) {
ChannelFuture future = channel.close();
future.addListener(f -> {
if (f.isSuccess()) {
NettyConfig.portToChannelMap.remove(port);
}
});
}
}
// 关闭所有Channel
public static void closeAllChannels() {
NettyConfig.group.close().syncUninterruptibly();
NettyConfig.portToChannelMap.clear();
}
}

@ -0,0 +1,117 @@
package com.hive.communication.netty.server;
import com.hive.communication.netty.server.decoder.DelimiterBasedFrameDecoder;
import com.hive.communication.netty.server.handler.EchoServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
private static final Map<Integer, Channel> portToChannelMap = new HashMap<>();
private static final Map<Integer, ServerBootstrap> portToBootstrapMap = new HashMap<>();
private static final Map<Integer, EventLoopGroup> portToEventLoopGroupMap = new HashMap<>();
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group, bossGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(port)// 绑定监听端口
//保持连接数
.option(ChannelOption.SO_BACKLOG, 128)
//有数据立即发送
.option(ChannelOption.TCP_NODELAY, true)
//保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
//处理新连接
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("报告");
System.out.println("信息:有一客户端链接到本服务端");
System.out.println("IP:" + ch.localAddress().getHostName());
System.out.println("Port:" + ch.localAddress().getPort());
System.out.println("报告完毕");
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
String startDelimiterStr = "99";
String endDelimiterStr = "0A";
ByteBuf startDelimiter = Unpooled.copiedBuffer(startDelimiterStr, Charset.forName("UTF-8"));
ByteBuf endDelimiter = Unpooled.copiedBuffer(endDelimiterStr, Charset.forName("UTF-8"));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(startDelimiter, endDelimiter));
ch.pipeline().addLast(new EchoServerHandler());
ch.pipeline().addLast(new ByteArrayEncoder());
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
System.out.println(EchoServer.class + " 启动正在监听: " + cf.channel().localAddress());
portToChannelMap.put(port, cf.channel());
portToBootstrapMap.put(port, sb);
portToEventLoopGroupMap.put(port, group);
cf.channel().closeFuture().sync(); // 关闭服务器通道
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放线程池资源
group.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void stopServer(int port) {
Channel channel = portToChannelMap.get(port);
if (channel != null && channel.isOpen()) {
ChannelFuture future = channel.close();
future.addListener(f -> {
if (f.isSuccess()) {
portToChannelMap.remove(port);
// 确保在Channel关闭后处理ServerBootstrap和EventLoopGroup
handleServerBootstrapAndEventLoopGroup(port);
} else {
// 处理关闭Channel失败的情况
}
});
} else {
// 处理Channel不存在或已关闭的情况
handleServerBootstrapAndEventLoopGroup(port);
}
}
private void handleServerBootstrapAndEventLoopGroup(int port) {
ServerBootstrap bootstrap = portToBootstrapMap.get(port);
if (bootstrap != null) {
EventLoopGroup bossGroup = portToEventLoopGroupMap.get(port);
EventLoopGroup workerGroup = bootstrap.childGroup();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
portToBootstrapMap.remove(port);
portToEventLoopGroupMap.remove(port);
} else {
System.out.println("找不到端口" + port);
}
}
}

@ -0,0 +1,15 @@
package com.hive.communication.netty.server;
import com.hive.communication.util.NettyServerUtil;
import com.hive.config.netty.NettyHiveServerConfig;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class NettyServerRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
new EchoServer(NettyServerUtil.getPort()).start();
}
}

@ -0,0 +1,37 @@
package com.hive.communication.netty.server.dataprocessing;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.entity.BeeHiveLog;
import com.hive.bee.service.HiveLogService;
import com.hive.bee.service.HiveService;
import com.hive.bee.vo.BeeHiveVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.hive.util.ConverterUtil.copySameFields;
@Slf4j
@Component
public class BeeHiveAdd {
@Autowired
private HiveService hiveService;
@Autowired
private HiveLogService hiveLogService;
public void addAndEdit(BeeHive beeHive){
hiveService.getBaseMapper().updateById(beeHive);
BeeHiveLog beeHiveLog = new BeeHiveLog();
BeeHiveVo beeHiveVo = new BeeHiveVo();
//用来去除不需要的字段
copySameFields(beeHive, beeHiveVo);
copySameFields(beeHiveVo, beeHiveLog);
beeHiveLog.setBeeHiveId(beeHive.getId());
hiveLogService.save(beeHiveLog);
}
public BeeHive getOneBeeHive(long deviceId){
QueryWrapper<BeeHive> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(BeeHive::isDeleted,true).eq(BeeHive::getDeviceId,deviceId);
return hiveService.getBaseMapper().selectOne(queryWrapper);
}
}

@ -0,0 +1,157 @@
package com.hive.communication.netty.server.dataprocessing;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.hive.bee.entity.BeeHive;
import com.hive.communication.util.HexConversion;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.hive.communication.netty.server.handler.EchoServerHandler.sendActive;
import static com.hive.util.DateConvetUtil.isDifferenceMoreThan10Minutes;
/**
*
*
* @Author: hive - LGH
* @Date: 2024/10/11 14:46
* @Version: V1.0
*/
@Slf4j
@Component
public class HandlerDateProcessing {
/**
*
*/
private final AtomicBoolean isScheduled = new AtomicBoolean(false);
// private boolean isScheduled = false;
public void handleResponse(ChannelHandlerContext ctx, StringBuffer sb) throws Exception {
boolean flag = false;
log.info("收到蜂箱数据" + sb);
BeeHive beeHive = dataAnalysis(sb.toString());
if (ObjectUtils.isNotNull(beeHive)) {
BeeHiveAdd beeHiveAdd = new BeeHiveAdd();
beeHiveAdd.addAndEdit(beeHive);
flag = true;
}
// 处理响应逻辑
// 读取数据并获取设备编号
// String deviceId = deviceChannels.get(ctx.channel().id());
// String deviceId = ctx.channel().attr(DEVICE_ID).get();
//数据长度不够则解析异常,需要设备再次上报
/* if (!flag) {
//解析 数据不符合要求,需要设备再次上报
} else {
//变更注册回复协议
}*/
// 创建一个CompletableFuture将在2s后完成
if (flag) {
CompletableFuture<Void> future = new CompletableFuture<>();
//isScheduled.compareAndSet 确保schedule()方法只被调用一次
if (isScheduled.compareAndSet(false, true)) {
ctx.channel().eventLoop().schedule(() -> {
try {
// 正常完成Future
future.complete(null);
} catch (Exception e) {
// 异常完成Future
future.completeExceptionally(e);
} finally {
isScheduled.set(false); // 无论如何都将标志重置为false
}
// }, 4, TimeUnit.HOURS);
}, 2, TimeUnit.SECONDS); // 将延迟时间设置为2s
// isScheduled = true; // 设置标志变量为true表示延迟逻辑已被调度
future.thenAccept(v -> sendActive(ctx, true, beeHive, true));
//ctx.writeAndFlush(messageBuffer)
} else {
//ctx.writeAndFlush();
}
}
}
public BeeHive dataAnalysis(String data) {
BeeHiveAdd beeHiveAdd = new BeeHiveAdd();
BeeHive beeHive = new BeeHive();
for (int i = 0; i < 7; i++) {
Integer startIndex = 0;
Integer endIndex = 0;
switch (i) {
case 0:
//蜂箱id
startIndex += 16;
endIndex += 32;
long deviceId = HexConversion.hexToBigLong(getSubData(data, startIndex, endIndex));
beeHive = beeHiveAdd.getOneBeeHive(deviceId);
if(ObjectUtils.isNull(beeHive)){
return null;
}
beeHive.setDeviceId(deviceId);
break;
case 1:
//时间戳
startIndex += 6;
endIndex += 14;
long date = HexConversion.hexToBigInt(getSubData(data, startIndex, endIndex));
//时间戳大于10分钟则为有效数据
if (!isDifferenceMoreThan10Minutes(Instant.now().getEpochSecond(), date)) {
return null;
}
beeHive.setDateUploadTime(LocalDateTime.ofInstant(Instant.ofEpochSecond(date), ZoneId.systemDefault()));
break;
case 2:
//当前温度值2
startIndex += 36;
endIndex += 40;
beeHive.setCurrentTemperature(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex)) / 10));
//startIndex=4 ,endIndex=10
break;
case 3:
//当前湿度值2
startIndex += 40;
endIndex += 44;
beeHive.setCurrentHumidity(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex))));
//startIndex=4 ,endIndex=10
break;
case 4:
//蜂箱重量4
startIndex += 44;
endIndex += 52;
beeHive.setBeeHiveWeight(String.valueOf(HexConversion.hexDouble2BigDecimal2(getSubData(data, startIndex, endIndex))));
//startIndex=4 ,endIndex=10
break;
case 5:
//蜂进出数量4
startIndex += 52;
endIndex += 60;
beeHive.setCurrentNumber(String.valueOf(HexConversion.hexadecimal16Conversion(getSubData(data, startIndex, endIndex))));
//startIndex=4 ,endIndex=10
break;
case 6:
//声音异常0表示无异常FF表示异常1
startIndex += 60;
endIndex += 62;
beeHive.setCurrentNoise(String.valueOf(getSubData(data, startIndex, endIndex)));
break;
}
}
return beeHive;
}
public String getSubData(String data, Integer startIndex, Integer endIndex) {
return data.substring(startIndex, endIndex);
}
}

@ -0,0 +1,69 @@
package com.hive.communication.netty.server.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @Author: HIVE - LGH
* @Date: 2024/10/11 13:27
* @Version: V1.0
*/
public class CustomDecoder extends ByteToMessageDecoder {
private static final byte HEARTBEAT_FLAG = (byte) 0xfe; // 心跳消息的标识
private int length1;
private int length2;
private byte specialFlag;
public CustomDecoder(int length1, int length2, byte specialFlag) {
this.length1 = length1;
this.length2 = length2;
this.specialFlag = specialFlag;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 检查是否为心跳消息
// 如果是心跳消息,则将其添加到输出列表中
out.add(in.readBytes(in.readableBytes()));
if (in.getByte(in.readerIndex()) == HEARTBEAT_FLAG) {
// 如果是心跳消息,则将其添加到输出列表中
out.add(in.readBytes(in.readableBytes()));
}else {
// 检查特殊标识
if (in.getByte(in.readerIndex()) == specialFlag) {
// 解析长度为length1的消息(水文协议)
if (in.readableBytes() >= length1) {
out.add(in.readBytes(length1));
}
} else {
// 解析长度为length2的消息(modbus协议)
if (in.readableBytes() >= length2) {
out.add(in.readBytes(length2));
}/*else{
// 不满足长度释放已读取的ByteBuf
ReferenceCountUtil.release(in);
}*/
}
}
}
/* private int length; // 消息长度
public CustomDecoder(int length) {
this.length = length;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= length) {
out.add(in.readBytes(length)); // 将指定长度的数据添加到输出列表中
}
}*/
}

@ -0,0 +1,60 @@
package com.hive.communication.netty.server.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @Author: HIVE - LGH
* @Date: 2024/10/11 15:42
* @Version: V1.0
*/
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf startDelimiter; // 开始标识字符串
private final ByteBuf endDelimiter; // 结束标识字符串
public DelimiterBasedFrameDecoder(ByteBuf startDelimiter, ByteBuf endDelimiter) {
this.startDelimiter = startDelimiter;
this.endDelimiter = endDelimiter;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
/* // 将ByteBuf转换为Java字符串以便搜索标识符
String data = in.toString(StandardCharsets.US_ASCII);
int startIndex = data.indexOf(startDelimiter);
int endIndex = data.indexOf(endDelimiter);
if (startIndex != -1 && endIndex != -1 && startIndex < endIndex) {
String frame = data.substring(startIndex + startDelimiter.length(), endIndex);
out.add(frame); // 添加提取的数据帧到输出列表中
}*/
// 将ByteBuf转换为byte数组
byte[] startDelim = new byte[startDelimiter.readableBytes()];
startDelimiter.getBytes(startDelimiter.readerIndex(), startDelim);
byte[] endDelim = new byte[endDelimiter.readableBytes()];
endDelimiter.getBytes(endDelimiter.readerIndex(), endDelim);
int startDelimIndex = ByteBufUtil.indexOf(startDelimiter, in);
int endDelimIndex = ByteBufUtil.indexOf(endDelimiter, in);
// 确保找到了开始和结束分隔符
if (startDelimIndex >= 0 && endDelimIndex > startDelimIndex) {
// 提取数据帧
ByteBuf frame = in.slice(startDelimIndex + startDelimiter.readableBytes(), endDelimIndex - (startDelimIndex + startDelimiter.readableBytes()));
// 将String转换为ByteBuf使用UTF-8编码
out.add(frame.retain());
// 更新readerIndex到结束分隔符之后
in.readerIndex(endDelimIndex + endDelimiter.readableBytes());
}
}
}

@ -0,0 +1,293 @@
package com.hive.communication.netty.server.handler;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.hive.bee.entity.BeeHive;
import com.hive.communication.netty.server.dataprocessing.HandlerDateProcessing;
import com.hive.communication.util.HexConversion;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import static com.hive.util.DateConvetUtil.getCurrentSeconds;
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final String HEARTBEAT_REQUEST = "FE";
//private final int reconnectDelay; // 重连延迟时间,单位为秒
private boolean isHeartbeatReceived = false;
// 创建一个固定大小的线程池用于消息处理
// private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 使用AttributeKey定义设备编号属性的键
private static final AttributeKey<String> DEVICE_ID = AttributeKey.valueOf("deviceId");
private static final ConcurrentHashMap<String, ChannelPromise> pendingRequests = new ConcurrentHashMap<>();
/***************************************** 发送消息 *****************************************/
/**
* channelAction
* channel action
*
*/
public static void sendActive(ChannelHandlerContext ctx, boolean flag, BeeHive beeHive, boolean notice) {
if (flag) {
try {
// String paddedString = String.format("%4s", code).replace(' ', '0');
String codes = "注册完回复";
//获取十分钟后的时间秒数需要转换为16进制
getCurrentSeconds();
log.info("当前时间: {}, 消息: {}", LocalDateTime.now(), codes);
// 先将十六进制字符串转换为字节数组
byte[] messageBytes = HexConversion.hexStringToByteArray(codes);
// 将字节数组封装到ByteBuf中
ByteBuf messageBuffer = Unpooled.wrappedBuffer(messageBytes);
// 通过ChannelHandlerContext发送ByteBuf
// 关联设备编号与Channel
// sendRequest(ctx.channel(),messageBuffer,devicePointsSensor.getId());
retrySend(ctx, messageBuffer)
.whenComplete((result,error) -> {
log.info("发送状态: {},当前时间: {}, 消息: {},名称:{}蜂箱id{}", result,LocalDateTime.now(), codes,beeHive.getBeeHiveName(),beeHive.getId());
/* if (result) {
// 发送成功
} else {
// 发送失败
}*/
});
} catch (Exception e) {
// 处理异常
}
}
}
/**
* retrySendAsync
*/
public static CompletableFuture<Boolean> retrySend(ChannelHandlerContext ctx, ByteBuf messageBuffer) {
int maxRetries = 3; // 最大重试次数
CompletableFuture<Boolean> future = new CompletableFuture<>();
try {
retrySendAsync(ctx, messageBuffer, maxRetries, future);
} catch (Exception e) {
//确保异常程序正常下行
e.printStackTrace();
//释放资源
if (messageBuffer.refCnt() > 0) {
messageBuffer.release();
}
future.complete(false);
}
return future;
}
/**
*
*/
private static void retrySendAsync(ChannelHandlerContext ctx, ByteBuf messageBuffer, int remainingRetries, CompletableFuture<Boolean> future) {
messageBuffer.retain(); // 增加引用计数
ChannelFuture sendFuture = ctx.writeAndFlush(messageBuffer);
sendFuture.addListener((ChannelFutureListener) sendFutureListener -> {
if (sendFutureListener.isSuccess()) {
messageBuffer.release(); // 释放在发送前增加的引用计数
future.complete(true); // 发送成功,将结果设置为 true
} else {
if (remainingRetries > 0) {
ctx.executor().schedule(() -> {
// 发送失败,但还有剩余重试次数
if (messageBuffer.isReadable()) {
// 如果messageBuffer仍然可读重新使用它进行重试
retrySendAsync(ctx,messageBuffer, remainingRetries - 1, future);
} else {
// messageBuffer不可读需要重新创建或复制一个新的messageBuffer
ByteBuf newMessageBuffer = messageBuffer.retainedDuplicate();
retrySendAsync(ctx, newMessageBuffer, remainingRetries - 1, future);
}
}, 2, TimeUnit.SECONDS);
} else {
messageBuffer.release(); // 释放在发送前增加的引用计数
future.complete(false); // 达到最大重试次数,将结果设置为 false
}
}
});
}
/**
* pendingRequests
*/
public static ChannelFuture sendRequest(Channel channel, ByteBuf message, String messageId) {
ChannelPromise promise = channel.newPromise();
pendingRequests.put(messageId, promise);
// 将messageId附加到消息中并发送消息
// 这里用逗号分隔messageId和消息体
ByteBuf msgToSend = message.copy().writeBytes((messageId + ",").getBytes(CharsetUtil.UTF_8));
channel.writeAndFlush(msgToSend).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
pendingRequests.remove(messageId);
}
});
return promise;
}
/**
*
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//ChannelManager.addChannel(ctx.channel(),(Integer) localAddress(ctx).get("port"));
//NettyConfig.group.add(ctx.channel());
}
/**
* channelInactive
* channel Inactive
*
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
// 关闭流 只将当前的Channel从ChannelGroup中移除不会立即关闭Channel
// NettyConfig.group.remove(ctx.channel());
//关闭当前的Channel并释放相关资源
ctx.close();
}
/**
* @param buf
* @return
* @author Taowd
* TODO
*/
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
try {
return new String(con, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
/*****************************************处理收到的消息 *****************************************/
/**
*
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (ObjectUtils.isNotNull(msg)) ;
// StringBuffer sb = null;
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
// String body = new String(req, "UTF-8");
StringBuffer sb = new StringBuffer(req.length * 2);
for (int i = 0; i < req.length; i++) {
sb.append(Character.forDigit((req[i] & 240) >> 4, 16));
sb.append(Character.forDigit(req[i] & 15, 16));
}
// System.out.println("接收到的数据sb" + sb.toString());
// 检查消息是否是心跳请求
if (sb.toString().equalsIgnoreCase(HEARTBEAT_REQUEST)) {
handleHeartbeat(ctx,msg);
} else {
// 如果不是心跳请求调用ChannelInboundHandler的下一个处理器
ctx.fireChannelRead(msg);
if (ObjectUtils.isNotNull(sb) && sb.length() > 4) {
new HandlerDateProcessing().handleResponse(ctx, sb);
}
}
// System.out.println("客户端收到服务器数据111 服务端ip与端口:" + localAddress(ctx));
// System.out.println("客户端收到服务器数据111 客户端ip与端口:" + remoteAddress(ctx));
}
private void handleHeartbeat(ChannelHandlerContext ctx, Object msg) {
isHeartbeatReceived = true;
ctx.fireChannelRead(msg);
//ctx.fireChannelReadComplete();
// 如果需要,发送心跳响应
// ctx.writeAndFlush("FE");
}
/**
*
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 第一种方法写一个空的buf并刷新写出区域。完成后关闭sock channel连接。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
//ctx.flush();
// 调用父处理器的channelReadComplete方法确保事件能传递到下一个处理器
super.channelReadComplete(ctx);
// 在任何情况下都传递到下一个处理器
//ctx.fireChannelReadComplete();
// ctx.flush();
// 第二种方法在client端关闭channel连接这样的话会触发两次channelReadComplete方法。
// ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
}
/**
*
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
/* cause.printStackTrace();
ctx.close();*/
if (cause instanceof ReadTimeoutException) {
// 处理超时情况,例如关闭连接或者记录日志等
ctx.close();
} else {
// 其他异常情况的处理
cause.printStackTrace();
ctx.close();
}
}
/**
* ip
*/
public static Map remoteAddress(ChannelHandlerContext ctx) {
SocketAddress clientAddress = ctx.channel().remoteAddress();
// 这里可以进行类型转换,因为通常情况下远程地址是 InetSocketAddress 类型
Map map = new HashMap();
if (clientAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) clientAddress;
String ipAddress = inetSocketAddress.getAddress().getHostAddress();
int port = inetSocketAddress.getPort();
map.put("ipAddress", ipAddress);
map.put("port", port);
return map;
}
return null;
}
/**
* ip
*/
public static Map localAddress(ChannelHandlerContext ctx) {
SocketAddress serverAddress = ctx.channel().localAddress();
Map map = new HashMap();
if (serverAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) serverAddress;
String ipAddress = inetSocketAddress.getAddress().getHostAddress();
int port = inetSocketAddress.getPort();
map.put("ipAddress", ipAddress);
map.put("port", port);
return map;
}
return null;
}
}

@ -0,0 +1,95 @@
package com.hive.communication.netty.server.heartbeat;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @Author: HIVE - LGH
* @Date: 2024/10/11 15:24
* @Version: V1.0
*/
public class GatewayHeartbeatHandler extends IdleStateHandler {
// 用于记录网关的在线状态
private boolean isOnline = false;
// 用于记录上一次设置为在线状态的时间戳
private long lastOnlineTimestamp = System.currentTimeMillis();
private static final long ONLINE_STATUS_INTERVAL = TimeUnit.HOURS.toMillis(1); // 1小时
public GatewayHeartbeatHandler(int readerIdleTimeSeconds) {
super(readerIdleTimeSeconds, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
if (evt.state() == IdleState.READER_IDLE) {
// 如果心跳超时,设置为离线,并关闭连接
setOffline(ctx);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isHeartbeatMessage(msg)) {
// 接收到心跳
// long currentTime = System.currentTimeMillis();
// || (currentTime - lastOnlineTimestamp > ONLINE_STATUS_INTERVAL)
if (!isOnline) {
// 如果网关之前不在线或者超过了更新间隔,设置为在线
setOnline(ctx);
// lastOnlineTimestamp = currentTime;
}
// 确保调用此方法以重置空闲状态监测
// ReferenceCountUtil.release(msg); // 如果msg是ByteBuf类型释放它
// super.channelRead(ctx, msg);
// ctx.fireChannelRead(msg);
// 忽略心跳消息不传递给下一个handler
}
//else {
// 不是心跳消息传递给下一个handler
super.channelRead(ctx, msg);
// }
// 此调用保证IdleStateHandler能够重置其读取计时器
// ctx.fireChannelRead(msg);
}
private boolean isHeartbeatMessage(Object msg) {
// 判断消息是否为心跳消息
if (msg instanceof ByteBuf) {
ByteBuf data = (ByteBuf) msg;
// 检查数据包是否足够长
if (data.readableBytes() >= 1) {
// 标记readerIndex的当前位置之后可以重置
data.markReaderIndex();
// 读取第一个字节
byte firstByte = data.readByte();
// 重置readerIndex到标记的位置这样不会影响后续的读取操作
data.resetReaderIndex();
// 检查第一个字节是否为心跳包的标志
return (firstByte == (byte) 0xFE || firstByte == (byte) 0xfe);
}
}
return false;
}
private void setOnline(ChannelHandlerContext ctx) {
// 更新网关状态为在线
System.out.println("设备上线");
isOnline = true;
// 实现更新状态的逻辑,比如更新数据库记录
}
private void setOffline(ChannelHandlerContext ctx) {
// 更新网关状态为离线
System.out.println("设备离线");
isOnline = false;
// 实现更新状态的逻辑,比如更新数据库记录
ctx.close(); // 关闭连接
}
}

@ -0,0 +1,54 @@
package com.hive.communication.netty.server.launch;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
/**
*
*
*/
public class TimeServer {
public static void main(String[] args) {
TimeServer.run(8021);
TimeServer.run(8023);
TimeServer.run(8024);
}
private static void run(int port){
// EventLoop 代替原来的 ChannelFactory
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new TimeServerHandler(),
new WriteTimeoutHandler(10),
//控制写入超时10秒构造参数10表示如果持续10秒钟都没有数据写了那么就超时。
new ReadTimeoutHandler(10)
);
}
}).option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,26 @@
package com.hive.communication.netty.server.launch;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
//ChannelHandlerContext通道处理上下文
@Override
public void channelActive(final ChannelHandlerContext ctx) throws InterruptedException { // (1)
while (true) {
ByteBuf time = ctx.alloc().buffer(4); //为ByteBuf分配四个字节
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ctx.writeAndFlush(time);
Thread.sleep(200);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

@ -0,0 +1,35 @@
package com.hive.communication.util;
public class CheckCode {
// 计算字符串的校验码(异位校验)
public static int calculateParity(String input, boolean oddParity) {
int parity = 0;
for (char ch : input.toCharArray()) {
parity ^= ch;
}
// 根据需要调整奇偶校验位
int parityBit = Integer.bitCount(parity) % 2;
if (oddParity) {
parityBit = parityBit == 0 ? 1 : 0; // 奇校验如果0变成1如果1变成0
}
return parityBit;
}
// 计算16进制字符串的校验码
public static int calculateChecksum(String hexString) {
int sum = 0;
// 每两个字符作为一个16进制数进行解析
for (int i = 0; i < hexString.length(); i += 2) {
String hexPair = hexString.substring(i, i + 2);
int hexValue = Integer.parseInt(hexPair, 16);
sum += hexValue;
}
// 保留最后8位
int checksum = sum & 0xFF; // 0xFF表示8位全1的掩码
return checksum;
}
}

@ -0,0 +1,97 @@
package com.hive.communication.util;
import java.math.BigDecimal;
import java.math.BigInteger;
/**
*
*/
public class HexConversion {
/**
* BigDecimal2
* @param hex
* @return
*/
public static BigDecimal hexFloat2BigDecimal(String hex) {
float value = Float.intBitsToFloat((int) Long.parseLong(hex, 16));
BigDecimal bd = new BigDecimal(Float.toString(value));
return bd.setScale(2, BigDecimal.ROUND_DOWN);
}
/**
* 使
* @param cardNum
* @return
*/
public static boolean judgeContainsStr(String cardNum) {
// String regex1 = ".*[a-zA-z].*";
String regex=".*[a-zA-z].*";
return cardNum.matches(regex);
}
/**
* BigDecimal2
* @param hex
* @return
*/
public static BigDecimal hexDouble2BigDecimal2(String hex) {
double value = Double.longBitsToDouble(Long.valueOf(hex,16).longValue());
BigDecimal bd = BigDecimal.valueOf(value);
return bd.setScale(2, BigDecimal.ROUND_DOWN);
}
/**
* ()
* @param hex
* @return Integer
*/
public static Integer hexToBigInt(String hex){
return new BigInteger(hex,16).intValue();
}
/**
* ()
* @param hex
* @return long
*/
public static long hexToBigLong(String hex){
return Long.parseLong(hex, 16);
}
/**
* hex
* @param s
* @return String
*/
public static byte[] hexStringToByteArray(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i+1), 16));
}
return data;
}
/**
* ()
* @param hexadecimalStr
* @return String
*/
public static int hexadecimal16Conversion(String hexadecimalStr) {
int getDataDecimal = 0;//转化得到的目标数据
//16进制代表数据 4位数字
if (hexadecimalStr.length() == 4) {
int bit1Num = Integer.parseInt(hexadecimalStr.substring(0, 1), 16);//获取第一位。判断是正数还是负数
if (bit1Num < 8) { //小于8是正数
getDataDecimal = Integer.parseInt(hexadecimalStr, 16);
} else { //负数
hexadecimalStr = "FFFF" + hexadecimalStr; //先不全八位
getDataDecimal = new BigInteger(hexadecimalStr, 16).intValue();
}
return getDataDecimal;
}
return 0;
}
}

@ -0,0 +1,15 @@
package com.hive.communication.util;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerUtil {
private static int port;
public static void setPort(int port) {
NettyServerUtil.port = port;
}
public static int getPort() {
return port;
}
}

@ -9,7 +9,7 @@ import org.springframework.context.annotation.Configuration;
public class MinioConfiguration { public class MinioConfiguration {
@Autowired @Autowired
private com.coffee.config.MinioProperties properties; private com.hive.config.MinioProperties properties;
@Bean @Bean
public MinioClient minioClient() { public MinioClient minioClient() {

@ -0,0 +1,17 @@
package com.hive.config.netty;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class NettyConfig {
/**
* channel
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static final Map<String, Channel> portToChannelMap = new ConcurrentHashMap<>();
}

@ -0,0 +1,19 @@
package com.hive.config.netty;
import com.hive.communication.util.NettyServerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Slf4j
@Configuration
public class NettyHiveServerConfig {
@Value(value = "${hive-server.port}")
private int port;
@Bean
public void initServer(){
NettyServerUtil.setPort(port);
}
}

@ -1,33 +0,0 @@
package com.hive.controller;
import com.hive.common.AjaxResult;
import com.hive.entity.XieZhuan;
import com.hive.service.XieZhuanService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.List;
@RestController
public class ExcelController {
@Autowired
private XieZhuanService xieZhuanService;
@PostMapping("/importFile")
private AjaxResult importFile(@RequestParam("file") MultipartFile file) throws IOException {
xieZhuanService.importFile(file);
return AjaxResult.success("导入成功");
}
@GetMapping("/getXZList")
private AjaxResult getXZList(){
List<XieZhuan> list = xieZhuanService.getXZList();
return AjaxResult.success("查询成功",list);
}
}

@ -1,34 +0,0 @@
package com.hive.entity;
import com.alibaba.excel.annotation.ExcelProperty;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("xiezhuan")
public class XieZhuan {
@TableId(type= IdType.AUTO)
private Long id;
@ExcelProperty("省代码")
private String provinceCode;
@ExcelProperty("省公司")
private String provinceCompanny;
@ExcelProperty("数据日期")
private String dataDate;
@ExcelProperty("省侧系统原因导致携转业务失败量")
private String failCount;
@ExcelProperty("携转业务总量")
private String businessCount;
@ExcelProperty("携转业务接口成功率")
private String successRate;
}

@ -1,9 +0,0 @@
package com.hive.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.coffee.entity.XieZhuan;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface XieZhuanMapper extends BaseMapper<XieZhuan> {
}

@ -1,16 +0,0 @@
package com.hive.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.hive.entity.XieZhuan;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.List;
public interface XieZhuanService extends IService<XieZhuan> {
void importFile(MultipartFile file) throws IOException;
List<XieZhuan> getXZList();
}

@ -1,90 +0,0 @@
package com.hive.service.impl;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.coffee.entity.XieZhuan;
import com.coffee.mapper.XieZhuanMapper;
import com.hive.service.XieZhuanService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;
@Service
public class XieZhuanImpl extends ServiceImpl<XieZhuanMapper, XieZhuan> implements XieZhuanService {
@Autowired
XieZhuanMapper xieZhuanMapper;
private static final String UPLOAD_DIR = "C:\\Users\\Lenovo\\Desktop\\";
@Override
public void importFile(MultipartFile file) throws IOException {
byte[] bytes = file.getBytes();
Path path = Paths.get(UPLOAD_DIR);
// 创建目录(如果不存在)
Files.createDirectories(path);
String fileName = file.getOriginalFilename();
// 存储文件
Files.write(path.resolve(fileName), bytes);
String filePath = UPLOAD_DIR + fileName;
readExcel(filePath, XieZhuan.class, new CustomExcelListener<XieZhuan>(fileName));
}
@Override
public List<XieZhuan> getXZList() {
return xieZhuanMapper.selectList(null);
}
class CustomExcelListener<T> extends AnalysisEventListener<T> {
private String filename;
public CustomExcelListener() {
}
public CustomExcelListener(String filename) {
this.filename = filename;
}
@Override
public void invoke(T object, AnalysisContext context) {
switch (filename) {
case "携转业务成功率日指标.xlsx":
xieZhuanMapper.insert((XieZhuan) object);
break;
}
System.out.println("解析数据:" + object);
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
System.out.println("读取" + filename + "文件并存入数据库结束======");
}
}
public static <T> void readExcel(String path, Class<T> clazz, AnalysisEventListener<T> listener) {
EasyExcel.read(path, clazz, listener).sheet().doRead();
}
}

@ -0,0 +1,30 @@
package com.hive.util;
import com.hive.bee.entity.BeeHive;
import com.hive.bee.entity.BeeHiveLog;
import java.lang.reflect.Field;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
public class ConverterUtil {
public static <T, U> void copySameFields(T source, U target) {
Field[] sourceFields = source.getClass().getDeclaredFields();
Field[] targetFields = target.getClass().getDeclaredFields();
for (Field sourceField : sourceFields) {
for (Field targetField : targetFields) {
if (sourceField.getName().equals(targetField.getName()) &&
sourceField.getType().equals(targetField.getType())) {
try {
sourceField.setAccessible(true);
targetField.setAccessible(true);
targetField.set(target, sourceField.get(source));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
}
}

@ -0,0 +1,58 @@
package com.hive.util;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class DateConvetUtil {
public long dateStrToSecond(String dateTimeStr) {
// 定义日期时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 解析日期时间字符串为 LocalDateTime
LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter);
// 转换为时间戳(秒数)
long timestamp = localDateTime
.atZone(ZoneId.systemDefault()) // 获取默认时区
.toInstant() // 转换为 Instant
.getEpochSecond(); // 转换为秒数
//.toEpochMilli() 毫秒
return timestamp;
}
public String secondToDateStr(long timestampInSeconds) {
// 使用 Instant 解析秒级时间戳
Instant instant = Instant.ofEpochSecond(timestampInSeconds);
// 转换为 LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
// 定义输出格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 格式化为字符串
String formattedDateTime = dateTime.format(formatter);
return formattedDateTime;
}
/**
* 10
*
* @param seconds1
* @param seconds2
* @return 10truefalse
*/
public static boolean isDifferenceMoreThan10Minutes(long seconds1, long seconds2) {
long difference = Math.abs(seconds1 - seconds2); // 计算绝对差异
return difference >= 600; // 10分钟等于600秒
}
public static long getCurrentSeconds() {
// 获取当前时间的秒数
long currentSeconds = Instant.now().getEpochSecond();
// 加10分钟 (10分钟 = 600秒)
long newSeconds = currentSeconds + Duration.ofMinutes(10).getSeconds();
return newSeconds;
}
}

@ -1,9 +1,9 @@
spring: spring:
datasource: datasource:
driverClassName: com.mysql.cj.jdbc.Driver driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/wd_jsh?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false url: jdbc:mysql://192.168.10.31:7536/hive?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false
username: root username: root
password: 123456 password: a8EYUSoT8wHbuRkX
# 文件上传 # 文件上传
servlet: servlet:
multipart: multipart:
@ -43,4 +43,5 @@ minio:
secret-key: minioadmin # 密钥密码 secret-key: minioadmin # 密钥密码
bucket-name: works # 默认的Bucket名称 bucket-name: works # 默认的Bucket名称
secure: false # 是否使用HTTPS如果使用HTTPS请设置为true secure: false # 是否使用HTTPS如果使用HTTPS请设置为true
hive-server:
port: 888

Loading…
Cancel
Save