주노 님의 블로그

[기술 구현]스프링 배치를 활용한 정산 로직 최적화 본문

카테고리 없음

[기술 구현]스프링 배치를 활용한 정산 로직 최적화

juno0432 2024. 11. 21. 16:59

두괄식 요약

스케쥴러를 사용한 정산 로직을

스프링 배치를 활용하여 안정성과  속도를 개선하였습니다.

8만개 기준 38m 4s 에서

jpa item reader를 사용하였을때 1m 11초까지 개선하였습니다.

 

내용

기존 정산로직으 스케쥴러를 사용하고 있었습니다

 

@Service
@RequiredArgsConstructor
public class CalculationService {

    private static final long ONE_MONTH = 1L;
    private static final String MONTHLY_AT_3_AM_ON_1ST = "0 0 3 1 * ?";

    private final CalculationRepository calculationRepository;
    private final PaymentRepository paymentRepository;
    private final PointRepository pointRepository;

    @Scheduled(cron = MONTHLY_AT_3_AM_ON_1ST)
    public void calculate() {
        LocalDateTime now = LocalDateTime.now()
                .withDayOfMonth(1)
                .withHour(0)
                .withMinute(0)
                .withSecond(0)
                .withNano(0);
        List<Payment> payments = paymentRepository.findPaymentsForCalculation(
                APPROVED,
                now.minusMonths(ONE_MONTH),
                now
        );

        List<User> users = payments.stream()
                .map(Payment::getEvent)
                .map(Event::getUser)
                .toList();

        Map<User, Point> pointMap = pointRepository.findPointsByUserIn(users).stream()
                .collect(Collectors.toMap(Point::getUser, point -> point));

        Map<User, Long> calculationMap = new HashMap<>();
        payments.forEach(payment -> {
            Event event = payment.getEvent();
            User user = event.getUser();
            Point point = pointMap.get(user);

            point.addPoint(payment.getOriginalAmount());
            event.calculate();
            calculationMap.put(
                    user,
                    calculationMap.getOrDefault(user, 0L) + payment.getOriginalAmount()
            );
        });

        List<Calculation> calculations = calculationMap.entrySet().stream()
                .map(entry -> Calculation.create(entry.getKey(), entry.getValue()))
                .toList();

        calculationRepository.saveAll(calculations);
    }
}

 

 

위 방식대로 사용하면 아쉬운점은

실패시 재시도 처리 로직이 없다는것이다

 

package com.sparta.spotlightspacescheduler.core.calculation.service;

import static com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus.APPROVED;

import com.sparta.spotlightspacescheduler.core.calculation.domain.Calculation;
import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.calculation.repository.CalculationRepository;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.repository.PaymentRepository;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.data.domain.Sort;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final CalculationRepository calculationRepository;
    private final PaymentRepository paymentRepository;
    private final PointRepository pointRepository;

    //배치의 실행 단위
    //정산 배치
    @Bean
    public Job calculationJob(Step calculationStep) {
        JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
        return jobBuilder
                .incrementer(new RunIdIncrementer())
                .listener(new JobLoggerListener())
                .start(calculationStep)
                .build();
    }

    // job의 세부 실행단위.
    // 한달 전부터 1일까지 결제된 내역을 가져와서 계산
    @Bean
    @JobScope
    public Step calculationStep(
            RepositoryItemReader<Payment> paymentReader,
            ItemProcessor paymentProcessor,
            ItemWriter paymentWriter
    ) {
        return new StepBuilder("calculationStep", jobRepository)
                .<Payment, Payment>chunk(20000, transactionManager)
                .reader(paymentReader)
                .processor(paymentProcessor)
                .writer(paymentWriter)
                .faultTolerant()
                .retryPolicy(retryPolicy())
                .build();
    }

    // db에서 데이터를 읽어오며 porccesor로 전달.
    @Bean
    @StepScope
    public RepositoryItemReader<Payment> paymentReader() {

        LocalDateTime now = LocalDateTime.now()
                .withDayOfMonth(1)
                .withHour(0)
                .withMinute(0)
                .withSecond(0)
                .withNano(0);

        return new RepositoryItemReaderBuilder<Payment>()
                .name("paymentReader")
                .repository(paymentRepository)
                .methodName("findPaymentsForCalculation1")
                .arguments(APPROVED, now.minusMonths(1), now)
                .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
                .pageSize(10000)
                .build();
    }

    // 프로세서는 가공하는 역할을 수행함
    @Bean
    @StepScope
    public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
        return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
            @Override
            public CalculationProcessResponseDto process(Payment payment) throws Exception {
                Event event = payment.getEvent();
                User user = event.getUser();

                Point point = pointRepository.findByUserId(user.getId());

                return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
            }
        };
    }

    // 가공된 데이터를 저장하는 역할을 수행함
    @Bean
    @StepScope
    public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
        return new ItemWriter<CalculationProcessResponseDto>() {
            @Override
            public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {
                for (CalculationProcessResponseDto result : chunk.getItems()) {
                    Point point = result.getPoint();
                    Event event = result.getEvent();
                    point.addPoint(result.getOriginAmount());

                    event.calculate();

                    Calculation calculation = Calculation.create(result.getUser(), result.getOriginAmount());
                    calculationRepository.save(calculation);

                    pointRepository.save(point);
                }
            }
        };
    }

    @Bean
    public RetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> excepttionClass = new HashMap<>();
        excepttionClass.put(TransientDataAccessException.class, true);

        return new SimpleRetryPolicy(5, excepttionClass);
    }
}

 

 

기본 배치를 구현하였다

 

위 배치의 장점은 

