본문 바로가기
iOS/개념

[RxSwift] Operator 연습 3탄 : Combining Operator

by 나리._. 2021. 12. 13.

이전의 Operator들에서는 Observable 시퀀스를 어떻게 필터링(Filtering Operator)하고 변형하는지(Transforming Operator)를 정리했다면 !

 

이번에는 Combining Operator를 이용하여 Observable 시퀀스들을 모으고 시퀀스 내의 데이터들을 병합하는 방법에 대해 정리할 것이다 🤔

 

1. startWith

Observable에서 작업할 때 초기값을 받는지 여부가 중요하다.

현재 위치, 네트워크 상태 등의 초기값이 필요한 경우에 쓰임!

import RxSwift


let disposeBag = DisposeBag()

let yellowClass = Observable<String>.of("student1","student2","student3")

yellowClass
    .enumerated() // enumerated 이용 Observable 시퀀스의 index값과 element 분리
    .map { index, element in
        return element + " \(index)" + "번째 어린이"
    }
    //  teacher 먼저, startWith는 코드 위치 상관 x -> 구독 전에만 붙이면 됨
    .startWith("teacher") // element와 동일한 형태의 값 (여기서는 String) 넣어야함
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

 

 

2. concat

startWith와 비슷하게 생각하면 됨 ( startWith = cocat(더 일반적)인 연산자의 변형)

startWith는 하나의 Element를 앞에 추가했지만, concat을 이용하면 앞 뒤로 여러개의 Element를 추가할 수 있다.

let yellowClassStudent = Observable<String>.of("student1","student2","student3")
let teacher = Observable<String>.of("teacher")

let walk = Observable
    .concat([teacher, yellowClassStudent]) // 컬렉션과 컬렉션 , 시퀀스와 시퀀스 조합으로 만들수 있음
    
// concat에 넣은 순서대로 출력
walk
    .subscribe(onNext: {
        print($0)// teacher, student1, student2, student3 순서로 줄바꿈하며 출력됨
    })
    .disposed(by: disposeBag)

 

아래와 같이 concat을 이용할 수도 있다.

teacher
    .concat(yellowClassStudent)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

 

3. concatMap

flatMap과 비슷하지만 concatMap은 flatMap와 다르게 순서가 보장된다.

이전에 flatMap을 정리할 때,

flatMap은 방출된 element별로 서로 다른 Observable을 생성하기 때문에 별도의 stream과 lifecycle을 가지고 있다고 했다.

 

하지만 concatMap은 순서를 유지하기 때문에, 이전에 방출된 element가 아직 처리 중이라면 끝날 때까지 계속해서 기다리게 된다.

(방출된 여러 데이터가 비동기적으로 동시에 처리되어야 한다면 concatMap을 사용 x)

let 어린이집: [String : Observable<String>] = [
    "노랑반" : Observable.of("y1","y2","y3"),
    "파랑반" : Observable.of("b1","b2")
]

Observable.of("노랑반","파랑반")
    .concatMap { 반 in
        // 위에서 받은 반에 대응하는 observable 값 나옴, 없다면 empty observable
        어린이집[반] ?? .empty()
    }
    .subscribe(onNext: {
        print($0) //y1, y2, y3, b1, b2 순서대로 출력
    })
    .disposed(by: disposeBag)

 

4. merge

merge는 말 그대로 여러개의 시퀀스를 합치는 operator이다.

let 강북 = Observable.from(["성북구","동대문구","종로구"])
let 강남 = Observable.from(["강남구","강동구","영등포구"])

Observable.of(강북, 강남)
    .merge()
    .subscribe(onNext: {
        print($0) // 두개의 옵저버블을 합쳐서 구독 -> 순서 없이 방출
    })
    .disposed(by: disposeBag)

다음과 같이 순서 보장 없이 도착하는 순서대로 섞여서 방출되는 것을 볼 수 있다!

merge로 묶어진 시퀀스는 소스가 되는 (여기서는 강남, 강북)이 만료되었을 때 종료된다.

또 이 Observable들 중 하나라도 에러를 방출하게 되면 이 전체 Observable이 에러를 방출하고 종료한다.

 

* merge - maxConcurrent

한 번에 받아낼 Observable 수 지정.

maxConcurrent를 1로 지정을 하면 첫 번째로 구독을 시작하게 된 Observable이 전부 element를 방출하기 전에는 다른 Observable을 받지 않음.

