广告位联系
返回顶部
分享到

Netty事件循环主逻辑NioEventLoop的run方法

java 来源:互联网 作者:秩名 发布时间:2022-03-24 22:47:40 人浏览
摘要

Netty事件循环主逻辑 Netty 事件循环主逻辑在NioEventLoop.run 中的 processSelectedKeys函数中 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 4

Netty事件循环主逻辑

Netty 事件循环主逻辑在 NioEventLoop.run 中的 processSelectedKeys函数中

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

protected void run() {

      //主循环不断读取IO事件和task,因为 EventLoop 也是 juc 的 ScheduledExecutorService 实现

        for (;;) {

            try {

                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                    case SelectStrategy.CONTINUE:

                        continue;

                    case SelectStrategy.SELECT:

                        select(wakenUp.getAndSet(false));

 

                         

 

                        if (wakenUp.get()) {

                            selector.wakeup();

                        }

                        // fall through

                    default:

                }

 

                cancelledKeys = 0;

                needsToSelectAgain = false;

            // IO事件占总执行时间的百分比 */

                final int ioRatio = this.ioRatio;

                if (ioRatio == 100) {

                    try {

                        processSelectedKeys();

                    } finally {

                        // Ensure we always run tasks.

                        runAllTasks();

                    }

                } else {

                    final long ioStartTime = System.nanoTime();

                    try {

                        processSelectedKeys();

                    } finally {

                        // Ensure we always run tasks.

                        final long ioTime = System.nanoTime() - ioStartTime;

                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                    }

                }

            } catch (Throwable t) {

                handleLoopException(t);

            }

            // Always handle shutdown even if the loop processing threw an exception.

            try {

                if (isShuttingDown()) {

                    closeAll();

                    if (confirmShutdown()) {

                        return;

                    }

                }

            } catch (Throwable t) {

                handleLoopException(t);

            }

        }

    }

processSelectedKeys 函数 执行时会判断是否执行优化的版本,即判断 SelectedSelectionKeySet 是否为空。

是否开启优化取决于是否设置了环境变量  io.netty.noKeySetOptimization ,默认是 false 代表开启

1

2

private static final boolean DISABLE_KEYSET_OPTIMIZATION =

            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

原理是通过反射的方式设置 eventLoop绑定的selector中的 selectKeys属性 为 SelectedSelectionKeySet ,好处是不用 迭代  selector.selectedKeys()

初始化 EventLoop

注入时机为初始化 EventLoop 的时候

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

private SelectorTuple openSelector() {

        12      //注入逻辑40

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {

            @Override

            public Object run() {

                try {

                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

 

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);

                    if (cause != null) {

                        return cause;

                    }

                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);

                    if (cause != null) {

                        return cause;

                    }

 

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);

                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

                    return null;

                } catch (NoSuchFieldException e) {

                    return e;

                } catch (IllegalAccessException e) {

                    return e;

                }

            }

        });

 

        ........78     }

处理读事件

处理读事件主要在processSelectedKey 中 ,分别对 读、写、连接事件进行了处理。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

