사용자 제어 서비스
본 장에서는 ProObject 애플리케이션에서 사용자 제어 서비스(User Control Service)를 사용하기 위한 방법에 대하여 설명한다.
1. 개요
사용자 제어 서비스(User Control Service), 이하 UCS)는 프로그램의 처리 흐름을 미들웨어가 아닌 사용자가 직접 제어할 수 있도록 하는 자사의 Tmax 고유의 프로그램 형태를 말한다. 보통 데몬 형태로 기동되어 다른 대외기관과의 연동 작업 및 클라이언트로 정보전달 등을 UCS 형태의 프로그램들이 처리하게 되는데, 이 기능을 ProObject에서도 지원한다.
하지만 Tmax와는 다르게 절차형 프로그래밍 방식이 아닌 이벤트 드리븐 아키텍처를 위해 NIO 프로그래밍 방식으로 개발해야 되며, 별도의 프로세스가 아닌 Service Worker Thread를 확장한 UCS Worker Thread가 사용자의 코드를 수행하도록 만들었다. 즉, ProObject에서 UCS란 특정 Service Worker Thread를 사용자가 직접 제어할 수 있도록 하는 것을 의미한다.
2. 아키텍처
UCS Worker Thread는 Service Worker Thread를 확장한 스레드이기 때문에 서비스 레이어로써 역할을 하는 동시에 UCS 레이어의 역할도 수행한다.
UCS 레이어는 보통 FEP와 통신하는 영역인데 FEP는 Front End Processor의 약자로 흔히 금융권에서 대외 통신 시스템 또는 대외기관을 의미한다. 이 때 UCS Worker Thread는 대외기관이 클라이언트로 연결되기를 기다리는 서버 역할을 한다. 그래서 클라이언트로부터 채널 이벤트를 감지하기 위해 내부에 selector를 가지고 있다.
사용자는 selector가 감지한 채널 이벤트를 처리하기 위한 ChannelAcceptHandler와 ChannelEventHandler를 개발해서 selector에 등록해야 한다. 자세한 내용은 개발 방법을 참고한다.
3. 개발 방법
본 절에서는 ProObject와 대외기관과의 통신 처리를 UCS로 처리하는 과정에 대해서 설명한다.
3.1. 개발 시나리오
다음은 ProObject와 대외기관과의 통신 처리를 UCS로 처리한 샘플 코드의 시나리오를 도식화한 설명이다.
-
클라이언트 → 서비스 → FEP
외부 클라이언트의 서비스 요청이 FEP까지 전달되어 응답을 받는 경우의 시나리오이다.
클라이언트 → 서비스 → FEP로 서비스 요청 개발 시나리오-
(0) 외부 클라이언트가 UCS 서비스를 호출한다.
-
(1)(2) UCS에게 서비스를 스케줄링해서 서비스를 수행한다.
-
(3) 서비스 코드 안에서 연결된 FEP의 채널로 write를 수행하고 리턴 메소드의 수행을 대기한다.
-
(4)(5) FEP로부터 응답이 오기전까지는 서비스의 응답을 내보낼 수 없으므로 스레드를 놓지 않겠다고 시그널을 보내고 ACK을 받는다.
-
(6) 실제로 네트워크를 통해 데이터를 전송한다.
-
(7) FEP가 ProObject에게 데이터를 전송하면 ChannelEventHandler의 onRead()가 수행된다.
-
(8)(9) 받은 데이터를 파싱해서 서비스 레이어로 전달하면 SO 내에 정의한 리턴 메소드를 수행한다.
-
(10) 서비스의 응답을 내보내기 위해 EM에게 시그널을 전달한다.
-
(11)(12)(13) 이벤트 핸들러가 응답 이벤트를 처리하고 완료 ACK 을 전달하면 UCS가 다시 서비스를 처리할 수 있는 상태가 되었음을 알린다.
-
(14) 클라이언트에게 서비스의 응답을 전달한다.
-
-
FEP → 서비스
FEP가 서비스를 요청하고 응답을 돌려받는 경우의 시나리오다.
FEP → 서비스로 서비스 요청 개발 시나리오-
(1) FEP가 채널을 통해 데이터를 전송한다.
-
(2) ChannelEventHandler의 onRead 메소드가 수행되어 데이터를 파싱한 후 서비스를 호출한다.
-
(3) EM에게 서비스 요청 이벤트를 전달한다.
-
(4)(5) 서비스 요청 이벤트를 처리한 다음 UCS에게 서비스를 스케줄링한다.
-
(6)(7) 서비스를 수행한 다음 EM에게 응답 이벤트를 전달한다.
-
(8)(9) 응답 이벤트를 처리하고 ACK을 전달한다.
-
(10) 스레드를 놓기 전에 FEP가 요청한 서비스임을 확인하고 FEP 채널로 write를 수행한다.
-
(11)(12) 실제로 네트워크를 통해 데이터를 전송하고 나면 스레드를 놓고 다시 서비스를 처리할 수 있는 상태가 되었음을 알린다.
-
-
랑데뷰 케이스
Main 서비스가 호출된 상태에서 Sub 서비스의 호출을 랑데뷰(대기)하고 있다가 Sub 서비스가 수행되고 나면 Main 서비스가 재수행 되는 시나리오다.
아래의 도식에서 Main 서비스는 클라이언트가 호출해서 UCS1 스레드에서 수행되고, Sub 서비스는 FEP가 호출해서 UCS2 스레드에서 수행된다고 이해하면 된다. 랑데뷰는 기본적으로 스레드를 놓기 때문에 다시 스케줄링 되었을 때 기존의 스레드에서 다시 수행된다는 보장이 없다. 하지만 UCS 서비스가 랑데뷰를 할 경우에는 동일한 스레드에 스케줄링된다.
랑데뷰 케이스 개발 시나리오-
(0) ~ (6) Main 서비스가 수행되면서 Sub 를 기다리기 위한 랑데뷰 콜을 한 뒤에 FEP에게 데이터를 전송하는 과정이다. 이때 랑데뷰 콜이므로 스레드는 놓는다. (동기 송신과 비슷)
-
(7) ~ (12) FEP가 Sub 서비스를 호출해서 UCS2 스레드가 서비스를 수행하는 과정이다. (동기 수신과 비슷)
-
(13) EM에게 응답 이벤트를 전달한다.
-
(14) EM은 Sub 서비스의 응답 이벤트를 처리하면서 랑데뷰 작업도 처리한다.
-
(15) 응답 이벤트를 모두 처리했다고 ACK을 전달한다.
-
(16) ~ (18) FEP에게 응답을 전송하고 나면 스레드(UCS2)를 놓고 다시 서비스를 처리할 수 있는 상태가 되었음을 알린다.
-
(19) ~ (25) Main 서비스가 다시 스케줄링되어 리턴 메소드까지 수행되면 스레드를 다시 서비스를 처리할 수 있는 상태로 만들고 서비스의 응답을 클라이언트에게 전송한다.
-
3.2. 리소스 구현
개발 시나리오에서 설명한 UCS 기능을 사용하려면 사용자는 3가지 종류의 리소스를 구현해야 한다.
3.2.1. 채널 핸들러(Channel Handler)
채널 핸들러는 클라이언트로부터 발생한 채널 이벤트를 어떻게 처리할 것인지 사용자가 정의할 수 있도록 제공하는 인터페이스다. 사용자는 두 가지의 채널 핸들러를 개발해야 하는데, 클라이언트의 연결 요청을 처리하기 위한 ChannelAcceptHandler와 연결 후에 읽기/쓰기 이벤트를 처리할 ChannelEventHandler를 구현해야 한다.
다음은 ChannelAcceptHandler 샘플 코드이다.
// ChannelAcceptHandler 샘플 코드 public class UCSChannelAcceptHandler extends ChannelAcceptHandler { private ProObjectLogger logger = ChannelEventLogger.getLogger(); private ServerSocketChannel serverSocketChannel; private UCSWorkerThread workerThread; public UCSChannelAcceptHandler(UCSWorkerThread workerThread) { this.workerThread = workerThread; } @Override public void init() throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); } @Override public void accept(int port, Object additionalInfo) { try { String acceptAddress = getAcceptAddress(); if(acceptAddress == null) { serverSocketChannel.bind(new InetSocketAddress(port)); } else { serverSocketChannel.bind(new InetSocketAddress(acceptAddress, port)); } workerThread.registerChannel(serverSocketChannel, this, SelectionKey.OP_ACCEPT); } catch (IOException e) { close(); throw new RuntimeException(e); } } @Override public void onAccept(AbstractSelectableChannel serverChannel, AbstractSelectableChannel clientChannel) { try { SocketChannel socketChannel = (SocketChannel) clientChannel; socketChannel.configureBlocking(false); UCSChannelEventHandler channelEventHandler = new UCSChannelEventHandler(socketChannel, workerThread); channelEventHandler.init(SelectionKey.OP_READ); workerThread.setChannelEventHandler(channelEventHandler); logger.fine("[UCSChannelAcceptHandler] onAccept success"); } catch (Throwable e) { logger.severe("[UCSChannelAcceptHandler] Establishing connection is failed...", e); } } @Override public void close() { if(logger.isLoggable(Level.INFO)) { logger.info("[UCSChannelAcceptHandler] close"); } if(serverSocketChannel != null && serverSocketChannel.isRegistered()) { try { serverSocketChannel.close(); workerThread.unregisterChannel(serverSocketChannel); serverSocketChannel = null; } catch (IOException e) { logger.severe("[UCSChannelAcceptHandler][{0}] Channel can not be closed!", this.getClass().getSimpleName(), e); } if(logger.isLoggable(Level.INFO)) { logger.info("[UCSChannelAcceptHandler] Server Channel is closed."); } } } }
ChannelAcceptHandler를 구현할 때는 반드시 init, accept, onAccept, close 메소드들을 오버라이드해야 한다.
다음은 메소드들을 오버라이드할 때 주의사항이다.
-
accept 메소드 안에서 workerThread 객체의 registerChannel API를 사용해서 selector에 서버 소켓과 ChannelAcceptHandler 객체를 등록해야 한다.
-
클라이언트가 실제로 연결되면 호출되는 onAccept 메소드안에서 ChannelEventHandler 객체를 생성한 다음 초기화와 setChannelEventHandler 메소드 호출하는것을 잊으면 안된다.
-
close 메소드가 호출되었을 때 selector에 등록된 서버 소켓을 제거해줘야 한다.
다음은 ChannelEventHandler 샘플 코드이다.
// ChannelEventHandler 샘플 코드 public class UCSChannelEventHandler extends ChannelEventHandler { private UCSWorkerThread workerThread; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; private Map<SourceChannel, String> timeoutChannelMap; private Pipe pollingChannel; private Pipe waitingChannel; public UCSChannelEventHandler(SocketChannel socketChannel, UCSWorkerThread workerThread) throws IOException { setChannel(socketChannel); this.workerThread = workerThread; readBuffer = ByteBuffer.allocate(1024); writeBuffer = ByteBuffer.allocate(1024); timeoutChannelMap = new HashMap<>(); } @Override public void init(int interestOps) throws IOException { setInterestOps(interestOps); try { SocketChannel socket = (SocketChannel)getChannel(); if(socket.isRegistered() == false) { workerThread.registerChannel(socket, this, interestOps); // * 주기적인 health check를 위한 처리 pollingChannel = Pipe.open(); pollingChannel.source().configureBlocking(false); workerThread.registerChannel(pollingChannel.source(), this, interestOps); workerThread.registerTimeout(pollingChannel.source(), 10*1000L); timeoutChannelMap.put(pollingChannel.source(), "POLLING"); // * write timeout을 위한 처리 waitingChannel = Pipe.open(); waitingChannel.source().configureBlocking(false); workerThread.registerChannel(waitingChannel.source(), this, interestOps); timeoutChannelMap.put(waitingChannel.source(), "WAITING"); getLogger().info("[UCSChannelEventHandler] init() success"); } } catch(Throwable e) { getLogger().severe("[UCSChannelEventHandler] is failed to initialize!", e); if(getChannel() != null) { try { getChannel().close(); } catch(Throwable ignore) {} } } } @Override public void onRead(AbstractSelectableChannel channel) throws IOException { SocketChannel socketChannel = (SocketChannel)channel; try { if(readBuffer.hasRemaining()) { if(socketChannel.read(readBuffer) < 0) { close(); return; } } readBuffer.flip(); String data = new String(readBuffer.array(), readBuffer.position(), readBuffer.limit()); // * UCS 수신 if(getLogger().isLoggable(Level.INFO)) { getLogger().info("[UCSChannelEventHandler] onRead() : {0}", data); } /** * Client로부터 read했을 때의 * user logic 작성 */ if(data.equalsIgnoreCase("efgh")) { // * 클라이언트 -> 서비스 -> FEP 시나리오 (SVC -> UCS -> FEP -> UCS -> SVC) ServiceManager.ucsCall(data); } else if (data.equalsIgnoreCase("ijkl")) { // * FEP -> 서비스 시나리오 (FEP -> UCS -> SVC -> UCS -> FEP) StringDataObject input = new StringDataObject(); input.setValue("sync/receive"); input.setDesc("mnop"); RequestContext requestContext = ServiceManager.getServiceRequestContext("test.main.UCSCommonInterfaceCall"); Header header = new Header(); header.set("TEST", "test"); requestContext.getRequest().setHeader(header); ServiceManager.ucsCall(requestContext, "test.main.UCSCommonInterfaceCall", input); } else if (data.equalsIgnoreCase("uvwx")) { // * 랑데뷰 시나리오 // * SVC1 -> UCS -> FEP1 // * FEP2 -> UCS -> SVC2 -> SVC1 랑데뷰 -> UCS -> FEP StringDataObject input = new StringDataObject(); input.setValue("matchingKey"); ServiceManager.ucsCall("test.main.UCSRendezvousSub", input); } else if (data.equalsIgnoreCase("sticky")) { // * 전문 역전 방지 (Sticky Service) 검증 시나리오 StringDataObject input = new StringDataObject(); input.setValue(data); ServiceManager.ucsCall("test.main.UCSCommonInterfaceCall", input); } readBuffer.clear(); } catch(Exception e) { close(); if(getLogger().isLoggable(Level.SEVERE)) { getLogger().severe("[UCSChannelEventHandler] onRead() fail..", e); } } } @Override public void onWrite(AbstractSelectableChannel channel, Object data, UserContext userContext, long requestId) throws IOException { String logMessage = null; if(data instanceof String) { // 실제로 보낼 데이터 조립 writeBuffer.put(((String) data).getBytes()); logMessage = data.toString(); } else if (data instanceof DataObject) { DataObject dobj = (DataObject)data; String value = dobj.get("value"); writeBuffer.put(value.getBytes()); logMessage = value.toString(); } if(getLogger().isLoggable(Level.INFO)) { getLogger().info("[UCSChannelEventHandler] onWrite() : {0}", logMessage); } workerThread.interestOpsChannel(channel, SelectionKey.OP_WRITE); } @Override public void onWritable(AbstractSelectableChannel channel) { SocketChannel socketChannel = (SocketChannel)channel; try { writeBuffer.flip(); if(socketChannel.write(writeBuffer) < 0) { close(); } writeBuffer.clear(); workerThread.interestOpsChannel(socketChannel, interestOps); workerThread.registerTimeout(waitingChannel.source(), 10*1000L); } catch(Exception e) { close(); } } @Override public void onTimeout(AbstractSelectableChannel channel) throws Exception { String value = timeoutChannelMap.get(channel); if(getLogger().isLoggable(Level.INFO)) { getLogger().info("[UCSChannelEventHandler] onTimeout() : {0}", value); } switch (value) { case "POLLING": if(pollingChannel != null) { // * FEP로 POLL 전문을 write 하면 됨 workerThread.registerTimeout(pollingChannel.source(), 10*1000L); } break; case "WAITING": if(waitingChannel != null) { // * write했을 때 timeout이 발생한 경우에 대한 처리 로직 } break; default: break; } } @Override public void onClose(AbstractSelectableChannel channel) throws Exception { if(getLogger().isLoggable(Level.INFO)) { getLogger().info("[UCSChannelEventHandler] onClose()"); } if(channel != null && channel.isRegistered()) { workerThread.unregisterChannel(channel); channel.close(); setChannel(null); workerThread.unregisterChannel(pollingChannel.source()); try { pollingChannel.source().close(); pollingChannel.sink().close(); } catch(Throwable ignore) {} pollingChannel = null; workerThread.unregisterChannel(waitingChannel.source()); try { waitingChannel.source().close(); waitingChannel.sink().close(); } catch(Throwable ignore) {} waitingChannel = null; if(getLogger().isLoggable(Level.INFO)) { getLogger().info("[UCSChannelEventHandler] Channel is closed."); } } } @Override public void onError(AbstractSelectableChannel channel, Throwable error) { if(getLogger().isLoggable(Level.SEVERE)) { getLogger().severe("[UCSChannelEventHandler] onError()", error); } } }
ChannelEventHandler는 클라이언트 채널의 OP_READ, OP_WRITE 같은 채널 이벤트를 감지해서 송/수신을 처리하기 위해 onRead, onWrite, onWritable 메소드를 구현해야 한다. 그 전에 init 메소드를 통해 ChannelEventHandler를 초기화하는 작업을 해야하는데, 보통 UCS Thread의 selector에 클라이언트 채널을 등록하는 작업을 한다.
-
onRead 메소드는 FEP로부터 데이터를 수신하는 영역이므로 수신한 데이터를 파싱해서 분기하는 로직을 작성하면 된다. 샘플 코드와 도식화한 시나리오를 보고 참고한다.
-
전문 역전 방지 검증 시나리오는 FEP → 서비스 시나리오와 거의 동일한 과정이라 별도로 도식화하지 않았다. UCSCommonInterfaceCall 서비스에서 어떻게 전문 역전을 방지하는지 "sticky" 부분을 참고하면 된다.
-
onWrite 메소드는 FEP에게 데이터를 어떻게 전송할 것인지를 구현하는 영역이다. 샘플 코드에서는 전송할 데이터의 byte[] 를 ByteBuffer에 put하고 채널의 interestOps를 OP_WRITE로 변경해서 selector가 onWritable 메소드가 실행되도록 최적화했다. 따라서 실제로 클라이언트 소켓에 데이터를 write하는 로직은 onWritable 메소드에 구현되어 있다. 샘플 코드에서는 실제로 write를 수행한 다음에 다시 interestOps 를 초기 값으로 업데이트 해서 selector가 계속 onWritable을 호출하는 것을 방지했으므로 참고한다. 만약에 타임 아웃에 대한 처리가 필요하다면 onTimeout 메소드를 참고한다. 샘플 코드에서는 특정시간을 주기로 타임아웃을 발생시켜서 FEP에게 주기적으로 health check 메시지를 보내거나 FEP에게 데이터를 전송해놓고 응답이 오지 않으면 타임아웃 처리를 할 수 있도록 되어 있다.
-
onClose는 채널의 연결이 끊기면 호출되는 메소드이므로 selector로부터 채널을 제거하고 그 밖의 리소스들을 정리해주는 로직을 작성해야 한다.
3.2.2. 스레드(Thread)
위에서 개발한 채널 핸들러들을 사용하려면 UCSWorkerThread 의 selector에 등록해야 하는데, 이를 위해서 사용자는 UCSWorkerThread를 확장한 사용자 정의 스레드를 개발해야 한다. 특수한 경우가 아니라면 run 메소드는 오버라이드하면 안된다.
public class UCSMainThread extends UCSWorkerThread { public UCSMainThread(ServiceWorkerThreadPool threadPool, ServiceGroupManager serviceManager, ThreadGroup threadGroup, int threadId, int priority) throws Throwable { super(threadPool, serviceManager, threadGroup, threadId, priority); UCSChannelAcceptHandler acceptHandler = new UCSChannelAcceptHandler(this); setChannelAcceptHandler(acceptHandler); acceptHandler.init(); int serverPort = serviceManager.getUCSInfoManager().getPort(threadPool.getName()); ChannelEventLogger.getLogger().info("===== UCSMainThread -> port : {0} =====", serverPort); acceptHandler.accept(serverPort, null); ContextDataObject context = serviceManager.getUCSInfoManager().getDetail(threadPool.getName()); ChannelEventLogger.getLogger().info("===== UCSMainThread -> detail : {0} =====", context); } }
샘플 코드처럼 스레드의 생성자에서는 개발한 ChannelAcceptHandler 객체를 생성해서 초기화한 다음 accept 메소드를 호출해야 한다. 이 때 accep 메소드에서는 서버 소켓이 바인드할 포트 번호를 입력해야 한다. 만약에 포트 번호 값을 고정으로 사용한다면 상수값을 직접 넣어줘도 되지만, UCS Thread 관리 화면을 통해서 설정한 특정 포트 번호를 사용하고 싶다면 샘플 코드처럼 UCSInfoManager의 getPort(String threadPoolName) API를 사용하면 된다.
UCSInfoManager 객체는 서비스 그룹별로 관리되기 때문에 serviceGroupManager를 통해서 접근하면 된다. 포트 번호 이외에도 UCS Thread 관리 화면에서 설정한 Detail 정보를 사용하려면 UCSInfoManager의 getDetail(String threadPoolName) API를 사용한다. Detail 정보는 json 형태로 관리되기 때문에 데이터를 key, value로 get할 수 있는 ContextDataObject 타입으로 반환된다. 이때 반환되는 ContextDataObject 객체는 readOnly이기 때문에 set API를 호출하면 예외가 발생하므로 주의한다.
3.2.3. 서비스(Service)
클라이언트 → 서비스 → FEP 시나리오는 UCSCommonInterfaceCall 서비스의 sync/send 부분을 참고하고 FEP → 서비스 시나리오는 sync/receive 부분을 참고한다. 그 밖에 전문 역전 방지 검증 시나리오는 sticky 부분을 참고하면 된다.
서비스 연동 API 중에서 Sticky Service 설정을 통해 전문 역전 현상을 방지했다. Sticky Service에 대한 자세한 설명은 Sticky Service 설정을 참고한다.
//샘플 ServiceObject public class UCSCommonInterfaceCall implements ServiceObject<StringDataObject, StringDataObject>{ @Override public StringDataObject service(StringDataObject input) throws Throwable { String value = input.getValue(); StringDataObject output = null; switch(value) { case "sync/send" : //* rcall과 비슷하게 동작한다고 생각하면 됨 ServiceManager.ucsCall("abcd", "response", "response"); break; case "sync/receive" : Header header = (Header) ServiceManager.getCurrentRequestContext().getRequest().getHeader(); output = new StringDataObject(); output.setValue("header > " + header.get("TEST") + " > " + input.getDesc()); break; case "sticky" : int count = 3; WaitObject[] waitObjects = new WaitObject[count]; for(int i=0;i < count;i++) { StringDataObject stickyInput = new StringDataObject(); stickyInput.setValue(String.valueOf(i)); stickyInput.setDesc("3"); waitObjects[i] = ServiceManager.acall("test.main.StickyServiceCall", stickyInput, "ucs"); } WaitObject and = WaitObject.And(waitObjects); try { WaitObject[] result = (WaitObject[])and.get(); String outputMessage = "callSequence;threadId;time"; for(int i = 0; i < result.length; i++) { StringDataObject stickyOutput = (StringDataObject)result[i].get(); outputMessage = outputMessage + " > " + stickyOutput.getValue(); } output = new StringDataObject(); output.setValue(outputMessage); } catch(Throwable e) { if(logger.isLoggable(Level.WARNING)) { logger.warning("[UCSCommonInterfaceCall] StickyServiceCall gave me a exception!", e); } throw e; } default : break; } return output; } @RCallReturn public Object response(Object result) { StringDataObject dtoOut = new StringDataObject(); dtoOut.setValue(result.toString()); return dtoOut; } } //샘플 ServiceExecutor public class UCSCommonInterfaceCallExecutor extends ServiceExecutor { public UCSCommonInterfaceCallExecutor() { serviceObject = new UCSCommonInterfaceCall(); } @SuppressWarnings("unchecked") @Override public Object execute(Object input, String methodName) throws Throwable { switch(methodName) { case "response" : return ((UCSCommonInterfaceCall)serviceObject).response(input); default : return serviceObject.service(input); } } }
전문역전방지 검증 시나리오는 StickyServiceCall 서비스 코드를 참고한다.
//샘플 ServiceObject public class StickyServiceCall implements ServiceObject<StringDataObject, StringDataObject> { @Override public StringDataObject service(StringDataObject input) throws Throwable { String time = String.valueOf(System.currentTimeMillis()); String callSequence = input.getValue(); String threadId = ((ServiceWorkerThread)Thread.currentThread()).getThreadId(); int sleep = StringUtil.isEmpty(input.getDesc()) ? 1 : Integer.parseInt(input.getDesc()); Thread.sleep(sleep * 1000); StringBuilder builder = new StringBuilder(); builder.append(callSequence).append(";").append(threadId).append(";").append(time); StringDataObject output = new StringDataObject(); output.setValue(builder.toString()); output.setDesc("callSequence;threadId;time"); if(ServiceLogger.getLogger().isLoggable(Level.INFO)) { ServiceLogger.getLogger().info("[StickyServiceCall] output : {0}", output); } return output; } }
StickyServiceCall은 특정 쓰레드에 할당되어 서비스가 순차적으로 수행되는 것을 보장하는지 검증하기 위한 테스트 서비스다. 내부에서 사용하는 파라미터들에 대한 설명은 다음과 같다.
Parameter Name | 설명 |
---|---|
callSequence |
호출된 순서를 확인한다. |
threadId |
동일한 쓰레드가 서비스를 처리하는지 확인한다. |
time |
실행 된 시점을 확인한다. |
sleep |
서비스의 종료시간을 조절한다. 서비스 간에 실행 된 시점이 얼마 차이가 나지 않으면 구분하기 어렵기 때문에 사용한다. |
랑데뷰 시나리오는 UCSRendezvousMain와 UCSRendezvousSub 서비스 코드를 참고한다.
//샘플 ServiceObject public class UCSRendezvousMain implements ServiceObject<StringDataObject, StringDataObject> { private String result; @Override public StringDataObject service(StringDataObject input) throws Throwable { result = input.getValue() + " > SM.RENDEZVOUS"; WaitObject wo = ServiceManager.rendezvous(input.getValue(), "test.main.UCSRendezvousMain" , "test.main.UCSRendezvousSub"); ServiceManager.ucsCall(wo, "qrst", "result", "result"); return null; } @RCallReturn public Object result(WaitObject waitObject) throws Throwable { StringDataObject callOutput = (StringDataObject)((WaitObject[])waitObject.get())[0].get(); result += " > " + callOutput.getValue() + " > UCS_MAIN"; StringDataObject output = new StringDataObject(); output.setValue(result); return output; } } //샘플 ServiceExecutor public class UCSRendezvousMainExecutor extends ServiceExecutor { public UCSRendezvousMainExecutor() { serviceObject = new UCSRendezvousMain(); } @Override public Object execute(Object input, String methodName) throws Throwable { switch (methodName) { case BaseServiceExecutor.DEFAULT_SERVICE_METHOD: return ((UCSRendezvousMain)serviceObject).service((StringDataObject)input); case "result": return ((UCSRendezvousMain)serviceObject).result((WaitObject)input); } throw new Exception(); } }
//샘플 ServiceObject public class UCSRendezvousSub implements ServiceObject<StringDataObject, StringDataObject> { @Override public StringDataObject service(StringDataObject input) throws Throwable { String value = input.getValue(); ServiceManager.rendezvous(input.getValue(), "test.main.UCSRendezvousMain" , "test.main.UCSRendezvousSub"); StringDataObject output = new StringDataObject(); output.setValue(value + " > UCS_SUB"); return output; } } //샘플 ServiceExecutor public class UCSRendezvousSubExecutor extends ServiceExecutor { public UCSRendezvousSubExecutor() { serviceObject = new UCSRendezvousSub(); } @SuppressWarnings("unchecked") @Override public Object execute(Object input, String methodName) throws Throwable { return serviceObject.service(input); } }
3.3. UCS 설정
사용자 정의 스레드와 UCS 서비스를 개발했다면 사용할 Application/ServiceGroup의 servicegroup.xml에 스레드 풀 설정을 해야 한다.
// test.main의 servicegroup.xml <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <ns19:service-group xmlns:ns2="http://www.tmax.co.kr/proobject/message" xmlns:ns4="http://www.tmax.co.kr/proobject/dataobject" xmlns:ns3="http://www.tmax.co.kr/proobject/application/runtime" xmlns:ns6="http://www.tmax.co.kr/proobject/dto" xmlns:ns31="http://www.tmax.co.kr/proobject/serverConfig" xmlns:ns5="http://www.tmax.co.kr/proobject/resource" xmlns:ns30="http://www.example.org/testData" xmlns:ns8="http://www.tmax.co.kr/proobject/mapping" xmlns:ns7="http://www.tmax.co.kr/proobject/dto/validator" xmlns:ns13="http://www.tmax.co.kr/proobject/queryobject" xmlns:ns9="http://www.tmax.co.kr/proobject/flow" xmlns:ns12="http://www.tmax.co.kr/proobject/dataobjectfactory" xmlns:ns11="http://www.tmax.co.kr/proobject/bizobject" xmlns:ns10="http://www.tmax.co.kr/proobject/aspectj" xmlns:ns32="http://www.tmax.co.kr/proobject/probuilder_config" xmlns:ns17="http://www.tmax.co.kr/proobject/serviceobject-automatic" xmlns:ns16="http://www.tmax.co.kr/proobject/jobobject" xmlns:ns15="http://www.tmax.co.kr/proobject/taskobject" xmlns:ns14="http://www.tmax.co.kr/proobject/serviceobject" xmlns:ns19="http://www.tmax.co.kr/proobject/servicegroup" xmlns:ns18="http://www.tmax.co.kr/proobject/sourcecode" xmlns:ns20="http://www.tmax.co.kr/proobject/testcase" xmlns:ns24="http://www.tmaxsoft.co.kr/proobject/testOperation" xmlns:ns23="http://www.tmax.co.kr/proobject/restriction_codes" xmlns:ns22="http://www.example.org/externalObjectConfig" xmlns:ns21="http://www.tmaxsoft.co.kr/proobject/testsuite" xmlns:ns28="http://www.tmax.co.kr/proobject/siteConfig" xmlns:ns27="http://www.tmaxsoft.co.kr/proobject/testresult" xmlns:ns26="http://www.tmax.co.kr/proobject/contents" xmlns:ns25="http://www.tmax.co.kr/proobject/prominer" xmlns:ns29="http://www.tmax.co.kr/proobject/property"> <ns19:group-name>main</ns19:group-name> ... <ns19:thread-pool min="1" core="1" max="1"> <ns19:name>ucsMainPool</ns19:name> <ns19:thread-class>com.tmax.proobject.test.runtime.application.test. servicegroup.main.lib.ucs.thread.UCSMainThread</ns19:thread-class> </ns19:thread-pool> <ns19:thread-pool min="1" core="1" max="1"> <ns19:name>ucsSubPool</ns19:name> <ns19:thread-class>com.tmax.proobject.test.runtime.application.test. servicegroup.main.lib.ucs.thread.UCSSubThread</ns19:thread-class> </ns19:thread-pool> ... <ns19:service-object> <ns19:name>UCSCommonInterfaceCall</ns19:name> <ns19:class-name>com.tmax.proobject.test.runtime.application.test. servicegroup.main.service.call.ucs.UCSCommonInterfaceCall</ns19:class-name> <ns19:input-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:input-dto> <ns19:output-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:output-dto> <ns19:service-type>COMPLEX</ns19:service-type> <ns19:is-list-input>false</ns19:is-list-input> <ns19:is-list-output>false</ns19:is-list-output> <ns19:service-thread-pool>ucsMainPool</ns19:service-thread-pool> </ns19:service-object> <ns19:service-object> <ns19:name>UCSRendezvousMain</ns19:name> <ns19:class-name>com.tmax.proobject.test.runtime.application.test. servicegroup.main.service.call.ucs.UCSRendezvousMain</ns19:class-name> <ns19:input-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:input-dto> <ns19:output-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:output-dto> <ns19:service-type>COMPLEX</ns19:service-type> <ns19:is-list-input>false</ns19:is-list-input> <ns19:is-list-output>false</ns19:is-list-output> <ns19:service-thread-pool>ucsMainPool</ns19:service-thread-pool> </ns19:service-object> <ns19:service-object> <ns19:name>UCSRendezvousSub</ns19:name> <ns19:class-name>com.tmax.proobject.test.runtime.application.test. servicegroup.main.service.call.ucs.UCSRendezvousSub</ns19:class-name> <ns19:input-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:input-dto> <ns19:output-dto>com.tmax.proobject.test.runtime.application.test. dto.StringDataObject</ns19:output-dto> <ns19:service-type>COMPLEX</ns19:service-type> <ns19:is-list-input>false</ns19:is-list-input> <ns19:is-list-output>false</ns19:is-list-output> <ns19:service-thread-pool>ucsSubPool</ns19:service-thread-pool> </ns19:service-object> </ns19:service-group>
현재 버전에서는 스레드 풀의 min/core/max 어트리뷰트를 모두 1로 설정해야 한다. 만약에 다른 값을 주면 의도치 않은 문제가 생길 수 있으므로 주의한다. thread-pool Element 의 하위 Element는 아래의 설명을 참고한다.
Element Name | 설명 |
---|---|
name |
사용자 정의 스레드를 담을 스레드 풀의 이름이다. |
thread-class |
사용자 정의 스레드의 class name이다. |
3.4. 예제
개발 시나리오에서 도식화한 시나리오를 테스트하려면 아래의 샘플 코드를 참고한다.
public class FEPMock implements Runnable { private String poServerIp = "192.168.15.28"; private int client1_port = 9997; private SocketChannel client1; private int client2_port = 9998; private SocketChannel client2; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; public FEPMock() { readBuffer = ByteBuffer.allocate(1024); writeBuffer = ByteBuffer.allocate(1024); } public static void main(String[] args) throws InterruptedException, IOException { FEPMock mock = new FEPMock(); Thread thread = new Thread(mock); thread.start(); System.out.println("start FEP..."); thread.join(); System.out.println("end FEP..."); } public String read(SocketChannel channel) throws IOException { channel.read(readBuffer); readBuffer.flip(); String data = new String(readBuffer.array(), readBuffer.position(), readBuffer.limit()); readBuffer.clear(); return data; } public void write(SocketChannel channel, String data) throws IOException { writeBuffer.put(data.getBytes()); writeBuffer.flip(); channel.write(writeBuffer); writeBuffer.clear(); } @Override public void run() { try { client1 = SocketChannel.open(); InetSocketAddress address1 = new InetSocketAddress(poServerIp, client1_port); client1.connect(address1); System.out.println("[Client1]connect success and wait 3000 ms"); Thread.sleep(3000); /* ========================== */ System.out.println("===> SVC(UCSCommonInterfaceCall) 호출 > UCS > FEP(Client1) > UCS > SVC 응답"); { System.out.println("서비스 호출을 대기중... -> serviceName : test.main.UCSCommonInterfaceCall, input : {\"dto\" : {\"value\" : \"sync/send\"}}"); String readData = read(client1); System.out.println("[Client1] PO로 부터 수신 : " + readData); // abcd String sendData = "efgh"; write(client1, sendData); System.out.println("[Client1] PO에게 송신 : " + sendData); // efgh System.out.println("UCSCommonInterfaceCall의 기대하는 응답... -> output : {\"dto\" : {\"value\" : \"efgh\"}}"); } System.out.println("===> FEP(Client1) > UCS > SVC(UCSCommonInterfaceCall) 수행 > UCS > FEP(Client1)"); { String sendData = "ijkl"; write(client1, sendData); System.out.println("[Client1] PO에게 송신 : " + sendData); // ijkl String readData = read(client1); System.out.println("[Client1] PO에게 수신: " + readData); // mnop } System.out.println("===> SVC(UCSRendezvousMain) > UCS > FEP(Client1)"); { System.out.println("서비스 호출을 대기중... -> serviceName : test.main.UCSRendezvousMain, input : {\"dto\" : {\"value\" : \"matchingKey\"}}"); String readData = read(client1); System.out.println("[Client1] PO로 부터 수신 : " + readData); // qrst } System.out.println("===> FEP(Client2) > UCS > SVC(UCSRendezvousSub) > UCSRendezvousMain 랑데뷰 완료"); client2 = SocketChannel.open(); InetSocketAddress address2 = new InetSocketAddress(poServerIp, client2_port); client2.connect(address2); System.out.println("[Client2]connect success and wait 3000 ms"); Thread.sleep(3000); { String sendData = "uvwx"; write(client2, sendData); System.out.println("[Client2] PO에게 송신 : " + sendData); // uvwx String readData = read(client2); System.out.println("[Client2] PO에게 수신: " + readData); // matchkingKey > UCS_SUB System.out.println("UCSRendezvousMain의 기대하는 응답... -> output : {\"dto\" : {\"value\" : \"matchingKey > SM.RENDEZVOUS > matchingKey > UCS_SUB > UCS_MAIN\"}}"); } Thread.sleep(3000); System.out.println("===> FEP(Client1) > UCS > SVC(StickyServiceCall) 호출한 순서대로 서비스 수행 > UCS > FEP(Client1)"); { String sendData = "sticky"; write(client1, sendData); System.out.println("[Client1] PO에게 송신 : " + sendData); // sticky String readData = read(client1); System.out.println("[Client1] PO에게 수신: " + readData); } /* ========================== */ Thread.sleep(3000); client1.close(); client2.close(); System.out.println("Client1, Client2 Channel close"); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4. UCS 관리
본 절에서는 운영 중에 있는 스레드를 제어하거나 UCS 정보들을 관리하기 위한 시스템 서비스들의 사용법을 설명한다. 이 서비스들은 시스템 앱인 ProManager Ops에서 호출하는 것이 일반적이므로 사용자가 직접 호출하는 것은 주의해야 한다.
4.1. 스레드 제어
스레드 제어는 UCS 스레드를 중지시키거나 기동시키는 기능이다. 아래의 예시는 pomain이라는 서버의 test 애플리케이션의 main 서비스 그룹에 배포된 ucsMainPool에 있는 UCS 스레드를 종료시키는 예시이다. 다시 재기동 하고 싶다면 command를 running으로 변경해서 호출하면 된다.
만약에 UCS 서비스 실행 중에 스레드 중지 요청을 하게 되면 어떻게 될까? 해당 스레드는 서비스가 끝난 다음에 중지하게 된다. 이 때 스레드의 상태는 RUNNING에서 TERMINATING 상태가 되며, 서비스가 종료된 다음에서야 비로소 TERMINATED 상태가 된다.
HTTP Method : POST URL : http://IP:PORT/proobject/ucs/UCSThreadControl?action=Service INPUT : { "dto" : { "serverName" : "pomain", "application" : "test", "serviceGroup" : "main", "poolName" : "ucsMainPool", "command" : "terminate" } } OUTPUT : { "dto" : { "count" : 1, "output" : "The state of the thread has been updated. - TERMINATED" } }
-
서비스의 입력 DTO의 파라미터 (UCSThreadControlInfo)
Input Parameter 설명 serverName
서버 이름이다. (proobject.xml의 server-name 엘리먼트)
application
애플리케이션 이름이다.
serviceGroup
서비스 그룹 이름이다.
poolName
스레드 풀 이름이다.
command
요청할 명령어이다. (대소문자 구분 안함)
-
RUNNING
-
TERMINATE
isTerminating (boolean)
스레드 종료 상태(TERMINATING/TERMINATED)를 나타내는 변수이다. 상태의 종류는 다음과 같다.
-
RUNNING
-
TERMINATING
-
TERMINATED
-
-
서비스의 출력 DTO의 파라미터 (UCSServiceResult)
Output Parameter 설명 count (int)
테이블에 업데이트된 row 수이다.
output
서비스 결과에 따른 메시지이다.
타입이 명시되어 있지 않은 파라미터들은 모두 문자열(String) 타입을 의미한다. |
4.2. UCS 정보 관리
UCS 정보란 특정 스레드가 사용하는 서버 포트 번호, 서버 포트 설명, 사용자가 정의한 세부 항목 그리고 스레드의 상태 같은 정보들을 통틀어서 부르는 말이다. 이런 정보들을 관리할 수 있도록 Create/Retrieve/Update/Delete 서비스를 제공한다. CRUD로 추가되거나 변경된 정보들은 엔진이 바로 사용할 수 있도록 메모리에서 관리하는데, UCS 스레드에서 관리 정보들을 사용하려면 UCSInfoManager를 통해 접근할 수 있다.
4.2.1. UCSInfoCreate
UCSInfoCreate는 UCS 정보를 생성하는 시스템 서비스이다.
다음은 호출 예제이다.
HTTP Method : POST URL : http://IP:PORT/proobject/ucs/UCSInfo INPUT : { "dto" : { "serverName" : "pomain", "poolId" : "abcd", "port" : 9997, "portDesc" : "sample server port", "detail" : "{\"key\" : \"value\"}", "status" : "RUNNING" } } OUTPUT : { "dto" : { "count" : 1, "output" : "Insert Success" } }
-
서비스의 입력 DTO의 파라미터 (UCSManagementInfo)
Input Parameter 설명 serverName
서버 이름이다. (proobject.xml의 server-name 엘리먼트)
poolId
스레드 풀 식별자이다.
port (int)
서버 포트 번호이다.
portDesc
서버 포트 번호에 대한 설명ㅇ;디/
detail
세부 항목이다. 사용자가 원하는 데이터를 JSON String 형태로 입력한다.
status
스레드의 상태이다. (RUNNING, TERMINATED)
-
서비스의 출력 DTO의 파라미터 (UCSServiceResult)
Output Parameter 설명 count (int)
테이블에 추가된 row 수이다.
output
서비스 결과에 따른 메시지이다.
타입이 명시되어 있지 않은 파라미터들은 모두 문자열(String) 타입을 의미한다. |
4.2.2. UCSInfoGet
UCSInfoGet는 UCS 정보를 조회하는 시스템 서비스이다.
RESTful 표준에 따라 조회 서비스는 HTTP GET 메소드를 사용하기 때문에 서비스의 입력 DTO의 파라미터는 아래와 같이 URL에 설정한다. 필수적으로 설정해야 하는 파라미터는 path로 구분해서 입력하고 선택적으로 필터링하기 위한 파라미터는 쿼리 스트링으로 설정하면 된다.
HTTP Method : GET URL : http://IP:PORT/proobject/ucs/serverName/pomain/application/test/serviceGroup/main/UCSInfo?poolId=abcd&port=9997&status=RUNNING OUTPUT : { "dto" : { "info" : [{"poolId" : "abcd", "serverName" : "pomain", "application" : "test", "serviceGroup" : "main", "poolName" : "ucsMainPool", "threadClassName" : "...", "threadMinCount" : 1, "threadCoreCount" : 1, "threadMaxCount" : 1, "status" : "RUNNING", "port" : 9997, "portDesc" : "sample server port", "detail" : "{ \"key\" : \"value\" }" }] } }
-
서비스의 입력 DTO의 파라미터 (UCSManagementInfo)
Input Parameter 설명 serverName
서버 이름이다. (proobject.xml의 server-name 엘리먼트)
application
애플리케이션 이름이다.
serviceGroup
서비스 그룹 이름이다.
poolId
스레드 풀 식별자이다. (optional)
port (int)
서버 포트 번호이다. (optional)
status
스레드의 현재 상태이다. (optional)
-
서비스의 출력 DTO의 파라미터 (UCSManagementInfos)
Output Parameter 설명 info (ArrayList)
모든 조건에 일치하는 UCS 정보 리스트이다.
타입이 명시되어 있지 않은 파라미터들은 모두 문자열(String) 타입을 의미한다. |
4.2.3. UCSInfoUpdate
UCSInfoUpdate는 UCS 정보를 수정하는 시스템 서비스이다.
업데이트할 특정 스레드 풀의 UCS 관리 정보를 수정할 때 사용한다. 아래는 pomain 서버의 test.main 에 배포된 ucsMainPool 스레드 풀의 port, portDesc, detail 정보를 업데이트 하는 예제이다.
HTTP Method : PUT URL : http://IP:PORT/proobject/ucs/UCSInfo INPUT : { "dto" : { "serverName" : "pomain", "application" : "test", "serviceGroup" : "main", "poolName" : "ucsMainPool", "port" : 9998, "portDesc" : "update sample server port", "detail" : "{\"updateKey\" : \"updateValue\"}" } } OUTPUT : { "dto" : { "count" : 1, "output" : "Update Success" } }
-
서비스의 입력 DTO의 파라미터 (UCSManagementInfo)
Input Parameter 설명 serverName
서버 이름이다. (proobject.xml의 server-name 엘리먼트)
application
애플리케이션 이름이다.
serviceGroup
서비스 그룹 이름이다.
poolName
스레드 풀 이름이다.
port (int)
서버 포트 번호이다.
portDesc
서버 포트 번호 설명이다.
detail
세부 항목이다. 사용자가 원하는 데이터를 JSON String 형태로 입력한다.
-
서비스의 출력 DTO의 파라미터 (UCSServiceResult)
Output Parameter 설명 count (int)
테이블에 업데이트된 row 수이다.
output
서비스 결과에 따른 메시지이다.
타입이 명시되어 있지 않은 파라미터들은 모두 문자열(String) 타입을 의미한다. |
4.2.4. UCSInfoDelete
UCSInfoDelete는 UCS 정보를 삭제하는 시스템 서비스다.
다음은 호출 예제이다.
HTTP Method : Delete URL : http://IP:PORT/proobject/ucs/UCSInfo INPUT : { "dto" : { "serverName" : "pomain", "application" : "test", "serviceGroup" : "main", "poolName" : "ucsMainPool" } } OUTPUT : { "dto" : { "count" : 1, "output" : "Delete Success" } }
-
서비스의 입력 DTO의 파라미터 (UCSManagementInfo)
Input Parameter 설명 serverName
서버 이름이다. (proobject.xml의 server-name 엘리먼트)
application
애플리케이션 이름이다.
serviceGroup
서비스 그룹 이름이다.
poolName
스레드 풀 이름이다.
-
서비스의 출력 DTO의 파라미터 (UCSServiceResult)
Output Parameter 설명 count (int)
테이블에서 삭제된 row 수이다.
output
서비스 결과에 따른 메시지이다.
타입이 명시되어 있지 않은 파라미터들은 모두 문자열(String) 타입을 의미한다. |
5. UCS 관련 테이블
다음은 UCS 관리 기능에 필요한 정보를 저장하는 UCS 스레드 풀 테이블과 UCS 관리 정보 테이블의 각 컬럼에 대한 설명이다.
-
PO_UCS_THREAD_POOL
UCS 스레드 풀 테이블이다. 모든 컬럼은 NOT NULL 제약 조건을 가지며, 세 개의 컬럼(APPLICATION, SERVICEGROUP, POOL_NAME)에 유니크 제약 조건이 걸려있다.
컬럼 설명 POOL_ID
스레드 풀 식별자이다. (PK)
APPLICATION
스레드 풀을 사용하는 애플리케이션명이다.
SERVICEGROUP
스레드 풀을 사용하는 서비스그룹명이다.
POOL_NAME
서비스그룹 메타에 설정하는 스레드 풀의 이름이다.
THREAD_CLASS
서비스그룹 메타에 설정하는 스레드 풀의 클래스명이다.
THREAD_MIN
서비스그룹 메타에 설정하는 스레드 풀의 최소 스레드 수이다.
THREAD_CORE
서비스그룹 메타에 설정하는 스레드 풀의 코어 스레드 수이다.
THREAD_MAX
서비스그룹 메타에 설정하는 스레드 풀의 최대 스레드 수이다.
-
PO_UCS_INFO
UCS 관리 정보 테이블이다.
컬럼 설명 SERVER_NAME
RTE의 서버 이름이다. (PK)
POOL_ID
스레드 풀 식별자이다. (PK)
PORT
서버 포트 번호이다. (NOT NULL)
PORT_DESC
서버 포트 설명이다.
DETAIL
세부 항목이다.
STATUS
스레드의 상태이다. (NOT NULL)