Jakarta Batch 예제

본 장에서는 JEUS에서 Jakarta Batch를 활용한 간략한 애플리케이션에 예제를 설명한다.

1. 개요

본 예제에서는 JEUS에서 Servlet 요청에 대해 배치 Job을 수행하고, 수행 결과를 클라이언트에게 응답으로 주는 간단한 애플리케이션을 소개한다.

다음은 Jakarta Batch를 활용한 서블릿 예제의 War 파일 구조이다.

Example.war
    |--META-INF
    |     |--MANIFEST.MF
    |--WEB-INF
          |--[X]jeus-web-dd.xml
          |--classes
               |--META-INF
                    |--batch-jobs
                    |     |--[X]simple_file_batch.xml
                    |--tmaxsoft
                          |--jbatch
                               |--test
                                    |--[C]jBatchServlet.class
                                    |--handler
                                         |--[C]FileItemReader.class
                                         |--[C]LogWriter.class
                                         |--[C]Person.class
                                         |--[C]StringToPersonProcessor.class

* Legend
- [01]: binary or executable file
- [X] : XML document
- [J] : JAR file
- [T] : Text file
- [C] : Class file
- [V] : jaba source file
- [DD] : deployment dexcriptor

2. DataSource 설정

Jakarta Batch 애플리케이션을 JEUS에서 사용하려면 반드시 Jakarta Batch 작업을 수행할 때 필요한 데이터소스가 정의되어 있어야 한다. 현재 JEUS에서는 Apache derby DB에서만 동작한다.

Windows의 경우 JEUS_HOME/bin/startderby.cmd(Linux는 JEUS_HOME/bin/startderby)를 이용하여 derby를 실행할 수 있다. 데이터소스의 export name으로 "jdbc/batch"를 정의한다.

현재는 Derby만 정상적으로 지원되고 있다. Derby 외의 DB 지원은 정상적으로 검증이 이루어지지 않았으므로 사용을 권하지 않는다.

다음은 데이터소스가 정의된 domain.xml의 예이다.

데이터소스 설정 : <domain.xml>
<domain>
  <data-source>
  ...
    <database>
      <data-source-id>jdbc/batch</data-source-id>
      <data-source-class-name>
           org.apache.derby.jdbc.ClientConnectionPoolDataSource
      </data-source-class-name>
      <data-source-type>ConnectionPoolDataSource</data-source-type>
      <server-name>localhost</server-name>
      <port-number>1527</port-number>
      <database-name>derbyDB</database-name>
      <user>app</user>
      <password>app</password>
      <property>
        <name>ConnectionAttributes</name>
        <type>java.lang.String</type>
        <value>;create=true</value>
      </property>
    </database>
  </data-source>
</domain>

3. jeus-web-dd.xml 설정

Job을 실행시킬 스레드 풀에 대한 설정을 한다. jeus-web-dd.xml에서 batch-thread-pool에 대한 명세를 하지 않는 경우 default 값으로 설정된 스레드 풀이 생성된다. 해당 상세 설명은 컴포넌트 DD 설정을 참조한다.

4. Job 정의

Job을 명세하는 xml에 Job, Step, ItemReader, ItemWriter, ItemProcessor를 등록한다. 해당 Job을 정의한 xml 파일은 META-INF/batch-jobs/ 폴더 아래에 두어야 한다.

War에서 META-INF는 구조적으로 /WEB-INF/classes 하위에 있어야 동작한다.

다음은 /WEB-INF/classes/META-INF/batch-jobs/simple_file_batch.xml 파일의 설정 예이다.

Job 정의 : <simple_file_batch.xml>
<job id="demo" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step1">
        <chunk>
            <reader ref="tmaxsoft.bhkim.test.filebatch.FileItemReader">
                <properties>
                    <property name="filePath" value="#{jobParameters['filePath']}" />
                </properties>
            </reader>
            <processor ref="tmaxsoft.bhkim.test.filebatch.StringToPersonProcessor" />
            <writer ref="tmaxsoft.bhkim.test.filebatch.LogWriter" />
        </chunk>
    </step>
</job>

5. JBatchServlet

Job에 대해서 설정한 xml을 JobOperator에게 전달한다. Job을 실행할 때 필요한 파라미터를 정의해서 같이 operator에게 전달할 수 있다.

다음 예제는 filePath를 key로 하는 프로퍼티로 처리할 데이터를 가지고 있는 personList.txt 파일의 경로를 넘겨주는 예제이다.

JBatchServlet 예제
@WebServlet("/JBatchServlet")
public class JBatchServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private final String FILE_PATH = "META-INF/task/personList.txt";

    public JBatchServlet() {
        super();
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        Properties jobParams = new Properties();
        URL personListURL = Thread.currentThread().getContextClassLoader().getResource(FILE_PATH);
        jobParams.setProperty("filePath", personListURL.getPath());

        JobOperator operator = BatchRuntime.getJobOperator();
        // xml file name without format
        long id = operator.start("simple_file_batch", jobParams);

        waitForEnd(operator, id);

        response.getWriter().println("File Batch is successfully precessed.");
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        doGet(request, response);
    }

    public static void waitForEnd(final JobOperator jobOperator, final long id) {
        final Collection<BatchStatus> endStatuses = Arrays.asList(BatchStatus.COMPLETED, BatchStatus.FAILED);
        do {
            try {
                Thread.sleep(100);
            } catch (final InterruptedException e) {
                return;
            }
        } while (!endStatuses.contains(jobOperator.getJobExecution(id).getBatchStatus()));
    }
}

