基本信息
源码名称:TCP/IP数据交互代码(ls-iot-TCP)
源码大小:0.20M
文件格式:.rar
开发语言:Java
更新时间:2021-04-20
   友情提示:(无需注册或充值,赞助后即可获取资源下载链接)

     嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300

本次赞助数额为: 2 元 
   源码介绍
Netty搭建TCP服务器,实现与IOT设备通信

   private ExecutorService executorService1 = new ThreadPoolExecutor(
            10,
            20,
            1L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    private ExecutorService executorService2 = new ThreadPoolExecutor(
            10,
            20,
            1L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());



    private static AtomicInteger atomicInteger = new AtomicInteger(1);

    private static AtomicInteger atomic = new AtomicInteger(1);

    private static AtomicInteger atomic_23 = new AtomicInteger(1);

    private static Lock lock_1 = new ReentrantLock();

    private static Lock lock_2 = new ReentrantLock();

    private static Lock lock_3 = new ReentrantLock();

    private static Lock lock_4 = new ReentrantLock();
    private  static int count = 0;

    //private  final NettyMqttService nettyMqttService;

    // 此处缓存一份设备的最新数据
    Map<Object, Map<String, Object>> lastedEnv = new ConcurrentHashMap<Object, Map<String, Object>>();

    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     */
    private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();

    @Override //数据读取完毕
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }


    /**
     * @param ctx
     * @author caobing
     * @DESCRIPTION: 有客户端连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        lock_1.lock();
        try {
            //获取连接通道唯一标识
            ChannelId channelId = ctx.channel().id();
            //如果map中不包含此连接,就保存连接
            if (CHANNEL_MAP.containsKey(channelId)) {
                log.info("客户端【" channelId "】是连接状态,连接通道数量: " CHANNEL_MAP.size());
            } else {
                //保存连接
                CHANNEL_MAP.put(channelId, ctx);
                log.info("客户端【" channelId "】连接netty服务器");
                log.info("连接通道数量: " CHANNEL_MAP.size());
            }
        } finally {
            lock_1.unlock();
        }
    }

    /**
     * @param ctx
     * @author caobing
     * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        lock_2.lock();
        try {
            ChannelId channelId = ctx.channel().id();
            //包含此客户端才去删除
            if (CHANNEL_MAP.containsKey(channelId)) {
                //删除连接
                CHANNEL_MAP.remove(channelId);
                System.out.println();
                log.info("客户端【" channelId "】退出netty服务器");
                log.info("连接通道数量: " CHANNEL_MAP.size());
            }
        } finally {
            lock_2.unlock();
        }
    }


    /**
     * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
     * 2. Object msg: 就是客户端发送的数据 默认Object
     * <p>
     * 读取数据实际(这里我们可以读取客户端发送的消息)
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lock_3.lock();
        try {
            //log.info("服务器读取线程 " Thread.currentThread().getName() " channle = " ctx.channel());

            //log.info("进入服务端数据:" msg.toString());

//            Channel channel = ctx.channel();
//            //将 msg 转成一个 ByteBuf
//            //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
            ByteBuf buf = (ByteBuf) msg;
            //得到此时客户端的数据长度
            int bytes_length = buf.readableBytes();
//            log.info("此时客户端的数据长度: " bytes_length);
            //组件新的字节数组
            byte[] buffer = new byte[bytes_length];
            buf.readBytes(buffer);
           // final String allData = NettyByteAndStringUtils.byteToHex(buffer);
            //log.info("进入服务端数据:" allData);
            String s = new String(buffer);
            System.out.println("s:" count);
            count ;

}