Backend/Java

Java NIO 알아보기

가은파파 2021. 3. 1. 22:25

Java NIO


> java.nio.file 패키지에서 file I/O, 디렉토리 I/O에 관해서 알아보자.

자바에서 IO를 처리하는 구조

유저영역 : 실행 중인 프로그램이 존재하는 제한된 영역으로 하드웨어에 직접 접근이 불가.
커널영역 : 반대로 하드웨어에 직접 접근이 가능하고 다른 프로세스를 제어할 수 있는 영역을 말함.

이런 그림을 기반으로 기반의 I/O 프로세스를 정의해보면 다음과 같습니다.

※ 자바 I/O 프로세스
1) 프로세스가 커널에 파일 읽기 명령을 내림.
2) 커널은 시스템 콜[read()]을 사용해 디스크 컨트롤러가 물리적 디스크로부터 읽어온 파일 데이터를 커널 영역안 버퍼에 쏜다.
* DMA(Direct Memory Access) : CPU의 도움없이 물리적 디스크에서 커널영역의 버퍼로 데이터를 읽어오는 것
3) 모든 파일 데이터가 버퍼에 복사되면 다시 프로세스 안의 버퍼로 복사한다
4) 프로세스 안의 버퍼의 내용으로 프로그래밍 한다.

첫째로 3)의 과정이 비효율적이다. 커널안에 버퍼 데이터를 프로세스 안으로 다시 복사하기 때문이다. 그렇다면 만약 3)의 과정을 없애고 커널영역에 바로 접근할 수 있다면 어떻게 될까? 만약 이게 가능하다면 우리는 버퍼를 복사하는 CPU를 낭비하지도 GC관리를 따로 하지 않아도 I/O를 사용할 수 있게 됩니다.

두번째로 IO프로세스를 거치는 동안 작업을 요청한 쓰레드가 블록킹 된다. 이것이 더 큰 문제이다. 이것은 I/O의 속도를 늦추는 원인 중 하나이다.

이런 두가지 문제를 버퍼와 채널 등으로 해결 가능하다.

New I/O vs I/O

구분

IO

NIO

입출력 방식

스트림(Stream)방식

채널(Channel)방식

버퍼 방식

넌버퍼

버퍼

비동기 방식

지원 안 함

지원

블로킹/넌블로킹 방식

블로킹 방식만 지원

블로킹/넌 블로킹 모두 지원

* 스트림과 채널

: NIO는 스트림과 달리 채널기반의 입출력방식을 사용하며 단방향이 아닌 양방향으로 입출력이 가능합니다.

* 넌버퍼와 버퍼
: 버퍼를 사용하면 속도가 더 빠릅니다. 버퍼를 저장하기 때문에 데이터의 위치를 이동해가면서 필요한 부분을 읽고 쓸 수 있다는 점이 있다.

* 블로킹과 넌블로킹
NIO는 스트림과 다르게 블로킹/넌블로킹 특징을 모두 가지고 있습니다. 하지만 일반 I/O와는 다르게 NIO의 블로킹은 스레드를 인터럽트함으로써 블로킹에서 빠져나올 수 있습니다. 또한, NIO는 입출력 작업이 준비 된 채널에 한해서 작업 스레드가 처리하기 때문에 넌블러킹 특징도 가지게 됩니다.

NIO 핵심요소

명칭

설명

Buffer

버퍼를 나타내며, 입출력 데이터를 임시로 저장할 때 사용된다.

Charset

캐릭터셋을 나타내며, 바이트 데이터와 문자 데이터를 인코딩/디코딩 할 때 사용된다.

Channel

채널을 나타내며, 데이터가 통과하는 스트림을 의미한다.

Selector

하나의 쓰레드에서 다중의 채널로부터 들어오는 입력 데이터를 처리할 수 있도록 해주는 멀티플렉서이다.

NIO 논블럭킹 입출력의 핵샘 요소이다.

버퍼

NIO의 버퍼 역시 보조스트림에서 사용했던 BufferedInputStream, BufferedOutputStream과 기능과 동일하다. 마찬가지로 바이트를 버퍼에 저장했다가 한번에 출력해주는 역할을 하게 된다.
위 그림과 같이 데이터 타입에 따라 버퍼가 사용하는 메모리의 위치에 따라 종류가 나뉘게 되는 버퍼 클래스이다.

다이렉트/논 다이렉트 버퍼

구분

Direct Buffer

Non Direct Buffer

사용공간

OS의 메모리

JVM 힙 메모리

버퍼의 생성속도

