组合操作符
startWith
在开始从可观察源发出元素之前,发出指定的元素序列。可以理解为“+”号,并且后加的先发送。
Observable.of("1", "2", "3", "4")
.startWith("A")
.startWith("B")
.startWith("C", "a", "b")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
C
a
b
B
A
1
2
3
4
*/
merge
将源可观察序列中的元素组合成一个新的可观察序列,并像源可观察序列发出元素一样按顺序发出每个元素(可以看做是两个数组相互穿插,组成一个新的数组)
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
// merge subject1和subject2
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("C")
subject1.onNext("o")
subject2.onNext("o")
subject2.onNext("o")
subject1.onNext("c")
subject2.onNext("i")
/*
输出结果:
C
o
o
o
c
i
*/
zip
将偶数个源可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个源可观测序列的元素
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("C")
stringSubject.onNext("o") // 到这里存储了 C o 但是不会响应,除非另一个响应
intSubject.onNext(1) // 勾出一个
intSubject.onNext(2) // 勾出另一个
stringSubject.onNext("i") // 存一个
intSubject.onNext(3) // 勾出一个
// 说白了: 只有两个序列同时有值的时候才会响应,否则存值
/*
输出结果:
C 1
o 2
i 3
*/
combineLatest
将两个子可观察序列组合成一个新的可观察序列,并用到了两个子序列响应的结果
当两个子序列都发出元素的时候,序列才会响应,并且新发出的元素,会覆盖原来的旧元素
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("L") // 存一个 L
stringSub.onNext("G") // 存了一个覆盖 - 和zip不一样
intSub.onNext(1) // 发现strOB也有G 响应 G 1
intSub.onNext(2) // 覆盖1 -> 2 发现strOB 有值G 响应 G 2
stringSub.onNext("Cooci") // 覆盖G -> Cooci 发现intOB 有值2 响应 Cooci 2
// combineLatest 比较zip 会覆盖
// 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关心账户密码怎么变化的只要查看最后有值就可以 loginEnable
/*
输出结果:
G 1
G 2
Cooci 2
*/
switchLatest
switch开关,切换的意思。所以这个函数根据官方注释可以理解为:只是发出最近接受的可观察序列的元素。即每当一个新的可观察序列被接受的时候,就只是订阅这个序列的,不再订阅之前序列的。
let switchLatestSub1 = BehaviorSubject(value: "L")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
switchLatestSub1.onNext("G")
switchLatestSub1.onNext("_")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") // 2-3都会不会监听,但是默认保存由 2覆盖1 3覆盖2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("*")
switchLatestSub.onNext(switchLatestSub1)
switchLatestSub1.onNext("Cooci") // 原理同上面 下面如果再次切换到 switchLatestSub1会打印出 Cooci
switchLatestSub2.onNext("4")
/*
输出结果:
L
G
_
3
*
Cooci
*/
映射操作符
map
转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。可以理解为函数的映射。
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number+2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
/*
输出结果:
next(3)
next(4)
next(5)
next(6)
completed
*/
flatMap 和 flatMapLatest
将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列 。
这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)。
flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素。
flatMapLatest实际上是map和switchLatest操作符的组合。
struct LGPlayer {
init(score: Int) {
self.score = BehaviorSubject(value: score)
}
let score: BehaviorSubject<Int>
}
let boy = LGPlayer(score: 100)
let girl = LGPlayer(score: 90)
let player = BehaviorSubject(value: boy)
player.asObservable()
.flatMap { $0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
boy.score.onNext(60)
player.onNext(girl)
boy.score.onNext(50)//如果切换到 flatMapLatest 就不会打印
boy.score.onNext(40)//如果切换到 flatMapLatest 就不会打印
girl.score.onNext(10)
girl.score.onNext(0)
//flatMapLatest实际上是map和switchLatest操作符的组合。
/*
输出结果:
100 // player中默认的boy的分数
60
90
50
40
10
0
*/
scan
从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 10 + 2
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 这里主要强调序列值之间的关系
/*
输出结果:
12 aggregateValue:2 + newValue:10
112 aggregateValue:12 + newValue:100
1112 aggregateValue:112 + newValue:1000
*/
过滤条件操作符
filter
仅从满足指定条件的可观察序列中发出那些元素
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
2
4
6
8
0
*/
distinctUntilChanged
抑制可观察序列发出的顺序重复元素
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
1
2
3
4
*/
elementAt
仅在可观察序列发出的所有元素的指定索引处发出元素
Observable.of("C", "o", "o", "c", "i")
.elementAt(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
c
*/
single
只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误(但是当是满足条件的第一个元素时,发出多个元素,也不会抛出错误)。
Observable.of("Cooci", "Kody")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
print("-----------------------")
Observable.of("Cooci", "Kody")
.single { $0 == "Kody" }
.subscribe { print($0) }
.disposed(by: disposeBag)
/*
输出结果:
Cooci
Unhandled error happened: Sequence contains more than one element.
subscription called from:
-----------------------
next(Kody)
completed
*/
take
从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
Observable.of("Hank", "Kody","Cooci", "CC")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
Hank
Kody
*/
takeLast
仅从可观察序列的末尾发出指定数量的元素
Observable.of("Hank", "Kody","Cooci", "CC")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
Kody
Cooci
CC
*/
takeWhile
只要指定条件的值为true,就从可观察序列的开始发出元素
Observable.of(1, 2, 3, 4, 5, 6)
.takeWhile { $0 < 3 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
1
2
*/
takeUntil
从源可观察序列发出元素,直到参考可观察序列发出元素
这个要重点,应用非常频繁 比如我页面销毁了,就不能获取值了(cell重用运用)
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("Cooci")
sourceSequence.onNext("Kody")
sourceSequence.onNext("CC")
referenceSequence.onNext("Hank") // 条件一出来(参考可观察序列发出元素),下面就走不了
sourceSequence.onNext("Lina")
sourceSequence.onNext("小雁子")
sourceSequence.onNext("婷婷")
/*
输出结果:
next(Cooci)
next(Kody)
next(CC)
completed
*/
skip
`skip跳过的意思。从源可观察序列发出元素都不响应,直到跳过指定的元素个数,后面发出的元素才会响应。
应用非常频繁 比如用在 textfiled 都有默认序列产生的时候 skip(1) 一次
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
3
4
5
6
*/
skipWhile
上面skip满足某个条件的情况
Observable.of(1, 2, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
4
5
6
*/
skipUntil
抑制从源可观察序列发出元素,直到参考可观察序列发出元素。和上面的takeUntil
正好相反。
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.skipUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 没有条件命令 下面走不了
sourceSeq.onNext("Cooci")
sourceSeq.onNext("Kody")
sourceSeq.onNext("CC")
referenceSeq.onNext("Hank") // 条件一出来(参考可观察序列发出元素),下面就可以走了
sourceSeq.onNext("Lina")
sourceSeq.onNext("小雁子")
sourceSeq.onNext("婷婷")
/*
输出结果:
Lina
小雁子
婷婷
*/
集合控制操作符
toArray
将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
/*
输出结果:
success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
*/
reduce
从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素累计运算,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
1111
*/
concat
以顺序方式连接来自一个可观察序列的内部可观察序列的元素。在从下一个序列发出元素之前响应,等待前面序列成功终止过后,才能响应下一个序列。可用来控制顺序
let subject1 = BehaviorSubject(value: "Hank")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("Cooci")
subject1.onNext("Kody")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出来")
subject2.onNext("2")
subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
subject2.onNext("3")
/*
输出结果:
next(Hank) // subjectsSubject 的默认值
next(Cooci)
next(Kody)
next(2)
next(3)
*/
可观察对象的错误通知中返回的操作符
catchErrorJustReturn
从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
let lgError = NSError.init(domain: "com.lgerror.cn", code: 10090, userInfo: nil)
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("w错误w")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("Hank")
sequenceThatFails.onNext("Kody") // 正常序列发送成功的
sequenceThatFails.onError(self.lgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
/*
输出结果:
next(Hank)
next(Kody)
next(w错误w) // 返回了错误信息,并且终止
completed
*/
catchError
通过切换到提供的恢复可观察序列,从错误事件中恢复,而不抛出错误
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence // 获取到了错误序列-我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert)
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("Hank")
sequenceThatFails.onNext("Kody") // 正常序列发送成功的
sequenceThatFails.onError(lgError) // 发送失败的序列
recoverySequence.onNext("CC")
/*
输出结果:
Error: Error Domain=com.lgerror.cn Code=10090 "(null)"
next(CC)
*/
retry
通过无限地重新订阅可观察序列来恢复重复的错误事件
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("Hank")
observer.onNext("Kody")
observer.onNext("CC")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.lgError) // 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1 // count += 1, 下次就不会走到错误序列来了
}
observer.onNext("Lina")
observer.onNext("小雁子")
observer.onNext("婷婷")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
Hank
Kody
CC
错误序列来了 // 发生错误,这次序列结束,重试序列开始发生
Hank // 重试序列,这次count=2,没有发生错误序列,也就不会再重新订阅了
Kody
CC
Lina
小雁子
婷婷
*/
retry(_:)
和上面一样,只不过规定了重试次数,达到次数过后还没有恢复,就抛出错误
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("Hank")
observer.onNext("Kody")
observer.onNext("CC")
if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
observer.onError(self.lgError)
print("错误序列来了")
count += 1
}
observer.onNext("Lina")
observer.onNext("小雁子")
observer.onNext("婷婷")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
输出结果:
Hank
Kody
CC
错误序列来了
Hank
Kody
CC
错误序列来了
Hank
Kody
CC
错误序列来了
Unhandled error happened: Error Domain=com.lgerror.cn Code=10090 "(null)"
subscription called from:
*/
debug
打印所有订阅、事件和处理。
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)