/**
Connectable Observable 如同普通的Observable,除了当他被订阅的时候不会以 发送item开始外。
在这个connectionable Observable 开始发送item之前 调用connect() ,你可以等待内部所有的订阅者去订阅 Observable
*/
func sampleWithoutConnectableOperators() {
// 生成两个序列
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
_ = int1
.subscribe {
print("first subscription \($0)")
}
delay(5) {
_ = int1
.subscribe {
print("second subscription \($0)")
}
}
/**
first subscription Next(0)
first subscription Next(1)
first subscription Next(2)
first subscription Next(3)
first subscription Next(4)
first subscription Next(5)------------5
second subscription Next(0)
first subscription Next(6)
second subscription Next(1)
first subscription Next(7)
second subscription Next(2)
first subscription Next(8)
second subscription Next(3)
first subscription Next(9)
second subscription Next(4)
first subscription Next(10)
second subscription Next(5)
*/
}
multicast 广播
A.multicast(B),则 将A 发送的item 广播给 B
func sampleWithMulticast() {
let subject1 = PublishSubject<Int64>()
_ = subject1
.subscribe {
print("Subject \($0)")
}
let int1 = Observable<Int64>.interval(1, scheduler: MainScheduler.instance)
.multicast(subject1)// 将输出广播给subject
_ = int1
.subscribe {
print("first subscription \($0)")
}
delay(2) {
print("connection. begin ...")
int1.connect() //------> 该方法是关键,无此方法不会输出
print("connection..end ..")
}
delay(4) {
_ = int1
.subscribe {
print("second subscription \($0)")
}
}
delay(6) {
_ = int1
.subscribe {
print("third subscription \($0)")
}
}
/**
connection. begin ...
connection..end ..
Subject Next(0)
first subscription Next(0)
Subject Next(1)----------------4
first subscription Next(1)
second subscription Next(1)
Subject Next(2)
first subscription Next(2)
second subscription Next(2)
Subject Next(3) -------------------> 6s
first subscription Next(3)
second subscription Next(3)
third subscription Next(3)
Subject Next(4)
first subscription Next(4)
second subscription Next(4)
third subscription Next(4)
Subject Next(5)
first subscription Next(5)
second subscription Next(5)
third subscription Next(5)
*/
}
replay(0)
每次subscribe 后,先重播之前的值,0 不重播
// publish = multicast + publicSubject
func sampleWithReplayBuffer0() {
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.replay(0)
_ = int1
.subscribe {
print("first subscription \($0)")
}
delay(2) {
print("connnection....")
int1.connect()// connection 设置为10 也是等待connecton执行完才会开始打印输出 没有此方法不会执行打印
}
delay(4) {
print("44444......")
_ = int1
.subscribe {
print("second subscription \($0)")
}
}
delay(6) {
print("66666......")
_ = int1
.subscribe {
print("third subscription \($0)")
}
}
/**
connnection....
first subscription Next(0)
44444......
first subscription Next(1)
second subscription Next(1)
first subscription Next(2)
second subscription Next(2)
66666......
first subscription Next(3)
second subscription Next(3)
third subscription Next(3)
first subscription Next(4)
second subscription Next(4)
third subscription Next(4)
*/
}
replay(2)
每次subscribe ,重播之前发送的两个item
func sampleWithReplayBuffer2() {
print("--- sampleWithReplayBuffer2 ---\n")
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.replay(2)
_ = int1
.subscribe {
print("first subscription \($0)")
}
delay(2) {
int1.connect()
print("22222.....")
}
delay(6) {
print("44444....")
_ = int1
.subscribe {
print("second subscription \($0)")
}
}
delay(10) {
print("66666666666666.....")
_ = int1
.subscribe {
print("third subscription \($0)")
}
}
/**
--- sampleWithReplayBuffer2 ---
22222.....
first subscription Next(0)
first subscription Next(1)
first subscription Next(2)
44444....
second subscription Next(1)----
second subscription Next(2)---- replay buffer = 2
first subscription Next(3)
second subscription Next(3)
first subscription Next(4)
second subscription Next(4)
first subscription Next(5)
second subscription Next(5)
first subscription Next(6)
second subscription Next(6)
66666666666666.....
third subscription Next(5)----
third subscription Next(6)---- replay 的输出
first subscription Next(7)
second subscription Next(7)
third subscription Next(7)
first subscription Next(8)
second subscription Next(8)
third subscription Next(8)
*/
}
shareReplay(n)
与replay 不同之处在于 shareReplay 不用connection就可以触发输出,replay 需要connection 。
另外replay的输出是从0 到n 的过程,中间有subscribe 就获取当前值输出(此时item为 10 ,就输出10)
如果Replay 的observable 在connection之前已添加多个subscribe,则所有的序列都会从头一起开始
func sampleWitShareReplayBuffer3() {
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.shareReplay(2)// shareReplay 与replay 可以触发subscribe
_ = int1
.subscribe {
print("first subscription \($0)")
}
delay(6) {
print("44444....")
_ = int1
.subscribe {
print("second subscription \($0)")
}
}
delay(10) {
print("66666666666666.....")
_ = int1
.subscribe {
print("third subscription \($0)")
}
}
/**
first subscription Next(0)
first subscription Next(1)
first subscription Next(2)
first subscription Next(3)
first subscription Next(4)
first subscription Next(5)
44444....
second subscription Next(4)-------buffer
second subscription Next(5)------- buffer
first subscription Next(6)
second subscription Next(6)
first subscription Next(7)
second subscription Next(7)
first subscription Next(8)
second subscription Next(8)
first subscription Next(9)
second subscription Next(9)
66666666666666.....
third subscription Next(8)-----buffer
third subscription Next(9)-----buffer
first subscription Next(10)
second subscription Next(10)
third subscription Next(10)
*/
}
shareReplay replay
无需触发输出 需要connenction触发
buffer 输出 buffer输出
如果不需要replay ,则不需要connection
replay ,publish , multicast 需要connection 才能触发