|
|
@ -12,6 +12,7 @@ import io.netty.handler.timeout.ReadTimeoutException;
|
|
|
|
import io.netty.util.AttributeKey;
|
|
|
|
import io.netty.util.AttributeKey;
|
|
|
|
import io.netty.util.CharsetUtil;
|
|
|
|
import io.netty.util.CharsetUtil;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
import java.net.SocketAddress;
|
|
|
|
import java.net.SocketAddress;
|
|
|
@ -20,6 +21,7 @@ import java.util.HashMap;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import static com.hive.communication.util.CheckCode.*;
|
|
|
|
import static com.hive.util.DateConvetUtil.getCurrentSeconds;
|
|
|
|
import static com.hive.util.DateConvetUtil.getCurrentSeconds;
|
|
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
|
|
@Slf4j
|
|
|
@ -43,33 +45,55 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
|
|
|
|
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
public static void sendActive(ChannelHandlerContext ctx, boolean flag, BeeHive beeHive, boolean notice) {
|
|
|
|
public static void sendActive(ChannelHandlerContext ctx, boolean flag, BeeHive beeHive, boolean notice) {
|
|
|
|
if (flag) {
|
|
|
|
if (flag) {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
// String paddedString = String.format("%4s", code).replace(' ', '0');
|
|
|
|
String start="99";
|
|
|
|
String codes = "注册完回复";
|
|
|
|
String end ="AA";
|
|
|
|
//获取十分钟后的时间秒数,需要转换为16进制
|
|
|
|
StringBuilder stringBuilder = new StringBuilder();
|
|
|
|
getCurrentSeconds();
|
|
|
|
stringBuilder.append(start);
|
|
|
|
log.info("当前时间: {}, 消息: {}", LocalDateTime.now(), codes);
|
|
|
|
stringBuilder.append("8561");
|
|
|
|
// 先将十六进制字符串转换为字节数组
|
|
|
|
stringBuilder.append(beeHive.getDateUploadTimeHex());
|
|
|
|
byte[] messageBytes = HexConversion.hexStringToByteArray(codes);
|
|
|
|
stringBuilder.append("0b01");
|
|
|
|
// 将字节数组封装到ByteBuf中
|
|
|
|
// String paddedString = String.format("%4s", code).replace(' ', '0');
|
|
|
|
ByteBuf messageBuffer = Unpooled.wrappedBuffer(messageBytes);
|
|
|
|
//获取十分钟后的时间秒数,
|
|
|
|
// 通过ChannelHandlerContext发送ByteBuf
|
|
|
|
// 转换为16进制
|
|
|
|
// 关联设备编号与Channel
|
|
|
|
String hexSeconds = Long.toHexString(getCurrentSeconds()).toUpperCase();
|
|
|
|
// sendRequest(ctx.channel(),messageBuffer,devicePointsSensor.getId());
|
|
|
|
stringBuilder.append(hexSeconds);
|
|
|
|
retrySend(ctx, messageBuffer)
|
|
|
|
stringBuilder.append("0000ff000000");
|
|
|
|
.whenComplete((result,error) -> {
|
|
|
|
|
|
|
|
log.info("发送状态: {},当前时间: {}, 消息: {},名称:{},蜂箱id:{}", result,LocalDateTime.now(), codes,beeHive.getBeeHiveName(),beeHive.getId());
|
|
|
|
byte[] dataSum = hexStringToByteArray(stringBuilder.toString());
|
|
|
|
|
|
|
|
// 计算校验码1
|
|
|
|
|
|
|
|
byte checksum1 = calculateChecksum1(dataSum);
|
|
|
|
|
|
|
|
// 计算校验码2
|
|
|
|
|
|
|
|
byte checksum2 = calculateChecksum2(dataSum);
|
|
|
|
|
|
|
|
String checksum1Hex = String.format("%02X", checksum1);
|
|
|
|
|
|
|
|
String checksum2Hex = String.format("%02X", checksum2);
|
|
|
|
|
|
|
|
stringBuilder.append(checksum1Hex);
|
|
|
|
|
|
|
|
stringBuilder.append(checksum2Hex);
|
|
|
|
|
|
|
|
stringBuilder.append(end);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log.info("当前时间: {}, 消息: {}", LocalDateTime.now(), stringBuilder);
|
|
|
|
|
|
|
|
// 先将十六进制字符串转换为字节数组
|
|
|
|
|
|
|
|
byte[] messageBytes = HexConversion.hexStringToByteArray(stringBuilder.toString());
|
|
|
|
|
|
|
|
// 将字节数组封装到ByteBuf中
|
|
|
|
|
|
|
|
ByteBuf messageBuffer = Unpooled.wrappedBuffer(messageBytes);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 通过ChannelHandlerContext发送ByteBuf
|
|
|
|
|
|
|
|
// 关联设备编号与Channel
|
|
|
|
|
|
|
|
// sendRequest(ctx.channel(),messageBuffer,devicePointsSensor.getId());
|
|
|
|
|
|
|
|
retrySend(ctx, messageBuffer)
|
|
|
|
|
|
|
|
.whenComplete((result, error) -> {
|
|
|
|
|
|
|
|
log.info("发送状态: {},当前时间: {}, 消息: {},名称:{},蜂箱id:{}", result, LocalDateTime.now(), stringBuilder, beeHive.getBeeHiveName(), beeHive.getId());
|
|
|
|
/* if (result) {
|
|
|
|
/* if (result) {
|
|
|
|
// 发送成功
|
|
|
|
// 发送成功
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// 发送失败
|
|
|
|
// 发送失败
|
|
|
|
}*/
|
|
|
|
}*/
|
|
|
|
});
|
|
|
|
});
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
// 处理异常
|
|
|
|
// 处理异常
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -92,6 +116,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return future;
|
|
|
|
return future;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 功能:发送消息并监听成功与失败进行重试
|
|
|
|
* 功能:发送消息并监听成功与失败进行重试
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -99,31 +124,32 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
messageBuffer.retain(); // 增加引用计数
|
|
|
|
messageBuffer.retain(); // 增加引用计数
|
|
|
|
ChannelFuture sendFuture = ctx.writeAndFlush(messageBuffer);
|
|
|
|
ChannelFuture sendFuture = ctx.writeAndFlush(messageBuffer);
|
|
|
|
sendFuture.addListener((ChannelFutureListener) sendFutureListener -> {
|
|
|
|
sendFuture.addListener((ChannelFutureListener) sendFutureListener -> {
|
|
|
|
if (sendFutureListener.isSuccess()) {
|
|
|
|
if (sendFutureListener.isSuccess()) {
|
|
|
|
messageBuffer.release(); // 释放在发送前增加的引用计数
|
|
|
|
messageBuffer.release(); // 释放在发送前增加的引用计数
|
|
|
|
future.complete(true); // 发送成功,将结果设置为 true
|
|
|
|
future.complete(true); // 发送成功,将结果设置为 true
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
if (remainingRetries > 0) {
|
|
|
|
if (remainingRetries > 0) {
|
|
|
|
ctx.executor().schedule(() -> {
|
|
|
|
ctx.executor().schedule(() -> {
|
|
|
|
// 发送失败,但还有剩余重试次数
|
|
|
|
// 发送失败,但还有剩余重试次数
|
|
|
|
if (messageBuffer.isReadable()) {
|
|
|
|
if (messageBuffer.isReadable()) {
|
|
|
|
// 如果messageBuffer仍然可读,重新使用它进行重试
|
|
|
|
// 如果messageBuffer仍然可读,重新使用它进行重试
|
|
|
|
retrySendAsync(ctx,messageBuffer, remainingRetries - 1, future);
|
|
|
|
retrySendAsync(ctx, messageBuffer, remainingRetries - 1, future);
|
|
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// messageBuffer不可读,需要重新创建或复制一个新的messageBuffer
|
|
|
|
// messageBuffer不可读,需要重新创建或复制一个新的messageBuffer
|
|
|
|
ByteBuf newMessageBuffer = messageBuffer.retainedDuplicate();
|
|
|
|
ByteBuf newMessageBuffer = messageBuffer.retainedDuplicate();
|
|
|
|
retrySendAsync(ctx, newMessageBuffer, remainingRetries - 1, future);
|
|
|
|
retrySendAsync(ctx, newMessageBuffer, remainingRetries - 1, future);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 2, TimeUnit.SECONDS);
|
|
|
|
}, 2, TimeUnit.SECONDS);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
messageBuffer.release(); // 释放在发送前增加的引用计数
|
|
|
|
messageBuffer.release(); // 释放在发送前增加的引用计数
|
|
|
|
future.complete(false); // 达到最大重试次数,将结果设置为 false
|
|
|
|
future.complete(false); // 达到最大重试次数,将结果设置为 false
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 功能:或缺设备编号等存入pendingRequests等读取是进行匹配
|
|
|
|
* 功能:或缺设备编号等存入pendingRequests等读取是进行匹配
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -142,6 +168,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
|
|
|
|
|
|
|
|
return promise;
|
|
|
|
return promise;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 功能:连接成功调用
|
|
|
|
* 功能:连接成功调用
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -179,7 +206,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/*****************************************处理收到的消息 *****************************************/
|
|
|
|
/*****************************************处理收到的消息 *****************************************/
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 功能:读取发送过来的信息
|
|
|
|
* 功能:读取发送过来的信息
|
|
|
@ -200,7 +227,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
// System.out.println("接收到的数据sb" + sb.toString());
|
|
|
|
// System.out.println("接收到的数据sb" + sb.toString());
|
|
|
|
// 检查消息是否是心跳请求
|
|
|
|
// 检查消息是否是心跳请求
|
|
|
|
if (sb.toString().equalsIgnoreCase(HEARTBEAT_REQUEST)) {
|
|
|
|
if (sb.toString().equalsIgnoreCase(HEARTBEAT_REQUEST)) {
|
|
|
|
handleHeartbeat(ctx,msg);
|
|
|
|
handleHeartbeat(ctx, msg);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// 如果不是心跳请求,调用ChannelInboundHandler的下一个处理器
|
|
|
|
// 如果不是心跳请求,调用ChannelInboundHandler的下一个处理器
|
|
|
|
ctx.fireChannelRead(msg);
|
|
|
|
ctx.fireChannelRead(msg);
|
|
|
@ -214,7 +241,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
|
|
|
|
|
|
|
|
private void handleHeartbeat(ChannelHandlerContext ctx, Object msg) {
|
|
|
|
private void handleHeartbeat(ChannelHandlerContext ctx, Object msg) {
|
|
|
|
isHeartbeatReceived = true;
|
|
|
|
isHeartbeatReceived = true;
|
|
|
|
ctx.fireChannelRead(msg);
|
|
|
|
ctx.fireChannelRead(msg);
|
|
|
|
//ctx.fireChannelReadComplete();
|
|
|
|
//ctx.fireChannelReadComplete();
|
|
|
|
// 如果需要,发送心跳响应
|
|
|
|
// 如果需要,发送心跳响应
|
|
|
|
// ctx.writeAndFlush("FE");
|
|
|
|
// ctx.writeAndFlush("FE");
|
|
|
|