EventGroup(事件组)

TCP是通过两个事件组来管理 线程数以及事件的,都是通过EventGroup这个类创建,一个叫ioEventGroup,一个叫workerEventGroup, 每个事件组里都有若干个EventRunner负责执行事件。 每个EventRunner 都各自对应一条任务链。 当自己的任务链空了以后,可以去其他EventRunner的任务链窃取任务。

ioEventGroup用来管理Selector,一个端口对应一个EventRunner里的一个事件,通常要监听几个端口 就设置几个EventRunner

workerEventGroup用来管理具体的事件,一个连接对应一个EventRunner,一个EventRunner对应多个连接,通常根据CPU核心数来合理的设置EventRunner的数量

整体模型如下图所示:

avatar

所以,在使用的时候 我们需要创建两个事件组,一个ioEventGroup,一个workerEventGroup

创建ioEventGroup

  • 第一个参数是允许监听几个端口,第二个参数是线程池
  • 一般 将第一个参数与线程池的核心线程数保持一致即可
EventGroup ioEventGroup = new EventGroup(2, Executors.newCachedThreadPool());

创建workerEventGroup

  • 第一个参数是同时有几个EventRunner在消费IO事件,第二个参数是线程池
  • 一般 将第一个参数与线程池的核心线程数保持一致即可
  • 第一个参数建议根据CPU的核心数合理设置,在服务器资源能撑得住的前提下越大越好
EventGroup workerEventGroup = new EventGroup(2, Executors.newCachedThreadPool());

// 当EventRunner里没任务的时候,是否允许去其他EventRunner里窃取任务,默认YES
workerEventGroup.setSteal(EventEnum.STEAL.YES);

创建服务

EventGroup ioEventGroup = new EventGroup(2, Executors.newCachedThreadPool());
EventGroup workerEventGroup = new EventGroup(10, Executors.newCachedThreadPool());

// 当前EventRunner没任务的时候,允许从其他EventRunner窃取任务
workerEventGroup.setSteal(EventEnum.STEAL.YES);

TCPServer tcpServer = Magician
            .createTCPServer(ioEventGroup, workerEventGroup) // 创建一个TCP服务
            .protocolCodec(new HttpProtocolCodec(tcpServerConfig)) // 设置http解码器,这句可以省略,默认就是http
            .httpHandler("/", new DemoHandler()) // 设置一个handler,用来处理业务逻辑

tcpServer.bind(8080);
tcpServer.bind(8088);

其他配置

在实际场景下,肯离不开个性化配置

TCPServerConfig tcpServerConfig = new TCPServerConfig();
tcpServerConfig.setFileSizeMax(1024*1024*1024); //单个文件限制大小
tcpServerConfig.setSizeMax(1024*1024*1024); // 所有文件限制大小
tcpServerConfig.setReadSize(1024); // 缓冲区大小

扩展协议解析

目前 只内置了http和websocket解码器,所以暂时只支持这两个协议,如果想扩展协议的的话,可以这么做

实现ProtocolCodec

public class exProtocol implements ProtocolCodec<Object> {

    /**
     * 解码
     * @param worker
     * @return
     * @throws Exception
     */
    Object codecData(Worker worker) throws Exception{
        // 获取协议报文
        ByteArrayOutputStream outputStream = worker.getOutputStream();
        if(outputStream == null || outputStream.size() < 1){
            return null;
        }

        // 按照报文格式 自行解析
        // 在解析的过程中判断报文是否完整
        // 如果不完整就返回null,一定要返回null
        // 如果完整的话,返回你希望丢给handler的数据

        // 在获取完整报文后必须这样,这是用来将已使用的报文去除的
        worker.skipOutputStream(本报文的字节长度);
    }

    /**
     * 解码完毕后,如果是一个完整的报文,则自动调用这个方法执行handler
     * @param t
     * @throws Exception
     */
    void handler(Object t) throws Exception{
        // 如果codecData方法 返回的数据不是null,则会自动执行这个方法
        // 这个方法的参数就是codecData方法的返回值
        // 在这个方法里调用handler即可
    }
}

results matching ""

    No results matching ""