Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- ineer join
- string
- Git사용법
- cicd
- stringbuilder의 reverse()
- isuppercase()
- 유클리드호제법
- toLowerCase()
- 스프링뼈대
- 최대공약수
- Github Actions
- git 컨벤션
- sql 데이터형 변환
- 동일성과 동등성
- while과 two-pointer
- 스프링환경설정
- 자바 스트링
- addDoc
- 래퍼타입
- 프로그래머스 레벨1
- islowercase()
- 자바 최소공배수
- 자바 유클리드
- 최소공배수
- GithubActions
- StringBuilder
- 스프링
- string과 stringbuilder
- 모던자바
- 베주계수
Archives
- Today
- Total
주노 님의 블로그
[기술 구현]스프링 배치를 활용한 정산 로직 최적화 본문
두괄식 요약
스케쥴러를 사용한 정산 로직을
스프링 배치를 활용하여 안정성과 속도를 개선하였습니다.
8만개 기준 38m 4s 에서
jpa item reader를 사용하였을때 1m 11초까지 개선하였습니다.
내용
기존 정산로직으 스케쥴러를 사용하고 있었습니다
@Service
@RequiredArgsConstructor
public class CalculationService {
private static final long ONE_MONTH = 1L;
private static final String MONTHLY_AT_3_AM_ON_1ST = "0 0 3 1 * ?";
private final CalculationRepository calculationRepository;
private final PaymentRepository paymentRepository;
private final PointRepository pointRepository;
@Scheduled(cron = MONTHLY_AT_3_AM_ON_1ST)
public void calculate() {
LocalDateTime now = LocalDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0);
List<Payment> payments = paymentRepository.findPaymentsForCalculation(
APPROVED,
now.minusMonths(ONE_MONTH),
now
);
List<User> users = payments.stream()
.map(Payment::getEvent)
.map(Event::getUser)
.toList();
Map<User, Point> pointMap = pointRepository.findPointsByUserIn(users).stream()
.collect(Collectors.toMap(Point::getUser, point -> point));
Map<User, Long> calculationMap = new HashMap<>();
payments.forEach(payment -> {
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointMap.get(user);
point.addPoint(payment.getOriginalAmount());
event.calculate();
calculationMap.put(
user,
calculationMap.getOrDefault(user, 0L) + payment.getOriginalAmount()
);
});
List<Calculation> calculations = calculationMap.entrySet().stream()
.map(entry -> Calculation.create(entry.getKey(), entry.getValue()))
.toList();
calculationRepository.saveAll(calculations);
}
}
위 방식대로 사용하면 아쉬운점은
실패시 재시도 처리 로직이 없다는것이다
package com.sparta.spotlightspacescheduler.core.calculation.service;
import static com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus.APPROVED;
import com.sparta.spotlightspacescheduler.core.calculation.domain.Calculation;
import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.calculation.repository.CalculationRepository;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.repository.PaymentRepository;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.data.domain.Sort;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final CalculationRepository calculationRepository;
private final PaymentRepository paymentRepository;
private final PointRepository pointRepository;
//배치의 실행 단위
//정산 배치
@Bean
public Job calculationJob(Step calculationStep) {
JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
return jobBuilder
.incrementer(new RunIdIncrementer())
.listener(new JobLoggerListener())
.start(calculationStep)
.build();
}
// job의 세부 실행단위.
// 한달 전부터 1일까지 결제된 내역을 가져와서 계산
@Bean
@JobScope
public Step calculationStep(
RepositoryItemReader<Payment> paymentReader,
ItemProcessor paymentProcessor,
ItemWriter paymentWriter
) {
return new StepBuilder("calculationStep", jobRepository)
.<Payment, Payment>chunk(20000, transactionManager)
.reader(paymentReader)
.processor(paymentProcessor)
.writer(paymentWriter)
.faultTolerant()
.retryPolicy(retryPolicy())
.build();
}
// db에서 데이터를 읽어오며 porccesor로 전달.
@Bean
@StepScope
public RepositoryItemReader<Payment> paymentReader() {
LocalDateTime now = LocalDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0);
return new RepositoryItemReaderBuilder<Payment>()
.name("paymentReader")
.repository(paymentRepository)
.methodName("findPaymentsForCalculation1")
.arguments(APPROVED, now.minusMonths(1), now)
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.pageSize(10000)
.build();
}
// 프로세서는 가공하는 역할을 수행함
@Bean
@StepScope
public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
@Override
public CalculationProcessResponseDto process(Payment payment) throws Exception {
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointRepository.findByUserId(user.getId());
return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
}
};
}
// 가공된 데이터를 저장하는 역할을 수행함
@Bean
@StepScope
public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
return new ItemWriter<CalculationProcessResponseDto>() {
@Override
public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {
for (CalculationProcessResponseDto result : chunk.getItems()) {
Point point = result.getPoint();
Event event = result.getEvent();
point.addPoint(result.getOriginAmount());
event.calculate();
Calculation calculation = Calculation.create(result.getUser(), result.getOriginAmount());
calculationRepository.save(calculation);
pointRepository.save(point);
}
}
};
}
@Bean
public RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> excepttionClass = new HashMap<>();
excepttionClass.put(TransientDataAccessException.class, true);
return new SimpleRetryPolicy(5, excepttionClass);
}
}
기본 배치를 구현하였다
위 배치의 장점은
exception이 발생하면 재시작을 할 수있다는것이다.
하지만, 성능면에서 속도가 낮게 나온다는 단점이 있었고,
이를 개선해보고자 하였다
package com.sparta.spotlightspacescheduler.core.calculation.service;
import com.sparta.spotlightspacescheduler.core.calculation.dto.CalculationProcessResponseDto;
import com.sparta.spotlightspacescheduler.core.event.domain.Event;
import com.sparta.spotlightspacescheduler.core.payment.domain.Payment;
import com.sparta.spotlightspacescheduler.core.payment.domain.PaymentStatus;
import com.sparta.spotlightspacescheduler.core.point.point.domain.Point;
import com.sparta.spotlightspacescheduler.core.point.point.repository.PointRepository;
import com.sparta.spotlightspacescheduler.core.user.domain.User;
import jakarta.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class CalculationBatch {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final PointRepository pointRepository;
private final EntityManagerFactory entityManagerFactory;
private final JdbcTemplate jdbcTemplate;
// 배치의 실행 단위 - 정산 배치
@Bean
public Job calculationJob(Step calculationStep) {
JobBuilder jobBuilder = new JobBuilder("CalculationJob", jobRepository);
return jobBuilder.incrementer(new RunIdIncrementer())
.listener(new JobLoggerListener())
.start(calculationStep)
.build();
}
// Job의 세부 실행 단위 - 한 달 전부터 1일까지 결제된 내역을 가져와서 계산
@Bean
@JobScope
public Step calculationStep(JpaPagingItemReader<Payment> paymentJpaPagingItemReader,
ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor,
ItemWriter<CalculationProcessResponseDto> paymentWriter) {
return new StepBuilder("calculationStep", jobRepository)
.<Payment, CalculationProcessResponseDto>chunk(30, transactionManager)
.reader(paymentJpaPagingItemReader)
.processor(paymentProcessor)
.writer(paymentWriter)
.faultTolerant()
.retryPolicy(retryPolicy())
.build();
}
// DB에서 데이터를 읽어오며 processor로 전달
@Bean
@StepScope
public JpaPagingItemReader<Payment> paymentJpaPagingItemReader() {
JpaPagingItemReader<Payment> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setQueryString(
"select p " +
"from Payment p " +
"join fetch p.event e " +
"join fetch e.user " +
"where p.status = :status " +
"and :startInclusive <= e.endAt " +
"and e.endAt < :endExclusive " +
"and e.isDeleted = false"
);
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
jpaPagingItemReader.setPageSize(30);
LocalDateTime now = LocalDateTime.now().withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", PaymentStatus.APPROVED);
parameterValues.put("startInclusive", now.minusMonths(1));
parameterValues.put("endExclusive", now);
jpaPagingItemReader.setParameterValues(parameterValues);
return jpaPagingItemReader;
}
// 프로세서는 데이터를 가공하는 역할을 수행
@Bean
@StepScope
public ItemProcessor<Payment, CalculationProcessResponseDto> paymentProcessor() {
Map<Long, Point> pointsCache = new HashMap<>(); // 캐시
return new ItemProcessor<Payment, CalculationProcessResponseDto>() {
@Override
public CalculationProcessResponseDto process(Payment payment) throws Exception {
if (pointsCache.isEmpty()) {
// 포인트 n+1문제 개선
List<Point> points = pointRepository.findAll();
for (Point point : points) {
pointsCache.put(point.getUser().getId(), point);
}
}
Event event = payment.getEvent();
User user = event.getUser();
Point point = pointsCache.get(user.getId());
return CalculationProcessResponseDto.of(user, point, event, payment.getOriginalAmount());
}
};
}
// 가공된 데이터를 저장하는 역할을 수행
@Bean
@StepScope
public ItemWriter<CalculationProcessResponseDto> paymentWriter() {
return new ItemWriter<CalculationProcessResponseDto>() {
@Override
public void write(Chunk<? extends CalculationProcessResponseDto> chunk) throws Exception {
String calculationSql = "INSERT INTO calculations (user_id, calculation_amount, create_at, update_at) VALUES (?, ?, ?, ?)";
List<Object[]> calculationBatchArgs = new ArrayList<>();
String pointUpdateSql = "UPDATE points SET amount = amount + ? WHERE user_id = ?";
List<Object[]> pointBatchArgs = new ArrayList<>();
//벌크 연산 로직
for (CalculationProcessResponseDto result : chunk.getItems()) {
calculationBatchArgs.add(new Object[]{
result.getUser().getId(),
result.getOriginAmount(),
LocalDateTime.now(),
LocalDateTime.now()
});
Point point = result.getPoint();
point.addPoint(result.getOriginAmount());
pointBatchArgs.add(new Object[]{
result.getOriginAmount(),
point.getUser().getId()
});
Event event = result.getEvent();
event.calculate();
}
jdbcTemplate.batchUpdate(calculationSql, calculationBatchArgs);
jdbcTemplate.batchUpdate(pointUpdateSql, pointBatchArgs);
}
};
}
// 재시도 정책 설정
@Bean
public RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionClass = new HashMap<>();
exceptionClass.put(Exception.class, true);
return new SimpleRetryPolicy(5, exceptionClass);
}
}
jpaPagingItemReader을 사용하며 개선을 하였다.
38m 4s에서 jpaPagingItemReader를 사용하였을때의 시간이 1분 31초로 줄어든 것을 볼 수 있다.
다만 위의 사진을 보면 point 조회시 n+1의 문제가 발생하는것을 볼 수 있고
insert시 개별의 쿼리가 나가는것을 볼 수 있다.
쿼리를 개선하기 위하여 findAll로 포인트 데이터를 가져오고,
bulkInsert로 일괄 삽입을 하였다
고민사항
exception을 어디까지 받아가야할까?
- 정산 로직은 하나라도 실수가 나면 안된다, 따라서 EXCEPTION으로 포괄적으로 잡는 방식을 사용한다
그럼에도 실패하면 어떻게 해야할까?
- 현업에서 API쪽에서 은근 오류가 많이 생긴다고 들었다 (폐업으로 인한).
SKIP을 사용해서 넘기되, 그 로직은 수기로 처리하는수 밖에 없다고 들었다.