Post

Spring Batch 오픈소스에 기여하기

Spring Batch 오픈소스에 기여하기

Issue : StepBuilder - issue when setting the taskExecutor before faultTolerant()
PR : Add AbstractTaskletStepBuilder copy constructor

들어가며

오픈소스 기여 스터디를 통해 spring-batch 기여에 참여하게 되었습니다. 이슈 선정의 목표는 당시에 처음 오픈소스를 참여하는 것이라서 욕심내지 않고, 쉬우면서 중요한 오픈소스에 기여해보자는 단순한 목표를 가지고 시작했습니다. GDG 오픈소스 스터디에 참여해서 스터디 운영자 분과 다른 스터디 참여하시는 분들의 PR 참여를 노션으로 보면서 기한 내에 PR를 할 수 있었습니다. (아직 merge 반영되지 않았지만 메인테이너가 확인한 상태입니다.)

Spring batch에 대한 간략 설명

Spring은 워낙 유명한 오픈소스이기 때문에 다들 알겠지만 spring-batch의 배치 잡을 사용할 프로젝트가 아닌 이상 사용해보지 않은 사람도 많은 것 같습니다. 저도 알고는 있었지만 오픈소스로 내부 오픈소스를 본 것은 처음이었습니다.

Spring batch는 대량의 데이터를 처리하는 데 사용되는 경량, 포괄적인 배치 프레임워크입니다.
먼저 stepItemReader, ItemProcessor, ItemWriter와 같은 구성요소를 포함하며, 특정 작업을 수행하는 역할을 합니다. 데이터 청크 단위로 처리할 수 있도록 하며 하나 이상의 step으로 job을 구성합니다.
job은 배치 처리의 실행 단위로 JobInstance로 간주되며, JobParameter에 의해 구분됩니다.JobExecution으로 실행에 대한 기록을 담습니다.

stepbuilder1.png

이슈 내용

이슈가 간단하고 테스트코드를 잘 작성해주셔서 한 번에 이해하기 편했습니다.
step을 만들 때, taskExecutor()의 위치에 따라 필드 값이 잘 상속되지 않음을 알 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TaskletStep step1 = new StepBuilder("step-name", jobRepository)
                .chunk(10, transactionManager)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .faultTolerant()
                .taskExecutor(taskExecutor)
                .build();
TaskletStep step2 = new StepBuilder("step-name", jobRepository)
                .chunk(10, transactionManager)
                .taskExecutor(taskExecutor)// The task executor is set before faultTolerant()
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .faultTolerant()
                .build();

오픈소스 분석

TaskletStep가 만들어지는 과정은 다음과 같은 구조도로 나타낼 수 있습니다.

1
2
3
4
5
6
7
8
9
StepBuilderHelper
|
|-- AbstractTaskletStepBuilder
    |
    |-- SimpleStepBuilder
        |
        |-- FaultTolerantStepBuilder
            |
            |-- TaskletStep

StepBuilderTaskletStep을 생성할 때, .faultTolerant()FaultTolerantStepBuilder의 영향을 받고, .taskExecutor(taskExecutor)AbstractTaskletStepBuilder의 메소드가 적용됩니다.
faultTolerant()를 적용하므로서 TaskletStep이 생성될 때 청크 지향의 시스템 구현시 실패한 아이템의 재시도(retry)와 스킵(skip) 기능을 포함하게 됩니다.

1
2
3
4
// FaultTolerantStepBuilder faultTolerant()
public FaultTolerantStepBuilder<I, O> faultTolerant() {
  return new FaultTolerantStepBuilder<>(this);
}

taskExecutor(taskExecutor)AbstractTaskletStepBuilder 속성 중 taskExecutor를 넣어준 매개변수로 변경하고 SimpleStepBuilder 인스턴스를 반환할 수 있게 해줍니다. 여기서 BAbstractTaskletStepBuilder 또는 그 하위 클래스의 타입을 나타냅니다.

1
2
3
4
5
// AbstractTaskletStepBuilder taskExecutor(taskExecutor)
public B taskExecutor(TaskExecutor taskExecutor) {
  this.taskExecutor = taskExecutor;
  return self();
}

