网络编程—上


1.网络编程基础

见计算机网络,因此略

2.深入BIO与NIO

2.1 BIO

BIO 有的称之为 basic(基本) IO,有的称之为 block(阻塞) IO,主要应用于文件 IO 和网络 IO,这里不再说文件 IO,前置基础资料中有详细说明,本次课程主要讲讲网络 IO。

在 JDK1.4 之前,我们建立网络连接的时候只能采用 BIO,需要先在服务端启动一个ServerSocket,然后在客户端启动 Socket 来对服务端进行通信,默认情况下服务端需要对每个请求建立一个线程等待请求,而客户端发送请求后,先咨询服务端是否有线程响应,如果没有则会一直等待或者遭到拒绝,如果有的话,客户端线程会等待请求结束后才继续执行,这就是阻塞式IO。

接下来通过一个例子复习回顾一下 BIO 的基本用法(基于 TCP)。

package org.example.socket.bio;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class BioSocketServer {
    public static void main(String[] args)throws Exception {
        //1、创建ServerSocket对象
        System.out.println("服务端  启动.....");
        System.out.println("初始化端口  9999");
        ServerSocket ss = new ServerSocket(9999);
        while (true){
            //2.监听客户端
            Socket s = ss.accept();
            //3.从连接中取出输入流来接受消息
            InputStream is =s.getInputStream();//阻塞
            byte[] b = new byte[10];
            is.read(b);
            String clientIP = s.getInetAddress().getHostAddress();
            System.out.println(clientIP +"说:"+new String(b).trim());
            //4、从连接中取出输出流并回话
            OutputStream os = s.getOutputStream();
            os.write("没钱".getBytes());
            //关闭
            s.close();

        }

    }
}

上述代码编写了一个服务器端程序,绑定端口号 9999,accept 方法用来监听客户端连接, 如果没有客户端连接,就一直等待,程序会阻塞到这里。

package org.example.socket.bio;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * 客户端程序
 */
public class BioSocketClient {
    public static void main(String[] args)throws Exception {
        while (true){
            //1、创建Socket对象
            Socket s = new Socket("127.0.0.1",9999);
            //2、从连接中取出输出流并发消息
            OutputStream os  = s.getOutputStream();
            System.out.println("请输出:");
            Scanner sc = new Scanner(System.in);
            String msg = sc.nextLine();
            os.write(msg.getBytes());
            //3.从连接中取出输入流并接受回话
            InputStream is = s.getInputStream();//阻塞
            byte[] b = new byte[20];
            is.read(b);
            System.out.println("boos say:"+new String(b).trim());
            //4、关闭连接
            s.close();



        }
    }
}

上述代码编写了一个客户端程序,通过 9999 端口连接服务器端,getInputStream 方法用来等待服务器端返回数据,如果没有返回,就一直等待,程序会阻塞到这里。

造成乱码的原因:读取的信息是按照特定编码读取的字节流信息,读取的时候受到读取数量限制,就有

可能出现读取的不是一个完整的字节数组信息的情况。

2.2 NIO

2.2.1 概述

java.nio 全称 Java Non-Blocking IO,是指 JDK 提供的新 API。

从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO)。新增了许多用于处理输入输出的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足 NIO 的功能。

NIO 和 BIO 有着相同的目的和作用,但是它们的实现方式完全不同;

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 IO 的效率比流 IO 高很多。

  • NIO 是非阻塞式的,这一点跟 BIO 也很不相同,使用它可以提供非阻塞式的高伸缩性网络。

NIO 主要有三大核心部分:

  • Channel通道

  • Buffer缓冲区

  • Selector选择器

传统的 BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel和 Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。

2.2.2 文件 IO

1.概述和核心 API

缓冲区(Buffer):实际上是一个容器,是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。

Channel 提供从文件、网络读取数据的渠道, 但是读取或写入的数据都必须经由 Buffer

如下图所示:

在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,常用的 Buffer 子类有:

  • ByteBuffer,存储字节数据到缓冲区

  • ShortBuffer,存储短整型数据到缓冲区

  • CharBuffer,存储字符数据到缓冲区

  • IntBuffer,存储整数数据到缓冲区

  • LongBuffer,存储长整型数据到缓冲区

  • DoubleBuffer,存储小数到缓冲区

  • FloatBuffer,存储小数到缓冲区

对于 Java 中的基本数据类型, 都有一个 Buffer 类型与之相对应,最常用的自然是ByteBuffer 类(字节缓冲),该类的主要方法如下所示:

  • public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区

  • public abstract byte[] get(); 从缓冲区获得字节数据

  • public final byte[] array(); 把缓冲区数据转换成字节数组

  • public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量

  • public static ByteBuffer wrap(byte[] array); 把一个现成数组放到缓冲区中使用

  • public final Buffer flip(); 翻转缓冲区,重置位置到初始位置(缓冲区有一个指针从头开始读取数据,读到缓冲区尾部时,可以使用这个方法,将指针重新定位到头)

Channel:类似于 BIO 中的 stream,例如 FileInputStream 对象,用来建立到目标(文件,网络套接字,硬件设备等)的一个连接,但是需要注意:BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的, 既可以用来进行读操作,也可以用来进行写操作。

常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel。

  • FileChannel 用于文件的数据读写

  • DatagramChannel 用于 UDP 的数据读写

  • ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。

这里我们先讲解 FileChannel 类,该类主要用来对本地文件进行 IO 操作,主要方法如下所示:

  • public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中

  • public int write(ByteBuffer src) ,把缓冲区的数据写到通道中

  • public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道中复制数据到当前通道

  • public long transferTo(long position, long count, WritableByteChannel target),把数据从当前通道复制给目标通道

