만든이: 송지훈
소속: JavaCafe 부시샵
email: johnleen@hanmail.net
서버의 성능 향상을 위해 Pooling 기법을 도입하고 Command 패턴과 런타임 동적 로딩을 이용해 멈추지 않는 서버 만들기
이
번 호에서는 서버의 성능 향상을 위해 고려해야 할 점들을 살펴보고 그 개선 방법들에 대해 언급할 것이다. 그리고 그 중 일부를
지난 시간에 만들었던 간단한 채팅 서버에 적용시켜 볼 것이다. 그럼 이 기사를 통해 상용 서버 프로그램 수준에 한걸음 더 다가서
보자.
ThreadPool 과 ByteBufferPool 로 서버에 날개를 달자
모바일 프로그램이
제한된 시스템 사양에서 만족할 만한 속도를 얻기 위해 코드를 최적화 시키듯이 네트워크 프로그래밍도 효율을 중시하기 때문에 코딩시
고려해야할 점들이 많다. 효율과 확장성, 안정성을 고려해야하는 네트워크 프로그래밍은 다른 프로그램에 비해 기술 집약적인 형태의
성능향상을 고려한 코드들로 이루어져야 한다. 이밖에도 네트워크라는 곳이 주무대이다 보니 예기치 못한 다양한 상황에 대비해야하는
것도 프로그래머 입장에선 골치 아픈 일이다. 지면관계상 이 글을 통해 서버 프로그래밍에서 고려해야할 모든 것들을 다 언급하지는
못할 것이다. 하지만 전 시간에 언급했듯이 효율적인 쓰레드 운영을 위한 ThreadPool, 버퍼의 효율적인 운영과 파일을
메모리로 사용하기 위한 ByteBufferPool을 중심으로 설명해 나갈 것이다. 또 Command 패턴과 자바의 언어적 특징인
런타임 동적 로딩을 이용해 기능 확장시 서버를 재부팅 하지 않아도 되는 기법을 설명할 것이다. 이외에 기타 필요하다고 생각되는
부분들에 대해서도 추가적인 설명과 코드를 소개해 나갈 것이다. 그럼 우리가 지난 시간에 만들었던 서버를 업그레이드 시켜보자.
효율적인 Thread 운영-관리를 위한 ThreadPool
동
시에 수많은 요청을 처리하기 위해선 무엇보다도 효율적(적은 메모리 사용, 빠른 처리속도) 으로 서버를 만드는 것이 가장
중요하다. 쓰레드를 필요할 때마다 그때그때 만들어서 사용하게 되면 쓰레드 자체를 생성하는 것도 꽤(서버 프로그램 입장에선)
시간이 걸리는 느린 작업이고 쓰레드라는 객체가 자주 생성-해제 되는 상황 때문에 가비지 컬렉터(Garbage
Collector)가 빈번하게 호출될 수 있기 때문이다.
그래서 이런 문제점을 해결하기 위한 방법으로 GoF 가
쓴 명서 Design Pattern 에 소개된 Object Pool 패턴을 이용할 것이다. 생성할 객체가 너무 시간이 오래
걸리거나 많은 경우 등으로 인한 문제점이 있을 때 <그림 1> 과 같이 그 객체를 큐(컬렉션 객체)에 넣어놓고
재사용하는 것이 이 패턴의 핵심 원리다. 패턴 이름을 보고 짐작할 수 있겠지만 Object 는 범용적인 형태이다. 객체지향
언어에서 최상위의 표현단위가 바로 Object 아닌가...
따라서 우리는 “Thread“를 재사용하려고 하기 때문에
ObjectPool Pattern 을 구체적으로 적용한 ThreadPool 이라는 클래스를 만들어서 사용할 것이다.
ObjectPool Pattern 의 다른 적용 예로는 jsp/servlet 으로 웹프로그래밍을 할때 DB 접속에 흔히 사용하는
ConnectionPool 이 있다.
<그림 1> ThreadPool 도식도
어
떤 기술을 공부할 때 항상 생각해야 하는 것들 중 하나가 바로 그 기술의 장단점과 최적의 적용 가능 분야 등을 정확하게 파악하는
것이다. 그래야만 좀 더 그 기술에 대한 이해도도 높일 수 있고 그 기술이 필요한 곳에 최적화 시켜 사용할 수 있기 때문이다.
그러므로 우선 ThreadPool을 만들기 전에 이 ThreadPool을 만들어 사용할 때 얻을 수 있는 장점이 구체적으로 무엇이 있는지 알아보자. 장점을 안다면 당연히 어느 곳에, 어떻게 쓰여야 할지도 쉽게 알 수 있을 것이다.
첫
번째 이점으로는 쓰레드를 재사용함으로써 가비지 컬렉터의 호출을 좀 더 줄일 수 있다는 것이다. 쓰레드의 잦은 생성 소멸로 인해
생성될 가비지 컬렉션(Garbage Collection)의 대상을 줄임으로써 가비지 컬렉터의 호출을 줄이고 이로 인해 퍼포먼스에
악영향을 줄 수 있는 요소를 작게나마 사전에 예방한다는 것이다. 지난 회에서도 얘기했었지만 가비지 컬렉터로 메모리를 수거하는
것은 상당히 느린 작업이다. (1회 기사에서 설명했었다. 기억이 안난다면 다시 찾아보자)
두 번째 이점은 쓰레드를
새로 생성하지 않고 이미 생성된 쓰레드를 가져다가 쓰기 때문에 쓰레드를 새로 생성하는 것에 비해 속도가 빠르다는 것이다.
메모리상에 존재하는 쓰레드를 그냥 가져오는 것이 당연히 새로운 쓰레드를 메모리에 할당해서 가져다 쓰는 것보다 빠를 것이다.
앞서도 언급했지만 쓰레드는 생성 시간이 결코 짧지 않다.
세 번째 이점은 ThreadPool에 있는 적절히 설정된
개수(초기 생성할 쓰레드의 개수와 생성할 수 있는 최대의 쓰레드 개수)의 쓰레드만을 사용함으로써 너무 많은 쓰레드 생성에 의한
시스템 성능저하나 최악의 경우 OutOfMemoryException 을 피할 수 있다는 것이다. (1회 기사에서 각각의 쓰레드는
자신만의 CPU와 스택 영역(메모리)을 사용한다고 했다. 즉, 쓰레드 자체가 메모리를 소비한다는 말이다)
<source
1> 은 ThreadPool 클래스의 코드이다. ThreadPool은 프로그램의 초기화 때 지정한 개수 만큼의 쓰레드를
미리 만들어서 큐(선입선출 큐 : FIFO Queue)에 넣어둔다. 그리고 필요할 때마다 큐에 접근해서 쓰레드를 꺼내서 사용하고
사용이 다 끝나면 다시 큐에 저장해서 나중에 다시 재사용하도록 하는 것이다. 그러나 큐에 접근해서 쓰레드를 꺼내려고 하는데,
만약 큐에 대기중인 쓰레드가 없다면 현재 생성된 쓰레드의 개수를 확인하고 생성할 수 있는 최대의 쓰레드 개수를 넘지 않았다면
새로 생성해서 건네주도록 할 것이다. 여기서 생성할 수 있는 최대의 쓰레드 개수에 도달했다면 사용중인 쓰레드가 큐에 반환되기를
기다렸다가 건네 줄 것이다. 또한 큐는 현재 대기중인 쓰레드의 개수가 초기에 생성한 쓰레드 개수보다 클 경우 큐로 반환되는
쓰레드를 보관하지 않고 폐기할 것이다. wait 변수는 큐에 대기중인 쓰레드가 없을 경우 생성할 수 있는 최대의 쓰레드 개수를
넘지 않았을 경우 바로 생성해서 건네줄지 아니면 큐에 사용이 끝난 쓰레드가 들어오기를 기다릴지를 결정한다. 여기서 주의 깊게
봐야할 점은 synchronized 키워드이다. 동기화 문제가 발생하지 않도록 synchronized를 사용하되 효율을 위해
사용 블록을 최소화 시켜야 한다.
<source 1> ThreadPool.java
public class ThreadPool {
private static final int MAX_POOLSIZE = 15;
private static int poolSize = 5;
private final ArrayList queue = new ArrayList();
private boolean wait = false
private int total = 0;
private int index = 0;
private AdvancedNioServer server;
public ThreadPool(AdvancedNioServer server) {
this(poolSize, server);
this.server = server;
}
public ThreadPool(int size, AdvancedNioServer server) {
poolSize = size;
for (index = 0; index < poolSize; index++) {
WorkerThread thread = new WorkerThread(this, server);
thread.setName("Worker" + (index + 1));
thread.start();
queue.add(thread);
total++;
}
}
public WorkerThread getThread() {
WorkerThread worker = null
if (queue.size() > 0) {
synchronized (queue) {
worker = (WorkerThread) queue.remove(0);
}
} else {
if (wait) {
return waitQueue();
} else {
if (index < MAX_POOLSIZE) {
worker = new WorkerThread(this, server);
worker.setName("Worker" + (index + 1));
worker.start();
total++;
return worker;
} else {
return waitQueue();
}
}
}
return worker;
}
private synchronized WorkerThread waitQueue() {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException ignored) {
}
}
return (WorkerThread) queue.remove(0);
}
public void putThread(WorkerThread thread) {
if (queue.size() >= poolSize) {
thread = null;
--index;
} else {
synchronized (queue) {
queue.add(thread);
queue.notify();
}
}
}
public boolean isWait() { return wait; }
public void setWait(boolean wait) { this.wait = wait; }
}
ByteBuffer buf = ByteBuffer.allocateDirect(4096);
readCount = sc.read(buf);
if (readCount < 0) {
room.removeElement(sc);
sc.close();
}
buf.flip();
broadcast(buf);
buf.clear();
buf = null;
<source 2> ByteBufferPool.java
public class ByteBufferPool {
private static final int MEMORY_BLOCKSIZE = 4096;
private static final int FILE_BLOCKSIZE = 10240;
private final ArrayList memoryQueue = new ArrayList();
private final ArrayList fileQueue = new ArrayList();
private boolean wait = false;
public ByteBufferPool(int memorySize, int fileSize, File file) throws IOException {
if (memorySize > 0)
initMemoryBuffer(memorySize);
if (fileSize > 0)
initFileBuffer(fileSize, file);
}
private void initMemoryBuffer(int size) {
int bufferCount = size / MEMORY_BLOCKSIZE;
size = bufferCount * MEMORY_BLOCKSIZE;
ByteBuffer directBuf = ByteBuffer.allocateDirect(size);
divideBuffer(directBuf, MEMORY_BLOCKSIZE, memoryQueue);
}
private void initFileBuffer(int size, File f) throws IOException {
int bufferCount = size / FILE_BLOCKSIZE;
size = bufferCount * FILE_BLOCKSIZE;
RandomAccessFile file = new RandomAccessFile(f, "rw");
try {
file.setLength(size);
ByteBuffer fileBuffer = file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0L, size);
divideBuffer(fileBuffer, FILE_BLOCKSIZE, fileQueue);
} finally {
file.close();
}
}
private void divideBuffer(ByteBuffer buf, int blockSize, ArrayList list) {
int bufferCount = buf.capacity() / blockSize;
int position = 0;
for (int i = 0; i < bufferCount; i++) {
int max = position + blockSize;
buf.limit(max);
list.add(buf.slice());
position = max;
buf.position(position);
}
}
public ByteBuffer getMemoryBuffer() {
return getBuffer(memoryQueue, fileQueue);
}
public ByteBuffer getFileBuffer() {
return getBuffer(fileQueue, memoryQueue);
}
private ByteBuffer getBuffer(ArrayList firstQueue, ArrayList secondQueue) {
ByteBuffer buffer = getBuffer(firstQueue, false);
if (buffer == null) {
buffer = getBuffer(secondQueue, false);
if (buffer == null) {
if (wait)
buffer = getBuffer(firstQueue, true);
else
buffer = ByteBuffer.allocate(MEMORY_BLOCKSIZE);
}
}
return buffer;
}
private ByteBuffer getBuffer(ArrayList queue, boolean wait) {
synchronized (queue) {
if (queue.isEmpty()) {
if (wait) {
try {
queue.wait();
} catch (InterruptedException e) {
return null;
}
} else {
return null;
}
}
return (ByteBuffer) queue.remove(0);
}
}
public void putBuffer(ByteBuffer buffer) {
if (buffer.isDirect()) {
switch (buffer.capacity()) {
case MEMORY_BLOCKSIZE :
putBuffer(buffer, memoryQueue);
break;
case FILE_BLOCKSIZE :
putBuffer(buffer, fileQueue);
break;
}
}
}
private void putBuffer(ByteBuffer buffer, ArrayList queue) {
buffer.clear();
synchronized(queue) {
queue.add(buffer);
queue.notify();
}
}
public synchronized void setWait(boolean wait) { this.wait = wait; }
public synchronized boolean isWait() { return wait; }
}
<?xml version='1.0' encoding='UTF-8'?>
<request>
<command>MessageCommand</command>
<message>안녕하세요~!</message>
</request>
<?xml version='1.0' encoding='UTF-8'?>
<response>
<message>네. 반갑습니다.</message>
</response>
public class HelloWorld {
public static void main(String[] args) {
System.out.println("Hello, world!");
}
}
Class cls = Class.forName("java.lang.String");
Object obj = cls.newInstance();
String s = (String) obj;
Charset charset = Charset.forName("UTF-8");
CharsetEncoder encoder = charset.newEncoder();
// 인코드 후 리턴되는 버퍼는 non-direct 버퍼임에 주의.
ByteBuffer encodedBuffer = encoder.encode(buffer);
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buffer);
String result = charBuffer.toString();
<source 3> WorkerThread.java
public class WorkerThread extends Thread {
...
private void requestProcess(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = bufferPool.getMemoryBuffer();
int count;
AbstractCommand command = null;
count = sc.read(buffer);
buffer.flip();
if (count < 0) {
server.removeUser(sc);
sc.close();
bufferPool.putBuffer(buffer);
key.selector().wakeup();
server.info("클라이언트가 접속을 종료했습니다.");
return;
}
charBuffer = decoder.decode(buffer);
String temp = charBuffer.toString();
byte[] bb = temp.substring(temp.indexOf("\r\n\r\n")).trim().getBytes("UTF-8");
String[] param = parsingXML(bb);
if (param == null) {
finish(buffer);
return;
}
if (server.isContainCommand(param[0])) {
command = server.getCommand(param[0]);
} else {
try {
command = (AbstractCommand) Class.forName(CommandPath + param[0]).newInstance();
} catch (Exception e) {
server.info("명령을 수행하는 커맨드 클래스가 존재하는지 확인해보세요.");
finish(buffer);
return;
}
if (command == null) {
finish(buffer);
return;
} else {
server.putCommand(param[0], command);
}
}
buffer.clear();
command.execute(server, buffer, param[1]);
finish(buffer);
}
private void finish(ByteBuffer buffer) {
bufferPool.putBuffer(buffer);
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
}
private String[] parsingXML(byte[] xml) {
String[] param = null;
ArrayList list = null;
if (xml == null)
return param;
in = new ByteArrayInputStream(xml);
try {
handler = SaxHandler.getInstance();
parser.parse(in, handler);
list = handler.getContents();
} catch (SAXException e) {
server.log(Level.WARNING, "WorkerThread/parsingXML()", e);
} catch (IOException ex) {
server.log(Level.WARNING, "WorkerThread/parsingXML()", ex);
}
param = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
param[i] = (String) list.get(i);
}
handler.clearSaxHandler();
return param;
}
}
SSISO Community