0%

netty概念和使用netty搭建WebSocket

简介

netty是java网络编程框架,其架构抽象出的概念很容易使我们针对不同的需求来定制不同的处理器,本文将简介netty的概念和一些网络知识,最后并使用netty来编写一个webScoket聊天室服务器.

netty基本概念

线程模型
阻塞与非阻塞

阻塞与非阻塞是描述进程在访问某个资源时,数据是否准备就绪的的一种处理方式。当数据没有准备就绪时:

阻塞:线程持续等待资源中数据准备完成,直到返回响应结果。

非阻塞:线程直接返回结果,不会持续等待资源准备数据结束后才响应结果。

同步与异步

同步与异步是指访问数据的机制,同步一般指主动请求并等待IO操作完成的方式。

异步则指主动请求数据后便可以继续处理其它任务,随后等待IO操作完毕的通知。

用经典的烧开水理论来解释为

1、普通水壶煮水,站在旁边,主动的看水开了没有?同步的阻塞

2、普通水壶煮水,去干点别的事,每过一段时间去看看水开了没有,水没开就走人。 同步非阻塞

3、响水壶煮水,站在旁边,不会每过一段时间主动看水开了没有。如果水开了,水壶自动通知他。 异步阻塞

4、响水壶煮水,去干点别的事,如果水开了,水壶自动通知他。异步非阻塞

为什么使用异步线程模型?本质上是由于CPU和I/O执行效率的不对等,CPU执行速度很快,而I/0执行的较慢,如果用阻塞去处理,那么就会导致cpu在I/0阻塞时一直在等待I/0的结果,这样导致CPU的浪费

netty使用Reactor线程模型

Reactor是为同步I/O设计的线程模式
其分为三种, 分别为

  • 单线程模型 (单Reactor单线程)
  • 多线程模型 (单Reactor多线程)
  • 主从多线程模型 (多Reactor多线程)

channel:通道,目前可以把channel看做是入站或者出站数据的载体,因此它可以被打开或者关闭—-摘抄自netty实战

Reactor中一个定义了4个事件分表为Accept(接受), Read(读取), Write(写),Connect(连接),各个Channel通过这些事件往Reactor中注册信息,来告诉Reactor,他们下一步想要执行什么事件

单线程模型

按钮
根据Doug Lea 在 《Scalable IO in Java 》中单线程模型中的代码
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
顾名思义就是已一个线程去控制所有I/O的流转,从代码中我们可以看出其只用了一个线程就能控制所有的I/O,其原理是在Reactor抽象出一个Selector,所有的Channel当状态发生变化时中往Selector中注册事件),由Selector循环去调用select()方法去获取这些发生状态变化的事件,然后根据这些事件绑定的执行器如图中的(acceptor,和handler(read,decode….send))一个个去执行这些事件,这样就使用一个线程就能控制多个I/O的执行

缺点:当处理读写任务的线程负载过高后,处理速度下降,事件会堆积,严重的会超时,可能导致客户端重新发送请求,性能越来越差

多线程模型
按钮
如图,多线程和单线程模型的区别就是将之前的handler换成一个线程池去处理,来解决单线程的问题,因为在网络编程中,花的时间较多的是handler,因为要负责解码,处理逻辑,最后在编发发送给客户端,因此有用线程池去处理,而acceptor只是单纯的做一个监测端口然后并注册selector中注册的作用,所以没用用到线程池,一般我们使用netty时只监测一个端口,所以这个模型是netty大量使用的模型

主从多线程模型
和多线程的区别就是acceptor也使用线程池去处理