2.案例

接下来我们通过 NIO 实现几个案例,分别演示一下本地文件的读、写和复制操作,并和 BIO 做个对比。

1.往本地文件中写数据

/**
 * 写入文件
 * @throws Exception
 */
public static void writeTxt()throws Exception{
    //1、创造输出流
    FileOutputStream fos = new FileOutputStream("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic.txt");
    //2、从流中得到一个通道
    FileChannel fc = fos.getChannel();
    //3、提供一个缓存区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //4、往缓冲区存入数据
    Scanner sc = new Scanner(System.in);
    buffer.put(sc.nextLine().getBytes());
    //5、翻转缓冲区
    buffer.flip();
    //6、把缓冲区写到通道中
    fc.write(buffer);
    fos.close();
}

NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然后通过通道的 write 方法写数据。在 write 之前,需要调用 flip 方法翻转缓冲区,把内部重置到初始位置,这样在接下来写数据时才能把所有数据写到通道里。

flip()方法的作用:翻转缓冲区,在缓冲区里有一个指针从头(pos)写到尾(lim)。默认的pos是缓冲区内元素size,lim是缓冲区大小。当从缓冲区向通道去写时,是从pos位置去写,写到lim,这样就得不到数据。所以要将pos=lim,pos=0再写。

2.从本地文件中读数据

/**
 * 读取文件
 * @throws Exception
 */
public static void readTxt()throws Exception{
    File file = new File("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic.txt");
    //1、 创建输入流
    FileInputStream fis = new FileInputStream(file);
    //2、得到一个通道
    FileChannel fc = fis.getChannel();
    //3、准备一个缓冲区
    ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
    fc.read(buffer);
    byte[] array = buffer.array();
    String readLine = new String(array);
    System.out.println(readLine);
    fis.close();
}

上述代码从输入流中获得一个通道,然后提供 ByteBuffer 缓冲区,该缓冲区的初始容量和文件的大小一样,最后通过通道的 read 方法把数据读取出来并存储到了 ByteBuffer 中。

3.复制文件

通过 BIO 复制文件

/**
 * BIO复制文件
 * @throws Exception
 */
public static void BioToCopyFile()throws Exception{
    FileInputStream fis = new FileInputStream("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic.txt");
    FileOutputStream fos = new FileOutputStream("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic_copy_bio.txt");
    byte[] b =new byte[1024];
    while (true){
        int res = fis.read(b);
        if(res==-1){
            break;
        }
        fos.write(b,0,res);
    }
    fis.close();
    fos.close();
}

上述代码分别通过输入流和输出流实现了文件的复制,是传统的BIO实现。

通过 NIO 复制相同的文件

/**
     * Nio复制文件
     * @throws Exception
     */
    public static void NioToCopyFile()throws Exception{
        //1、创建两个流对象
        FileInputStream fis = new FileInputStream("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic.txt");
        FileOutputStream fos = new FileOutputStream("D:\\workspace\\java_example\\java_case\\src\\main\\resources\\basic_copy_nio.txt");
        //2、得到两个通道
        FileChannel sourceFileCopy = fis.getChannel();
        FileChannel destFileCopy = fos.getChannel();
        //3、复制
        destFileCopy.transferFrom(sourceFileCopy,0,sourceFileCopy.size());
        //4、关闭
        fis.close();
        fos.close();
    }

上述代码分别从两个流中得到两个通道,sourceCh 负责读数据,destCh 负责写数据,然后直接调用transferFrom 方法一步到位实现了文件复制。

2.2.3 网络 IO

1)概述

学习 NIO 主要就是进行网络 IO,Java NIO 中的网络通道是非阻塞 IO 的实现,基于事件驱动,非常适用于服务器需要维持大量连接,但是数据交换量不大的情况,例如:Web服务器、RPC、即时通信…

在 Java 中编写 Socket 服务器,通常有以下几种模式:

  • 一个客户端连接用一个线程

    • 优点:程序编写简单

    • 缺点:如果连接非常多,分配的线程也会非常多,服务器可能会因为资源耗尽而崩溃。

  • 把每个客户端连接交给一个拥有固定数量线程的连接池

    • 优点:程序编写相对简单, 可以处理大量的连接。

    • 缺点:线程的开销非常大,连接如果非常多,排队现象会比较严重。

  • 使用 Java 的 NIO,用非阻塞的 IO 方式处理

    • 优点:这种模式可以用一个线程,处理大量的客户端连接

    • 缺点:代码复杂度较高,不易理解

2)Selector选择器

能够检测多个注册的通道上是否有事件发生(读、写、连接),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销

该类的常用方法如下所示:

  • public static Selector open(),得到一个选择器对象

  • public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的

  • SelectionKey 加入到内部集合中并返回,参数用来设置超时时间

  • public Set selectedKeys(),从内部集合中得到所有的 SelectionKey

3)SelectionKey

代表了 Selector 和网络通道的注册关系

一共四种(就是连接事件)

  • int OP_ACCEPT:有新的网络连接可以 accept,值为 16

  • int OP_CONNECT:代表连接已经建立,值为 8

  • int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4

该类的常用方法如下所示:

  • public abstract Selector selector(),得到与之关联的 Selector 对象

  • public abstract SelectableChannel channel(),得到与之关联的通道

  • public final Object attachment(),得到与之关联的共享数据

  • public abstract SelectionKey interestOps(int ops),设置或改变监听事件

  • public final boolean isAcceptable(),是否可以 accept

  • public final boolean isReadable(),是否可以读

  • public final boolean isWritable(),是否可以写

