do运算符功能描述如下:
extension ObservableType {
/**
Invokes an action for each event in the
observable sequence, and propagates all observer
messages through the result sequence.
*/
public func doOn(eventHandler: (Event<E>) throws ->Void)->Observable<E> {
return Do(source: self.asObservable(), eventHandler: eventHandler)
}
public func doOn(onNext onNext: (E throws ->Void)? = nil, onError:(ErrorType throws ->Void)? = nil, onCompleted:(() throws ->Void)? = nil)->Observable<E> {
return Do(source: self.asObservable()) { e in
switch e {
case .Next(let element):
try onNext?(element)
case .Error(let e):
try onError?(e)
case .Completed:
try onCompleted?()
}
}
}
public func doOnNext(onNExt: (E throws->Void))->Observable<E> {
return self.doOn(onNext: onNext)
}
public func doOnError(onError: (ErrorType throws->Void))->Observable<E> {
return self.doOn(onError: onError)
}
public func doOnCompleted(onCompleted: (() throws ->Void))->Observable<E> {
return self.doOn(onCompleted: onCompleted)
}
}
class Do<Element>: Producer<Element> {
typealias EventHandler = Event<Element> throws->Void
private let _source: Observable<Element>
private let _eventHandler: EventHandler
init(source: Observable<Element>, eventHandler: EventHandler) {
_source = source
_eventHandler = eventHandler
}
override func fun<O: ObserverType where O.E == Element>(observer: O)->Disposable {
let sink = DoSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
class DoSink<O: ObserverType>: Sink<O>, ObserverType {
typealias Element = O.E
typealias parent = Do<Element>
private let _parent: Parent
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer)
}
func on(event: Event<Element>) {
do {
try _parent._eventHandler(event)
forwardOn(event)
if event.isStopEvent {
dispose()
}
}catch let error {
forwardOn(.Error(error))
dispose()
}
}
}