예제 및 테스트 코드는 github 에서 확인 가능합니다.
Java ParallelStream
안녕하세요 이번에는 Java Stream 의 병렬처리를 가능하게 하는 ParallelStream 에 대해 알아보겠습니다.
Java 의 ParallelStream 은 별다른 설정없이 Stream 의 parallel(), Collection의 parallelStream() 메소드를 통해
손쉽게 병렬처리를 할 수 있습니다.
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
// parallel()
list.stream()
.parallel()
.forEach(System.out::println);
// parallelStream()
list.parallelStream()
.forEach(System.out::println);
ParallelStream 은 내부적으로 ForkJoinFrameWork 방식으로 동작하며
기본적으로는 ForkJoinPool.commonPool 을 사용한다는 특징을 가지고 있습니다.
그리고 멀티코어를 이용해 여러 작업들을 동시에 처리하게 됩니다.
하지만 병럴 스트림을 사용한다고 해서 무조건적으로 순차 스트림보다 빠르지는 않습니다.
오히려 더 느리게 동작할 수 있고 잘못사용하게되면 시스템 장애도 발생할 수 있습니다.
그럼 ParallelStream 의 특징과 주의사항에 대해 한번 알아보겠습니다.
ForkJoinFrameWork 란?
ForkJoinFrameWork 는 큰 작업을 작은 작업들로 쪼개어 작업을 병렬로 처리하고 처리한 작업들을 다시 큰 작업으로 합치는 방식으로 동작합니다.
(마치 분할정복 알고리즘과 같이 동작합니다.)
- Fork: 작업들을 작은 작업들로 분할함.
- Join: 분할된 작업들을 큰 작업들로 병합함.
Fork
Join
그리고 ForkJoinFrameWork 는 Work-Stealing 매커니즘으로 동작합니다.
Work-Stealing 은 어떤 방식일까요?
Work-Stealing
- 각 스레드는 각자의 WorkQueue 를 가지고 있습니다.
- WorkQueue 의 구조는 Dequeue 구조로 되어있어 head, tail 양쪽에서 모두 push/pop 이 가능합니다.
- Inbound Queue 에 Task 가 존재하면 스레드는 자신의 WorkQueue 에 가져와서 작업함
- 다른 스레드가 자신의 WorkQueue 에 Work-Stealing 하는 경우 WorkQueue 의 tail 에서 pop 을 진행.
- 스레드의 개수가 많아진다면 Work-Stealing 과정에서 경함이 발생할 수 있음
Parallel Stream 주의사항 및 예시
commonPool 사용시
ParallelStream 은 기본적으로 ForkJoinPool 의 commonPool 을 사용합니다.
이 commonPool 의 size 는 아래와 같이 설정됩니다.
Runtime.getRuntime().availableProcessors() - 1
Runtime.getRuntime().availableProcessors() 는 CPU Core 의 사이즈이고 -1 은 main 스레드를 제외하기 위함입니다.
그리고 commonPool 은 JVM에서 공유됩니다.
그렇기에 parallelStream 을 사용하는 경우 모두 동일한 commonPool 을 사용하게 됩니다.
만약 어플리케이션에서 다양한 병렬작업이 존재해 commonPool 을 같이 사용하는 경우 혹은
병렬 스트림에서 I/O 작업을 처리하는 경우에는 commonPool 의 리소스가 부족해 시스템 전체에 영향을 줄 수 있습니다.
Custom Thread Pool
병렬 스트림을 처리할때 ThreadPool 을 직접 정의해서 처리할 수 있습니다.
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
customThreadPool.submit(() -> list.stream()
.parallel()
.forEach(System.out::println)).get();
하지만 ThreadPool 을 직접 정의해서 사용하는 경우에는 OutOfMemoryError 에 주의해야 합니다.
commonPool 은 기본적으로 JVM 에서 공유되며, 정적 ThreadPool 인스턴스입니다. 그래서 메모리 누수가 발생하지 않습니다.
그에 반해, Custom Thread Pool 의 경우에는 처리가 완료되어도 참조가 해제되지 않아 GC 에 수집되지 않을 수 있습니다.
그래서 사용 후 꼭 아래와 같이 shutdown() 하여 ForkJoinPool 을 종료해주어야 합니다.
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() -> list.stream()
.parallel()
.forEach(System.out::println)).get();
} finally {
customThreadPool.shutdown();
}
https://www.baeldung.com/java-8-parallel-streams-custom-threadpool
I/O 작업과 같은 처리에 commonPool 을 묶어두고 싶지 않다면 별도의 ThreadPool 을 정의해서 처리하는 방법이
효율적일 수 있습니다.
이와 반대로, 간단한 작업이나 경합이 크게 일어나지 않을 환경이라면 commonPool 을 사용하는것이
자원을 더 효율적으로 사용할 수 있지 않을까 싶습니다.
독립된 처리
ParallelStream 사용시 각 작업이 독립되어야 좋은 성능을 낼 수 있습니다.
stream() 연산 중 distinct(), sorted() 와 같은 작업들은 내부적으로 상태에 대한 변수를 공유하고 동기화하기에
병렬처리에 대한 이점을 살릴 수 없습니다.
데이터 타입
ParallelStream 은 ForkJoin 방식으로 동작하기에 작업을 나누는 단위가 균등해야하고, 작업을 나누기 위한 비용이 적어야 합니다.
이러한 비용은 데이터 타입에 따라 차이나게 됩니다.
이 비용에는 크게 두 가지를 들 수 있습니다.
- 연속된 메모리 할당(참조 지역성)
배열의 경우 메모리 상에서 연속된 공간에 저장되므로, 요소가 물리적으로 가까운 위치해 있습니다.
특정 인덱스의 요소에 접근할 때 메모리에서 그 위치를 바로 계산할 수 있어 매우 빠릅니다.
이는 분할 시 데이터에 빠르게 접근할 수 있는 장점이 됩니다.
- 데이터 접근 방식
ArrayList 의 경우 특정 인덱스에 있는 요소에 접근하는 시간 복잡도는 O(1)입니다.
분할 시에도 특정 지점에서 데이터를 빠르게 나눌 수 있기 때문에 성능이 좋습니다.
하지만 LinkedList 의 경우 특정 인덱스에 접근하기 위해 순차적으로 노드를 탐색해야 하므로 시간 복잡도는 O(n)입니다.
즉, 인덱스에 따라 요소를 찾는 데 더 많은 시간이 걸리기 때문에 분할 성능이 떨어집니다.
ArrayList, IntStream.range, LongStream.range, TreeMap, HashSet 과 같은 데이터 타입은 분할하기에 용이합니다.
요소의 개수
병렬처리는 동일한 작업이더라도 요소의 개수에 따라 성능이 좋을수도, 나쁠수도 있습니다.
이는 작업을 분할하고(Fork) 다시 합치는(Join) 비용과 스레드간의 Context Switching 비용이 존재하기 때문인데요.
간단한 예시를 통해 소요시간을 확인해보겠습니다. 시간은 각 수행하는 PC 의 사양마다 다를 수 있습니다.
ArrayParallelStreamExample.java
public class ArrayParallelStreamExample {
static List<Long> arrayNumbers = new ArrayList<>();
static {
LongStream.rangeClosed(1, 1_000_000).forEach(it -> {
arrayNumbers.add(it);
});
}
public static void main(String[] args) {
long startSequential = System.currentTimeMillis();
long sumSequential = arrayNumbers.stream()
.reduce(0L, Long::sum);
long endSequential = System.currentTimeMillis();
System.out.println("Sequential sum: " + sumSequential);
System.out.println("Sequential processing time: " + (endSequential - startSequential) + "ms");
long startParallel = System.currentTimeMillis();
long sumParallel = arrayNumbers.stream()
.parallel()
.reduce(0L, Long::sum);
long endParallel = System.currentTimeMillis();
System.out.println("Parallel sum: " + sumParallel);
System.out.println("Parallel processing time: " + (endParallel - startParallel) + "ms");
}
}
ArrayList 기준으로 1 ~ 100만의 숫자를 더하는 코드를 만들었습니다.
소요시간을 한번 확인해보겠습니다.
기본 스트림이 병렬 스트림보다 더 빠른것을 알 수 있습니다. 이는 100만개 요소를 직렬로 처리하는것보다
100만개 요소에 대한 분할/병합 비용 그리고 스레드간 Context Switching 비용이 더 크다고 볼 수 있습니다.
그렇다면 요소의 수를 100만개가 아니라 1000만개로 실행해보면 어떨까요 ??
다음과 같이 변경해 실행해보겠습니다.
LongStream.rangeClosed(1, 10_000_000).forEach(it -> {
arrayNumbers.add(it);
});
요소의 수가 많아지니 아까와는 다르게 병렬 스트림이 더 빠르게 동작한것을 확인할 수 있습니다.
5000만개도 한번 확인해보겠습니다.
LongStream.rangeClosed(1, 50_000_000).forEach(it -> {
arrayNumbers.add(it);
});
요소의 개수가 많아질수록 소요시간의 차이가 더 확연하게 나타났습니다.
이렇듯, 병렬 스트림은 고려할 변수가 많아서 직접 테스트를 거치며 사용여부를 결정하는것이 좋지 않을까 생각합니다.
감사합니다.
reference
'Java' 카테고리의 다른 글
[Java] 불변 객체(Immutable Object) 에 대해 알아보자 (1) | 2022.09.04 |
---|---|
[Java] N개 이상의 값으로 그룹핑해보기 (0) | 2022.08.20 |
OCP(Open-Close-Principle) 개방 폐쇄 원칙이란? (0) | 2022.08.20 |