4)ServerSocketChannel

用来在服务器端监听新的客户端 Socket 连接

常用方法如下所示:

  • public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道

  • public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号

  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式, 取值 false 表示采用非阻塞模式

  • public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象

  • public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件

5)SocketChannel

网络 IO 通道,具体负责进行读写操作

NIO 总是把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。

常用方法如下所示:

  • public static SocketChannel open(),得到一个 SocketChannel 通道

  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式, 取值 false 表示采用非阻塞模式

  • public boolean connect(SocketAddress remote),连接服务器

  • public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作

  • public int write(ByteBuffer src),往通道里写数据

  • public int read(ByteBuffer dst),从通道里读数据

  • public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置监听事件,最后一个参数可以设置共享数据

  • public final void close(),关闭通道

服务器端有一个选择器对象,服务器的ServerSocketChannel对象也要注册给selector,它的accept方法负责接收客户端的连接请求。有一个客户端连接过来,服务端就会建立一个通道。Selector会监控所有注册的通道,检查这些通道中是否有事件发生【连接、断开、读、写等事件】,如果某个通道有事件发生则做相应的处理。

2.2.4 NIO案例:客户端与服务器之间通信

API 学习完毕后,接下来我们使用 NIO 开发一个入门案例,实现服务器端和客户端之间的数据通信(非阻塞)。

上面代码用 NIO 实现了一个服务器端程序,能不断接受客户端连接并读取客户端发过来的数据

package org.example.socket.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NioSocketClient {

    public static void main(String[] args)throws Exception {
        //1、得到一个网络通道
        SocketChannel channel = SocketChannel.open();
        //2、设置非阻塞方式
        channel.configureBlocking(false);
        //3、提供服务器端的IP地址和端口号
        InetSocketAddress address = new InetSocketAddress("127.0.0.1",9999);
        //4、连接服务器端,如果用connect()方法连接服务器不成功,则用finishConnect()方法进行连接
        if(!channel.connect(address)){
            //因为连接需要花事件,所以用while一直区尝试连接。在连接服务端时还可以做别的事情,体现非阻塞
            while(!channel.finishConnect()){
            //nio作为非阻塞式的优势,如果服务器没有响应(不启动服务端),客户端不会阻塞,最后会报错,客户端尝试链接服务器连不上
                System.out.println("Clinet:连接服务端的同时,还可以干别的一些事情");

            }
        }
        //5、得到一个缓冲区并存入数据
        String msg = "123asd";
        ByteBuffer writeBuf = ByteBuffer.wrap(msg.getBytes());
        //6、发送数据
        channel.write(writeBuf);
        //阻止客户端停止,否则服务端也会停止
        System.in.read();



    }



}
package org.example.socket.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class NioSocketServer {
    public static void main(String[] args)throws Exception {
        //1.开启一个ServerSocketChannel通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2、开启一个Selector选择器
        Selector selector = Selector.open();
        //3、绑定9999端口
        System.out.println("服务器端启动");
        System.out.println("服务器初始化端口:9999");
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //4、配置非阻塞方式
        serverSocketChannel.configureBlocking(false);
//        5、Selector选择器注册ServerSocketChannel通道,绑定连接操作
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //6、循环执行:监听连接事件及读取数据操作
        while (true){
            //6.1 监控客户端连接
            //selecto.select()方法返回的是客户端的通道数,如果为0,则说明没有客户端连接。
            //nio非阻塞式的优势
            if(selector.select(2000)==0){
                System.out.println("Server:客户端暂时无连接,处理其它业务");
                continue;
            }
            //6.2 得到SelectionKey,判断通道里的事件
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            //遍历所有SelectionKey
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                //客户端先连接上,处理事件,然后客户端会向服务端发消息,再读取客户端的数据事件
                if(key.isAcceptable()){//客户端连接请求事件
                    System.out.println("OP_ACCEPT");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    //注册通道,将通道交给selector选择器进行监控
                    //参数01-选择器
                    //参数02-服务器要监控读事件,客户端发送send数据,服务端read数据
                    //参数03-客户端传过来的数据要放在缓冲区
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if(key.isReadable()){//读取客户端数据事件
                    //数据在通道中,先得到通道
                    SocketChannel channel = (SocketChannel) key.channel();
                    //取到一个缓冲区,nio读写数据都是基于缓冲区
                    ByteBuffer readBuffer = (ByteBuffer) key.attachment();
                    //从通道中将客户端发来的数据读到缓冲区
                    channel.read(readBuffer);
                    System.out.println(new String(readBuffer.array()));
//                    Charset charset = StandardCharsets.UTF_8;
//                    String receivedData = charset.decode(readBuffer).toString();
//                    System.out.println("客户端发送过来的数据:"+receivedData);
                }
                //6.3 手动从集合中移除当前key,防止重复处理
                keyIterator.remove();
            }
        }
    }
}

上面代码通过 NIO 实现了一个客户端程序,连接上服务器端后发送了一条数据

2.2.5 网络聊天室V1.0

接下来我们用 NIO 实现一个多人聊天案例,具体代码如下所示

package org.example.socket.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

/**
 * NIO 实现一个多人聊天案例
 */
public class NioChatSocketServer {
    //监听通道
    private ServerSocketChannel listenerChannel;
    //选择器对象
    private Selector selector;
    //服务器端口
    private static final int PROT = 9999;