SimpleStepBuilder의 생성자를 보면 부모 객체의 속성들을 받아서 새로운 객체를 생성하거나 복사 생성자를 만드는 두 가지 방법이 사용된 것을 볼 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public SimpleStepBuilder(StepBuilderHelper<?> parent) {
	super(parent);
}
protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) {
    super(parent);
    this.chunkSize = parent.chunkSize;
    this.completionPolicy = parent.completionPolicy;
    this.chunkOperations = parent.chunkOperations;
    this.reader = parent.reader;
    this.writer = parent.writer;
    this.processor = parent.processor;
    this.itemListeners = parent.itemListeners;
    this.readerTransactionalQueue = parent.readerTransactionalQueue;
    this.meterRegistry = parent.meterRegistry;
}

하지만 AbstractTaskletStepBuilder의 복사 생성자가 없고 단순히 부모 객체 속성을 받아오는 super() 처리 생성자만 있는 것을 볼 수 있습니다.

1
2
3
public AbstractTaskletStepBuilder(StepBuilderHelper<?> parent) {
  super(parent);
}

해결방안

AbstractTaskletStepBuilder의 복사 생성자를 추가하므로서 필드 update가 가능하도록 처리했습니다.

1
2
3
4
5
6
7
8
9
10
11
public AbstractTaskletStepBuilder(AbstractTaskletStepBuilder<?> parent) {
    super(parent);
    this.chunkListeners = parent.chunkListeners;
    this.stepOperations = parent.stepOperations;
    this.transactionManager = parent.transactionManager;
    this.transactionAttribute = parent.transactionAttribute;
    this.streams.addAll(parent.streams);
    this.exceptionHandler = parent.exceptionHandler;
    this.throttleLimit = parent.throttleLimit;
    this.taskExecutor = parent.taskExecutor;
}

테스트 코드는 다음과 같이 작성했는데, 오픈소스 기여에서 contribute 파일이나 readme 내용을 보고 설정한 내용대로 테스트 코드를 작성하면 됩니다.
@BeforeEach로 공통으로 테스트에 사용될 simpleStepBuilder 만들어 줍니다. 코드 수정은 AbstractTaskletStepBuilder 자체는 추상클래스이므로 비교를 위한 구현체로 simpleStepBuilder를 이용했습니다.
복사 생성자가 잘 작동하는지 확인하는 테스트 코드와 taskExecutor()를 먼저하고 faultTolerant()를 이후에 했을 때 값을 비교하는 테스트 코드를 작성했습니다.

여기서,SimpleStepBuilder의 속성값은 private로 접근이 어렵기 때문에 자바 API인 리플렉션을 이용했습니다. 리플렉션은 런타임에 클래스의 정보를 조회하고, 객체의 필드나 메서드에 접근하거나, 클래스의 객체를 동적으로 생성하는 등의 작업을 가능하게 해주는 자바 API입니다. field.setAccessible(true)와 같은 방식으로 declare 필드의 접근을 조정할 수 있습니다. (accessPrivateField 메서드)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@SpringBatchTest
@SpringJUnitConfig
public class AbstractTaskletStepBuilderTests {
	private final JobRepository jobRepository =  mock(JobRepository.class);
	private final int chunkSize = 10;
	private final ItemReader itemReader = mock(ItemReader.class);
	private final ItemProcessor itemProcessor = mock(ItemProcessor.class);
	private final ItemWriter itemWriter = mock(ItemWriter.class);
	private final SimpleAsyncTaskExecutor taskExecutor  = new SimpleAsyncTaskExecutor();
	SimpleStepBuilder simpleStepBuilder;

	private <T> T accessPrivateField(Object o, String fieldName) throws ReflectiveOperationException {
		Field field = o.getClass().getDeclaredField(fieldName);
		field.setAccessible(true);
		return (T) field.get(o);
	}

	private <T> T accessSuperClassPrivateField(Object o, String fieldName) throws ReflectiveOperationException {
		Field field = o.getClass().getSuperclass().getDeclaredField(fieldName);
		field.setAccessible(true);
		return (T) field.get(o);
	}

   // 공통 사용될 simpleStepBuilder 만들기
	@BeforeEach
	void set(){
		StepBuilderHelper stepBuilderHelper = new StepBuilderHelper("test", jobRepository) {
			@Override
			protected StepBuilderHelper self() {
				return null;
			}
		};
		simpleStepBuilder = new SimpleStepBuilder(stepBuilderHelper);
		simpleStepBuilder.chunk(chunkSize);
		simpleStepBuilder.reader(itemReader);
		simpleStepBuilder.processor(itemProcessor);
		simpleStepBuilder.writer(itemWriter);
	}

