当前位置:网站首页>NIO之Channel详解
NIO之Channel详解
2022-07-21 05:03:00 【普通人zzz~】
FileChannel
一、通道(channel)介绍
Channel是一个对象,作用是用于源节点和目标节点的连接,在java NIO中负责缓冲区数据的传递。Channel本身不存储数据,因此需要配合缓冲区进行传输。
二、主要实现类
主要的实现类有如下四个: FileChannel, SocketChannel, ServerSocketChannel, DatagramChannel
,
三、获取通道
- Java针对支持通道的类提供了getChannel()方法
本地IO | 网络IO |
---|---|
FileInputStream/FileOutputStream | Socket |
RandomAccessFile | ServerSocket |
无 | DatagramSocket |
- 在JDK1.7中的NIO.2针对各个通道提供了静态方法open()
- 在JDK1.7中的NIO.2的Files工具类的newByteChannel()
public static void main(String[] args) throws Exception {
// 1本地IO获取通道
FileInputStream inputStream = new FileInputStream(resourceFilePath);
FileOutputStream outputStream = new FileOutputStream(targetFilePath);
FileChannel inChannel = inputStream.getChannel();
FileChannel outChannel = outputStream.getChannel();
// 2.通过open方法获取
FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
FileChannel.open(Paths.get(targetFilePath),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
}
四、案例
4.1 FileChannel-文件复制
public class FileNioTest {
public static void main(String[] args) {
String resourceFilePath = "D:\\test\\copyFileTest.txt";
String targetFilePath = "D:\\test\\copyFileTest-1.txt";
// 复制文件
// copyFile1(resourceFilePath, targetFilePath);
// copyFile2(resourceFilePath, targetFilePath);
copyFile3(resourceFilePath, targetFilePath);
}
private static void copyFile1(String resourceFilePath, String targetFilePath) {
// 使用FileChannel配合缓冲区实现文件复制的功能
try (
FileInputStream inputStream = new FileInputStream(resourceFilePath);
FileOutputStream outputStream = new FileOutputStream(targetFilePath);
FileChannel inChannel = inputStream.getChannel();
FileChannel outChannel = outputStream.getChannel();
) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) != -1) {
// 切换读模式
buffer.flip();
outChannel.write(buffer);
// 清空
buffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void copyFile2(String resourceFilePath, String targetFilePath) {
// 内存映射文件的方式实现文件复制
try (
FileChannel inChannel = FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get(targetFilePath),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
) {
MappedByteBuffer inMap = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMap = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
byte[] b = new byte[inMap.limit()];
// 从磁盘文件中获取数据写入到b字节数组中
inMap.get(b);
// 将b字节数组中的数据写入到磁盘文件中
outMap.put(b);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void copyFile3(String resourceFilePath, String targetFilePath) {
// Channel-to-channel方式实现复制:零拷贝方式
try (
FileChannel inChannel = FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get(targetFilePath),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
) {
// inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.2 SocketChannel、ServerSocketChannel
案例:通过NIO实现多人聊天室
4.1 服务端Server
public class Server {
public void start() throws IOException {
// 创建 selector
Selector selector = Selector.open();
// 创建ServerSocketChannel,并绑定监听端口
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
// 将Channel设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 将Channel注册到Selector上,监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("-------------服务端启动成功---------------");
while (true) {
// 循环调用Selector的select方法,检测就绪情况
int i = selector.select();
if (i == 0) {
continue;
}
// 调用selectedKeys方法获取就绪channel集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 判断就绪事件种类,调用业务处理方法
if (selectionKey.isAcceptable()) {
acceptable(serverSocketChannel, selector);
} else if (selectionKey.isWritable()) {
System.out.println("----------Writable");
} else if (selectionKey.isReadable()) {
readable(selectionKey, selector);
} else if (selectionKey.isConnectable()) {
System.out.println("----------Connectable");
}
}
}
}
/** * 可读事件处理方法 * * @param selectionKey * @param selector * @throws IOException */
private void readable(SelectionKey selectionKey, Selector selector) throws IOException {
// 从 selectionKey中获取已经就绪的channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建 byteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuffer sb = new StringBuffer();
// 获取客户端的消息
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
sb.append(Charset.forName("utf-8").decode(byteBuffer));
byteBuffer.clear();
}
System.out.println(sb.toString());
// 将channel再次注册到selector上,监听它的可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
if (sb.toString().length() != 0) {
// 广播给其他客户端
broadCast(selector, socketChannel, sb.toString());
}
}
/** * 处理连接事件 * * @param serverSocketChannel * @param selector * @throws IOException */
private void acceptable(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
// 获取要连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 将Channel设置为非阻塞模式
socketChannel.configureBlocking(false);
// 将Channel注册到Selector上,监听连接事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 返回消息给客户端
socketChannel.write(Charset.forName("utf-8").encode("建立连接成功!"));
}
/** * 广播给其他客户端 * * @param selector * @param socketChannel 当前客户端对象 * @param sb 消息 */
private void broadCast(Selector selector, SocketChannel socketChannel, String sb) {
// 获取所有连接的 SelectionKey
Set<SelectionKey> keys = selector.keys();
keys.forEach(key -> {
Channel channel = key.channel();
// 剔除当前客户端
if (channel instanceof SocketChannel && channel != socketChannel) {
try {
// 发送消息
((SocketChannel) channel).write(Charset.forName("utf-8").encode(sb));
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws IOException {
new Server().start();
}
}
4.2 客户端Client
public class ClientA {
private String name;
public ClientA(String name) {
this.name = name;
}
public void start() throws IOException {
// 连接服务器
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 9999)
);
// 接收服务端响应消息
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
ClientThread thread = new ClientThread();
thread.setSelector(selector);
new Thread(thread).start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String sb = scanner.nextLine();
socketChannel.write(Charset.forName("utf-8").encode(this.name + ":" + sb.trim()));
}
}
public static void main(String[] args) throws IOException {
new ClientA("普通人").start();
}
}
ClientThread
public class ClientThread implements Runnable {
private Selector selector;
public void setSelector(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true){
int i = selector.select();
if (i == 0){
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()){
readable(selectionKey);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/** * 可读事件处理方法 * @param selectionKey * @throws IOException */
private void readable(SelectionKey selectionKey) throws IOException {
// 从 selectionKey中获取已经就绪的channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建 byteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuffer sb = new StringBuffer();
// 获取客户端的消息
while (socketChannel.read(byteBuffer) > 0){
byteBuffer.flip();
sb.append(Charset.forName("utf-8").decode(byteBuffer));
}
// 将channel再次注册到selector上,监听它的可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(sb.toString());
}
}
4.3 DatagramChannel
SocketChannel 创建的是 TCP 连接,DatagramChannel 则创建的是 UDP 连接。
创建 DatagramChannel 的模式和创建其他 socket 通道是一样的:调用静态的 open( )
方法来创建一个新实例。新 DatagramChannel 会有一个可以通过调用 socket( )
方法获取的对等 DatagramSocket 对象。DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者)。
DatagramChannel channel = DatagramChannel.open( );
DatagramSocket socket = channel.socket( );
socket.bind (new InetSocketAddress (9999));
DatagramChannel 是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据净荷。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址SocketAddress)。
// 打开 DatagramChannel
DatagramChannel channel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress(9999);
// 绑定
channel.bind(sendAddress);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 接收
while (true) {
buffer.clear();
SocketAddress socketAddress = channel.receive(buffer);
buffer.flip();
System.out.println(Charset.forName("UTF-8").decode(buffer));
}
1. 打开DatagramChannel
通过 9999 端口接收 UDP 报文数据
DatagramChannel server = DatagramChannel.open();
server.socket().bind(new InetSocketAddress(9999));
2. 接收数据
通过 DatagramChannel.receive()
接收 UDP 报文数据
public static void testReceive() throws IOException {
// 打开 DatagramChannel
DatagramChannel receiveChannel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress(9999);
// 绑定
receiveChannel.bind(sendAddress);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 接收
while (true) {
buffer.clear();
SocketAddress socketAddress = receiveChannel.receive(buffer);
buffer.flip();
System.out.println(Charset.forName("UTF-8").decode(buffer));
}
}
3. 发送数据
通过 DatagramChannel.send()
发送 UDP 报文数据
public static void testSend() throws IOException {
// 打开 DatagramChannel
DatagramChannel sendChannel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);
// 发送
ByteBuffer buffer = ByteBuffer.wrap(String.format("client send:%s", System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
sendChannel.send(buffer, sendAddress);
System.out.println("send success");
}
4. 连接
UDP 不存在真正意义上的连接,这里的连接是向特定服务地址用 read() / write()
接收/发送 数据包
public static void testConnect() throws IOException {
// 打开DatagramChannel
DatagramChannel connChannel = DatagramChannel.open();
// 绑定
connChannel.bind(new InetSocketAddress(9999));
// 连接
connChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
// write方法
connChannel.write(ByteBuffer.wrap(String.format("test connect:%s", System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)));
// buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
connChannel.read(buffer);
buffer.flip();
System.out.println(Charset.forName("UTF-8").decode(buffer));
}
}
注意:read()
和 write()
只有在 connect()
后才能使用,不然会抛 NotYetConnectedException
异常,用 read()
接收时,如果没有接收包,会抛 PortUnreachableException
异常。
参考:https://blog.csdn.net/qq_38526573/article/details/89207100
边栏推荐
- Lst list deletion and modification
- 汇编语言大作业
- Projet d'intégration du cadre SSM
- Automatic packet capturing tool web scraper
- 解决Oracle数据库查询单表排序顺序错误问题之一
- aws的postgreSQL是去掉了hba吗?
- Detailed explanation and use of swagger
- 2022.7.18DAY609
- MySQL DML (data manipulation language)
- From March to June, after summary, more than 200 pages of true question notes and detailed explanations (including core test sites and 6 major factories)
猜你喜欢
Gradually understand the deep belief network
Flutter obtains longitude and latitude through geolocator positioning plug-in and calls Gaode peripheral information interface
【无标题】
实时跟踪用户管理操作
Quartz创建定时任务(入门)
Projet d'intégration du cadre SSM
解决Oracle数据库查询单表排序顺序错误问题之一
Pycharm创建SQLite数据库
Detailed explanation and use of swagger
Okaleido tiger NFT is about to log in to binance NFT platform, and the future market continues to be optimistic
随机推荐
逐步理解深度信念网络
Fluent customizes form and encapsulates form components
MySQL之DQL(数据查询语言)-常见函数
Corn表达式用法
fastJson数据类型中,解析JSONObject出现$ref: “$.list[0]“问题
URLs and URIs
【GeoServer二次开发】基于GeoServer-Manager的REST自动部署模块开发
数据库设计 数据库系统概论(第五版)
如何成功拿到蚂蚁,京东,小米,腾讯等大厂的offer
LeetCode 76. 最小覆盖子串-滑动窗口法
Jenkins插件开发——提供对外访问接口
QT notes - qmetaenum class
MySQL之DDL(数据定义语言)
1451 - Cannot delete or update a parent row 具有多个外键约束 删除子表数据行的问题
Reasons why fastjson @jsonfield format does not take effect
Flutter通过Geolocator定位插件获经纬度调用高德周边信息接口
Paper reading (61):multi instance attention network for few shot learning
论文阅读 (61):Multi-Instance Attention Network for Few-Shot Learning
1451 - cannot delete or update a parent row has multiple foreign key constraints to delete the data rows of the sub table
629. K inverse pairs array