博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty3.10.1:关于TCP粘包问题 及 Encoder&Decoder
阅读量:6188 次
发布时间:2019-06-21

本文共 5223 字,大约阅读时间需要 17 分钟。

hot3.png

上一篇文章,我们只是解释了,在Netty中,从调用socket的read函数,到复制读取的内容到自己的缓冲区里,

到把这个缓冲区封装成一个UpStream事件,到分发给SocketChannel对应的PipeLine,到如何被自定义的类的MessageReceived函数处理

的整个过程。

---------------但是,很多时候,由于TCP的天生字节流特性,导致我们需要解决粘包的问题,其实也就是说

可能某个MessageReceived中的内容不是一个逻辑上完整的内容,想象一下,如果你使用XML格式来定义消息

结果本次读取的内容不是一个完整的XML,因为剩下的内容可能要在下一次MessageReceived中处理,

此时,怎么办?

MessageDecoder就是用来处理这种情况的。

----------------------------------------------那么,在整个系统中,MessageDecoder到底是如何处理的呢?

这就得分析

p.addLast("name1", new Handler1);

 

p.addLast("name2", new Handler2);

 

p.addLast("name3", new Handler3);

道理意味着什么?

其实,上面这3行代码是放在PipelineFactory的getPipeline()函数里的。

然后执行了这3行代码,就形成了一条处理链,从1到3顺序双链表连接。

1<--->2<--->3.

head为1,tail为3.这样的话,上行消息就从1到2到3处理,下行消息就从3到2到1处理。

每当到达一节节点是,根据上行还是下行,判断当前节点是否可以处理本消息而决定是处理还是跳过。

----------------------------------------------------------------------------------------------------------

由于MessageReceived是一个上行消息,在真正被处理前,应该被解码,所以如果hander2是处理完整消息的节点的话。

则handler1则应该是MessageDecoder节点,且这个节点应该可以执行MessageReceived函数。也就是起码应该

extends SimpleChannelUpstreamHandler ,也可以是SimpleChannelHandler 

反正只要你实现了MessageReceived就可以了。

----------------------------------------------------------------------------------------------------------

 

public void sendUpstream(ChannelEvent e) {

        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);

        if (head == null) {

            if (logger.isWarnEnabled()) {

                logger.warn(

                        "The pipeline contains no upstream handlers; discarding: " + e);

            }

 

            return;

        }

 

        sendUpstream(head, e);

    }

 

    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {

        try {

            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);

        } catch (Throwable t) {

            notifyHandlerException(e, t);

        }

    }

 这上面的代码的意思就是:如果某个PipeLine收到一个上行消息,就从head找到第一个可以处理上行消息的节点

交给它处理,下面我们看看上行节点收到消息后怎么办?

 public void handleUpstream(

            ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

 

        if (e instanceof MessageEvent) {

            messageReceived(ctx, (MessageEvent) e);

        } else if (e instanceof WriteCompletionEvent) {

            WriteCompletionEvent evt = (WriteCompletionEvent) e;

            writeComplete(ctx, evt);

        } else if (e instanceof ChildChannelStateEvent) {

            ChildChannelStateEvent evt = (ChildChannelStateEvent) e;

            if (evt.getChildChannel().isOpen()) {

                childChannelOpen(ctx, evt);

            } else {

                childChannelClosed(ctx, evt);

            }

        } else if (e instanceof ChannelStateEvent) {

            ChannelStateEvent evt = (ChannelStateEvent) e;

            switch (evt.getState()) {

            case OPEN:

                if (Boolean.TRUE.equals(evt.getValue())) {

                    channelOpen(ctx, evt);

                } else {

                    channelClosed(ctx, evt);

                }

                break;

            case BOUND:

                if (evt.getValue() != null) {

                    channelBound(ctx, evt);

                } else {

                    channelUnbound(ctx, evt);

                }

                break;

            case CONNECTED:

                if (evt.getValue() != null) {

                    channelConnected(ctx, evt);

                } else {

                    channelDisconnected(ctx, evt);

                }

                break;

            case INTEREST_OPS:

                channelInterestChanged(ctx, evt);

                break;

            default:

                ctx.sendUpstream(e);

            }

        } else if (e instanceof ExceptionEvent) {

            exceptionCaught(ctx, (ExceptionEvent) e);

        } else {

            ctx.sendUpstream(e);

        }

    }

 从这里,我们已经可以很清楚的知道了,对于每一个上行Event 处理器来说,