    public NioChatSocketServer() {
        try {
            //1、开启Socket监听通道
            listenerChannel = ServerSocketChannel.open();
            //2、开启选择器
            selector = Selector.open();
            listenerChannel.bind(new InetSocketAddress(PROT));
            //4、设置为非阻塞模型
            listenerChannel.configureBlocking(false);
            //5. 将选择器绑定到监听通道并监听accpet事件
            listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("真人网络聊天室 启动..........");
            System.out.println("真人网络聊天室 初始化端口 9999..........");
            System.out.println("真人网络聊天室 初始化网络ip地址..........");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void start()throws Exception{
        try {
            while (true){//不停的监控
                if(selector.select(2000)==0){
                    System.out.println("Server:没有客户端连接");
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if(key.isAcceptable()){//连接事件
                        SocketChannel accept = listenerChannel.accept();
                        accept.configureBlocking(false);
                        accept.register(selector,SelectionKey.OP_READ);
                        System.out.println(accept.getRemoteAddress().toString().substring(1)+"上线了......");

                    }
                    if(key.isReadable()){//读取数据事件
                        readMsg(key);
                    }
                    iterator.remove();
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //读取客户端发来的消息并广播出去
    public void readMsg(SelectionKey key)throws Exception{
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int count = channel.read(buffer);
        if(count>0){
            String msg = new String(buffer.array());
            printInfo(msg);
            //全员广播消息
            broadCast(channel,msg);
        }
    }

    //给所有客户端发消息
    public void broadCast(SocketChannel except, String msg) throws IOException {
        System.out.println("服务器广播消息了......");
        for(SelectionKey key:selector.keys()){
            Channel targetChannel = key.channel();
            if(targetChannel instanceof SocketChannel && targetChannel!=except){
                SocketChannel destChannel = (SocketChannel) targetChannel;
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                destChannel.write(buffer);
            }
        }
    }

    private void printInfo(String str) { //往控制台打印消息
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
    }

    public static void main(String[] args)throws Exception {
        /**
         * 指定jar包中的某一个类启动
         * java -cp java_case-1.0-SNAPSHOT.jar org.example.socket.nio.chat.NioChatSocketServer
         */
        new NioChatSocketServer().start();
    }

}

上述代码使用 NIO 编写了一个聊天程序的服务器端,可以接受客户端发来的数据,并能把数据广播给所有客户端。

package org.example.socket.nio.chat;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

//聊天程序客户端
public class NioChatSocketClient {
    private final String HOST="192.168.0.46";
    private int PORT = 9999;
    private SocketChannel socketChannel;//网络通道
    private String userName;

    public NioChatSocketClient()throws Exception{
        //1.得到一个网络通道
        socketChannel = SocketChannel.open();
        //2.设置非阻塞方式
        socketChannel.configureBlocking(false);
        //3.提供服务器端的IP地址和端口号
        InetSocketAddress address = new InetSocketAddress(HOST, PORT);
        //4. 连接服务器端
        if(!socketChannel.connect(address)){
            while (!socketChannel.finishConnect()){//nio作为非阻塞式的优势
                System.out.println("Client:连接服务器端的同时,可以去处理其它业务");

            }
        }
        //5、得到客户端IP地址和端口信息,作为聊天用户名使用
        userName = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println("-------------client("+userName+") is ready------------");
    }
    //向服务器发送数据
    public void sendMsg(String msg)throws Exception{
        if(msg.equalsIgnoreCase("bye")){
            socketChannel.close();
        }
        msg = userName+"说:"+msg;
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        socketChannel.write(buffer);
    }

    //从服务器端接受数据
    private void receiveMsg()throws Exception{
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int size = socketChannel.read(buffer);
        if(size>0){
            String msg = new String(buffer.array());
            System.out.println(msg.trim());
        }
    }

}

上述代码通过 NIO 编写了一个聊天程序的客户端,可以向服务器端发送数据,并能接收服务器广播的数据。

public static void main(String[] args) throws Exception{
        NioChatSocketClient chatSocketClient = new NioChatSocketClient();
        new Thread(()->{
            //监听服务器消息
            while (true){
                try {
                    chatSocketClient.receiveMsg();
                    Thread.sleep(2000);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            chatSocketClient.sendMsg(message);
        }




}

上述代码运行了聊天程序的客户端,该代码运行一次就是一个聊天客户端,可以同时运行多个聊天客户端。在一个聊天客户端中发送消息,会广播给所有其他聊天客户端。客户端互相发送消息,需要提前将服务端启动。

支持局域网聊天,也支持网络聊天。使用内网穿透将本机9999端口映射到公网121.199.163.228的9999端口,即可实现网络群聊。

2.3 AIO 编程

2.3.1 概念

JDK 7 引入了 Asynchronous IO,即 AIO,叫做异步不阻塞的 IO,也可以叫做NIO2。在进行 IO 编程中,常用到两种模式:Reactor模式 和 Proactor模式。

  • NIO采用 Reactor 模式,当有事件触发时,服务器端得到通知,进行相应的处理。

  • AIO采用 Proactor模式,引入异步通道的概念, 简化了程序编写,一个有效的请求才启动一个线程,它的特点是先由操作系统完成后,才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

2.3.2 IO对比总结

IO 的方式通常分为几种:同步阻塞的 BIO、同步非阻塞的 NIO、异步非阻塞的 AIO。

  • BIO 方式:适用于连接数目比较小且固定的架构

    • 这种方式对服务器资源要求比较高,并发局限于应用中

    • JDK1.4 以前的唯一选择,但程序直观简单易理解

    • 同步阻塞:食堂排队取餐:中午去食堂吃饭,排队等着,啥都干不了,到你了选餐,付款,然后找位子吃饭

  • NIO 方式:适用于连接数目多且连接比较短(轻操作)的架构

    • 比如:聊天服务器,并发局限于应用中,编程比较复杂

    • JDK1.4 开始支持

    • 同步非阻塞:下馆子:点完餐,就去商场玩儿了。玩一会儿,就回饭馆问一声:好了没

  • AIO 方式:使用于连接数目多且连接比较长(重操作)的架构

    • 比如:相册服务器,充分调用 OS 参与并发操作,编程比较复杂

    • JDK7 开始支持。

    • 异步非阻塞:海底捞外卖火锅,打电话订餐。海底捞会说,我们知道您的位置,一会给您送过来,请您安心工作。

3.Netty核心技术

3.1 概述

Netty 是一个被广泛使用的,基于NIO的 Java 网络应用编程框架,Netty框架可以帮助开发者快速、简单的实现客户端和服务端的网络应用程序。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 利用 Java 语言的NIO网络编程的能力,并隐藏其背后的复杂性,从而提供一个易用的 API,基于这些API,我们可以快速编写出一个客户端/服务器网络应用程序。

Netty 成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行的开源项目(Apache Cassandra、Apache Storm、Elasticsearch、Dubbo等等),都利用其强大的对于网络抽象的核心代码实现网络通信。

特点:

  • API简单易用:支持阻塞和非阻塞式的socket

  • 基于事件模型:可扩展性和灵活性更强

  • 高度定制化的线程模型:支持单线程和多线程

  • 高通吐、低延迟、资源占用率低

  • 完整支持SSL和TLS

  • 学习难度低

应用场景:

  • 互联网行业:分布式系统远程过程调用,高性能的RPC框架

  • 游戏行业:大型网络游戏高性能通信

  • 大数据:Hadoop的高性能通信和序列化组件 Avro 的 RPC 框架

3.2 线程模型

3.2.1 单线程模型

服务端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,咱们前面的 NIO 案例就属于这种模型。

3.2.2 线程池模型

服务端用一个线程专门处理客户端连接请求,用一个线程池负责 IO 操作。在绝大多数场景下,该模型都能满足网络编程需求。

3.2.3 Netty 线程模型

各组件间的关系

  • Netty 抽象出两组线程池:BossGroup、WorkerGroup

    • BossGroup 专门负责接收客户端连接

    • WorkerGroup 专门负责网络读写操作

    • BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup,相当于一个事件循环组

  • NioEventLoopGroup 可以有多个线程,即含有多个NioEventLoop

  • NioEventLoop 表示一个不断循环的执行处理任务的线程

    • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue

      • Selector 上可以注册监听多个 NioChannel,也就是监听Socket网络通信

      • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上

      • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

    • NioEventLoop 内部采用串行化(Pipeline)设计:责任链模式

      • 消息读取 ==> 解码 ==> 处理(handlers) ==> 编码 ==> 发送,始终由IO线程NioEventLoop 负责

一个Client连接的执行流程

  1. Boss的NioEventLoop 循环执行步骤:

    1. 轮询 accept 事件

    2. 处理 accept 事件:

      • 与client建立连接,生成NioSocketChannel ,并将其注册到某个worker的NIOEventLoop的 selector
    3. 处理任务队列的任务 , 即 runTasks

  2. Worker的NIOEventLoop 循环执行步骤:

    1. 轮询read、write 事件

    2. 在对应NioSocketChannel中,处理业务相关操作(ChannelHandler)

    3. 处理任务队列的任务,即 runTasks

  3. 每个Worker的NioEventLoop 处理业务时会使用管道Pipeline。Pipeline中包含了 Channel,通过管道可以获取到对应Channel,Channel 中维护了很多的Handler处理器。

3.3 核心API

1.ServerBootstrap 和 Bootstrap

  • ServerBootstrap 是 Netty 中的服务端启动助手,通过它可以完成服务端的各种配置;

  • Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。

常用方法:

  • 服务端ServerBootstrap

    • ServerBootstrap group(parentGroup , childGroup), 该方法用于设置两个EventLoopGroup,连接线程组和工作线程组

    • public B channel(Class<? extends C> channelClass),该方法用来设置服务端或客户端通道的实现类型

    • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置

    • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收通道添加配置

    • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义handler)

    • public ChannelFuture bind(int inetPort) ,该方法用于设置占用端口号

  • 客户端Bootstrap

    • public B group(EventLoopGroup group) ,该方法用来设置客户端的 EventLoopGroup

    • public B channel(Class<? extends C> channelClass),该方法用来设置服务端或客户端通道的实现类型

    • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用来配置连接服务端地址信息,host:port

2.EventLoopGroup(Boss\WorkerGroup)

在 Netty 服务端编程中,一般需要提供两个 EventLoopGroup: ①BossEventLoopGroup专门负责接收客户端连接、②WorkerEventLoopGroup专门负责网络读写操作。

  • Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

  • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。

  • EventLoopGroup 本质是一组 EventLoop,池化管理的思想

通常一个服务端口即一个ServerSocketChannel 对应一个Selector 和一个EventLoop 线程,BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO处理。

如下图所示:

  • BossEventLoopGroup 通常是单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例

  • Boss的EventLoop 不断轮询 Selector 将连接事件分离出来,通常是 OP_ACCEPT 事件, 然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup

  • WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的事件进行处理。

常用方法:

  • public NioEventLoopGroup(),构造方法

  • public Future<?> shutdownGracefully(),断开连接,关闭线程

3.ChannelHandler 及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们通过重写这些方法实现业务功能。API 关系如下图所示:

我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法:

  • channelActive(ChannelHandlerContext ctx),通道就绪事件

  • channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件

  • channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件

  • exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

4.ChannelPipeline

ChannelPipeline是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链(责任链模式)。

01-事件传递到ChannelPipeline中的第一个ChannelHandler
02-ChannelHandler使用分配的ChannelHandlerContext将事件传递给ChannelPipeline中的下一个ChannelHander

上图绿线的线串起来的就是Pipeline,它包含3个处理不同业务的ChannelHandler,依次通过这三个ChannelHandler。因为这3个ChannelHandler不知道彼此。所以要用ChannelHandlerContext上下文来说明,ChannelHandlerContext包含ChannelHandler、Channel、pipeline的信息。

  • ChannelPipeline addFirst(ChannelHandler… handlers),把业务处理类(handler)添加到Pipeline链中的第一个位置

  • ChannelPipeline addLast(ChannelHandler… handlers),把业务处理类(handler)添加到Pipeline链中的最后一个位置

5.ChannelHandlerContext

ChannelHandlerContext是事件处理器上下文对象, Pipeline链中的实际处理节点。 每个处理节点

ChannelHandlerContext 中包含一个具体的事件处理器 ChannelHandler , 同时ChannelHandlerContext 中也绑定了对应的 Pipeline 和 Channel 的信息,方便对ChannelHandler进行调用。

常用方法:

  • ChannelFuture close(),关闭通道

  • ChannelOutboundInvoker flush(),刷新

  • ChannelFuture writeAndFlush(Object msg) ,将数据写到ChannelPipeline中当前ChannelHandler 的下一个 ChannelHandler 开始处理(出栈交给下一个handler将继续处理)。

6.ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是Socket 的标准化参数而非 Netty 的独创。

常配参数:

  1. ChannelOption.SO_BACKLOG:用来初始化服务器可连接队列大小,对应 TCP/IP 协议 listen 函数中的 backlog 参数。

    • 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
    • 如果请求连接过多,服务端将不能及时处理,多余连接放在队列中等待,backlog 参数指定了等待队列大小。
  2. ChannelOption.SO_KEEPALIVE ,连接是否一直保持(是否长连接)。

7.ChannelFuture

ChannelFuture表示 Channel 中异步 IO 操作的未来结果,在 Netty 中异步IO操作都是直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 IO 操作的处理状态。Netty异步非阻塞处理事件,如果事件很费时,会通过Future异步处理,不会阻塞。

常用方法:

  • Channel channel(),返回当前正在进行IO操作的通道

  • ChannelFuture sync(),等待异步操作执行完毕

8.Unpooled 类

Unpooled 是 Netty 提供的一个专门用来操作缓冲区的工具类

常用方法:

  • ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

3.4 Netty案例:客户端与服务器之间通信

目标:API 学习完毕后,接下来我们使用 Netty 开发一个网络应用程序,实现服务端和客户端之间的数据通信。

实现步骤:

  1. 导入依赖坐标

  2. 编写Netty服务端程序:配置线程组,配置自定义业务处理类,绑定端口号,然后启动Server,等待Client连接

  3. 编写服务端-业务处理类Handler:继承 ChannelInboundHandlerAdapter,并分别重写了三个方法

    • 读取事件

    • 读取完成事件

    • 异常捕获事件

  4. 编写客户端程序:配置了线程组,配置了自定义的业务处理类,然后启动Client,连接Server。

  5. 编写客户端-业务处理类:继承 ChannelInboundHandlerAdapter ,并分别重写了2个方法

    • 通道就绪事件

    • 读取事件

1.导入依赖

<dependencies>
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.8.Final</version>
    </dependency>
</dependencies>

上述代码在 pom 文件中引入了 netty 的坐标

2.服务端程序Server

package org.example.socket.nio.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //1、创建一个线程组,接收客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //2、创建一个线程组:处理网络操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3、创建服务端启动助手来配置参数
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup,workerGroup)//4、设置两个线程组
                .channel(NioServerSocketChannel.class)//5、使用NioServerSocketChannel作为服务端通道的实现
                .option(ChannelOption.SO_BACKLOG,128)//6、设置线程队列中的等待连接的个数
                .childOption(ChannelOption.SO_KEEPALIVE,true)//7、保持活动连接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {//8、创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //9、王pipeline链中添加自定义的handler类
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("...........服务端启动中 init port:9999");
        ChannelFuture cf = b.bind(9999).sync();//10. 绑定端口 bind方法是异步的sync方法是同步阻塞的
        System.out.println("............服务端   启动成功..........");
        //11、关闭通道,关闭线程组
        cf.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

上述代码编写了一个服务端程序,配置了线程组,配置了自定义业务处理类,绑定端口号,然后启动Server,等待Client连接。

3.服务端-业务处理类ServerHandler

package org.example.socket.nio.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;


//服务端的业务处理类
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    //读取数据事件,msg就客户端发过来的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("Server:"+ctx);
        //用缓冲区接受数据
        ByteBuf buffer = (ByteBuf) msg;
        //转换成字符串
        System.out.println("client msg:"+buffer.toString(CharsetUtil.UTF_8));
    }

    //数据读取完毕事件,读取完客户端数据后回复客户端
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        //Unpooled.copiedBuffer获取到缓冲区
        //第一个参数是向客户端传的字符串
        ctx.writeAndFlush(Unpooled.copiedBuffer("学习Netty",CharsetUtil.UTF_8));
    }
    //异常发生事件
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //异常时关闭ctx,ctx是相关信息的汇总,关闭它其它的也就关闭了。
        ctx.close();
    }
}

上述代码定义了一个服务端业务处理类,继承 ChannelInboundHandlerAdapter,并分别重写了三个方法。

4.客户端程序Client

package org.example.socket.nio.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1、创建一个线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //2、创建客户端的启动助手,完成相关配置
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)//3、设置线程组
                .channel(NioSocketChannel.class)//4、设置客户端通道实现类
                .handler(new ChannelInitializer<SocketChannel>() {//5、创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //6、往Pipeline链中添加自定义的handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        System.out.println("........客户端 准备就绪 msg发射");

        //7、启动客户端去连接服务端 connect方法是异步的 sync方法是同步阻塞的
        ChannelFuture cf = bootstrap.connect("127.0.0.1", 9999).sync();
        //8、关闭连接(异步非阻塞)
        cf.channel().closeFuture().sync();


    }
}

上述代码编写了一个Client程序,配置了线程组,配置了自定义的业务处理类,然后启动Client,连接Server。

5.客户端-业务处理类ClientHandler

package org.example.socket.nio.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutorGroup;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Java工程师", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("server msg:"+buf.toString(CharsetUtil.UTF_8));
    }
}

