嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300
本次赞助数额为: 2 元微信扫码支付:2 元
请留下您的邮箱,我们将在2小时内将文件发到您的邮箱
Netty搭建TCP服务器,实现与IOT设备通信
private ExecutorService executorService1 = new ThreadPoolExecutor(
10,
20,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
private ExecutorService executorService2 = new ThreadPoolExecutor(
10,
20,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
private static AtomicInteger atomicInteger = new AtomicInteger(1);
private static AtomicInteger atomic = new AtomicInteger(1);
private static AtomicInteger atomic_23 = new AtomicInteger(1);
private static Lock lock_1 = new ReentrantLock();
private static Lock lock_2 = new ReentrantLock();
private static Lock lock_3 = new ReentrantLock();
private static Lock lock_4 = new ReentrantLock();
private static int count = 0;
//private final NettyMqttService nettyMqttService;
// 此处缓存一份设备的最新数据
Map<Object, Map<String, Object>> lastedEnv = new ConcurrentHashMap<Object, Map<String, Object>>();
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
@Override //数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* @param ctx
* @author caobing
* @DESCRIPTION: 有客户端连接服务器会触发此函数
* @return: void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
lock_1.lock();
try {
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("客户端【" channelId "】是连接状态,连接通道数量: " CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("客户端【" channelId "】连接netty服务器");
log.info("连接通道数量: " CHANNEL_MAP.size());
}
} finally {
lock_1.unlock();
}
}
/**
* @param ctx
* @author caobing
* @DESCRIPTION: 有客户端终止连接服务器会触发此函数
* @return: void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
lock_2.lock();
try {
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (CHANNEL_MAP.containsKey(channelId)) {
//删除连接
CHANNEL_MAP.remove(channelId);
System.out.println();
log.info("客户端【" channelId "】退出netty服务器");
log.info("连接通道数量: " CHANNEL_MAP.size());
}
} finally {
lock_2.unlock();
}
}
/**
* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
* 2. Object msg: 就是客户端发送的数据 默认Object
* <p>
* 读取数据实际(这里我们可以读取客户端发送的消息)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lock_3.lock();
try {
//log.info("服务器读取线程 " Thread.currentThread().getName() " channle = " ctx.channel());
//log.info("进入服务端数据:" msg.toString());
// Channel channel = ctx.channel();
// //将 msg 转成一个 ByteBuf
// //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
//得到此时客户端的数据长度
int bytes_length = buf.readableBytes();
// log.info("此时客户端的数据长度: " bytes_length);
//组件新的字节数组
byte[] buffer = new byte[bytes_length];
buf.readBytes(buffer);
// final String allData = NettyByteAndStringUtils.byteToHex(buffer);
//log.info("进入服务端数据:" allData);
String s = new String(buffer);
System.out.println("s:" count);
count ;
}