private void processSelectedKeysOptimized() {

        for (int i = 0; i < selectedKeys.size; ++i) {

            final SelectionKey k = selectedKeys.keys[i];

            // null out entry in the array to allow to have it GC'ed once the Channel close

            // See https://github.com/netty/netty/issues/2363

            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {

                //分别处理每个channel的事件

                processSelectedKey(k, (AbstractNioChannel) a);

            } else {

                @SuppressWarnings("unchecked")

                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

                processSelectedKey(k, task);

            }

            if (needsToSelectAgain) {

                // null out entries in the array to allow to have it GC'ed once the Channel close

                // See https://github.com/netty/netty/issues/2363

                selectedKeys.reset(i + 1);

                selectAgain();

                i = -1;

            }

        }

    }

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

        if (!k.isValid()) {

            final EventLoop eventLoop;

            try {

                eventLoop = ch.eventLoop();

            } catch (Throwable ignored) {

                // If the channel implementation throws an exception because there is no event loop, we ignore this

                // because we are only trying to determine if ch is registered to this event loop and thus has authority

                // to close ch.

                return;

            }

            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop

            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is

            // still healthy and should not be closed.

            // See https://github.com/netty/netty/issues/5125

            if (eventLoop != this || eventLoop == null) {

                return;

            }

            // close the channel if the key is not valid anymore

            unsafe.close(unsafe.voidPromise());

            return;

        }

        try {

            int readyOps = k.readyOps();

            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise

            // the NIO JDK channel implementation may throw a NotYetConnectedException.

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking

                // See https://github.com/netty/netty/issues/924

                int ops = k.interestOps();

                ops &= ~SelectionKey.OP_CONNECT;

                k.interestOps(ops);

          //处理了连接事件

                unsafe.finishConnect();

            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {

                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write

1

2

3

4

5

6

7

8

9

10

11

12

13

14

//将要写入的buffer flush掉

         ch.unsafe().forceFlush();

           }

 

           // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead

           // to a spin loop

           if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

          //回调 pipeline 上所有的 ChannelInboundHandler 的 fireChannelRead  和 channelReadComplete 函数

               unsafe.read();

           }

       } catch (CancelledKeyException ignored) {

           unsafe.close(unsafe.voidPromise());

       }

   }

注意 

NioServerSocketChannel 和 NioSocketChannel 都是 同样的 处理逻辑, 不同的是  前者 只关注 OP_ACCEPT 和 OP_READ事件, 后者 关注  OP_READ、OP_WRITE、OP_CONNECT事件

当NioServerSocketChannel 发生 OP_ACCEPT事件时 会 触发

 AbstractNioChannel.NioUnsafe.read ->  NioSctpServerChannel.doReadMessages(List<Object>)  -> ServerBootstrapAcceptor.channelRead ,

将受到的 NioSocketChannel 注册到 childEventLoop 。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://www.cnblogs.com/ironroot/p/8575134.html
相关文章
  • SpringBoot自定义错误处理逻辑介绍

    SpringBoot自定义错误处理逻辑介绍
    1. 自定义错误页面 将自定义错误页面放在 templates 的 error 文件夹下,SpringBoot 精确匹配错误信息,使用 4xx.html 或者 5xx.html 页面可以打印错误
  • Java实现手写一个线程池的代码

    Java实现手写一个线程池的代码
    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和
  • Java实现断点续传功能的代码

    Java实现断点续传功能的代码
    题目实现:网络资源的断点续传功能。 二、解题思路 获取要下载的资源网址 显示网络资源的大小 上次读取到的字节位置以及未读取的字节
  • 你可知HashMap为什么是线程不安全的
    HashMap 的线程不安全 HashMap 的线程不安全主要体现在下面两个方面 在 jdk 1.7 中,当并发执行扩容操作时会造成环形链和数据丢失的情况 在
  • ArrayList的动态扩容机制的介绍

    ArrayList的动态扩容机制的介绍
    对于 ArrayList 的动态扩容机制想必大家都听说过,之前的文章中也谈到过,不过由于时间久远,早已忘却。 所以利用这篇文章做做笔记,加
  • JVM基础之字节码的增强技术介绍

    JVM基础之字节码的增强技术介绍
    字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础。字节码增强技术就是一类对现有字
  • Java中的字节码增强技术

    Java中的字节码增强技术
    1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术。 参考地址 2.常见技术 技术分类 类
  • Redis BloomFilter布隆过滤器原理与实现

    Redis BloomFilter布隆过滤器原理与实现
    Bloom Filter 概念 布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射
  • Java C++算法题解leetcode801使序列递增的最小交换次

    Java C++算法题解leetcode801使序列递增的最小交换次
    题目要求 思路:状态机DP 实现一:状态机 Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Solution { public int minSwap(int[] nums1, int[] nums2) { int n
  • Mybatis结果集映射与生命周期介绍

    Mybatis结果集映射与生命周期介绍
    一、ResultMap结果集映射 1、设计思想 对简单的语句做到零配置,对于复杂一点的语句,只需要描述语句之间的关系就行了 2、resultMap的应用场
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计