上述代码自定义了一个Client业务处理类,继承 ChannelInboundHandlerAdapter ,并分别重写了2个方法

3.5 网络聊天室V2.0

先看效果:demo02chat

刚才通过 Netty 实现了一个基础案例,基本了解了 Netty 的 API 和运行流程。接下来,我们在上个案例的基础上,再实现一个Netty版本的多人聊天案例。

实现步骤:

  1. 编写聊天程序服务端:配置线程组,配置编解码器,配置自定义业务处理类,绑定端口号,然后启动Server,等待Client连接

  2. 编写服务端-业务处理类Handler:

    • 当通道就绪时,输出上线

    • 当通道未就绪时,输出离线

    • 当通道发来数据时,读取数据,进行广播

  3. 编写聊天程序客户端:配置了线程组,配置编解码器,配置了自定义的业务处理类,然后启动Client,连接Server。

    • 连接服务端成功后,获取客户端与服务端建立的Channel

    • 获取系统键盘输入,将用户输入信息通过Channel发送给服务端

  4. 编写客户端-业务处理类:

    • 读取事件:监听服务端广播消息

1.聊天服务端程序ChatServer

package org.example.socket.nio.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * 聊天程序服务端netty版本
 */
public class NettyChatServer {
    private int port;//服务端端口号

