본문 바로가기

프로젝트/집안일 관리 시스템

[성능 개선] Batch 작업을 최적화해보자

Spring Batch

1. Spring Batch를 사용한 이유?

매주마다 집안일을 가장 많이 한 멤버를 선정하는 요구사항이 있다.

매주 월요일 새벽시간이 되면 일괄적으로

방을 돌면서 방마다 멤버들의 1주 동안의 집안일 횟수를 조회해서

가장 많이 일을 많이 한 멤버를 선정하도록 했다.

 

많은 데이터를 일괄적으로 처리하기 위해 spring batch를 사용했고 

특정 주기마다 자동으로 실행시키기위해 quartz라는 scheduler를 사용했다.

 

2. Spring Batch란?

대용량 데이터 처리를 위한 spring framework로

주기적이거나 반복적인 작업을 효과적으로 처리하기 위한 기능을 제공한다.

 

다음과 같은 기능을 제공한다.

- transaction 

- retry

- skip

 

3. Spring Batch 용어

Job Launcher job을 실행한다.
Job job은 실행가능한 작업단위이다.
Step job을 구성하는 독립적인 작업 단위로
job은 여러 개의 step으로 구성된다.
Job Repository job과 step의 실행정보를 저장하고 관리한다.
Job Instance job과 job parameter의 조합으로 생성되는 논리적 실행 단위이다.
Job Parameter job instance를 구별하기 위해 사용되는 parameter이다.
Job Execution job을 실행하는 단일 시도로 하나의 job instance는 여러번 실행될 수 있으며
실행마다 job execution이 생성된다.
Step Execution step의 실행 기록이다.
Item Reader 데이터를 검색한다.
Item Processor 데이터를 가공한다.
Item Writer 데이터를 출력한다.

 

4. 메타 테이블

 

BATCH_JOB_INSTANCE 테이블은 Job Instance와 관련된 정보를 저장한다.

JOB_INSTANCE_ID job instance를 식별할 수 있는 값
VERSION optimistic lock을 위해 사용되는 값
JOB_NAME job의 이름
JOB_KEY job의 이름과 job parameter를 hash한 값이다.
이 값을 기준으로 Job Instance의 고유성을 판단한다.

 

BATCH_JOB_EXECUTION 테이블은 Job Execution과 관련된 정보를 저장한다.

JOB_EXECUTION_ID job execution을 식별할 수 있는 값
VERSION optimistic lock을 위해 사용되는 값
JOB_INSTANCE_ID job execution이 속한 job instance를 가리키는 foreign key
CREATE_TIME job execution이 생성된 시각
START_TIME job execution이 실행된 시각
END_TIME job execution이 종료된 시각
STATUS job execution의 상태로
COMPLETED, STARTED, STARTING, STOPPED, FAILED와 같은 값을 가진다.
EXIT_CODE 종료 코드
EXIT_MESSAGE 종료 메시지로 종료된 이유나 에러 메시지를 나타낸다.
LAST_UPDATED job execution이 수정된 시각

 

BATCH_JOB_EXECUTION_PARAMS 테이블은 Job Parameter와 관련된 정보를 저장한다.

JOB_EXECUTION_ID job execution을 가리키는 foreign key
TYPE_CD job parameter의 type 코드로
String, Date, Long, Double과 같은 값을 가진다.
KEY_NAME job parameter의 이름
STRING_VAL TYPE_CD가 String일때 값
DATETIME TYPE_CD가 Date일때 값
LONG_VAL TYPE_CD가 Long일때 값
DOUBLE_VAL TYPE_CD가 Double일때 값
IDENTIFYING job instance의 고유성을 판단할 때 사용할지 안할지 정하는 값으로
Y, N과 같은 값을 가진다.

 

BATCH_JOB_EXECUTION_CONTEXT 테이블은 Job Execution의 상태를 저장하며 재시작을 하는데 사용된다.

JOB_EXECUTION_ID job execution을 가리키는 foreign key
SHORT_CONTEXT 실행 context 정보가 정보가 직렬화되어 저장된다.
SERIALIZED_CONTEXT SHOT_CONTEXT에 저장되기에 큰 정보인 경우 저장된다.

 

BATCH_STEP_EXECUTION 테이블은 Step Execution과 관련된 정보를 저장한다.

