본문 바로가기
개발 관련/java

자바 비동기 (2)

by lazysnack 2022. 7. 14.

지난 포스팅에서 비동기에 대해 살짝 알아봤습니다.

기본적인 Thread, Runnable 를 사용하거나, 자바5 부터 생긴 Future, FutureTask, 그리고 web 3.0 부터 지원된 비동기 서블릿이 있었습니다.

그리고 자바 8로 넘어가면서 비동기를 지원하기 위해 CompletableFuture 가 새로 생겼는데요.

이번에는 이 CompleteableFuture 에 대해 알아보도록 하겠습니다.

1. CompletableFuture?

앞에서 언급했듯이 자바는 비동기를 지원하기 위해 Future 이라는 인터페이스가 있었습니다. 그런데, 왜 CompletableFuture 가 추가되었을까요?

여기서 Future 의 인터페이스에 있는 메소드들을 보면,

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

이 있습니다. 상태값을 확인하거나 값을 가져오거나 하는 메소드들이 있습니다.

그런데, get() 을 호출하면 작업이 완료될 때까지 블록킹이 된다는 (?) 점이 있습니다.

그러다보니 결과적으로 해당 결과를 통해 이후 계산을 하는데에 제약이 있었고, 에러를 핸들링하는데에 어려움도 있었죠.

그래서 그런 문제점을 보완하고자 자바8에서 추가되었습니다.

CompletableFuture 을 보면,

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

로 클래스 인 것을 확인할 수 있습니다. Future 인터페이스와 CompletionStage 인터페이스를 상속했네요.

뭔가 감이 오지 않나요?

CompletionStage 또한 1.8에서 추가되었으며, 해당 인터페이스에 있는 몇몇 메소드를 보면 thenApply() 처럼 then 혹은 accept 등으로 시작하는 메소드가 있는 것을 확인할 수 있습니다. (어쩌면 중요한 놈은 이놈이었을지도 모르겠네요.)

그럼 어느 정도 소개는 한 것 같으니, 사용법을 알아보도록 하겠습니다.

2. Future 로 사용하는 방법

Future 인터페이스를 구현하기 때문에 일반적인 Future 처럼 사용할 수 있습니다.

public Future<String> test1() throws InterruptedException{
    CompletableFuture<String> future = new CompletableFuture<>();
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(2000);
        future.complete("Finish1");
        return null;
    });
    return future;
}


assertEquals("Finish1", test1().get()); // true

newCachedThreadPool 을 통해 쓰레드를 만들었지만, 이미 값을 알고 있는 경우 다음과 같이 사용할 수도 있습니다.

public String test2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.completedFuture("Finish2");
    return future.get();
}

그리고, 자바8에서 지원하는 람다 표현식으로도 사용이 가능합니다.

public Future<String> test3(){
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish2");
    return future;
}

위와 같은 방법으로 사용할 수 있습니다.

그러면 이제 결과를 가지고 후행 작업(?) 후처리(?) 하는 방법에 대해 알아보도록 하겠습니다.

3. 비동기 작업 결과 프로세싱

작업 결과에 대해 추가 작업을 해주는 메소드는 CompletionStage 인터페이스에 있었는데요.

이 부분에 대해 알아보도록 하겠습니다.

  1. thenApply()
    • 작업을 처리하면 그 결과를 가지고, 다른 작업을 수행하도록 할 수 있습니다.
public Future<String> test4() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish4");
    CompletableFuture<String> thenApply = future.thenApply(s -> s + " apply");

    // 한 줄 로도 가능
    //CompletableFuture.supplyAsync(() -> "Finish4").thenApply(s -> s + " apply");
    return thenApply;
}

assertEquals("Finish4 apply", test4().get());
  1. thenAccept()
    • thenApply() 와 비슷하지만, 리턴값이 없어 Void 형태를 반환(?) 합니다.
public Future<Void> test5() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish5");
    CompletableFuture<Void> voidCompletableFuture = future.thenAccept(s -> System.out.println(s + " Accept"));
    return voidCompletableFuture;
}

// System.out.println(s + " Accept") == Finish Accept
// test5().get() == null
  1. thenRun()
    • 계산이 필요하지 않거나, 체인의 끝에 값을 반환하고 싶지 않을 경우 사용합니다.
public Future<Void> test6() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish6");
    CompletableFuture<Void> thenRun = future.thenRun(() -> System.out.println("Run Finished"));
    return thenRun;
}
  1. thenCompose()
    • 여러 작업을 순차적으로 수행하기 위해서 사용합니다.
    • 동일한 유형의 다른 객체를 반환하는 함수를 받습니다.
public Future<String> test7() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish7")
            .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Compose"));
    return future;
}

assertEquals("Finish7 Compose" , test7().get());
  1. thenCombine()
    • thenCompose() 가 순차로 실행했다면, thenCombine() 은 병렬로 처리합니다.