    public NettyChatServer(int port) {
        this.port = port;
    }

    public void run(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //往pipeline中添加一个解码器
                    pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
                    //往pipeline链中添加一个编码器
                    pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
                    //往pipeline链中添加自定义的handler(业务处理类)
                    pipeline.addLast(new NettyChatServerHandler());
                }
            });
            System.out.println("基于Netty的网络真人聊天室 Server 启动..........");
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("基于Netty的网络真人聊天室 Server 关闭.........");
        }
    }

    public static void main(String[] args) {
        new NettyChatServer(9999).run();
    }

}

上述代码通过 Netty 编写了一个服务端程序。

注意:我往 Pipeline 链中添加了处理字符串的编码器和解码器,它们加入到 Pipeline 链中后会自动工作,使得服务端读写字符串数据时更加方便,不用人工处理 编解码操作。

2.服务端业务处理类ChatServerHandler

package org.example.socket.nio.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * 当通道就绪时,输出上线
 * 当通道未就绪时,输出离线
 * 当通道发来数据时,读取数据,进行广播
 */
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {

    protected static List<Channel> channels = new ArrayList<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel inChannel = ctx.channel();
        channels.add(inChannel);
        System.out.println(" [Server]:"+inChannel.remoteAddress().toString().substring(1)+"上线");
    }
    //通道未就绪
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel inChannel = ctx.channel();
        channels.remove(inChannel);
        System.out.println("[Server]:"+inChannel.remoteAddress().toString().substring(1)+"离线");
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        Channel inChannel = ctx.channel();
        System.out.println("s = "+s);
        for (Channel channel:channels) {
            if(channel!=inChannel){
                channel.writeAndFlush("["+inChannel.remoteAddress().toString().substring(1)+"]"+"说:"+s+"\n");
            }
        }
    }


}

