private func demo1(){
print("********** just ***************")
let disposeBag = DisposeBag()
Observable.just("🔴")
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
/** 输出:
next(🔴)
completed
*/
print("********** just ***************")
print("********** of ***************")
Observable.of("🐶", "🐱", "🐭", "🐹")
.subscribe(onNext: { element in
print(element)
})
.disposed(by: disposeBag)
/** 输出:
🐶
🐱
🐭
🐹
*/
print("********** of ***************")
//MARK: - from
print("********** from ***************")
Observable.from(["🐶", "🐱", "🐭", "🐹"])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/** 输出:
🐶
🐱
🐭
🐹
*/
print("********** from ***************")
//MARK: - create
print("********** create ***************")
let myJust = { (element: String) -> Observable<String> in
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust("🔴")
.subscribe { print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🔴)
completed
*/
print("********** create ***************")
//MARK: - range
print("********** range ***************")
Observable.range(start: 1, count: 10)
.subscribe {
print($0)
}.disposed(by: disposeBag)
/** 输出:
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed
*/
print("********** range ***************")
//MARK: - repeatElement
print("********** repeatElement ***************")
Observable.repeatElement("🔴")
.take(3)
.subscribe {
if case .next(let value) = $0 {
print(value)
}
}.disposed(by: disposeBag)
/** 输出:
🔴
🔴
🔴
*/
print("********** repeatElement ***************")
//MARK: - generate
print("********** generate ***************")
Observable.generate(initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 })
.subscribe {
if case .next(let value) = $0 {
print(value)
}
}.disposed(by: disposeBag)
/** 输出:
0
1
2
*/
print("********** generate ***************")
//MARK: - deferred
print("********** deferred ***************")
var count = 1
let deferredSequence = Observable<String>.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting....")
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐵")
return Disposables.create()
}
}
deferredSequence.subscribe {
if case .next(let value) = $0 {
print(value)
}
}.disposed(by: disposeBag)
deferredSequence.subscribe{
if case .next(let value) = $0 {
print(value)
}
}.disposed(by: disposeBag)
/** 输出:
Creating 1
Emitting....
🐶
🐱
🐵
Creating 2
Emitting....
🐶
🐱
🐵
*/
print("********** deferred ***************")
//MARK: - error
print("********** error ***************")
Observable<Int>.error(RxError.unknown)
.subscribe {
print($0)
}.disposed(by: disposeBag)
/** 输出:
error(Unknown error occurred.)
*/
print("********** error ***************")
//MARK: - doOn
print("********** doOn ***************")
Observable.of("🍎", "🍐", "🍊", "🍋")
.do(onNext: { print("Intercepted:", $0)}, afterNext: { print("Intercepted after: ", $0) }, onError: {
print("Intercepted error: ", $0)
}, afterError: { print("Intercepted after error: ", $0)}, onCompleted: { print("Completed") }, afterCompleted: {
print("After completed")
}).subscribe{ print($0)}.disposed(by: disposeBag)
/** 输出:
Intercepted: 🍎
next(🍎)
Intercepted after: 🍎
Intercepted: 🍐
next(🍐)
Intercepted after: 🍐
Intercepted: 🍊
next(🍊)
Intercepted after: 🍊
Intercepted: 🍋
next(🍋)
Intercepted after: 🍋
Completed
completed
After completed
*/
print("********** doOn ***************")
//MARK: - PublishSubject
/*!
从订阅时间开始向所有观察者广播新事件。
*/
print("********** PublishSubject ***************")
print("从订阅时间开始向所有观察者广播新事件。")
let subject = PublishSubject<String>()
subject.subscribe{
print("订阅者1: \($0)")
}.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject.subscribe{
print("订阅者2: \($0)")
}.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
/** 输出:
订阅者1: next(🐶)
订阅者1: next(🐱)
订阅者1: next(🅰️)
订阅者2: next(🅰️)
订阅者1: next(🅱️)
订阅者2: next(🅱️)
*/
print("********** PublishSubject ***************")
//MARK: - ReplaySubject
/**
向所有订阅者广播新事件,并将先前事件的指定bufferSize数量广播给新订阅者。
*/
print("********** ReplaySubject ***************")
print("向所有订阅者广播新事件,并将先前事件的指定bufferSize数量广播给新订阅者。")
let subject1 = ReplaySubject<String>.create(bufferSize: 1)
subject1.subscribe{
print("订阅者1: \($0)")
}.disposed(by: disposeBag)
subject1.onNext("🐶")
subject1.onNext("🐱")
subject1.subscribe{
print("订阅者2: \($0)")
}.disposed(by: disposeBag)
subject1.onNext("🅰️")
subject1.onNext("🅱️")
/** 输出:
订阅者1: next(🐶)
订阅者1: next(🐱)
订阅者2: next(🐱)
订阅者1: next(🅰️)
订阅者2: next(🅰️)
订阅者1: next(🅱️)
订阅者2: next(🅱️)
*/
print("********** ReplaySubject ***************")
//MARK: - BehaviorSubject
/**
向所有订阅者广播新事件,并向新订阅者广播最近的(或初始)值。
*/
print("********** BehaviorSubject ***************")
print("向所有订阅者广播新事件,并向新订阅者广播最近的(或初始)值。")
let subject2 = BehaviorSubject(value: "🔴")
subject2.subscribe{
print("订阅者1: \($0)")
}.disposed(by: disposeBag)
subject2.onNext("🐶")
subject2.onNext("🐱")
subject2.subscribe{
print("订阅者2: \($0)")
}.disposed(by: disposeBag)
subject2.onNext("🅰️")
subject2.onNext("🅱️")
/** 输出:
订阅者1: next(🔴)
订阅者1: next(🐶)
订阅者1: next(🐱)
订阅者2: next(🐱)
订阅者1: next(🅰️)
订阅者2: next(🅰️)
订阅者1: next(🅱️)
订阅者2: next(🅱️)
*/
print("********** BehaviorSubject ***************")
}
//MARK: - 基础使用
private func demo2(){
let disposeBag = DisposeBag()
//MARK: - startWith
print("********** startWith ***************")
print("在开始从源Observable中发射元素之前,先发射指定的元素序列。")
Observable.of("🐶", "🐱", "🐭", "🐹")
.startWith("1️⃣")
.startWith("2️⃣")
.startWith("3️⃣", "🅰️", "🅱️")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/** 输出:
3️⃣
🅰️
🅱️
2️⃣
1️⃣
🐶
🐱
🐭
🐹
*/
print("********** startWith ***************")
//MARK: - merge
print("********** merge ***************")
print("将源Observable序列中的元素组合成一个新的Observable序列,并像每个源Observable序列发出的那样发出每个元素。")
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe{
print($0)
}.disposed(by: disposeBag)
subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")
/** 输出:
next(🅰️)
next(🅱️)
next(①)
next(②)
next(🆎)
next(③)
*/
print("********** merge ***************")
//MARK: - zip
print("********** zip ***************")
print("最多可将8个源可观察序列合并为一个新的可观察序列,并从合并后的可观察序列中发出对应索引处每个源可观察序列中的元素。")
print("特点: 必定是成双成对的, 不会叉位")
/**
next(("🅰️", 1))
next(("🅱️", 2))
next(("🆎", 3))
*/
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject)
.subscribe{
print($0)
}.disposed(by: disposeBag)
stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("🆎")
intSubject.onNext(3)
/** 输出:
next(("🅰️", 1))
next(("🅱️", 2))
next(("🆎", 3))
*/
print("********** zip ***************")
//MARK: - combineLatest
print("********** combineLatest ***************")
print("将最多8个源可观察序列合并为一个新的可观察序列,并在所有源可观察序列发射至少一个元素后,以及当任何源可观察序列发射一个新元素时,开始从合并后的可观察序列中发射每个源可观察序列的最新元素。")
let stringSubject1 = PublishSubject<String>()
let intSubject1 = PublishSubject<Int>()
Observable.combineLatest(stringSubject1, intSubject1) { stringElement, intElement in
"\(stringElement) ---- \(intElement)"
}.subscribe{
print($0)
}.disposed(by: disposeBag)
stringSubject1.onNext("🅰️")
stringSubject1.onNext("🅱️")
intSubject1.onNext(1)
intSubject1.onNext(2)
stringSubject1.onNext("🆎")
/** 输出:
next(🅱️ ---- 1)
next(🅱️ ---- 2)
next(🆎 ---- 2)
*/
print("********** combineLatest ***************")
//MARK: - combineLatest --- Array
print("********** combineLatest --- Array ***************")
let stringSubject02 = Observable.just("❤️")
let fruitObservable = Observable.from(["🍎", "🍐", "🍊"])
let animalObservable = Observable.of("🐶", "🐱", "🐭", "🐹")
Observable.combineLatest([stringSubject02, fruitObservable, animalObservable]) {
"\($0[0]) --- \($0[1]) --- \($0[2])"
}.subscribe{
print($0)
}.disposed(by: disposeBag)
/** 输出:
next(❤️ --- 🍎 --- 🐶)
next(❤️ --- 🍐 --- 🐶)
next(❤️ --- 🍐 --- 🐱)
next(❤️ --- 🍊 --- 🐱)
next(❤️ --- 🍊 --- 🐭)
next(❤️ --- 🍊 --- 🐹)
completed
*/
print("********** combineLatest --- Array ***************")
//MARK: - switchLatest
print("********** switchLatest ***************")
print("将Observable序列发出的元素转换为Observable序列,并从最近的内部Observable序列发出元素。")
let subject11 = BehaviorSubject(value: "⚽️")
let subject22 = BehaviorSubject(value: "🍎")
let subjectsSubject = BehaviorSubject<Observable>(value: subject1)
subjectsSubject.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject11.onNext("🏈")
subject11.onNext("🏀")
subjectsSubject.onNext(subject22)
subject11.onNext("⚾️")
subject22.onNext("🍐")
/** 输出:
🍎
🍐
*/
print("********** switchLatest ***************")
//MARK: - withLatestFrom
print("********** withLatestFrom ***************")
print("通过将第一个源中的每个元素与第二个源中的最新元素(如果有的话)结合,将两个可观察序列合并为一个可观察序列。")
let foodSubject = PublishSubject<String>()
let drinksSubject = PublishSubject<String>()
foodSubject.asObservable()
.withLatestFrom(drinksSubject) {
"\($0) + \($1)"
}.subscribe{
print($0)
}.disposed(by: disposeBag)
foodSubject.onNext("🥗")
drinksSubject.onNext("☕️")
foodSubject.onNext("🥐")
drinksSubject.onNext("🍷")
foodSubject.onNext("🍔")
foodSubject.onNext("🍟")
drinksSubject.onNext("🍾")
/** 输出:
next(🥐 + ☕️)
next(🍔 + 🍷)
next(🍟 + 🍷)
*/
print("********** withLatestFrom ***************")
}
//MARK: - 组合操作符
private func demo3(){
let disposeBag = DisposeBag()
//MARK: - map
print("********** map ***************")
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Observable.from([2, 3, 4])
.map { $0 * $0 }
.subscribe{
print($0)
}.disposed(by: disposeBag)
print("********** map ***************")
//MARK: - flatMap and flatMapLatest
print("********** flatMap and flatMapLatest ***************")
print("将可观察序列发出的元素转换为可观察序列,并将两个可观察序列发出的元素合并为一个可观察序列。这也很有用,例如,当你有一个可观察序列本身发射可观察序列,你希望能够对来自任何一个可观察序列的新发射作出反应。flatMap和flatMapLatest的区别在于,flatMapLatest只会从最近的内部Observable序列中发出元素。")
struct Player{
init(score: Int) {
self.score = BehaviorSubject(value: score)
}
let score: BehaviorSubject<Int>
}
let 👦🏻 = Player(score: 80)
let 👧🏼 = Player(score: 90)
let player = BehaviorSubject(value: 👦🏻)
player.asObservable()
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
👦🏻.score.onNext(85)
player.onNext(👧🏼)
👦🏻.score.onNext(95) // Will be printed when using flatMap, but will not be printed when using flatMapLatest
👧🏼.score.onNext(100)
/** 输出:
80
85
90
95
100
*/
print("********** flatMap and flatMapLatest ***************")
//MARK: - scan
print("********** scan ***************")
print("从初始种子值开始,然后对Observable序列发出的每个元素应用累加器闭包,并将每个中间结果作为单元素Observable序列返回。")
Observable.of(10, 100, 1000)
.scan(0) { aggregateValue, newValue in
print("\(aggregateValue) ------ \(newValue)")
return aggregateValue + newValue
}
.subscribe{
print($0)
}.disposed(by: disposeBag)
/** 输出:
0 ------ 10
next(10)
10 ------ 100
next(110)
110 ------ 1000
next(1110)
completed
*/
print("********** scan ***************")
}
private func demo4(){
let disposeBag = DisposeBag()
//MARK: - filter
print("********** filter ***************")
print("只发出Observable序列中满足指定条件的元素。")
Observable.of( "🐱", "🐰", "🐶",
"🐸", "🐱", "🐰",
"🐹", "🐸", "🐱")
.filter {
$0 == "🐱"
}.subscribe {
print($0)
}.disposed(by: disposeBag)
/** 输出:
next(🐱)
next(🐱)
next(🐱)
completed
*/
print("********** filter ***************")
//MARK: - distinctUntilChanged
print("********** distinctUntilChanged ***************")
print("抑制由Observable序列发出的连续重复元素。")
Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
.distinctUntilChanged()
.subscribe{
print($0)
}.disposed(by: disposeBag)
/** 输出:
next(🐱)
next(🐷)
next(🐱)
next(🐵)
next(🐱)
completed
*/
print("********** distinctUntilChanged ***************")
//MARK: - elementAt
print("********** elementAt ***************")
print("只发出可观察序列发出的所有元素中指定索引处的元素。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.element(at: 3)
.subscribe{ print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐸)
completed
*/
print("********** elementAt ***************")
//MARK: - single
print("********** single ***************")
print("只发出Observable序列发出的第一个元素(或第一个满足条件的元素)。如果Observable序列没有发出一个元素,将抛出错误。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.single()
.subscribe{ print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐱)
error(Sequence contains more than one element.)
*/
print("********** single ***************")
//MARK: - single with condaitions
print("********** single with condaitions ***************")
print("有且只有一个满足条件的")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.single { $0 == "🐸"}
.subscribe { print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐸)
completed
*/
print("有两个满足条件的数据, 且发出第二个的时候会抛出错误")
Observable.of("🐱", "🐰", "🐶", "🐱", "🐰", "🐶")
.single { $0 == "🐰"}
.subscribe { print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐰)
error(Sequence contains more than one element.)
*/
print("没有满足提交的条件, 直接抛出错误")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.single { $0 == "🔵" }
.subscribe { print($0) }
.disposed(by: disposeBag)
print("********** single with condaitions ***************")
//MARK: - take
print("********** take ***************")
print("只从Observable序列的开头发出指定数量的元素。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.take(3)
.subscribe{ print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐱)
next(🐰)
next(🐶)
completed
*/
print("********** take ***************")
//MARK: - takeLast
print("********** takeLast ***************")
print("只从Observable序列的末尾发出指定数量的元素。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.takeLast(3)
.subscribe{ print($0) }
.disposed(by: disposeBag)
/** 输出:
next(🐸)
next(🐷)
next(🐵)
completed
*/
print("********** takeLast ***************")
//MARK: - takeWhile
print("********** takeWhile ***************")
print("只要指定的条件求值为true,就从Observable序列的开头发出元素。")
Observable.of(1, 2, 3, 4, 5, 6)
.take(while: { $0 < 4 })
.subscribe{ print($0) }
.disposed(by: disposeBag)
/** 输出:
next(1)
next(2)
next(3)
completed
*/
print("********** takeWhile ***************")
//MARK: - takeUntil
print("********** takeUntil ***************")
print("从源Observable序列发出元素,直到引用Observable序列发出一个元素。")
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.take(until: referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")
referenceSequence.onNext("🔴")
sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")
/** 输出:
next(🐱)
next(🐰)
next(🐶)
completed
*/
print("********** takeUntil ***************")
//MARK: - skip
print("********** skip ***************")
print("禁止从Observable序列的开头发出指定数量的元素。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/** 输出:
🐶
🐸
🐷
🐵
*/
print("********** skip ***************")
//MARK: - skipWhile
print("********** skipWhile ***************")
print("禁止从Observable序列的开头发出满足指定条件的元素。")
Observable.of(1, 2, 3, 4, 5, 6)
.skip(while: { $0 < 4 })
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/** 输出:
4
5
6
*/
print("********** skipWhile ***************")
//MARK: - skipWhileWithIndex
print("********** skipWhileWithIndex ***************")
print("抑制从Observable序列的开头发出满足指定条件的元素,并发出剩余的元素。闭包还传递每个元素的索引。")
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.enumerated()
.skip(while: { (index, element) in
index < 3
})
.map({ (index, element) in
element
})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/** 输出:
🐸
🐷
🐵
*/
print("********** skipWhileWithIndex ***************")
//MARK: - skipUntil
print("********** skipUntil ***************")
print("抑制从源Observable序列发出元素,直到引用Observable序列发出元素。")
let sourceSequence1 = PublishSubject<String>()
let referenceSequence1 = PublishSubject<String>()
sourceSequence1
.skip(until: referenceSequence1)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSequence1.onNext("🐱")
sourceSequence1.onNext("🐰")
sourceSequence1.onNext("🐶")
referenceSequence1.onNext("🔴")
sourceSequence1.onNext("🐸")
sourceSequence1.onNext("🐷")
sourceSequence1.onNext("🐵")
/** 输出:
🐸
🐷
🐵
*/
print("********** skipUntil ***************")
}
private func demo5(){
let disposeBag = DisposeBag()
//MARK: - toArray
print("********** toArray ***************")
Observable.range(start: 1, count: 10)
.toArray()
.subscribe {
if case .success(let value) = $0 {
print(value)
}
}
.disposed(by: disposeBag)
print("********** toArray ***************")
//MARK: - reduce
print("********** reduce ***************")
print("从初始种子值开始,然后对Observable序列发出的所有元素应用accumulator闭包,并将聚合结果作为单元素Observable序列返回。")
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe {
print($0)
}.disposed(by: disposeBag)
/** 输出:
next(1111)
completed
*/
print("********** reduce ***************")
//MARK: - concat
print("********** concat ***************")
print("以顺序的方式连接Observable序列的内部Observable序列中的元素,等待每个序列成功终止,然后再释放下一个序列中的元素.")
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe{
print($0)
}.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
subjectsSubject.onNext(subject2)
subject2.onNext("I would be ignored") /// 这个不会打印出结果, 因为subject1还没有发送completed信号
subject2.onNext("🐱") /// 这里也不会直接输出, 但是当subject1发送了completed后, 会输出一次subject2最后一次保存的数据,
subject1.onCompleted()
subject2.onNext("🐭")
/** 输出:
next(🍎)
next(🍐)
next(🍊)
next(🐱) 这里会输出是因为 BehaviorSubject 会保存最后一次信号
next(🐭)
*/
print("********** concat ***************")
}
private func demo6() {
//MARK: - interval 定时器
print("********** interval ***************")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
_ = interval.subscribe(onNext: {
print("Subscription: 1, Event: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + .seconds(5)) {
_ = interval.subscribe(onNext: {
print("Subscription: 2, Event: \($0)")
})
}
print("********** interval ***************")
}
private func demo7() {
//MARK: - publish
print("********** publish ***************")
print("将源Observable序列转换为可连接序列。 publish() 和 connect() 方法是一起使用的, 否则信号不会开始")
let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
_ = intSequence.subscribe(onNext: { print("Subscription 1: Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
_ = intSequence.subscribe(onNext: {
print("Subscriotion 2, Event: \($0)")
})
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(6)) {
_ = intSequence.subscribe(onNext: {
print("Subscriotion 3, Event: \($0)")
})
}
/** 输出:
Subscription 1: Event: 0
Subscription 1: Event: 1
Subscriotion 2, Event: 1
Subscription 1: Event: 2
Subscriotion 2, Event: 2
Subscription 1: Event: 3
Subscriotion 2, Event: 3
Subscriotion 3, Event: 3
Subscription 1: Event: 4
Subscriotion 2, Event: 4
Subscriotion 3, Event: 4
Subscription 1: Event: 5
Subscriotion 2, Event: 5
Subscriotion 3, Event: 5
Subscription 1: Event: 6
Subscriotion 2, Event: 6
Subscriotion 3, Event: 6
Subscription 1: Event: 7
Subscriotion 2, Event: 7
Subscriotion 3, Event: 7
Subscription 1: Event: 8
Subscriotion 2, Event: 8
Subscriotion 3, Event: 8
Subscription 1: Event: 9
Subscriotion 2, Event: 9
Subscriotion 3, Event: 9
....
*/
print("********** publish ***************")
}
private func demo8 () {
let disposeBag = DisposeBag()
//MARK: - replay
print("********** replay ***************")
print("将源Observable序列转换为可连接序列,并向每个新订阅者重播先前的排放量bufferSize数。")
let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.replay(2)
_ = intSequence.subscribe(onNext: {
print("Subscription 1: Event: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
_ = intSequence.subscribe{
print("Subscription 2, Event: \($0)")
}
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(8)) {
_ = intSequence.subscribe{
print("Subscription 3, Event: \($0)")
}
}
/** 输出:
********** replay ***************
将源Observable序列转换为可连接序列,并向每个新订阅者重播先前的排放量bufferSize数。
********** replay ***************
Subscription 1: Event: 0
Subscription 2, Event: next(0)
Subscription 1: Event: 1
Subscription 2, Event: next(1)
Subscription 1: Event: 2
Subscription 2, Event: next(2)
Subscription 1: Event: 3
Subscription 2, Event: next(3)
Subscription 1: Event: 4
Subscription 2, Event: next(4)
Subscription 3, Event: next(3)
Subscription 3, Event: next(4)
Subscription 1: Event: 5
Subscription 2, Event: next(5)
Subscription 3, Event: next(5)
Subscription 1: Event: 6
Subscription 2, Event: next(6)
Subscription 3, Event: next(6)
Subscription 1: Event: 7
Subscription 2, Event: next(7)
Subscription 3, Event: next(7)
Subscription 1: Event: 8
Subscription 2, Event: next(8)
Subscription 3, Event: next(8)
Subscription 1: Event: 9
Subscription 2, Event: next(9)
Subscription 3, Event: next(9)
Subscription 1: Event: 10
Subscription 2, Event: next(10)
Subscription 3, Event: next(10)
Subscription 1: Event: 11
Subscription 2, Event: next(11)
Subscription 3, Event: next(11)
Subscription 1: Event: 12
Subscription 2, Event: next(12)
Subscription 3, Event: next(12)
*/
print("********** replay ***************")
}
private func demo9 () {
//MARK: - multicast
print("********** multicast ***************")
print("将源可观察序列转换为可连接序列,并通过指定对象广播其发射。")
let subject = PublishSubject<Int>()
_ = subject
.subscribe(onNext: { print("Subject: \($0)") })
let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.multicast(subject)
_ = intSequence
.subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
_ = intSequence
.subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(6)) {
_ = intSequence
.subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
}
/** 输出:
********** multicast ***************
将源可观察序列转换为可连接序列,并通过指定对象广播其发射。
********** multicast ***************
Subject: 0
Subscription 1:, Event: 0
Subject: 1
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subject: 2
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subject: 3
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subject: 4
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subject: 5
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subject: 6
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
Subject: 7
Subscription 1:, Event: 7
Subscription 2:, Event: 7
Subscription 3:, Event: 7
Subject: 8
Subscription 1:, Event: 8
Subscription 2:, Event: 8
Subscription 3:, Event: 8
Subject: 9
Subscription 1:, Event: 9
Subscription 2:, Event: 9
Subscription 3:, Event: 9
Subject: 10
Subscription 1:, Event: 10
Subscription 2:, Event: 10
Subscription 3:, Event: 10
*/
print("********** multicast ***************")
}
private func demo10() {
let disposeBag = DisposeBag()
//MARK: - catchAndReturn
print("********** catchAndReturn ***************")
print("通过返回一个发出单个元素然后终止的Observable序列,从Error事件中恢复。")
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchAndReturn("😊")
.subscribe {
print($0)
}.disposed(by: disposeBag)
print("********** catchAndReturn ***************")
}
RxSwift的使用
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 一、UICollectionView 的基本用法 1,单个分区的集合视图 (1)首先自定义一个单元格类: (2)主...
- 三、将结果转为 JSON 对象 1,实现方法 (1)如果服务器返回的数据是 json 格式的话,直接通过 Moya...
- 三、一个使用 Observable 的 MVVM 样例 1,效果图 (1)当我们在表格上方的搜索框中输入文字时,会...
- 很多情况下,表格里的数据不是一开始就准备好的、或者固定不变的。可能我们需要先向服务器请求数据,再将获取到的内容显示...
- Subject介绍 从之前的文章RxSwift(三)-- RxSwift使用介绍Observable的创建中,我们...