STEP_EXECUTION_ID step execution을 식별할 수 있는 값
VERSION optimistic lock을 위해 사용되는 값
STEP_NAME step의 이름
JOB_EXECUTION_ID job execution을 가리키는 foreign key
START_TIME job execution이 실행된 시각
END_TIME job execution이 종료된 시각
STATUS step execution의 상태로
COMPLETED, STARTED, STARTING, STOPPED, FAILED와 같은 값을 가진다.
COMMIT_COUNT transaction이 commit 횟수
READ_COUNT item reader를 통해 읽은 횟수
FILTER_COUNT item processor 처리 도중 필터링되어 제외된 횟수
WRITE_COUNT item writer를 통해 기록된 횟수
READ_SKIP_COUNT item reader 읽기 중 skip된 횟수
WRITE_SKIP_COUNT item writer 쓰기 중 skip된 횟수
PROCESS_SKIP_COUNT item processor 처리 중 skip 된 횟수
EXIT_CODE 종료 코드
ROLLBACK_COUNT transaction이 rollback된 횟수
EXIT_MESSAGE 종료 메시지로 종료된 이유나 에러 메시지를 나타낸다.
LAST_UPDATED step execution이 수정된 시각

 

BATCH_STEP_EXECUTION_CONTEXT 테이블은 Step Execution의 상태를 저장하며 재시작을 하는데 사용된다.

JOB_EXECUTION_ID job execution을 가리키는 foreign key
SHORT_CONTEXT 실행 context 정보가 정보가 직렬화되어 저장된다.
SERIALIZED_CONTEXT SHOT_CONTEXT에 저장되기에 큰 정보인 경우 저장된다.

 

구현

1. Chunk 처리

데이터를 처리하기 위한 방식으로 tasklet 방식과 chunk 방식이 있다.

tasklet 방식은 하나의 작업을 단일 transaction으로 처리하는 방식으로 복잡하지 않은 작업에 사용된다.

chunk 방식은 작업을 chunk라는 묶음으로 나누어서 처리하는 방식으로 각 chunk마다 transaction이 적용된다.

chunk 방식은 대용량 데이터 작업에 적합하다.

이 프로젝트에서는 chunk 방식을 사용했다.

 

chunk 방식을 사용하면 step은 다음과 같이 구성된다.

- ItemReader를 통해 데이터를 읽어온다.

  chunk size가 될 때까지 read 메서드를 호출해서 데이터를 읽어온다.

- ItemProcessor를 통해 데이터를 변환한다.

  데이터 1건 마다 process 메서드를 호출해서 변환한다.

- ItemWriter를 통해 데이터를 기록한다.

  chunk size만큼의 데이터가 모이면 write 메서드를 호출해서 한번에 기록한다.

 

job과 step을 다음과 구성했다.

private final static int CHUNK_SIZE = 100;

@Bean(name = "decideWeeklyTopUserJob")
public Job decideWeeklyTopUserJob() {
    return job.get("decideWeeklyTopUserJob")
            .start(decideWeeklyTopUserStep())
            .build();
}

@Bean
public Step decideWeeklyTopUserStep() {
    return steps.get("decideWeeklyTopUserStep")
            .<Room, List<WeeklyTopUser>>chunk(CHUNK_SIZE)
            .reader(decideWeeklyTopUserReader())
            .processor(decideWeeklyTopUserProcessor())
            .writer(decideWeeklyTopUserListWriter())
            .build();
}

 

2. JpaPagingItemReader

JpaPagingItemReader는 ItemReader 인터페이스의 구현체 중 하나로

JPA paging 방식을 사용해서 데이터를 읽어온다.

paging을 하기 위해서 offset과 limit을 사용한다.

 

한번에 page size만큼 데이터를 조회하고 chunk size가 될때까지 반복한다.

이 프로젝트에서는 page size를 chunk size로 설정해서 한번에 읽어오도록 했다.

@Bean
public JpaPagingItemReader<Room> decideWeeklyTopUserReader() {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("deleteStatus", DeleteStatus.NOT_DELETED);
    return new JpaPagingItemReaderBuilder<Room>()
            .queryString("SELECT r FROM Room r WHERE r.deleteStatus = :deleteStatus")
            .pageSize(CHUNK_SIZE)
            .entityManagerFactory(entityManagerFactory)
            .parameterValues(parameterValues)
            .name("decideWeeklyTopUserReader")
            .build();
}

 

3. ItemProcessor

각 방마다 집안일을 가장 많이 한 멤버 리스트를 선정한다.