上述代码通过继承 SimpleChannelInboundHandler 类自定义了一个服务端业务处理类,并在该类中重写了四个方法。

  • 当通道就绪时,输出上线

  • 当通道未就绪时,输出离线

  • 当通道发来数据时,读取数据,进行广播

3.聊天程序客户端ChatClient

package org.example.socket.nio.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Scanner;

//聊天程序客户端
public class NettyChatClient {
    private final String host; //服务端IP地址
    private final int port; //服务端端口号
    public NettyChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void run(){
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //往pipeline链中添加一个解码器
                    pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
                    //往pipeline链中添加一个编码器
                    pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
                    //往pipeline链中添加自定义的handler 业务处理类
                    pipeline.addLast(new NettyChatChatClientHandler());
                }
            });

            ChannelFuture cf = bootstrap.connect(host,port).sync();
            Channel channel = cf.channel();
            System.out.println("------ "+channel.localAddress().toString().substring(1)+"------");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg+"\r\n");
            }
            cf.channel().closeFuture().sync();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyChatClient("192.168.0.46",9999).run();
    }
}

上述代码通过 Netty 编写了一个客户端程序。客户端同样需要配置编解码器

4.客户端业务处理类ChatClientHandler

package org.example.socket.nio.chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

//自定义一个客户端业务处理类
public class NettyChatChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