Observable.of(강북, 강남)
    .merge(maxConcurrent: 1) // 한번에 받아낼 옵저버블 수 -> 순서 보장되는 것처럼 보임
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

 

 

결과를 보면 여기서는 강북이 먼저 들어왔고 강북 시퀀스가 완료된 후 강남 시퀀스를 받은 것을 확인할 수 있다.

 

이러한 maxConcurrent의 적절한 용도는

네트워크 요청이 많아질 때 리소스를 제한하거나 연결 수를 제한하기 위해 사용할 수 있다.

 

 

5. combineLatest

let 성 = PublishSubject<String>()
let 이름 = PublishSubject<String>()

let 성명 = Observable
    .combineLatest(성, 이름){ 성, 이름 in
        성 + 이름
    }

성명
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

성.onNext("이")
이름.onNext("가희")
이름.onNext("나희")
이름.onNext("다희")
성.onNext("박")
성.onNext("김")
성.onNext("최")

CombineLatest는 여러 텍스트 필드를 한 번에 관찰하고 값을 결합하거나

여러 소스들의 상태를 봐야 할 때 자주 쓰인다.

개수를 맞춰서 짝을 이루는 게 아닌, combineLatest의 최신의 값 (여기서는 성 나온 뒤 이름의 최신의 값) 나오면 합쳐져서 출력!

성이 새로운 이벤트를 발생했을 때(박, 김, 최) 여기서 최신의 이름(다희)과 조합돼서 나옴

 

combineLatest의 다른 예 1 - 동일한 날짜를 받아도 다른 형식으로 출력되는 코드

combineLatest는 소스를 여러 개(하나의 combineLatest에 최대 8개)를 받을 수 있다. 

let 날짜형식 = Observable<DateFormatter.Style>.of(.short, .long)
let 현재날짜 = Observable<Date>.of(Date())

let 현재날짜표시 = Observable
    .combineLatest(
        날짜형식,
        현재날짜,
        resultSelector: { 형식, 날짜 -> String in
            let dateFormatter = DateFormatter()
            dateFormatter.dateStyle = 형식
            return dateFormatter.string(from: 날짜)
        }
    )

현재날짜표시
    .subscribe(onNext: {
        print($0) // short type, long type 출력
    })
    .disposed(by: disposeBag)

 

 

combineLatest의 다른 예 2 - Array내에 최종 값을 받는 형태

let lastName = PublishSubject<String>()
let firstName = PublishSubject<String>()

let fullName = Observable
    // collection으로 받기
    .combineLatest([firstName, lastName]) { name in
        name.joined(separator: " ")
    }

fullName
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

lastName.onNext("Kim")
firstName.onNext("a")
firstName.onNext("b")
firstName.onNext("c")
firstName.onNext("d")

 

 

6. zip

CombineLastest와 비슷하지만 조금 다르다.

CombineLastest은 각 observable에서 이벤트가 발생하면 바로 새로운 이벤트를 받았지만,

zip은 각 observable에 같은 횟수의 이벤트가 들어올 때 실행된다.

즉,  둘 중 하나의 observable이 끝나게 되면 zip에 대한 observable도 끝남

 

또 zip은 소스 여러 개(하나의 zip에 최대 8개)를 가질 수 있다.

enum Result {
    case win
    case lose
}

let race = Observable<Result>.of(.win, .win, .lose, .win, .lose)
let player = Observable<String>.of("a1","a2","a3","a4","a5","a6")

let result = Observable
    .zip(race, player) { result, player in
        return player + "선수" + " \(result)"
    }
result
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

여기서는 race 옵저버블이 끝났기 때문에 zip에 대한 observable도 끝남 ->  a6 가 나오지 않음!

 

 

8. withLatestFrom

withLatestFrom의 첫 번째 observable이 trigger(방아쇠) 역할을 한다.

즉, 첫번째 observable이 이벤트를 발생시켜야만 

withLatestFrom 연산자 안에 들어가는 두 번째 observable의 가장 최신의 이벤트가 나타난다.

let trigger = PublishSubject<Void>()
let runner = PublishSubject<String>()

trigger
    .withLatestFrom(runner) // trigger가 이벤트를 발생해야만 runner observable의 가장 최신의 이벤트들이 나타남
    .subscribe(onNext: {
        print($0) // p3 2번 출력
    })
    .disposed(by: disposeBag)

runner.onNext("p1")
runner.onNext("p2")
runner.onNext("p3")
trigger.onNext(Void())
trigger.onNext(Void())

 

 