Room을 넘겨받으면 database 조회를 통해서 WeeklyTopUser 리스트로 변환한다.

@Bean
public ItemProcessor<Room, List<WeeklyTopUser>> decideWeeklyTopUserProcessor() {
    return room -> {
        var lastWeekDate = LocalDate.now().minusWeeks(1);
        var weekFields = WeekFields.ISO;
        var year = lastWeekDate.getYear();
        var weekOfYear = lastWeekDate.get(weekFields.weekOfYear());

        return weeklyStatisticStore.decideAndGetWeeklyTopUserInRoom(room.getId(), year, weekOfYear).stream()
                .map(weeklyTodoCount ->
                        WeeklyTopUser.builder()
                                .userId(weeklyTodoCount.getUserId())
                                .roomId(weeklyTodoCount.getRoomId())
                                .weekOfYear(weeklyTodoCount.getWeekOfYear())
                                .year(weeklyTodoCount.getYear())
                                .build()
                ).collect(Collectors.toList());
    };
}

 

4. JpaItemListWriter

JpaItemWriter는 리스트를 처리하지 못하므로

JpaItemListWriter라는 custom class를 만들고 JpaItemWriter를 상속받아서 write 메서드를 오버라이딩했다.

매개변수로 리스트가 넘어오면 addAll 메서드로 반환되는 리스트에 합치도록 했다. 

@Bean
public JpaItemListWriter<WeeklyTopUser> decideWeeklyTopUserListWriter() {
    JpaItemWriter<WeeklyTopUser> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return new JpaItemListWriter<>(writer, entityManagerFactory);
}

 

성능 개선

1. Reader 성능 개선

기존 JpaPagingItemReader의 경우 데이터를 읽어올때 offset과 limit을 사용한다.

하지만 데이터가 많아질수록 offset이 커지고 offset 만큼의 데이터를 넘어가기위해서 순차탐색을 하는 비용이 발생한다.

JpaCursorItemReader를 사용해서 이러한 비용을 절감할 수 있다.

@Bean
public JpaCursorItemReader<Room> decideWeeklyTopUserCursorReader() {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("deleteStatus", DeleteStatus.NOT_DELETED);

    return new JpaCursorItemReaderBuilder<Room>()
            .queryString("SELECT r FROM Room r WHERE r.deleteStatus = :deleteStatus ORDER BY r.id ASC")
            .entityManagerFactory(entityManagerFactory)
            .parameterValues(parameterValues)
            .name("decideWeeklyTopUserCursorReader")
            .build();
}

 

2. Writer 성능 개선

processor는 단건을 받아서 처리하기 때문에 database 통신시간이 순차적으로 누적이 된다.

writer에서 리스트를 받아서 처리할 수 있기 때문에 기존 processor 로직을 writer로 옮기고

병렬로 처리를 한다면 성능을 높일 수 있다.

 

또한 JpaItemWriter의 경우 데이터베이스에 저장할 때 batch insert가 안되는데

batch insert가 가능한 JdbcBatchItemWriter를 사용하면 성능을 높일 수 있다. 

@Bean
public ItemWriter<Room> decideWeeklyTopUserListWriter(DataSource dataSource) {
    return items -> {
        List<WeeklyTopUser> flatList = items.parallelStream()
                .map(room -> {
                        var lastWeekDate = LocalDate.now().minusWeeks(1);
                        var weekFields = WeekFields.ISO;
                        var year = lastWeekDate.getYear();
                        var weekOfYear = lastWeekDate.get(weekFields.weekOfYear());

                        return weeklyStatisticStore.decideAndGetWeeklyTopUserInRoom(room.getId(), year, weekOfYear).stream()
                            .map(weeklyTodoCount ->
                                WeeklyTopUser.builder()
                                    .userId(weeklyTodoCount.getUserId())
                                    .roomId(weeklyTodoCount.getRoomId())
                                    .weekOfYear(weeklyTodoCount.getWeekOfYear())
                                    .year(weeklyTodoCount.getYear())
                                    .build()
                            ).collect(Collectors.toList());
                }).flatMap(List::stream)
                .collect(Collectors.toList());

        JdbcBatchItemWriter<WeeklyTopUser> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO weekly_top_user (created_time, modified_time, room_id, user_id, week_of_year, year) VALUES (now(), now(), :roomId, :userId, :weekOfYear, :year)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet();

        writer.write(flatList);
    };
}