IO模型

Unix与java的I/O模型

Unix 下共有五种 I/O 模型:阻塞 I/O、非阻塞 I/O、I/O 多路复用(select、poll、epoll)信号驱动 I/O(SIGIO)和异步 I/O(Posix.1的aio_系列函数),而java除了其中的信号驱动式之外,其他均有支持;

输入操作的两个阶段

理解I/O模型,首先要理解一个输入操作所必须包含的2个阶段:

  1. 等待数据准备好;
  2. 从内核向进程复制数据;

对于套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待的分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。

IO模型详解

1. 阻塞式IO模型

进程调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲中或者发生错误才返回。这就是阻塞式IO模型的微观图示。

针对阻塞IO模型的传统服务设计则如上图,服务器对每个client连接都会启动一个专门的线程去维护,服务器中的逻辑Handler需要在各自的线程中执行,这种模型对线程的需求较多,面对高并发的场景,会造成CPU资源浪费;原来的tomcat就是这种模式,只不过现在也支持NIO了。

常见写法(服务端):
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @program test
 * @description: bio
 * @author: cys
 * @create: 2020/06/30 16:20
 */
public class BIOServer {

    //线程池机制
    //1. 创建一个线程池
    //2. 如果有客户端连接了,创建一个线程与之通讯(单独写一个方法)
    public static void main(String[] args) throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while (true) {
            //监听,等待客户端连接
            final Socket socket = serverSocket.accept();
            System.out.println("有客户端连接");
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    private static void handler(Socket socket) {
        byte[] bytes = new byte[1024];
        try (InputStream inputStream = socket.getInputStream();) {
            int read = 0;
            while ((read = inputStream.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, read));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

2.非阻塞IO

img

  • 前三次调用时,数据未准备好,内核就会立即返回一个错误码(EWOULDBLOCK),此时请求线程不会被阻塞;
  • 第四次时,数据准备好,它被复制到应用进程缓冲区,recvfrom成功返回。
  • 由此可见:请求线程将不断请求内核,查看数据是否准备好。这种轮询操作,一样会消耗大量的CPU资源,所以在java的实现中,会采用同时支持I/O复用的方式支持非阻塞。
  • 3.I/O复用模型

    img

    如图I/O复用模型将阻塞点由真正的I/O系统调用转移到了对select、poll或者epoll系统函数的调用上。单纯看这个微观图,有可能还会觉得与阻塞I/O区别不大,甚至于我们多增加了I/O调用还增加了性能损耗。其实不然,使用select以后最大的优势是用户可以在一个线程内同时处理多个连接的I/O请求

    java NIO实现一个聊天的功能(服务端)
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    
    /**
     * @program test
     * @description:
     * @author: cys
     * @create: 2020/07/02 10:37
     */
    public class GroupChatServer {
    
        private Selector selector;
        private ServerSocketChannel listenChannel;
        private static final int port = 6667;
    
        public GroupChatServer() {
            try {
                selector = Selector.open();
                listenChannel = ServerSocketChannel.open();
                listenChannel.socket().bind(new InetSocketAddress(port));
                listenChannel.configureBlocking(false);
                listenChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        public void listen() {
            try {
                while (true) {
                    if (selector.select(2000) > 0) {
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            if (key.isAcceptable()) {//连接事件
                                SocketChannel socketChannel = listenChannel.accept();
                                socketChannel.configureBlocking(false);
                                socketChannel.register(selector, SelectionKey.OP_READ);
                            }
                            if (key.isReadable()) {//读取事件,即通道可读了
                                read(key);
                            }
                            iterator.remove();
                        }
                    } else {
                        System.out.println("等待。。。。");
                    }
                }
            } catch (IOException e) {
    
            } finally {
    
            }
        }
    
        //读取客户端消息
        private void read(SelectionKey selectionKey) {
    
            SocketChannel socketChannel = null;
            try {
                //定义一个SocketChannel
                socketChannel = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = socketChannel.read(buffer);
                if (count > 0) {//读取到了数据
                    String str = new String(buffer.array());
                    System.out.println("from 客户端" + str);
                    //向其他的客户端转发消息
                    sendInfoToOtherClients(str, socketChannel);
                }
    
            } catch (IOException e) {
                try {
                    System.out.println(socketChannel.getRemoteAddress() + "离线了");
                    selectionKey.cancel();
                    socketChannel.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            } finally {
            }
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
            System.out.println("服务器转发消息中");
            //遍历所有注册到Selector上的channel,并排除self
            for (SelectionKey key : selector.keys()) {
                Channel targetChannel = key.channel();
                if (targetChannel instanceof SocketChannel && targetChannel != self) {
                    SocketChannel desc = (SocketChannel) targetChannel;
                    //将msg存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    desc.write(buffer);
                }
            }
        }
    
        public static void main(String[] args) {
            new GroupChatServer().listen();
        }
    }
    

    信号驱动式I/O模型

    img

    如图,我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动式I/O(signal-driven I/O).

    5.异步I/O模型

    img

    它由POSIX规范定义,工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。与信号驱动式I/O不同的是:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

    区别

    img

    如图,前四种模型都属于同步I/O模型,因为其中真正的 I/O操作将阻塞进程。而异步I/O是完全不会阻塞请求I/O的。目前 Windows 下通过 IOCP 实现了真正的异步 I/O。而在 Linux 系统下,目前 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 IO 复用模型模式为主。

    参考: ​ https://cloud.tencent.com/developer/article/1677776

    2021-05-23 21:50

    Netty知识点

    Netty简介

    Netty编程模型

    深入了解Netty底层原理

    I/O模型

  • 阻塞式_I/O

    应用进程被阻塞,直到数据复制到应用进程缓冲区中才返回。

  • 非阻塞式_I/O(NIO)

    应用进程执行系统调用之后,内核返回一个错误码。应用进程可以继续执行,但是需要不断的执行系统调用来获知 I/O 是否完成,这种方式称为轮询(polling)。

  • I/O_多路复用(IO_multiplexing)

    IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

  • select
  • poll
  • epoll
  • LT模式(level_trigger)
  • ET模式(edge_trigger)
  • 信号驱动式_I/O(SIGIO)

    应用进程使用 sigaction 系统调用,内核立即返回,应用进程可以继续执行,也就是说等待数据阶段应用进程是非阻塞的。内核在数据到达时向应用进程发送 SIGIO 信号,应用进程收到之后在信号处理程序中调用 recvfrom 将数据从内核复制到应用进程中。

  • 异步_I/O(AIO)

    进行 aio_read 系统调用会立即返回,应用进程继续执行,不会被阻塞,内核会在所有操作完成之后向应用进程发送信号。

  • Reactor模型

    相关教程

    更多

    Hadoop core之IO

    包:org.apache.Hadoop.io AbstractMapWritable:抽象类,是MapWritable、SortedMapWritable的基类。提供了序列化和反序列化的能力。ClassID和Class互为键值对,其中除去基本的类,如ArrayWritable、BooleanWritable、BytesWritable等基本类型,ID从-127开始,ID最大值为byte.Max_v

    关于java io流关闭的问题

    RT,我想问的是,如果不关闭流,会出现什么样不同的情况?  希望大家能多多的给出不同的答案,很想知道!  我个人的分数真的很少,所以不好意思。但是我希望达人们 能一起交流交流  问题补充:  达人们很多啊。看到答案,真的很感谢!

    Java 流(Stream)、文件(File)和IO

    Java 流(Stream)、文件(File)和IO    Java.io包几乎包含了所有操作输入、输出需要的类。所有这些流类代表了输入源和输出目标。  Java.io包中的流支持很多种格式,比如:基本类型、对象、本地化字符集等等。  一个流可以理解为一个数据的序列。输入流表示从一个源读取数据,输出流

    论 NoSQL 的数据模型

    本文内容是对《NoSQL Data Modeling Techniques》一文的简单概述,原文对NoSQL的几种数据模型进行了详细深入的讨论。是了解NoSQL数据模型不过错过的全面资料。 NoSQL的一些非功能性的特性,比如扩展性、性能以及一致性的讨论,目前已经有很多。而对于NoSQL产品内部数据模型相关的知识一直比较欠缺,本文就希望能够系统地对NoSQL数据模型进行一些探讨。 我们大致先将No

    Hadoop系列-IPC模型

    IPC         实现RPC的一种方法,具有快速、简单的特点。 它不像Sun公司提供的标准RPC包,基于Java序列化。  IPC无需创建网络stubs和skeletons。  IPC中的方法调用要求参数和返回值的数据类型必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。接口方法应该只抛出IOException异常。      使用模型

    css盒子模型

    盒子模型解决页面的布局问题 块级标签: 占的是一行.  行内标签: 占行内的一部分. 不能嵌套 块级标签. 块级: div p ol   行内: span font a 示例: <html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8&quo

    iotop命令实时监听磁盘IO

    如果你知道有程序在磨你的硬盘,但是你又不能确定是哪一个程序在磨你的硬盘,那么就用 iotop来帮助你吧。在Ubuntu里安装命令是: sudo apt-get install iotop,安装好之后在终端输入:iotop就可以了

    Jackson树模型JsonNode

    
                                

    Java:IO/NIO篇,读写属性文件(properties)

    1. 描述  尝试用多种方法读取属性文件。   测试打印系统属性; 测试读取、写入用户属性文件; 测试读取类库中的属性文件。  2. 示范代码    package com.clzhang.sample.io;import java.io.*;   import java.util.*; import org.junit.Test;/** * 属性文件测试类, * 1.测试打印系统属性; * 2.

    Mapreduce 读取Hbase,写入hbase IO 不均衡问题

    硬件环境:h46、h47、h48 三个节点 2cpu 4核 共8个核心 14G 内存 软件环境: 三台机器分别部署Hadoop、hbase 并同时作为datanode 和 tasktracker regionserver、HQuorumPeer; H46同时为Namenode、Jobtracker 和HMaster 和HQuorumPeer 出现问题:跑mapreduce 时使用 iostat 1

    SSH中使用了模型驱动

    我在使用了模型驱动,比如登陆操作,我写了一个POJO:  public class Login {  private int id;  private String loginName;  private String passWord;  private String role;  。。。。  而在页面中使用  <table width="100%" height=&q

    如何构建高效的storm计算模型

    计算机制简介 Storm采用流式计算的模型,和shell类似让数据在一个个“管道”中进行处理。   Spout负责从数据源拉取数据,相当于整个系统的生产者。 Bolt负责消费数据并将tuple发送给下一个计算单元。Bolt可以接受多个spout和bolt的数据。 每个spout,bolt可以设置并行度excuter相当于多进程,每个excuter可以设置多个task  shuffle groupi

    Hadoop权威指南学习(二)——HDFS & Hadoop IO

    HDFS设计: 以流式数据访问模式来存储超大文件,“一次写入,多次读取”; HDFS为高数据吞吐量应用优化的,低延迟的方位需求应选择HBase; 文件系统的元数据存储在namenode的内存中,所能存储的文件总数受限于内存容量; HDFS的块(block)默认为64M(块大的目的为了最小化寻址开销,从磁盘传输时间可明显大于定位时间),以块存储而非文件可简化存储系统的设计 HDFS只是Hadoop文

    Storm数据流模型的分析及讨论

    转自:http://www.cnblogs.com/panfeng412/archive/2012/07/29/storm-stream-model-analysis-and-discussion.html  Storm数据流模型的分析及讨论       本文首先介绍了Storm的基本概念和数据流模型,然后结合一个典型应用场景来说明Storm支持Topology之间数据流订阅的必要性,最后对比了S

    基于Hadoop云模型及相关云技术研究

      云计算关键技术   云计算是由企业界开始发展,然后才进入学术界引起重视的,这与网格计算相反。经过对迄今为止的云计算相关学术论文进行统计分析后,显示学术界对于云计算的研究主要集中在云技术关键技术方面。云计算研究的关键技术包括虚拟机、安全管理、数据管理、云监测、能耗管理和计算模型等。云计算的计算模型是研究如何针对某类应用特点提出效率更高的编程方式,目前云计算模型众多,而Hadoop是一个开源的分布

    最新教程

    更多

    java线程状态详解(6种)

    java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

    redis从库只读设置-redis集群管理

    默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

    Netty环境配置

    netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

    Netty基于流的传输处理

    ​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

    Netty入门实例-使用POJO代替ByteBuf

    使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

    Netty入门实例-时间服务器

    Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

    Netty入门实例-编写服务器端程序

    channelRead()处理程序方法实现如下

    Netty开发环境配置

    最新版本的Netty 4.x和JDK 1.6及更高版本

    电商平台数据库设计

    电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

    HttpClient 上传文件

    我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

    MongoDB常用命令

    查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

    快速了解MongoDB【基本概念与体系结构】

    什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

    windows系统安装MongoDB

    安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

    Spring boot整合MyBatis-Plus 之二:增删改查

    基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
    import com.baomidou.mybatisplus.annotations.TableName;
    import com.baom

    分布式ID生成器【snowflake雪花算法】

    基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案