    // 복사가 잘되는지 확인
	@Test
	void copyConstractorTest() throws ReflectiveOperationException {
		Constructor<SimpleStepBuilder> constructor = SimpleStepBuilder.class.getDeclaredConstructor(SimpleStepBuilder.class);
		constructor.setAccessible(true);
		SimpleStepBuilder copySimpleStepBuilder = constructor.newInstance(simpleStepBuilder);

		int copyChunkSize = accessPrivateField(copySimpleStepBuilder, "chunkSize");
		ItemReader copyItemReader = accessPrivateField(copySimpleStepBuilder, "reader");
		ItemProcessor copyItemProcessor = accessPrivateField(copySimpleStepBuilder, "processor");
		ItemWriter copyItemWriter = accessPrivateField(copySimpleStepBuilder, "writer");

		assertEquals(chunkSize, copyChunkSize);
		assertEquals(itemReader, copyItemReader);
		assertEquals(itemProcessor, copyItemProcessor);
		assertEquals(itemWriter, copyItemWriter);
	}

   // taskExecutor를 먼저하고 faultTolerant를 이후에 했을 때 값 비교
	@Test
	void faultTolerantMethodTest() throws ReflectiveOperationException {
		simpleStepBuilder.taskExecutor(taskExecutor); // The task executor is set before faultTolerant()
		simpleStepBuilder.faultTolerant();

		int afterChunkSize = accessPrivateField(simpleStepBuilder, "chunkSize");
		ItemReader afterItemReader = accessPrivateField(simpleStepBuilder, "reader");
		ItemProcessor afterItemProcessor = accessPrivateField(simpleStepBuilder, "processor");
		ItemWriter afterItemWriter = accessPrivateField(simpleStepBuilder, "writer");
		TaskExecutor afterTaskExecutor = accessSuperClassPrivateField(simpleStepBuilder, "taskExecutor");

		assertEquals(chunkSize, afterChunkSize);
		assertEquals(itemReader, afterItemReader);
		assertEquals(itemProcessor, afterItemProcessor);
		assertEquals(itemWriter, afterItemWriter);
		assertEquals(taskExecutor, afterTaskExecutor);
	}
}

스터디 회고

실제로 오픈소스에 기여한다는 건 생각만 해보고 시도한 적이 없었는데 이렇게 간단한 오픈소스를 통해 이번을 시작으로 다른 오픈소스에도 참여해보고 싶다는 생각이 들었습니다. 그리고 오픈소스 자체에 대한 심리적 부담감?같은 것이 있었는데 실제로 프로젝트에 참여하듯이 기여를 할 수 있다는 점이 좋았습니다. 스터디가 아니였으면, 다른 사람들과 함께 하지 않았으면 시도가 더 늦어졌을 것 같은데 좋은 기회에 참여할 수 있어서 감사했습니다.

결론으로 spring-batch는 기여할 것이 많은 노다지였다!

스프링 배치 컨트리뷰터!!
그로부터 거의 5개월이 지난 시점의 메인테이너의 피드백을 받게 됐는데 도움을 주신 인제님과 기여부분 수정제안을 해주신 태익님의 도움으로 pr을 다시 올려서 close 시킬 수 있었습니다.
사실 엄청난 코드작성을 한 부분은 아니지만 중간 데이터 소실이라는 문제가 발생하는 부분이기 때문에 core단 버그를 수정했다는 점에서 굉장히 뿌듯했고, 이런 식으로 접근할 수 있구나 이렇게 소통해서 문제를 해결하는구나라는 것을 알게 된 순간이었습니다.

아쉬운 부분이 있다면, 아무리 생각해도 당시에 이슈에서 제안한 한정적 방법 밖에 떠오르지 않아서 테스트를 일단 이슈대로 작성했었습니다. 중간에 test 부분 작성에 대한 방향성을 더 유지보수가 쉽게 하는 방안으로 메인테이너 요청을 만족스럽게 수정하지 못한 부분이었습니다. 기존에는 모든 값들이 잘 복사됐는지 확인하는 방식이었는데, 메인테이너가 아래와 같이 stepOperations 비교를 하는 방식으로 변경해서 테스트를 변경해주신걸 볼 수 있었습니다. (spring-batch에는 private 처리된 것이 많아서 ReflectionTestUtils로 값을 그냥 가지고 와서 비교하는 방식으로 테스트 코드가 실제로 다른 테스트에도 작성된 부분이 많습니다.)

1
2
Object stepOperations = ReflectionTestUtils.getField(step, "stepOperations");
assertInstanceOf(TaskExecutorRepeatTemplate.class, stepOperations);
This post is licensed under CC BY 4.0 by the author.