느리다

빠르다

버퍼의 크기

크다

작다

I/O 성능

높다

낮다

Use

한번 생성한 뒤 재사용을 할 경우

빈번하게 계속해서 사용해야 할 경우

import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; //Direct와 Non Direct의 속도차이를 비교하는 예제 public class Input { public static void main(String[] args) { // TODO Auto-generated method stub try { // 파일 경로 지정 Path path = Paths.get("C:\\output.txt"); long size = Files.size(path); FileChannel fileChannel = FileChannel.open(path); // Non-Direct Buffer ByteBuffer nondirectbuffer = ByteBuffer.allocate((int) size); // Direct Buffer ByteBuffer directbuffer = ByteBuffer.allocateDirect((int) size); long start, end; start = System.nanoTime(); for (int i = 0; i < 100; i++) { fileChannel.read(nondirectbuffer); nondirectbuffer.flip(); } end = System.nanoTime(); System.out.println("Non-Direct Buffer : " + (end - start) + " ns"); start = System.nanoTime(); for (int i = 0; i < 100; i++) { fileChannel.read(directbuffer); directbuffer.flip(); } end = System.nanoTime(); System.out.println("Direct Buffer : " + (end - start) + " ns"); fileChannel.close(); }catch(Exception e) { e.printStackTrace(); } } } /******************************************** Non-Direct Buffer : 1342700 ns Direct Buffer : 401600 ns ***********************************************/

다이렉트와 논다이렉트의 속도차이 예제이다. 속도 차이가 난다.
Buffer클래스 메서드를 위 샘플과 java api 사이트를 통해 익혀보자.
docs.oracle.com/javase/7/docs/api/

채널

채널은 스트림의 향상된 버전으로 단방향이 아닌 양방향으로 접근이 가능하다. 또한 스트림과 다르게 채널은 비동기적으로 닫고 중단할 수 있다.
아래 채널을 활용한 서버와 클라이언트 샘플 코드이다.

