在上一个篇章中,我们一起探索了Rxswift的核心逻辑,对Rxswift有了更近一步的理解
正所谓光说不练,假把式。那么接下来我们来看看在我们平常开发中用到的一些序列创建方式
1:empty---空序列
使用empty
函数创建一个空序列,事件是Int
类型的,由于是空序列,也就是没有序列,所以只能complete
let emptyOb = Observable<Int>.empty()
emptyOb.subscribe(onNext: { number in
print("订阅:", number)
}, onError: { error in
print("error:", error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
//打印结果:完成回调
释放回调
为什么直接完成回调了呢,我们点击源码进行分析
extension ObservableType {
public static func empty() -> Observable<Element> {
EmptyProducer<Element>()
}
}
final private class EmptyProducer<Element>: Producer<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.completed)
return Disposables.create()
}
}
- 1.
empty
和上一章讲的create
类似,创建并返回Observable
对象,区别在于empty
创建EmptyProducer
,create
创建AnonymousObservable
,create
传入了一个闭包并保存,empty
没有参数;根据继承关系,都是继承自Observable
- 2.
observer.on(.completed)
看到这里就明白为什么直接完成回调了,它直接发送了完成信号
2:just---单个信号序列
该方法通过传入一个默认值来初始化,构建一个只有一个元素的Observable
队列,订阅完信息自动complete
。
// 单个信号序列创建
print("******** just ********")
let justOb = Observable<String>.just("逸华爱Moto")
justOb.subscribe(onNext: { number in
print("订阅:", number)
}, onError: { error in
print("error:", error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
//打印结果
******** just ********
订阅: 逸华爱Moto
完成回调
释放回调
简单来说,就是传入什么输出什么,看看源码
extension ObservableType {
public static func just(_ element: Element) -> Observable<Element> {
Just(element: element)
}
//这里省略了一些代码...
}
final private class Just<Element>: Producer<Element> {
private let element: Element
init(element: Element) {
self.element = element
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.next(self.element))
observer.on(.completed)
return Disposables.create()
}
}
- 1.
just
和empty
类似,只是保存了传入的参数 - 2.
observer.on(.next(self._element))
常规订阅之后就会发送.next
事件
之后就会自动发送完成事件,跟我们效果完全吻合
3:of
- 上面的
just
针对单元素,那么of
则是多个元素 针对序列处理 - 该方法可以接受可变数量的参数(必需要是同类型的)
print("******** of ********")
//多个元素
Observable.of(1,2,3,4)
.subscribe { element in
print("订阅:", element)
}
.disposed(by: disposeBag)
//数组
Observable.of([1,2,3,4])
.subscribe { element in
print("订阅:", element)
}
.disposed(by: disposeBag)
//字典
Observable.of(["name":"逸华","hobby":"骑摩托"])
.subscribe { element in
print("订阅:", element)
}
.disposed(by: disposeBag)
//打印结果
******** of ********
订阅: next(1)
订阅: next(2)
订阅: next(3)
订阅: next(4)
订阅: completed
订阅: next([1, 2, 3, 4])
订阅: completed
订阅: next(["name": "逸华", "hobby": "骑摩托"])
订阅: completed
话不多说来看下具体实现
extension ObservableType {
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
ObservableSequence(elements: elements, scheduler: scheduler)
}
}
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
fileprivate let elements: Sequence
fileprivate let scheduler: ImmediateSchedulerType
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self.elements = elements
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
typealias Parent = ObservableSequence<Sequence>
private let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
- 创建是一个中规中矩的流程,和上面几个几乎一样
- 同样保存了传入的元素,并且保存了调度环境
- 订阅流程也是利用
sink
,然后通过mutableIterator
迭代器处理发送
4.from
- 将可选序列转换为可观察序列。
- 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
print("******** from ********")
//MARK: from
// 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
Observable<[String]>.from(optional: nil)
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
Observable<[String]>.from(optional: ["逸华", "爱摩托"])
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
//打印结果:
******** from ********
completed
next(["逸华", "爱摩托"])
completed
看下源码
extension ObservableType {
public static func from(optional: Element?) -> Observable<Element> {
ObservableOptional(optional: optional)
}
}
final private class ObservableOptional<Element>: Producer<Element> {
private let optional: Element?
init(optional: Element?) {
self.optional = optional
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let element = self.optional {
observer.on(.next(element))
}
observer.on(.completed)
return Disposables.create()
}
}
-
self.optional = optional
保存可选项 - 订阅流程判断是否匹配传入并保存的可选项
- 发送
observer.on(.next(element))
序列 - 最后自动
observer.on(.completed)
完成序列发送
5.defer
- 延时初始化序列
print("******** defer ********")
//MARK: defer
// 这里有一个需求:动态序列 - 根据外界的标识 - 动态输出
// 使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
var isOdd = false
_ = Observable<Int>.deferred { () -> Observable<Int> in
// 这里设计我们的序列
isOdd = !isOdd
if isOdd {
return Observable.of(1)
}
return Observable.of(0)
}
.subscribe { event in
print(event)
}
-
self.observableFactory = observableFactory
初始化保存了外部传入的闭包
通过sink.run
进入都下面的代码
func run() -> Disposable {
do {
let result = try self.observableFactory()
return result.subscribe(self)
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
return Disposables.create()
}
}
- 在订阅流程,这段工厂闭包被执行
6:rang
- 生成指定范围内的可观察整数序列。
print("******** range ********")
Observable.range(start: 0, count: 5)
.subscribe { number in
print(number)
}
.disposed(by: disposeBag)
//打印结果:
******** range ********
next(0)
next(1)
next(2)
next(3)
next(4)
completed
源码
extension ObservableType where Element: RxAbstractInteger {
public static func range(start: Element, count: Element, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
RangeProducer<Element>(start: start, count: count, scheduler: scheduler)
}
}
final private class RangeProducer<Element: RxAbstractInteger>: Producer<Element> {
fileprivate let start: Element
fileprivate let count: Element
fileprivate let scheduler: ImmediateSchedulerType
init(start: Element, count: Element, scheduler: ImmediateSchedulerType) {
guard count >= 0 else {
rxFatalError("count can't be negative")
}
guard start &+ (count - 1) >= start || count == 0 else {
rxFatalError("overflow of count")
}
self.start = start
self.count = count
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class RangeSink<Observer: ObserverType>: Sink<Observer> where Observer.Element: RxAbstractInteger {
typealias Parent = RangeProducer<Observer.Element>
private let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self.parent.scheduler.scheduleRecursive(0 as Observer.Element) { i, recurse in
if i < self.parent.count {
self.forwardOn(.next(self.parent.start + i))
recurse(i + 1)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
- 从外部传入
start
和count
已经调度环境,并保存起来 -
i < self.parent.count
通过这个进行是否发送.completed
,从而递归的发送信号
7: generate
- 该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
- 初始值给定 然后判断条件1 再判断条件2 会一直递归下去,直到条件1或者条件2不满足
- 类似数组便利
print("******** generate ********")
Observable.generate(initialState: 0,// 初始值
condition: { $0 < 10}, // 条件1
iterate: { $0 + 2 }) // 条件2 +2
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
// 数组遍历
let arr = ["e_1","e_2","e_3","e_4","e_5"]
Observable.generate(initialState: 0,// 初始值
condition: { $0 < arr.count}, // 条件1
iterate: { $0 + 1 }) // 条件2 +2
.subscribe(onNext: {
print("遍历arr:", arr[$0])
}).disposed(by: disposeBag)
- 参数一initialState: 初始状态。
- 参数二 condition:终止生成的条件(返回“false”时)。
- 参数三 iterate:迭代步骤函数。
- 参数四 调度器:用来运行生成器循环的调度器,默认
CurrentThreadScheduler.instance
。 - 返回:生成的序列。
8:timer
- 返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值
print("******** timer ********")
Observable<Int>.timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
// 因为没有指定期限period,故认定为一次性
Observable<Int>.timer(.seconds(1), scheduler: MainScheduler.instance)
.subscribe { event in
print("111111111 \(event)")
}
.disposed(by: disposeBag)
- 参数1:第一次响应距离现在的时间
- 参数2:时间间隔
- 参数3:线程
9:repeatElement
- 该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
print("******** repeatElement ********")
//MARK: repeatElement
// 该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
Observable<Int>.repeatElement(5)
.subscribe { event in
print("订阅:", event)
}
.disposed(by: disposeBag)
10:error
- 对消费者发出一个错误信号
print("******** error ********")
Observable<String>.error(NSError.init(domain: "发送错误", code: 10086, userInfo: ["reason": "unknow"]))
.subscribe { event in
print("订阅:", event)
}
.disposed(by: disposeBag)
11:never
- 该方法创建一个永远不会发出
Event
(也不会终止)的Observable
序列。 - 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有用
print("******** never ********")
Observable<String>.never()
.subscribe { event in
print("走你", event)
}
.disposed(by: disposeBag)
序列的创建也是学习
RxSwift
的根基,当然RxSwift还有一些创建序列的方式,大家可以玩一玩以上就是
RxSwift
常用的序列创建方式,如果对你有帮助,请不要吝啬自己手里的点赞👍哦~~~