- RxSwift 使用需要导入
import RxSwift
和import RxCocoa
,RxSwift
是基础库,RxCocoa
对我们Foundation
中的常用组件的监听帮我们做了扩展,如果不用RxCocoa
的话,我们想要监听这些控件的属性时,需要自己做很多的东西。 - RxBlocking 和 RxTest 分别负责集成测试和单元测试
常用方式简单体验
- 常用方式一: 监听按钮的点击事件
testBtn.rx.tap.subscribe{ (event : Event<()>) in
print("按钮发生了点击");
}
对于订阅事件需要返回值,而我们没有写返回值的警告
,我们可以定义一个 DisposeBag
的变量来,并添加到订阅后来解决, 比如上面的代码我们只做了打印,并没有返回值,这是会有个警告提示,那么我们就可以通过这种方式解决
fileprivate lazy var bag: DisposeBag = DisposeBag()
testBtn.rx.tap.subscribe{ (event : Event<()>) in
print("按钮 1 发生了点击");
}.disposed(by: bag)
对于使用 RxSwift 中没提示的问题
,可以先定义一个局部变量,使用局部变量将方法点出来之后再复制,比如上面的代码testBtn1.rx.tap.
的时候点不出来了,就可以定义一个局部变量let btn = UIButton()
,使用 btn.rx.tap.
出来要用的方法,然后复制粘贴
- 常用方式二: 订阅 TextField 文字的改变
textField.rx.text.subscribe{ (event : Event<String ?>) in
print(event.element!!) // 这里 event 是一个 Next 值
}.disposed(by: bag)
// 或者
textField.rx.text.subscribe( onNext: {(str: String? ) in
print(str!)
}).disposed(by: bag)
- 常用方式三: 将 textField 的text值绑定到一个 Lable 上
textField.rx.text.bindTo(textLable1.rx.text).disposed(by: bag)
- 常用方式四: 监听Lable 的text的变化
testLabel.rx.observe(String.self, "text").subscribe(onNext:{(str: String?) in
print(str!)
}).disposed(by: bag)
- 常用方式五: 监听 ScrollView 的滚动
scrollView.contentSize = CGSize(width: 10000, height: 200)
scrollView.rx.contentOffset.subscribe(onNext:{(point: CGPoint) in
print(point)
}).disposed(by: bag)
Event 的类型
@frozen public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
- next: 发送数据
- error: 发送错误
- completed: 发送完成,发送 completed 之后,相应的资源就会被回收掉,再发送 next 或者 error 订阅者都不会收到
Observable
- never: 创建一个从来都不执行的 Observable
let never = Observable<String>.never()
never.subscribe{(event : Event<String>) in
print(event)
}.disposed(by: bag)
不理解为啥既然从来都不执行了,还要创建这个东西干嘛?
其实never的用途挺多的,比如当一个函数要求我们返回一个Observable,而函数的实现部分有可能出现异常或者某些重要参数解包失败的情况,这个时候我们就可以返回一个never, 比如:
public func changeInChannelMute(channelId: Int?) -> Observable<Bool> {
guard let id = channelId else { return .never() }
return Observable.create(.....)
}
再比如我们可以用never来帮助实现协议某些可选接口的默认实现,
extension DeviceErrorDispatching {
public func deviceErrorStream() -> Observable<Error> {
return Observable.never()
}
}
- empty: 创建一个 empty Observable, empty 就是一个空的 sequence,只能发出一个 completed 的事件
let empty = Observable<String>.empty()
empty.subscribe{(event : Event<String>) in
print(event) // 这里会打印一次 completed 事件
}.disposed(by: bag)
- just: 创建一个只能发出一种特定的事件和一个 completed 事件.
let just = Observable.just("张三")
just.subscribe{(event : Event<String>) in
print(event) // 这里会打印两次,一次 Next 事件(Next(张三)),一次 completed 事件
}.disposed(by: bag)
- of : 创建一个sequence 能发出很多种事件信号,发送完事件信号后,发送 completed 事件
let oftest = Observable.of("a","b","c","d")
oftest.subscribe{(event : Event<String>) in
print(event) // 这里会打印五次,四次 Next 事件一次输出a,b,c,d 和 一次 completed 事件
}.disposed(by: bag)
- error: 创建一个 event 值只能是 error 的序列,发送完 error 事件后,不会再发送 completed 事件
let err = NSError(domain: "test", code: -1,userInfo: nil)
_ = Observable<Int>.error(err).subscribe{
(event: RxSwift.Event<Int>) -> Void in
print(event)// 这里会打印一次 error 事件
}
- from: 从数组中创建 sequence
let fromtest = Observable.from([1,2,3,4])
fromtest.subscribe{(event : Event<Int>) in
print(event) // 这里会打印五次,四次 Next 事件一次输出1,2,3,4 和 一次 completed 事件
}.disposed(by: bag)
- create: create 可以自定义可观察的 sequence, 需要传入一个观察者 observer,然后调用 observer 的 onNext, onCompleted 和 onError 方法,返回一个可观察的 obserable 序列
let create = Observable.create({(observer : AnyObserver<Any>) -> Disposable in
observer.onNext("zhangsan")
observer.onNext(18)
observer.onNext(1.8)
observer.onCompleted()
return Disposable.create()
})
create.subscribe {
print($0) // 会打印三次 Next 事件和一次 Completed 事件
}
- generate: 可以生成一连串事件,并且允许我们根据上一次事件的结果设定下一次事件,并设置 completed 的条件。initialState: 初始值, condition: 返回 true 时就生成 next,否则就生成completed, iterate: 用于设置每次事件发送之后,对事件值进行的迭代操作
let generate = Observable.generate(initialState: 0, condition: { $0 < 5 }, iterate: { $0 + 1 })
generate.subscribe{
print($0) // 会输出 next(0),next(1),next(2),next(3),next(4)和一次 completed
}
- deferred: 对创建的事件队列的一种修饰,被 deferred 修饰之后,事件序列只有在被订阅时才生成,起到延时创建的作用,并且发送事件, 并且每订阅一次就新生成一个事件序列
let deferred = Observable<Int>.deferred{
print("Generating")
return Observable.generate(initialState: 0, condition: {$0 < 3}, iterate: {$0 + 1})
}
_ = deferred.subscribe{
print($0) //会输出 next(0),next(1),next(2) 和一次 completed
}
- range
let range = Observable.range(start:1, count:10)
range.subscribe{(event : Event<Int>) in
print(event) // 这里会打印十一次,十次 Next 事件,一次 completed 事件
}.disposed(by: bag)
- repeatElement
let repeat = Observable.repeatElement("hello world")
repeat.subscribe{(event : Event<String>) in
print(event) // 这里会一直重复打印next 事件 next(hello world)
}.disposed(by: bag)
// 如果需要执行次数,需要先使用 repeat.take(count: Int) 方法指定次数
repeat.take(4).subscribe{(event : Event<String>) in
print(event) // 这里会一直重复打印next 事件 next(hello world)
}.disposed(by: bag)
Subjects
- subjects 是 Observable 和Observer 之间的桥梁,一个 Subject 既是一个 Observable 也是一个 Observer,它既可以发出事件,也可以监听事件
- PublishSubject: 当你订阅 PublishSubject 的时候,你只能收到订阅它之后发生的事件。subject.onNext()发出 onNext 事件,对应的 onError() 和 onCompleted()事件。调用 dispose()方法之后subject就会被释放, 当 subscribe 调用 dispose 方法后,subject 再发出的事件 subscribe 就收不到了
let subject = PublishSubject<String>()
subject.onNext("18"); //发送的这个 onNext 事件,下面的订阅收不到,因为发生在订阅之前
let sub1 = subject.subscribe {
print("\($0)") //输出 next(张三) ,但收不到 next(李四) 和 next(王五) ,因为在发出李四和王五事件之前 sub1 已经调用了 dispose 方法被释放掉了
}
subject.onNext("张三")
sub1.dispose()
let sub2 = subject.subscribe {
print("\($0)") //输出 next(李四) 和 next(王五); 收不next(张三), 因为发送张三事件发生在 sub2 订阅之前
}
subject.onNext("李四")
subject.onNext("王五")
PublishSubject 相当于是会员制,只有先加入会员才能订阅
- BehaviorSubject, 订阅者可以接受订阅之前的最后一个事件。相当于是试用制,在加入会员之前可以先试用。
let subject = BehaviorSubject<String>(value: "默认值")
let sub1 = subject.subscribe {
print("====\($0)") // 输出 next(默认值) 和 next(张三),
}
subject.onNext("张三")
sub1.dispose() // 执行完之后,之前的订阅就收不到事件了
let sub2 = subject.subscribe {
print("----\($0)") // 输出next(张三),next(李四),next(王五)
}
subject.onNext("李四")
subject.onNext("王五")
控制台输出:
====next(默认值)
====next(张三)
----next(张三)
----next(李四)
----next(王五)
- ReplaySubject: 当你订阅 ReplaySubject 的时候,你可以接收到订阅它之后的事件,也可以接收到订阅它之前发出的事件,接收几个事件取决与 bufferSize 的大小
let subject = ReplaySubject<String>.create(bufferSize: 2)
subject.onNext("张三")
subject.onNext("李四")
subject.onNext("王五")
let sub1 = subject.subscribe {
print("====\($0)") // 由于 bufferSize 是 2,所以能收到李四和王五两次事件
}
sub1.dispose()
subject.onNext("赵六")
let sub2 = subject.subscribe {
print("----\($0)") // 订阅之前 subject 发出的最后两次事件是王五和赵六,所以收到王五,赵六和订阅之后的所有时间
}
subject.onNext("孙七")
subject.onNext("周八")
subject.onNext("吴九")
控制台输出:
====next(张三)
====next(李四)
----next(王五)
----next(赵六)
----next(孙七)
----next(周八)
----next(吴九)
- AsyncSubject : 如果发出error 事件, 订阅会收到 error事件; 如果只有 next 事件,那么什么也收不到, 如果有 next 和 completed 事件,会收到 completed 之前的最后一次 next 事件和 completed 事件; 如果只有 completed 事件,那么只会收到completed 事件
let subject = AsyncSubject<String>()
let sub = subject.subscribe {
print("====\($0)")
}
subject.onNext("这是第一条消息,不会被打印出来")
subject.onNext("这是第二条消息")
// subject.onError(Testerror.errorA) // 执行到这里控制台会输出 "====error(errorA)"
subject.onCompleted() //当把上面一行发送 error 事件的代码注释掉后,控制台会打印"====next(这是第二条消息)" 和 "====completed"
let subject1 = AsyncSubject<String>()
let sub1 = subject1.subscribe {
print("====\($0)")
}
subject1.onError(Testerror.errorB) // 控制台输出 "====error(errorB)"
let subject2 = AsyncSubject<String>()
let sub2 = subject2.subscribe {
print("====\($0)")
}
subject2.onCompleted() // 控制台输出 "====completed"
let subject3 = AsyncSubject<String>()
subject3.onNext("subject3")
subject3.onCompleted() // 控制台输出 "====next(subject3)",和 ====completed
let sub3 = subject3.subscribe {
print("====\($0)")
}
let subject4 = AsyncSubject<String>()
subject4.onNext("subject4")
subject4.onError(Testerror.errorB) // 控制台输出"====error(errorB)",
let sub4 = subject4.subscribe {
print("====\($0)")
}
let subject5 = AsyncSubject<String>()
let sub5 = subject5.subscribe {
print("====\($0)")
}
subject5.onNext("subject5") // 控制台什么也不输出,因为只有发送了 completed 事件才会将最后一次事件输出
控制台输出:
====next(这是第二条消息)
====completed
====error(errorB)
====completed
====next(subject3)
====completed
====error(errorB)
总结下来就是:
1.无Error 无Completed 只有onNext , 订阅者什么都收不到
2.无Error 有onNext 有Completed, 订阅者收到最后一条 onNext 和 completed
3.只有Completed, 订阅者收到 completed
4.有Error,订阅者收到 error
- ControlProperty: 专门用于描述 UI 控件属性的, RxCocoa 给很多 Foundation 中控件添加的属性,比如 UILable或 UIField 的 text, UIButton 的 tap. 通过这些属性我们可以很方便的做很多事情,比如 可以通过类似
textField.rx.text.bind(to: label.rx.text)
调用将 textField 中输入的字符串实时显示到 label 上, 它具有以下特征:
- 不会产生 error 事件
- 一定在 MainScheduler 订阅(主线程订阅)
- 一定在 MainScheduler 监听(主线程监听)
- Variable(已被弃用): Variable 是 BehaviorSubject 的一个包装箱,使用的时候需要调用 asObservable()拆箱,里面的 value 是一个 BehaviorSubject, 如果要发出事件,直接修改对象的 value 即可,他不会发出 error 事件,但是会自动发出 completed 事件
let variable = Variable("a")
variable.asObserver().subscribe{(event : Event<String>) in
print(event) // 会接收到 a,b,c
}.disposed(by: bag)
variable.value = 'b'
variable.value = 'c'
资源回收
- 当监听一个事件序列的时候,有消息事件来了,我们做某些事情。但是这个事件序列不再发出消息了,我们的监听也就没有什么存在价值了,所以我们需要释放我们这些监听资源,其实也就是每种编程语言中的内存资源释放。
- 当一个 Observable 向订阅者发送 completed 或者 error 事件之后observable 的使命就结束了,属于这个 observable 的所有资源都会被自动回收,但有时事件队列在某些情况下是无线的,比如一个定时器,他在固定的时间间隔里不断地发送事件,对于这样的无线序列,要回收它的资源我们应该怎么做呢?
- 当我们对一个 subject 订阅的时候,订阅的内部会强引用这个subject,导致这个 subject 释放不掉。这是我们可以通过两种方式将 subject 释放掉, 通过调用
dispose()
或disposed(by: )
函数将其释放掉 - dispose 相当于 MRC 中手动调用 release 操作, 当调用完 dispose 后,因为观察者已经销毁,所以后面发送的所有事件都不会收到
let variable = Variable("a")
variable.asObserver().subscribe{(event : Event<String>) in
print(event) // 会接收到 a,
}.dispose()
variable.value = 'b'
variable.value = 'c'
- Dispose Bags: 除了上面的手动释放方法,还有这种自动释放的方式,类似 ARC, 通过调用
disposed(by: )
方法,将我们的观察者添加到 bag 中,当 bag 销毁的时候,会将 bag 中的所有观察者销毁,类似自动释放池
var bag = DisposeBag()
letlet variable = Variable("a")
variable.asObserver().subscribe{(event : Event<String>) in
print(event) // 会接收到 a,
}.disposed(by: bag)
UIBindingObserver
- UIBindingObserver 可以帮助我们创建自己的监听者,有时候RxCocoa(RxSwift中对UIKit的一个扩展库)给的扩展不够使用。这时我们就可以使用UIBindingObserver自己扩展。比如UITextField有个isEnable属性,我想把这个isEnable变成一个observer,就可以这样做:
extension Reactive where Base: UITextField{
var inputEnable: UIBindingObserver<Base, Result>{
return UIBindingObserver(UIElement: base) { textField, result in
textField.isEnabled = result.isValid
}
}
}
map 和 flatMap 的区别
- 以下面一个 textField 输入值的事件来举例:
- 公用代码
private func getStringObservable(text: String) -> Observable<String> {
return Observable.create {
(observer: AnyObserver<String>) -> Disposable in
observer.on(.next(text))
observer.onCompleted()
return Disposables.create { print("=========") }
}
}
- 假设都是在 textField 中输入字符 3
- 使用 map
textField.rx.value.filter{
guard let temp = $0 else {
return false
}
return temp.count > 0
}
.map{
self.getStringObservable(text: $0!)
}
.subscribe(onNext: {
// 在这里断点执行 po $0.self 控制台输出 <AnonymousObservable<String>: 0x600002b1e7e0>
print("====\($0)") // print 输出 ====RxSwift.(unknown context at $109b1cfb8).AnonymousObservable<Swift.String>
})
- 使用flatMap
textField.rx.value.filter{
guard let temp = $0 else {
return false
}
return temp.count > 0
}
.flatMap{
self.getStringObservable(text: $0!)
}
.subscribe(onNext: {
// 在这里断点执行po $0.self 控制台输出 "3"
print("====\($0)") // print 输出 ====3
})
- 通过以上对比发下使用maps时,向下传的是getStringObservable的返回值类型也就是Observable<String>,它依旧是一个 Observable 序列; 而是用 flatMap 时向下传的是 String 类型也就是getStringObservable的返回值类型中的泛型类,帮我们去掉了 Obervable 的包装。
再举个例子说明 map 和 flatMap 的区别
struct Player {
var score: BehaviorRelay<Int>
}
let John = Player (score: BehaviorRelay (value: 70))
let Jole = Player (score: BehaviorRelay (value: 90))
let players = PublishSubject<Player>()
players.asObserver()
.map{
$0
}
.subscribe(onNext: { (num) in
print("map \(num)") // 控制台输出: map Player(score: RxRelay.BehaviorRelay<Swift.Int>) ,map Player(score: RxRelay.BehaviorRelay<Swift.Int>)
})
players.asObserver()
.flatMap { (player:Player) -> Observable<Int> in
player.score.asObservable()
}
.subscribe(onNext: { (num) in
print("flatMap \(num)") // 控制台输出: flatMap 70, flatMap 75,flatMap 90,flatMap 85,flatMap 95
})
players.onNext(John)
John.score.accept(75)
players.onNext(Jole)
John.score.accept(85)
Jole.score.accept(95)
- 另外一点是flatMap 闭包中要求我们返回ObservableConvertibleType类型,而 map 中则无要求,可以返回任何类型的值
Operators
- ignoreElements: 操作符将阻止 Observable 发出 next 事件,但是允许他发出 error 或 completed 事件。
如果你并不关心 Observable 的任何元素,你只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符 - skip: 跳过 Observable 中头 n 个元素
- skipWhile: 跳过 Observable 中头几个元素,直到元素的判定为否:
比如:Observable.of(1, 2, 3, 4, 3, 2, 1) .skipWhile { $0 < 4 } .subscribe(onNext: { print($0) }) // 输出 4,3,2,1
- skipUntil: 跳过 Observable 中头几个元素,直到另一个 Observable 发出一个元素
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.skipUntil(referenceSequence)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")
referenceSequence.onNext("🔴")
sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")
// 输出:
🐸
🐷
🐵
- distinctUntilChanged: 阻止 Observable 发出相邻相同的元素
Observable.of("1", "2", "2", "2", "3", "3", "2")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1,2,3,2
- elementAt :只发出 Observable 中的第 n 个元素
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.elementAt(3)
.subscribe(onNext: { print($0) })
// 输出 🐸
- filter: 仅仅发出 Observable 中通过判定的元素
Observable.of(2, 30, 22, 5, 60, 1)
.filter { $0 > 10 }
.subscribe(onNext: { print($0) })
//输出: 30, 22, 60
- take: 仅仅从 Observable 中发出头 n 个元素
Observable.of("1", "2", "3", "4", "5", "6")
.take(3)
.subscribe(onNext: { print($0) })
// 输出 1,2,3
- takeWhile: 直到某个元素判定为 false终止
Observable.of(1, 2, 3, 4, 5, 2, 1)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
// 输出: 1,2,3
- takeUntil: 忽略掉在第二个 Observable 产生事件后发出的那部分元素
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
referenceSequence.onNext("0")
sourceSequence.onNext("4")
sourceSequence.onNext("5")
sourceSequence.onNext("6")
// 输出: 1,2,3
- takeLast: 只发出最后n 个元素和 completed 事件,必须要等 completed 发出后才订阅得到; 如果是 error 则只发出 error 事件
createObservable()
.takeLast(2)
.subscribe { sender in
print("\(sender)") // 输出next(3), next(4), completed
}
private func createObservable() -> Observable<String> {
return Observable.create { observer in
DispatchQueue.main.asyncAfter(deadline: .now() + DispatchTimeInterval.seconds(1) , execute: {
observer.onNext("1")
})
DispatchQueue.main.asyncAfter(deadline: .now() + DispatchTimeInterval.seconds(2) , execute: {
observer.onNext("2")
})
DispatchQueue.main.asyncAfter(deadline: .now() + DispatchTimeInterval.seconds(3) , execute: {
observer.onNext("3")
})
DispatchQueue.main.asyncAfter(deadline: .now() + DispatchTimeInterval.seconds(4) , execute: {
observer.onNext("4")
observer.onCompleted()
})
return Disposables.create()
}
}
tansfer operators
- scan: 操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。
Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue
}
.subscribe(onNext: { print($0) })
// 输出: 11,111,1111
- map: 通过一个转换函数,将 Observable 的每个元素转换一遍
Observable.of(1, 2, 3)
.map { $0 * 10 }
.subscribe(onNext: { print($0) })
// 输出: 10,20,30
flatMap: flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。 参见 map 和 flatMap 的区别
flatMapLatest: flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
let first = BehaviorSubject(value: "first")
let second = BehaviorSubject(value: "second")
let subject = BehaviorSubject(value: first)
subject.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
first.onNext("first-1")
subject.onNext(second)
second.onNext("second-1")
first.onNext("first-2")
second.onNext("second-2")
// 控制台输出:
first
first-1
second
second-1
second-2
Combine Opertaors
- startWith: 会在 Observable 头部插入一些元素。离 subscribe 越近,越先发出. startWith 事件类型必须和后续的事件类型相同
Observable.of("1", "2", "3", "4")
.startWith("startWith-1")
.startWith("startWith-2")
.startWith("startWith-3", "startWith-3-1", "startWith-3-2")
.subscribe(onNext: { print($0) })
// 输出:
startWith-3
startWith-3-1
startWith-3-2
startWith-2
startWith-1
1
2
3
4
- concat: 将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后(发送了 completed 事件),后一个 Observable 才可以开始发出元素。当串联起来的Observables元素都发送完毕后,调用subscribe的onCompleted回调
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable
.concat([queueA.asObserver(), queueB.asObserver()])
.subscribe(onNext: {(event: String) in
print(event)
},onCompleted: {
print("completed")
})
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onCompleted()
// 控制台输出: A1,A2,A3,B2,completed
使用 queueA.concat(queueB) 的结果与上面是一样的
- merge: 通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable
.merge([queueA.asObserver(), queueB.asObserver()])
.subscribe(onNext: {(event: String) in
print(event)
},onCompleted: {
print("completed")
})
queueA.onNext("A1")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onNext("A2")
// 控制台输出: A1,B1,B2,A2,completed
还能通过 maxConcurrent 参数指定 merge 的最大数量,比如Observable.of(queueA.asObserver(), queueB.asObserver()).merge(maxConcurrent: 1)
,这样只能订阅到queueA的事件
- combineLatest 是将多个Observable 组合起来,必须这些 Observable 都有值后才会将他们组合成最新的值发出,此后任意一个 Observable 值有更新,就使用最新的值重新组合再发出。如果有一个Observable还没有值是不会将所有的 Observable 的值组合发出的。
下例中,由于first发送了一个最新值"1" 但是 second 都没有值,所以不会有输出
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
下例中在second发送完"2",后first 和 second 中都有值,所以会将它们组合发出,此后每次 first 或second 的值有更新都会重新将最新的值组合发出
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
// 控制台输出
// 1A
// 2A
// 2B
下例中由于 first和 second 都有默认值,所以控制台会输出
let disposeBag = DisposeBag()
let first = BehaviorSubject(value:"+")
let second = BehaviorSubject(value:"-")
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 控制台输出
// +-
下例中由于 first 有默认值,second 没有默认值,所以在 first 发出"1"的时候,由于 second 还没有值,所以也不会组合发出;当 second 发出"A"的时候,由于 first 已经是 "1"了,所以将 "1"和"A"组合发出
let first = BehaviorSubject(value:"+")
let second = PublishSubject<String>()
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
// 控制台输出
// 1A
combineLatest 也可以直接传递数组,比如Observable .combineLatest([queueA, queueB)
- zip: zip将多个(最多不超过8个) Observables 的元素通过一个函数组合起来,然后将这个组合的结果发出来, 发出来的是最新的没使用过的事件. 任何一个observable 发生了 completed 事件,真个zip 的 observable 就完成了
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable
.zip(queueA, queueB)
.subscribe(onNext: {
print($0 + $1)
},onCompleted: {
print("completed")
})
queueA.onNext("A1")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onNext("A2")
queueA.onCompleted()
queueA.onNext("A3")
queueB.onNext("B3")
queueB.onCompleted()
// 控制台输出: A1B1, A2B2, completed
- withLatestFrom: 当第一个 Observable 发出一个元素时,就立即取出第二个 Observable 中最新的元素,然后把第二个 Observable 中最新的元素发送出去。
let textField = BehaviorSubject<String>(value: "default")
let submitBtn = PublishSubject<Void>()
submitBtn.withLatestFrom(textField)
.subscribe(onNext: {
print($0)
},onCompleted: {
print("completed")
})
submitBtn.onNext(())
textField.onNext("1")
submitBtn.onNext(())
// 控制台输出: default, 1
- switchLatest: 将订阅切换到最新的 observable 状态
let coding = PublishSubject<String>()
let testing = PublishSubject<String>()
let working = PublishSubject<Observable<String>>()
working.switchLatest().subscribe(onNext: {
print($0)
})
working.onNext(coding)
coding.onNext("version1")
working.onNext(testing)
testing.onNext("failed")
coding.onNext("version2")
working.onNext(coding)
coding.onNext("version2")
working.onNext(testing)
testing.onNext("pass")
// 控制台输出: version1,failed,version2, pass, 可以看到中间工作在 testing 状态时,虽然 coding 发出了 version2 事件,但是并没有订阅到
- interval: 操作符将创建一个 Observable,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素。
public func delay(_ delay: Double, closure: @escaping() -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + delay) {
closure()
}
}
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance)
_ = interval.subscribe(onNext: {
print("Subscriber 1: \($0)")
})
delay(4) {
_ = interval.subscribe(onNext: {
print("Subscribe 2: \($0)")
})
}
// 控制台输出
Subscriber 1: 0
Subscriber 1: 1
Subscriber 1: 2
Subscribe 2: 0
Subscriber 1: 3
Subscribe 2: 1
Subscriber 1: 4
Subscribe 2: 2
- publish: 会将
Observable
转换为可被连接的Observable
。可被连接的Observable
和普通的Observable
十分相似,不过在被订阅后不会发出元素,直到connect
操作符被应用为止。这样一来你可以控制Observable
在什么时候开始发出元素。
public func delay(_ delay: Double, closure: @escaping() -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + delay) {
closure()
}
}
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance).publish()
_ = interval.subscribe(onNext: {
print("Subscriber 1: \($0)")
})
_ = interval.connect()
delay(4) {
_ = interval.subscribe(onNext: {
print("Subscribe 2: \($0)")
})
}
// 控制台输出:
Subscriber 1: 0
Subscriber 1: 1
Subscriber 1: 2
Subscribe 2: 2
Subscriber 1: 3
Subscribe 2: 3
Subscriber 1: 4
Subscribe 2: 4
- connect: 通知 ConnectableObservable 可以开始发出元素了,ConnectableObservable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。
- replay : 操作符将 Observable 转换为可被连接的 Observable,并且这个可被连接的 Observable 将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
- replayAll: 同 replay 一样,不过是将所有元素缓存下来发送给观察者
-
buffer: 操作符将缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
.buffer(timeSpan: DispatchTimeInterval.seconds(4), count: 2, scheduler: MainScheduler.instance)
print("START -" + stamp())
_ = interval.subscribe(onNext: { [weak self] in
print("Subscriber 1: \($0) at \(self?.stamp())")
})
// 控制台输出
START -22:53:12
Subscriber 1: [0, 1] at Optional("22:53:14")
Subscriber 1: [2, 3] at Optional("22:53:16")
Subscriber 1: [4, 5] at Optional("22:53:18")
Subscriber 1: [6, 7] at Optional("22:53:20")
Subscriber 1: [8, 9] at Optional("22:53:22")
- debounce: 过滤掉高频产生的元素,操作符将发出这种元素,在 Observable 产生这种元素后,一段时间内没有新元素产生
- debug: 打印所有的订阅,事件以及销毁信息
let disposeBag = DisposeBag()
let sequence = Observable<String>.create { observer in
observer.onNext("🍎")
observer.onNext("🍐")
observer.onCompleted()
return Disposables.create()
}
sequence
.debug("Fruit")
.subscribe()
.disposed(by: disposeBag)
// 控制台输出
2017-11-06 20:49:43.187: Fruit -> subscribed
2017-11-06 20:49:43.188: Fruit -> Event next(🍎)
2017-11-06 20:49:43.188: Fruit -> Event next(🍐)
2017-11-06 20:49:43.188: Fruit -> Event completed
2017-11-06 20:49:43.189: Fruit -> isDisposed