카테고리 없음
[기술 구현]스프링 배치를 활용한 정산 로직 최적화
juno0432
2024. 11. 21. 16:59
두괄식 요약
스케쥴러를 사용한 정산 로직을
스프링 배치를 활용하여 안정성과 속도를 개선하였습니다.
8만개 기준 38m 4s 에서
jpa item reader를 사용하였을때 1m 11초까지 개선하였습니다.
내용
기존 정산로직으 스케쥴러를 사용하고 있었습니다
@Service
@RequiredArgsConstructor
public class CalculationService {
private static final long ONE_MONTH = 1L;
private static final String MONTHLY_AT_3_AM_ON_1ST = "0 0 3 1 * ?";
private final CalculationRepository calculationRepository;
private final PaymentRepository paymentRepository;
private final PointRepository pointRepository;
@Scheduled(cron = MONTHLY_AT_3_AM_ON_1ST)
public void calculate() {
LocalDateTime now = LocalDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0);
List<Payment> payments = paymentRepository.findPaymentsForCalculation(
APPROVED,
now.minusMonths(ONE_MONTH),
now
);
List<User> users = payments.stream()
.map(Payment::getEvent)
.map(Event::getUser)
.toList();
Map<User, Point> pointMap = pointRepository.findPointsByUserIn(users).stream()
.collect(Collectors.toMap(Point::getUser, point -> point));
Map<User, Long> calculationMap = new HashMap<>();
payments.forEach(payment -> {
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointMap.get(user);
point.addPoint(payment.getOriginalAmount());
event.calculate();
calculationMap.put(
user,
calculationMap.getOrDefault(user, 0L) + payment.getOriginalAmount()
);
});
List<Calculation> calculations = calculationMap.entrySet().stream()
.map(entry -> Calculation.create(entry.getKey(), entry.getValue()))
.toList();
calculationRepository.saveAll(calculations);
}
}
위 방식대로 사용하면 아쉬운점은
실패시 재시도 처리 로직이 없다는것이다
package com.sparta.spotlightspacescheduler.core.calculation.service;
import static com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus.APPROVED;
import com.sparta.spotlightspacescheduler.core.calculation.domain.Calculation;
import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.calculation.repository.CalculationRepository;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.repository.PaymentRepository;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.data.domain.Sort;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final CalculationRepository calculationRepository;
private final PaymentRepository paymentRepository;
private final PointRepository pointRepository;
//배치의 실행 단위
//정산 배치
@Bean
public Job calculationJob(Step calculationStep) {
JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
return jobBuilder
.incrementer(new RunIdIncrementer())
.listener(new JobLoggerListener())
.start(calculationStep)
.build();
}
// job의 세부 실행단위.
// 한달 전부터 1일까지 결제된 내역을 가져와서 계산
@Bean
@JobScope
public Step calculationStep(
RepositoryItemReader<Payment> paymentReader,
ItemProcessor paymentProcessor,
ItemWriter paymentWriter
) {
return new StepBuilder("calculationStep", jobRepository)
.<Payment, Payment>chunk(20000, transactionManager)
.reader(paymentReader)
.processor(paymentProcessor)
.writer(paymentWriter)
.faultTolerant()
.retryPolicy(retryPolicy())
.build();
}
// db에서 데이터를 읽어오며 porccesor로 전달.
@Bean
@StepScope
public RepositoryItemReader<Payment> paymentReader() {
LocalDateTime now = LocalDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0);
return new RepositoryItemReaderBuilder<Payment>()
.name("paymentReader")
.repository(paymentRepository)
.methodName("findPaymentsForCalculation1")
.arguments(APPROVED, now.minusMonths(1), now)
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.pageSize(10000)
.build();
}
// 프로세서는 가공하는 역할을 수행함
@Bean
@StepScope
public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
@Override
public CalculationProcessResponseDto process(Payment payment) throws Exception {
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointRepository.findByUserId(user.getId());
return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
}
};
}
// 가공된 데이터를 저장하는 역할을 수행함
@Bean
@StepScope
public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
return new ItemWriter<CalculationProcessResponseDto>() {
@Override
public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {
for (CalculationProcessResponseDto result : chunk.getItems()) {
Point point = result.getPoint();
Event event = result.getEvent();
point.addPoint(result.getOriginAmount());
event.calculate();
Calculation calculation = Calculation.create(result.getUser(), result.getOriginAmount());
calculationRepository.save(calculation);
pointRepository.save(point);
}
}
};
}
@Bean
public RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> excepttionClass = new HashMap<>();
excepttionClass.put(TransientDataAccessException.class, true);
return new SimpleRetryPolicy(5, excepttionClass);
}
}
기본 배치를 구현하였다
위 배치의 장점은
exception이 발생하면 재시작을 할 수있다는것이다.
하지만, 성능면에서 속도가 낮게 나온다는 단점이 있었고,
이를 개선해보고자 하였다
package com.sparta.spotlightspacescheduler.core.calculation.service;
import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import jakarta.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final PointRepository pointRepository;
private final EntityManagerFactory entityManagerFactory;
private final JdbcTemplate jdbcTemplate;
// 배치의 실행 단위 - 정산 배치
@Bean
public Job calculationJob(Step calculationStep) {
JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
return jobBuilder.incrementer(new RunIdIncrementer())
.listener(new JobLoggerListener())
.start(calculationStep)
.build();
}
// Job의 세부 실행 단위 - 한 달 전부터 1일까지 결제된 내역을 가져와서 계산
@Bean
@JobScope
public Step calculationStep(JpaPagingItemReader<Payment> paymentJpaPagingItemReader,
ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor,
ItemWriter<CalculationProcessResponseDto> paymentWriter) {
return new StepBuilder("calculationStep", jobRepository)
.<Payment, CalculationProcessResponseDto>chunk(30, transactionManager)
.reader(paymentJpaPagingItemReader)
.processor(paymentProcessor)
.writer(paymentWriter)
.faultTolerant()
.retryPolicy(retryPolicy())
.build();
}
// DB에서 데이터를 읽어오며 processor로 전달
@Bean
@StepScope
public JpaPagingItemReader<Payment> paymentJpaPagingItemReader() {
JpaPagingItemReader<Payment> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setQueryString(
"select p " +
"from Payment p " +
"join fetch p.event e " +
"join fetch e.user " +
"where p.status = :status " +
"and :startInclusive <= e.endAt " +
"and e.endAt < :endExclusive " +
"and e.isDeleted = false"
);
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
jpaPagingItemReader.setPageSize(30);
LocalDateTime now = LocalDateTime.now().withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", PaymentStatus.APPROVED);
parameterValues.put("startInclusive", now.minusMonths(1));
parameterValues.put("endExclusive", now);
jpaPagingItemReader.setParameterValues(parameterValues);
return jpaPagingItemReader;
}
// 프로세서는 데이터를 가공하는 역할을 수행
@Bean
@StepScope
public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
Map<Long, Point> pointsCache = new HashMap<>(); // 캐시
return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
@Override
public CalculationProcessResponseDto process(Payment payment) throws Exception {
if (pointsCache.isEmpty()) {
// 포인트 n+1문제 개선
List<Point> points = pointRepository.findAll();
for (Point point : points) {
pointsCache.put(point.getUser().getId(), point);
}
}
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointsCache.get(user.getId());
return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
}
};
}
// 가공된 데이터를 저장하는 역할을 수행
@Bean
@StepScope
public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
return new ItemWriter<CalculationProcessResponseDto>() {
@Override
public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {
String calculationSql = "INSERT INTO calculations (user_id, calculation_amount, create_at, update_at) VALUES (?, ?, ?, ?)";
List<Object[]> calculationBatchArgs = new ArrayList<>();
String pointUpdateSql = "UPDATE points SET amount = amount + ? WHERE user_id = ?";
List<Object[]> pointBatchArgs = new ArrayList<>();
//벌크 연산 로직
for (CalculationProcessResponseDto result : chunk.getItems()) {
calculationBatchArgs.add(new Object[]{
result.getUser().getId(),
result.getOriginAmount(),
LocalDateTime.now(),
LocalDateTime.now()
});
Point point = result.getPoint();
point.addPoint(result.getOriginAmount());
pointBatchArgs.add(new Object[]{
result.getOriginAmount(),
point.getUser().getId()
});
Event event = result.getEvent();
event.calculate();
}
jdbcTemplate.batchUpdate(calculationSql, calculationBatchArgs);
jdbcTemplate.batchUpdate(pointUpdateSql, pointBatchArgs);
}
};
}
// 재시도 정책 설정
@Bean
public RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionClass = new HashMap<>();
exceptionClass.put(Exception.class, true);
return new SimpleRetryPolicy(5, exceptionClass);
}
}
jpaPagingItemReader을 사용하며 개선을 하였다.
38m 4s에서 jpaPagingItemReader를 사용하였을때의 시간이 1분 31초로 줄어든 것을 볼 수 있다.
다만 위의 사진을 보면 point 조회시 n+1의 문제가 발생하는것을 볼 수 있고
insert시 개별의 쿼리가 나가는것을 볼 수 있다.
쿼리를 개선하기 위하여 findAll로 포인트 데이터를 가져오고,
bulkInsert로 일괄 삽입을 하였다
고민사항
exception을 어디까지 받아가야할까?
- 정산 로직은 하나라도 실수가 나면 안된다, 따라서 EXCEPTION으로 포괄적으로 잡는 방식을 사용한다
그럼에도 실패하면 어떻게 해야할까?
- 현업에서 API쪽에서 은근 오류가 많이 생긴다고 들었다 (폐업으로 인한).
SKIP을 사용해서 넘기되, 그 로직은 수기로 처리하는수 밖에 없다고 들었다.