exception이 발생하면 재시작을 할 수있다는것이다.

하지만, 성능면에서 속도가 낮게 나온다는 단점이 있었고,

 

이를 개선해보고자 하였다

 
package com.sparta.spotlightspacescheduler.core.calculation.service;

import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import jakarta.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final PointRepository pointRepository;
    private final EntityManagerFactory entityManagerFactory;
    private final JdbcTemplate jdbcTemplate;

    // 배치의 실행 단위 - 정산 배치
    @Bean
    public Job calculationJob(Step calculationStep) {
        JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
        return jobBuilder.incrementer(new RunIdIncrementer())
                .listener(new JobLoggerListener())
                .start(calculationStep)
                .build();
    }

    // Job의 세부 실행 단위 - 한 달 전부터 1일까지 결제된 내역을 가져와서 계산
    @Bean
    @JobScope
    public Step calculationStep(JpaPagingItemReader<Payment> paymentJpaPagingItemReader,
            ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor,
            ItemWriter<CalculationProcessResponseDto> paymentWriter) {
        return new StepBuilder("calculationStep", jobRepository)
                .<Payment, CalculationProcessResponseDto>chunk(30, transactionManager)
                .reader(paymentJpaPagingItemReader)
                .processor(paymentProcessor)
                .writer(paymentWriter)
                .faultTolerant()
                .retryPolicy(retryPolicy())
                .build();
    }

    // DB에서 데이터를 읽어오며 processor로 전달
    @Bean
    @StepScope
    public JpaPagingItemReader<Payment> paymentJpaPagingItemReader() {
        JpaPagingItemReader<Payment> jpaPagingItemReader = new JpaPagingItemReader<>();
        jpaPagingItemReader.setQueryString(
                "select p " +
                        "from Payment p " +
                        "join fetch p.event e " +
                        "join fetch e.user " +
                        "where p.status = :status " +
                        "and :startInclusive <= e.endAt " +
                        "and e.endAt < :endExclusive " +
                        "and e.isDeleted = false"
        );
        jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
        jpaPagingItemReader.setPageSize(30);

        LocalDateTime now = LocalDateTime.now().withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", PaymentStatus.APPROVED);
        parameterValues.put("startInclusive", now.minusMonths(1));
        parameterValues.put("endExclusive", now);
        jpaPagingItemReader.setParameterValues(parameterValues);

        return jpaPagingItemReader;
    }

    // 프로세서는 데이터를 가공하는 역할을 수행
    @Bean
    @StepScope
    public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
        Map<Long, Point> pointsCache = new HashMap<>(); // 캐시

        return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
            @Override
            public CalculationProcessResponseDto process(Payment payment) throws Exception {
                if (pointsCache.isEmpty()) {

                    // 포인트 n+1문제 개선
                    List<Point> points = pointRepository.findAll();
                    for (Point point : points) {
                        pointsCache.put(point.getUser().getId(), point);
                    }
                }

                Event event = payment.getEvent();
                User user = event.getUser();
                Point point = pointsCache.get(user.getId());
                return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
            }
        };
    }


    // 가공된 데이터를 저장하는 역할을 수행
    @Bean
    @StepScope
    public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
        return new ItemWriter<CalculationProcessResponseDto>() {

            @Override
            public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {

                String calculationSql = "INSERT INTO calculations (user_id, calculation_amount, create_at, update_at) VALUES (?, ?, ?, ?)";
                List<Object[]> calculationBatchArgs = new ArrayList<>();

                String pointUpdateSql = "UPDATE points SET amount = amount + ? WHERE user_id = ?";
                List<Object[]> pointBatchArgs = new ArrayList<>();

                //벌크 연산 로직
                for (CalculationProcessResponseDto result : chunk.getItems()) {
                    calculationBatchArgs.add(new Object[]{
                            result.getUser().getId(),
                            result.getOriginAmount(),
                            LocalDateTime.now(),
                            LocalDateTime.now()
                    });

                    Point point = result.getPoint();
                    point.addPoint(result.getOriginAmount());
                    pointBatchArgs.add(new Object[]{
                            result.getOriginAmount(),
                            point.getUser().getId()
                    });

                    Event event = result.getEvent();
                    event.calculate();
                }

                jdbcTemplate.batchUpdate(calculationSql, calculationBatchArgs);
                jdbcTemplate.batchUpdate(pointUpdateSql, pointBatchArgs);
            }
        };
    }

    // 재시도 정책 설정
    @Bean
    public RetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionClass = new HashMap<>();
        exceptionClass.put(Exception.class, true);
        return new SimpleRetryPolicy(5, exceptionClass);
    }
}



jpaPagingItemReader을 사용하며 개선을 하였다.

 

38m 4s에서 jpaPagingItemReader를 사용하였을때의 시간이 1분 31초로 줄어든 것을 볼 수 있다.

 

다만 위의 사진을 보면 point 조회시 n+1의 문제가 발생하는것을 볼 수 있고

insert시 개별의 쿼리가 나가는것을 볼 수 있다.

 

쿼리를 개선하기 위하여 findAll로 포인트 데이터를 가져오고, 

bulkInsert로 일괄 삽입을 하였다

 

고민사항

exception을 어디까지 받아가야할까?

- 정산 로직은 하나라도 실수가 나면 안된다, 따라서 EXCEPTION으로 포괄적으로 잡는 방식을 사용한다

그럼에도 실패하면 어떻게 해야할까?

- 현업에서 API쪽에서 은근 오류가 많이 생긴다고 들었다 (폐업으로 인한).

   SKIP을 사용해서 넘기되, 그 로직은 수기로 처리하는수 밖에 없다고 들었다.