서버 사이드

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class Server { static ServerSocketChannel serverSocketChannel = null; public static void main(String[] args) { try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); serverSocketChannel.bind(new InetSocketAddress(10000)); while(true) { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("connected : " + socketChannel.getRemoteAddress()); //클라이언트로 부터 입/출력받기 Charset charset = Charset.forName("UTF-8"); ByteBuffer byteBuffer = ByteBuffer.allocate(128); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("received Data : " + charset.decode(byteBuffer).toString()); byteBuffer = charset.encode("hello, My Client !"); socketChannel.write(byteBuffer); System.out.println("Sending Success"); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 


클라이언트

import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class Client { static SocketChannel socketChannel = null; public static void main(String [] args) { try { //SocketChannel을 생성하고 몇 가지 설정을 한다. socketChannel = SocketChannel.open(); socketChannel.configureBlocking(true); //서버 연결 socketChannel.connect(new InetSocketAddress("localhost",10000)); Charset charset = Charset.forName("UTF-8"); //서버에 입출력 ByteBuffer byteBuffer = charset.encode("Hello Server !"); socketChannel.write(byteBuffer); byteBuffer = ByteBuffer.allocate(128); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("received Data : " + charset.decode(byteBuffer).toString()); //소켓닫기 if(socketChannel.isOpen()) { socketChannel.close(); } }catch(Exception e) { e.printStackTrace(); } } }

채널에 대한 간단한 예제로 서버와 클라이언트간의 입출력을 보여주는 예제입니다.

셀렉터를 사용하지 않는 Blocking방식으로 서버는 accept()매서드가 호출되는 순간부터 클라이언트를 기다려야만 하는 결과를 낳게 됩니다. 다시말하면 이는 클라이언트 하나당 하나의 스레드를 할당해줘야한다는 말과 같은데 이는 매우 비효율적인 방법이다. 그래서 우리는 이 방법을 셀렉터를 사용하여 넌블럭킹되게 변경해 주어야한다.

셀렉터

셀렉터는 하나의 쓰레드에서 다수의 채널을 처리할 수 있는 기술을 의미한다. 멀티플렉스 IO를 할 수 있게 만드는 기술로 이를 통해 좀 더 적은 cpu와 자원을 소모할 수 있게 된다.

비슷한 컨셉의 서버와 클라이언트간의 입출력을 보여주는 셀럭터 클래스를 이용한 샘플이다.

서버사이드

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashSet; import java.util.Iterator; import java.util.Set; public class NonBlockingServer { static ServerSocketChannel serverSocketChannel = null; public static void main(String[] args) { // 연결된 클라이언트를 관리할 컬렉션 Set<SocketChannel> allClient = new HashSet<>(); try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(10000)); // 채널 관리자(Selector) 생성 및 채널 등록 Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("----------서버 완료----------"); // 입출력 시 사용할 바이트버퍼 생성 ByteBuffer inputBuf = ByteBuffer.allocate(1024); ByteBuffer outputBuf = ByteBuffer.allocate(1024); // 클라이언트 접속 시작 while (true) { selector.select(); // 이벤트 발생할 때까지 스레드 블로킹 // 발생한 이벤트를 모두 Iterator에 담아줌 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); // 발생한 이벤트들을 담은 Iterator의 이벤트를 하나씩 순서대로 처리함 while (iterator.hasNext()) { // 현재 순서의 처리할 이벤트를 임시 저장하고 Iterator에서 지워줌 SelectionKey key = iterator.next(); iterator.remove(); // 연결 요청중인 클라이언트를 처리할 조건문 작성 if (key.isAcceptable()) { // 연결 요청중인 이벤트이므로 해당 요청에 대한 소켓 채널을 생성해줌 ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel clientSocket = server.accept(); // Selector의 관리를 받기 위해서 논블로킹 채널로 바꿔줌 clientSocket.configureBlocking(false); // 연결된 클라이언트를 컬렉션에 추가 allClient.add(clientSocket); // 아이디를 입력받기 위한 출력을 해당 채널에 해줌 clientSocket.write(ByteBuffer.wrap("아이디를 입력해주세요 : ".getBytes())); // 아이디를 입력받을 차례이므로 읽기모드로 셀렉터에 등록해줌 clientSocket.register(selector, SelectionKey.OP_READ, new ClientInfo()); // 읽기 이벤트(클라이언트 -> 서버)가 발생한 경우 } else if (key.isReadable()) { // 현재 채널 정보를 가져옴 (attach된 사용자 정보도 가져옴) SocketChannel readSocket = (SocketChannel) key.channel(); ClientInfo info = (ClientInfo) key.attachment(); // 채널에서 데이터를 읽어옴 try { readSocket.read(inputBuf); // 만약 클라이언트가 연결을 끊었다면 예외가 발생하므로 처리 } catch (Exception e) { key.cancel(); // 현재 SelectionKey를 셀렉터 관리대상에서 삭제 allClient.remove(readSocket); // Set에서도 삭제 // 서버에 종료 메세지 출력 String end = info.getID() + "님의 연결이 종료되었습니다.\n"; System.out.print(info.getID() + "님의 연결이 종료되었습니다.\n"); // 자신을 제외한 클라이언트에게 종료 메세지 출력 outputBuf.put(end.getBytes()); for(SocketChannel s : allClient) { if(!readSocket.equals(s)) { outputBuf.flip(); s.write(outputBuf); } } outputBuf.clear(); continue; } // 현재 아이디가 없을 경우 아이디 등록 if (info.isID()) { // 현재 inputBuf의 내용 중 개행문자를 제외하고 가져와서 ID로 넣어줌 inputBuf.limit(inputBuf.position() - 2); inputBuf.position(0); byte[] b = new byte[inputBuf.limit()]; inputBuf.get(b); info.setID(new String(b)); // 서버에 출력 String enter = info.getID() + "님이 입장하셨습니다.\n"; System.out.print(enter); outputBuf.put(enter.getBytes()); // 모든 클라이언트에게 메세지 출력 for(SocketChannel s : allClient) { outputBuf.flip(); s.write(outputBuf); } inputBuf.clear(); outputBuf.clear(); continue; } // 읽어온 데이터와 아이디 정보를 결합해 출력한 버퍼 생성 inputBuf.flip(); outputBuf.put((info.getID() + " : ").getBytes()); outputBuf.put(inputBuf); outputBuf.flip(); for(SocketChannel s : allClient) { if (!readSocket.equals(s)) { s.write(outputBuf); outputBuf.flip(); } } inputBuf.clear(); outputBuf.clear(); } } } } catch (IOException e) { e.printStackTrace(); } } } // 접속한 사용자의 ID를 가진 클래스 class ClientInfo { // 아직 아이디 입력이 안된 경우 true private boolean idCheck = true; private String id; // ID가 들어있는지 확인 boolean isID() { return idCheck; } // ID를 입력받으면 false로 변경 private void setCheck() { idCheck = false; } // ID 정보 반환 String getID() { return id; } // ID 입력 void setID(String id) { this.id = id; setCheck(); } }

클라이언트 사이드

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; public class NonBlockingClient { public static void main(String[] args) { Thread systemIn; // 서버 IP와 포트로 연결되는 소켓채널 생성 try (SocketChannel socket = SocketChannel.open (new InetSocketAddress("localhost", 10000))) { // 모니터 출력에 출력할 채널 생성 WritableByteChannel out = Channels.newChannel(System.out); // 버퍼 생성 ByteBuffer buf = ByteBuffer.allocate(1024); // 출력을 담당할 스레드 생성 및 실행 systemIn = new Thread(new SystemIn(socket)); systemIn.start(); while (true) { socket.read(buf); // 읽어서 버퍼에 넣고 buf.flip(); out.write(buf); // 모니터에 출력 buf.clear(); } } catch (IOException e) { System.out.println("서버와 연결이 종료되었습니다."); } } } //입력을 담당하는 클래스 class SystemIn implements Runnable { SocketChannel socket; // 연결된 소켓 채널과 모니터 출력용 채널을 생성자로 받음 SystemIn(SocketChannel socket) { this.socket = socket; } @Override public void run() { // 키보드 입력받을 채널과 저장할 버퍼 생성 ReadableByteChannel in = Channels.newChannel(System.in); ByteBuffer buf = ByteBuffer.allocate(1024); try { while (true) { in.read(buf); // 읽어올때까지 블로킹되어 대기상태 buf.flip(); socket.write(buf); // 입력한 내용을 서버로 출력 buf.clear(); } } catch (IOException e) { System.out.println("채팅 불가."); } } }

결과

블럭킹 예제와 조금 많이 달리지긴 했지만 같은 주제인 채팅을 이용한 예제입니다. 둘 중 논블러킹을 이용한 소스는 서버쪽 소스이며, 클라이언트는 위 예제와 동일한 블러킹된 소스입니다. 이제 서버쪽 소스를 살펴보겠습니다.

먼저 크게 보면 SelectionKey를 등록할 때 OP_ACCEPT, OP_READ를 통해 구분하여 등록하는 것을 알 수 있습니다. 이는 SelectionKey 클래스가 가지고 있는 상수로 의미하는 바는 아래와 같습니다.

상수

유형

OP_ACCEPT

접속요청에 대한 이벤트를 감지

OP_CONNECT

소켓 통신이 허용된 이벤트를 감지

OP_READ

읽기 가능한 상태인지 감지

(해당 채널의 소켈에 데이터가 수신된 이벤트 발생 감지)

OP_WRITE

쓰기 가능한 상태인지 감지

(연결이 되어 있으면 항상 쓰기가 가능한 상태로 인식)

이렇게 상수를 통해 등록된 SelectionKey에 대한 값은 다음과 같은 함수를 통해 구분지어 체크하게 됩니다.

SelectionKey 상태 확인 메소드

설명

boolean isAcceptable()

이벤트가 발생한 채널의 감지유형이 OP_ACCEPT인 경우

boolean isReadable()

이벤트가 발생한 채널의 감지유형이 OP_READ인 경우

boolean isWritable()

이벤트가 발생한 채널의 감지유형이 OP_WRITE인 경우

boolean isConnectable()

이벤트가 발생한 채널의 감지유형이 OP_CONNECT인 경우

boolean isValid

현재 채널이 유효한 경우

소스에서는 isAcceptable()과 isReadable()를 통해 구분하여 데이터를 처리함을 확인할 수 있습니다.

다시 소스로 돌아가 살펴보겠습니다. 만약 처음 클라이언트로 부터 접속요청이 오면 서버는 isAcceptable()를 통해 아이디를 받는 로직을 타게 됩니다. 이후 아이디를 입력받으면 SelectionKey에 해당 채널(클라이언트)과 클라이언트 정보를 OP_READ형태로 같이 등록해줍니다.

이후 해당 채널에 대해 이벤트가 발생하면 OP_READ형태로 날아올 것이고, 서버는 이를 isReadable()로 감지하여 해당 로직을 수행하게 됩니다. 물론 이벤트는 iterator에 위해 순차적으로 진행되게 됩니다.


출처 : blog.naver.com/swoh1227/222244309304

'Backend > Java' 카테고리의 다른 글

멀티쓰레드 프로그래밍  (0) 2021.03.04
[Java] 멀티스레드 와 멀티 프로세스  (0) 2021.03.03
Java I/O (Input / Output)  (0) 2021.02.21
JAVA 상속  (0) 2021.02.20
annotations  (0) 2021.02.06