9. sample

sample은 withLatestFrom와 비슷하지만, withLatestFrom과 달리 단 한 번만 방출한다!

withLatestFrom을 sample과 똑같이 사용하려면 .distinctUntilChanged() (동일한 이벤트 거르는 함수) 이용!

let startSign = PublishSubject<Void>()
let f1 = PublishSubject<String>()

f1
    .sample(startSign)
    .subscribe(onNext: {
        print($0) // p3 한번 출력
    })
    .disposed(by: disposeBag)

f1.onNext("p1")
f1.onNext("p2")
f1.onNext("p3")
startSign.onNext(Void())
startSign.onNext(Void())
startSign.onNext(Void())

 

 

10. amb

amb는 ambiguous(모호한)의 약자이다.

두 가지 시퀀스를 받을 때 두가지 시퀀스 중 어떤 것을 구독할지 애매할 때 사용한다.

let bus1 = PublishSubject<String>()
let bus2 = PublishSubject<String>()

let busStation = bus1.amb(bus2)

busStation
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

bus1.onNext("버스1 - 승객1")
bus2.onNext("버스2 - 승객1")
bus1.onNext("버스1 - 승객2")
bus1.onNext("버스1 - 승객3")
bus2.onNext("버스1 - 승객2")

결과를 보면 버스1에 대한 것만 방출하고 있는 것을 볼 수 있다. 혹은 버스2에 대한 것만 방출한다.

왜일까?

두 가지 옵저버블을 구독하긴 하지만, 먼저 방출하는 옵저버블이 생기면 나머지 옵저버블은 구독하지 않는다.

즉, amb는 amb가 가지고 있는 옵저버블들을 지켜보다가 먼저 이벤트를 방출하는 옵저버블만 구독한다.

 

amb의 적절한 용도는 여러 서버 연결 시도 중, 가장 먼저 응답을 준 서버와 통신을 계속하는 상황에 적합하다.

 

 

11. switchLatest

switchLatest 연산자는 소스 observable로 들어온 마지막 시퀀스의 아이템만 구독한다.

 

let student1 = PublishSubject<String>()
let student2 = PublishSubject<String>()
let student3 = PublishSubject<String>()

let handup = PublishSubject<Observable<String>>()

let class1 = handup.switchLatest()
// 소스 observable -> 여기서 소스는 handup

class1
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

// 손든 학생만 말하는 교실
handup.onNext(student1) // handup(소스 옵저버블)에 student1
student1.onNext("학생1: 저는 학생1 입니다.") // student1에 대한 이벤트만 구독
student2.onNext("학생2: 저요!!")

handup.onNext(student2) // handup(소스 옵저버블)에 student2
student2.onNext("학생2: 저요 저요!!") // 마지막 시퀀스(가장 최신의) student2에 대한 이벤트만 구독
student1.onNext("학생1: 제가 말하고 있었는데..")

handup.onNext(student3) // handup(소스 옵저버블)에 student3
student2.onNext("학생2: 저요 저요!!")
student1.onNext("학생1: ㅠㅠ")
student3.onNext("학생3: 학생 3입니다. 제가 말할게요.") //마지막 시퀀스(가장 최신의) student2에 대한 이벤트만 구독

handup.onNext(student1) // handup(소스 옵저버블)에 student1
student2.onNext("학생2: 저는요 ??!!")
student1.onNext("학생1: ㅎㅎ") // 가장 최신의 시퀀스는 다시 student1
student3.onNext("학생3: ㅠㅠ")

다음과 같이 출력되는 것을 확인할 수 있다.

 

 

12. reduce

시퀀스 내의 요소들 간의 결합 (swift의 reduce함수와 동일하다고 생각하면 된다.)

Observable.from((1...10))
//    .reduce(0, accumulator: { summary, newValue in
//        return summary + newValue
//    })
//    .reduce(0) { summary, newValue in
//        return summary + newValue
//    }
    .reduce(0, accumulator: +)
    .subscribe(onNext: {
        print($0) // 55 출력
    })
    .disposed(by: disposeBag)

위의 코드는 3가지 형태로 작성해보았다.

 

 

13. scan

reduce는 최종 결과값만 방출하지만 scan은 옵저버블 타입으로 매번 값이 들어올 때마다 변화된 element를 출력한다.

Observable.from((1...10))
    .scan(0, accumulator: +)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

다음과 같이 변화하는 element 값을 출력하는 것을 볼 수 있다.