FILE_PATH에서 정의하고 있는 파일의 내용은 다음과 같다.

Name1,27,Computer Science
Name2,22,English Education
Name3,23,Electronic Engineering

6. ItemReader

FileItemReader 예제를 살펴보면 ItemReader를 구현하면서 4개의 함수 open, close, readItem, checkpointInfo를 구현해야 한다. ItemReader에서는 item 단위로 파일에 접근해서 처리해야 하고, 본 예제에서는 item의 단위를 파일의 한 라인으로 정의하고 있다.

open 함수의 매개변수로 Serializable을 넘겨주는데, 이것은 Checkpoint를 확인하는 기능으로 이전에 비정상적으로 종료되어 다시 시작하는 경우에 마지막으로 기록된 Checkpoint에서 실행되도록 도와주는 기능이다.

Jakarta Batch 애플리케이션을 작성할 때 Servlet, EJB 외에 순수 Batch 로직에서 injection이 동작하지 않으므로 주의해야 한다.

다음 예제에서는 readItem을 할 때마다 recordNumber를 기록해뒀다가, 나중에 작업이 비정상 종료되어 다시 실행되었을 때 파일을 해당 recordNumber부터 시작할 수 있도록 건너뛰는 작업을 open에서 수행한다.

FileItemReader 예제
public class FileItemReader implements ItemReader {

    @Inject
    @BatchProperty
    private String filePath; // JobProperties in xml setting is injected to the field

    private BufferedReader reader = null;

    private int recordNumber;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        if (filePath == null)
            throw new RuntimeException("Can't find any input");

        final File file = new File(filePath);
        if (!file.exists())
            throw new RuntimeException("A file '" + filePath + "' doesn't exist");

        reader = new BufferedReader(new FileReader(file));

        if (checkpoint != null) {
            assert (checkpoint instanceof Integer);
            recordNumber = (Integer) checkpoint;

            // Pass over latest checkpoint record
            for (int i = 1; i < recordNumber; i++)
                reader.readLine();
        }
    }

    @Override
    public void close() throws Exception {
        if (reader != null)
            reader.close();
    }

    @Override
    public Object readItem() throws Exception {
        Object item = reader.readLine();

        // checkpoint line update
        recordNumber++;
        return item;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return recordNumber;
    }

}

7. ItemWriter

LogWriter 예제는 ItemWriter를 구현하면서 4개의 함수 open, close, wirteItems, checkpointInfo를 구현한다. writeItem에서 처리해야할 item을 벌크(리스트)로 넘겨 받게 된다.

Jakarta Batch 애플리케이션을 작성할 때 Servlet, EJB 외에 순수 Batch 로직에서 injection이 동작하지 않으므로 주의해야 한다.

다음 예제에서는 간단하게 넘겨 받은 item 리스트에 대해서 내부에서 정의한 logger로 출력하는 작업만 수행한다.

ItemWriter 예제
public class LogWriter implements ItemWriter {
    private static final Logger logger = Logger.getLogger(LogWriter.class.getSimpleName());
    int writeRecordNumber; //

    @Override
    public void open(Serializable checkpoint) throws Exception {
    }

    @Override
    public void close() throws Exception {
    }


    @Override
    public void writeItems(List<Object> items) throws Exception {
        // simply print passed item to logger
        for (Object o: items) {
            logger.info("writeItems > " + o.toString());
        }
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

8. ItemProcessor

ItemProcessor는 ItemReader에서 읽어들인 데이터를 가공하는 작업을 수행한다.

Jakarta Batch 애플리케이션을 작성할 때 Servlet, EJB 외에 순수 Batch 로직에서 injection이 동작하지 않으므로 주의해야 한다.

다음 예제에서는 읽어들인 파일의 라인 문자열을 item 파라미터로 전달받았고, 문자열을 가공하여 Person 객체로 저장한다. 후에 특정 주기가 되면 가공된 정보들이 ItemWriter로 전달된다.

ItemProcessor 예제
public class StringToPersonProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        final String[] line = String.class.cast(item).split(",");

        if (line == null || line.length != 3)
            return null;

        // name, age, department
        return new Person(line[0], Integer.parseInt(line[1]), line[2]);
    }
}

Processor 내에서 사용할 Person 객체는 다음과 같이 간단하게 정의된다.

Person 객체
public class Person {
    private String name;
    private int age;
    private String department;

    public Person(String name, int age, String department) {
        this.name = name;
        this.age = age;
        this.department = department;
    }

    @Override
    public String toString() {
        return name + "," + age + "," + department;
    }
}