主要接口和概念
  • channel:通道,目前可以把channel看做是入站或者出站数据的载体,因此它可以被打开或者关闭
  • ChannelHandler:Netty使用不同的事件来通知我们状态的改变或者是操作的状态;这使我们能够基于已经发生的事件来触发适当的动作,举处理心跳的IdleStateHandler为例,在这个类初始化时会初始化一个定时器,如果在一段时间内没有触发消失这会在定时器中触发事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private void initialize(ChannelHandlerContext ctx) {
    switch(this.state) {
    case 1:
    case 2:
    return;
    default:
    this.state = 1;
    this.initOutputChanged(ctx);
    this.lastReadTime = this.lastWriteTime = this.ticksInNanos();
    if (this.readerIdleTimeNanos > 0L) {
    this.readerIdleTimeout = this.schedule(ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }

    if (this.writerIdleTimeNanos > 0L) {
    this.writerIdleTimeout = this.schedule(ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }

    if (this.allIdleTimeNanos > 0L) {
    this.allIdleTimeout = this.schedule(ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }

    }
    }

以下是定时任务的代码,在IdleStateHandler.this.channelIdle(ctx, event);中netty发出了事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}

protected void run(ChannelHandlerContext ctx) {
long nextDelay = IdleStateHandler.this.readerIdleTimeNanos;
if (!IdleStateHandler.this.reading) {
nextDelay -= IdleStateHandler.this.ticksInNanos() - IdleStateHandler.this.lastReadTime;
}

if (nextDelay <= 0L) {
IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = IdleStateHandler.this.firstReaderIdleEvent;
IdleStateHandler.this.firstReaderIdleEvent = false;

try {
IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);
IdleStateHandler.this.channelIdle(ctx, event);
} catch (Throwable var6) {
ctx.fireExceptionCaught(var6);
}
} else {
IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}

}
}

发出事件,由后续由我们自行实现的channel在userEventTriggered方法中捕获这个事件并做处理

1
2
3
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
  • ChannelHandler:分为入站和出站分别对应其的两个子接口ChannelInboundHandler,ChannelOutboundHandler,入站事件主要处理读事件,这里做一个类比,Http服务器主要做的是处理读事件,在客户单连接到服务器后,服务器需要对接受到的信息进行解码,http协议的解释,最后再进行逻辑的处理,最后将请求返回回去这就是读请求,而客户端需要处理写的请求,将对象序列化,并以一定的协议传输出去,这是写请求。

  • EventLoopGroup为netty封装的线程池模型

服务端代码

服务端代码如下,代码中做了注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class ServiceMain {

public static void main(String[] args)throws Exception {
//编写一个acceptor线程池,主要监测端口和分发连接,因为我们只监测一个端口所以采用的是Reactor的多线程模型(只用一个线程去处理连接)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//定义一个出来ChannelHandler的线程池,不指定参数线程池数量为cpu数*2
EventLoopGroup workerGroup = new NioEventLoopGroup();
//定义一个保存所有channel连接的容器
ChannelGroup group=new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
try {
ServiceMain serviceMain=new ServiceMain();
ServerBootstrap serverBootstrap=new ServerBootstrap();
//绑定Reactor线程池和ChannelHandler线程池
Channel channel =serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline=socketChannel.pipeline();
//添加http协议的解码器和编译器
pipeline.addLast(new HttpServerCodec());
//添加http聚合器,因为在tcp传输中tcp协议会根据发送包的大小对数据进行分包,所以这里要将分的包组合成一个完成的http请求
pipeline.addLast(new HttpObjectAggregator(65536));
//webscoket对应的处理器,用户处理webscoket的握手和协议升级
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//主要处理websocket发送的消息
pipeline.addLast( serviceMain.new TextHandler(group));
}
})
.bind(new InetSocketAddress(9000)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println(channelFuture.isSuccess());
}
}).sync().channel();
channel.closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}


//将用户进入聊天室发起的消息进行分发
private class TextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{


private ChannelGroup group;

public TextHandler(ChannelGroup group) {
this.group = group;
}


//如果触发了webscoket握手信息则往聊天室类发消息通知用户进入聊天室
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt==WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
group.writeAndFlush("新用户加入房间");
group.add(ctx.channel());
}
super.userEventTriggered(ctx, evt);
}

//用户号发送消息时将消息在聊天室内广播
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

group.writeAndFlush(textWebSocketFrame.retain());
}
}







}

网页端代码

用h5发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!DOCTYPE html>
<meta charset="utf-8" />
<title>WebSocket Test</title>
<script language="javascript"type="text/javascript">
alert(2222)
var ws = new WebSocket("ws://127.0.0.1:9000/ws");

ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};

ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);

};

ws.onclose = function(evt) {
console.log("Connection closed.");
};

function send() {
ws.send("message.................")
}
</script>
<input type="button" value="send" onclick="send()">
</html>