public Future<String> test8() {
    CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Finish8.1");

    CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Finish8.2");

    return future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2);
}

이외에도 더 있습니다만,

다 비슷비슷해 보이는게 가장 큰 문제점이라고 할 수 있겠네요. (하하..)

비슷해보여도, 동작, 파라미터 등이 다르니 API 문서를 참조하면 좋을 것 같네요.

제가 사용해본 건 비동기 로직을 태운 후에 실행했다는 로그를 남기기 위해 thenAccept() 를 써봤네요.

또한 메소드 체이닝 관점(?)에서 thenApply().thenApply() 가 가능한데, thenCompose() 랑 뭐가 다른가 라고 할 수 있을 것 같은데요. thenCompose() 의 경우 중첩이 아닌 flatMap 처럼 평면화해서 사용하기 때문에 더 유리하다고 합니다.

(참고 : Map vs flatMap)

4. 병렬처리에서 하나 혹은 전부

thenCombine() 을 통해서 병렬 처리할 수 있다고 했었는데요.

여러 개의 CompletableFuture 중에서 빨리 처리되는 1개의 결과 혹은 전부를 가져오는 메소드도 존재합니다.

  1. allOf()
    • 모든 Future 의 결과를 받아서 처리할 수 있습니다.
    • 모든 Future 의 결합된 결과를 반환하지는 않기에, 이 결과를 얻기 위해 Stream API 를 사용합니다.
public String test9() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Future1");

    CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Future2");

    CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "Future3");

    CompletableFuture<Void> combinedFuture
            = CompletableFuture.allOf(future1, future2, future3);

    String combined = Stream.of(future1, future2, future3)
            .map(CompletableFuture::join)
            .collect(Collectors.joining(" + "));

    return combined;
}
  1. anyOf()
    • 여러 개의 CompletableFuture 중에서 가장 빨리 처리 되는 1개의 결과만 가져옵니다.
public Future<Object> test10() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Future1");

    CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Future2");

    CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "Future3");

    return CompletableFuture.anyOf(future1, future2, future3);
}

5. 에러 핸들링

위에서 Future 가 결과 처리 및 에러 핸들링에 대해 어려움이 있었다고 했었는데요.

이 부분을 컨트롤하는 방법을 알아보겠습니다.

public Future<String> test11() {
    String name = null;

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (name == null) {
            throw new RuntimeException("name is null");
        }
        return "hi, " + name;
    }).handle((s, t) -> s != null ? s : "hi unknown!");

    return future;
}

handle 메소드를 통해 에러를 컨트롤 할 수 있습니다.

하지만 익셉션을 집적 던져주고 싶은 경우가 있을텐데요.

public Future<String> test12() {
    String name = null;

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hi, " + name);

    future.completeExceptionally(new RuntimeException("fail"));
    return future;
}

위와 같이 하는 방법이 있을 수 있습니다.

물론 try/catch 블록을 사용할 수도 있습니다,

6. Async 메소드

비동기를 하는데, 당연히 비동기와 관련된 메소드겠지 했는데 갑자기 async 가 붙은 메소드들이 등장했습니다.

간단히 말해 비동기 쓰레드를 사용하지만, 거기서 사용한 쓰레드를 사용하지 않고, 다른 쓰레드를 이용하여 처리하는 방법입니다.

(뭔가 말이 꼬인듯한..?)

주로 3번에서 본 메소드들에 대부분 async 가 들어간 메소드가 추가로 있는 경우 입니다.

 thenApply() -> thenApplyAsnyc()
 thneAccept() -> thenAcceptAsync()
public Future<Void> test13() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Finish13");
    log(future.get());
    CompletableFuture<Void> finish13 = future.thenAcceptAsync(s -> log("Finish13 async"));
    return finish13;
}

public void log(String msg) {
    System.out.println(Thread.currentThread().getName() + "  ::  " + msg);
}

//main  ::  Finish13
//ForkJoinPool.commonPool-worker-1  ::  Finish13 async

위와 같이 서로 다른 쓰레드에서 실행한 것을 알 수 있습니다.

7. 정리

자바 8이 되면서 생긴 CompletableFuture 는 비동기를 처리할 수 있는 많은 방법을 제공합니다.

대부분 써보진 않고, 앞으로도 쓸 일이 얼마나 있을까 싶긴 하지만, 한 번 짚고 넘어가면 (기억엔 남으니까) 괜찮을 것 같습니다.

그리고 자바9로 메소드나 인스턴스등이 추가되었습니다.

사용한 블로그 코드는 (별거 없지만) github 에 있습니다.

'개발 관련 > java' 카테고리의 다른 글

HashTable vs HashMap  (0) 2022.07.14
클래스 vs 인스턴스  (0) 2022.07.14
자바 비동기 (1)  (0) 2022.07.14
캡슐화(encapsulation)  (0) 2022.07.14
Transactional 정리  (0) 2022.07.14