只要是MessageEvent,就会调用messageReceived函数。

而这个函数正是我们之前自定义的@Override函数。

------------------------------------------------------那么MessageDecoder是如何嵌入进去的呢?

这里我们以继承了FrameDecoder的类为例子讲解!

 

@Override

    public void messageReceived(

            ChannelHandlerContext ctx, MessageEvent e) throws Exception {

 

        Object m = e.getMessage();//获取消息

        if (!(m instanceof ChannelBuffer)) {

            ctx.sendUpstream(e);

            return;

        }

 

        ChannelBuffer input = (ChannelBuffer) m;//转换成ChannelBuffer对象

        if (!input.readable()) {

            return;

        }

 

        if (cumulation == null) {

            try {

//根据需要决定是否需要缓存本次消息,然后再把复制好的消息传给callDecode函数

                // the cumulation buffer is not created yet so just pass the input to callDecode(...) method

                callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());

            } finally {

                updateCumulation(ctx, input);

            }

        } else {

            input = appendToCumulation(input);

            try {

                callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());

            } finally {

                updateCumulation(ctx, input);

            }

        }

    }

 其实说白了,就是FrameDecoder会在本地缓存数据,然后每次来了新消息之后,就把自己的老数据+新数据合并成目前的数据

然后传给自定义类的decode函数。

如下是一个自定义的Decoder

 

public class MyFrameDecoder extends  {

 

         @Override

   protected Object decode( ctx,

                           ,

                            buf) throws Exception { 

     if (buf.readableBytes() < 4) {     

        return null;

     } 

     buf.markReaderIndex(); 

      int length = buf.readInt(); 

     if (buf.readableBytes() < length) {

         buf.resetReaderIndex(); 

        return null;

     }  

      frame = buf.readBytes(length);   

     return frame;

   }

 }

这个比较简单,就不多说了。

------------------------------------------------------

那么,结合上一篇文章,我们这里就知道如何利用MessageDecoder和MessageHandler来处理一个消息了。

至于编码Encoder,就可以继承OneToOneEncoder即可。

最后写入到对应的socketChannel的队列里。

 

至于其他心跳检测等内容,可以参考。

 

 

 

@Override

protected Object encode(ChannelHandlerContext ctx, Channel channel,

Object msgObj) throws Exception {

 

ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();

Message msg = (Message) msgObj;

// version

byte[] version = new byte[1];

version[0] = msg.getVersion();

buffer.writeBytes(version);

// length

buffer.writeInt(0);// 数据长度

// content

byte[] bytes = null;

String str = msg.getContent();

try {

bytes = str.getBytes("UTF-8");

catch (UnsupportedEncodingException e) {

logger.error("", e);

}

buffer.writeBytes(bytes);// 数据体

// 修正长度

buffer.setInt(1, buffer.writerIndex());// 重置长度

//ok

logger.debug("send message=====" + str.toString());

return buffer;

 

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://my.oschina.net/qiangzigege/blog/393280

你可能感兴趣的文章
Linux相关知识
查看>>
我的友情链接
查看>>
[转载] 信息系统项目管理师视频教程——25 战略管理
查看>>
【Java】Java中的时间日期处理
查看>>
在windows下配置apache以cgi方式支持python
查看>>
Oracle 服务器端客户端字符集设置对应用程序的影响
查看>>
MCQ消息丢失排查
查看>>
搭建Nginx + Memcached + Tomcat 集群记录
查看>>
iOS状态栏颜色修改
查看>>
关于生活!
查看>>
manila newton源码安装
查看>>
红灯不是用来闯的
查看>>
win10的kms激活命令
查看>>
SecureCRT启用日志和设置卷屏行数
查看>>
数学基础——进制转换
查看>>
linux samba服务器
查看>>
配置yum源2:配置本地yum源,服务器不联网
查看>>
linux下mysql主主互备
查看>>
软件详细设计说明书
查看>>
利用awk自身变量NR和FNR来处理多个文件
查看>>