上述代码通过继承 SimpleChannelInboundHandler 自定义了一个客户端业务处理类,重写了一个方法用来读取服务端发过来的数据。

3.6 编码和解码

  • 为什么要编解码呢?因为计算机数据传输的是二进制的字节数据

  • 解码:字节数据 –> 字符串(字符数据)

  • 编码 :字符串(字符数据)–> 字节数据

1.概述

我们在编写网络应用程序的时候需要注意 codec (编解码器),因为数据在网络中传输的都是二进制字节码数据,而我们拿到的目标数据往往不是字节码数据。因此在发送数据时就 需要编码,接收数据时就需要解码。

codec 的组成部分有两个:decoder(解码器)和 encoder(编码器)。

  • encoder 负责把业务数据转换成字节码数据
  • decoder 负责把字节码数据转换成业务数据

其实 Java 的序列化技术就可以作为 codec 去使用,但是它的硬伤太多:

  1. 无法跨语言,这应该是 Java 序列化最致命的问题了

  2. 序列化后的体积太大,是二进制编码的 5 倍多

  3. 序列化性能太低

Netty 自身提供了一些 编解码器,如下:

  • StringEncoder对字符串数据进行编码

  • ObjectEncoder对 Java 对象进行编码

Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,但其内部使用的仍是 Java 序列化技术,所以在某些场景下不适用。对于 POJO 对象或各种业务对象要实现编码和解码,我们需要更高效更强的技术。

2.Google 的 Protobuf

Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,特点如下:

  • 支持跨平台、多语言(支持目前大多数语言,例如 C++、C#、Java、python 等)

  • 高性能,高可靠性

  • 使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述,然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件

在使用 Netty 开发时,经常会结合 Protobuf 作为 codec (编解码器)去使用,具体用法如下所示。

使用步骤:

  1. 第一步:将传递数据的实体类生成【基于构建者模式设计】

  2. 第二步:配置编解码器

  3. 第三步:传递数据使用生成后的实体类

3.导入 protobuf 依赖

在 pom 导入 protobuf 的坐标

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.6.1</version>
</dependency>

4.proto 文件

假设我们要处理的数据是图书信息,那就需要为此编写 proto 文件

syntax = "proto3";
    option java_outer_classname = "BookMessage";
    message Book{
        int32 id = 1;
        string name = 2;
    }

5.生成 Java 类

通过 protoc.exe 根据描述文件生成 Java 类

cd C:\protoc-3.6.1-win32\bin

执行以下命令:

protoc --java_out=. Book.proto

会生成BookMessage.java

把生成的 BookMessage.java 拷贝到项目中

6.客户端

package org.example.socket.nio.encoder;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.example.socket.nio.netty.NettyClientHandler;

public class NettyEncoderDecoderClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast("encoder",new ProtobufEncoder());
                socketChannel.pipeline().addLast(new NettyEncoderDecoderClientHandler());
            }
        });
        // 启动客户端
        ChannelFuture cf = bootstrap.connect("127.0.0.1", 9998).sync();
        // 5等待连接关闭
        cf.channel().closeFuture().sync();
    }
}

上述代码在编写客户端程序时,要向 Pipeline 链中添加 ProtobufEncoder 编码器对象。

7.客户端业务类

package org.example.socket.nio.encoder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyEncoderDecoderClientHandler extends
        ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        BookMessage.Book book= BookMessage.Book.newBuilder().setId(1).setName("天王盖地虎").build();
        ctx.writeAndFlush(book);
    }
}

上述代码在往服务端发送图书(POJO)时就可以使用生成的 BookMessage 类搞定,非常方便

8.服务端

package org.example.socket.nio.encoder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.example.socket.nio.netty.NettyServerHandler;

public class NettyEncoderDecoderServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup pgroup = new NioEventLoopGroup();//线程组,用来处理网络事件处理(接受客户端连接)
        EventLoopGroup cgroup = new NioEventLoopGroup();//线程组,用来处理网络事件处理(接受客户端连接)
        ServerBootstrap b = new ServerBootstrap();
        b.group(pgroup,cgroup).channel(NioServerSocketChannel.class)//注册服务端channel
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("decoder",new ProtobufDecoder(BookMessage.Book.getDefaultInstance()));
                        socketChannel.pipeline().addLast(new NettyEncoderDecoderServerHandler());
                    }
                });
        ChannelFuture cf = b.bind(9998).sync();
        System.out.println(".........Server is Starting");
        cf.channel().closeFuture().sync();
        pgroup.shutdownGracefully();
        cgroup.shutdownGracefully();

    }
}

上述代码在编写服务端程序时,要向 Pipeline 链中添加 ProtobufDecoder 解码器对象。

9.服务端业务类

package org.example.socket.nio.encoder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyEncoderDecoderServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        BookMessage.Book book = (BookMessage.Book) msg;
        System.out.println("客户端 msg:"+book.getName()+"   ID:"+book.getId());
    }
}

上述代码在服务端接收数据时,直接就可以把数据转换成 POJO 使用,很方便。


文章作者: Gustavo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC 4.0 许可协议。转载请注明来源 Gustavo !
评论
  目录