DearMiku

RXSwift源码浅析(一)

字数统计: 2.2k阅读时长: 9 min
2017/12/17 Share

RXSwift源码浅析(一)

简述

最近老大给了个新项目,我打算用Swift写.原来OC用的RAC,换到Swift自然框架也想试试新的,就用了RXSwift,对于这两个框架,我都是会用,但不解其中的原理,正好最近需求没下来,就研究了研究RXSwif,把自己的收获分享一下,文中要有不准确的地方还望大家多多指正~

关于RXSwift是什么和怎么用我就不废话了,网上资源很多,本文先从Observable实现原理入手,旨在以小见大,后面的Single什么的自然举一反三~

使用Demo

下面是一段简单使用Observable的代码

1
2
3
4
5
6
7
8
9
10
11
12
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onCompleted()

return Disposables.create {
}
}

numbers.subscribe{
print($0)
}

demo实现的效果其实就是 将上一段闭包中输入的 产生的事件(0,1,Completed),在下一段闭包中提取出来.
这样就将 事件的产生 和 事件的处理 分开. 本文也就是分析这个效果怎么实现的

主要类

AnonymousObservable

匿名观察者,存储产生事件的闭包 和激活处理事件闭包的入口

AnyObserver

任意观察者,用于存储事件 和 输出事件

AnonyObserver

匿名观察者,用于存储 处理事件的闭包

AnonymousObservableSink

将可观察者 和 观察者 链接,实现事件的传递

ObserverType

协议,将上面所有内容都包裹起来,将它们加以限制,便于有效的沟通~

Event

事件本身,是枚举,有 Error,Complete,Element(元素)

实现过程

存储

首先要说的是 ObserverType 定义的一些内容

1
2
3
associatedtype E

func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E

E:为本次事件流中定义一个确定的类型,保证 产生的和处理的元素类型相同,否则无法传递

create方法

Observable<Int>.create { observer -> Disposable in ....}
对于Observable,它是一个抽象类,我们在实际使用中并不能使用它,在协议中有默认的实现

1
2
3
4
5
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}

所以此处创建的是 AnonymousObservable 对象,我先称其为A1,A1将事件产生的闭包持有, 闭包中产生的事件 输入到AnyObserver结构体中.闭包我们成为A2 这样 存储部分就好了~~

激活

激活 我们通过调用A1的订阅方法subscribe(也是协议中限定的方法),接下来看方法中的实现~
因为Observable是抽象类,所以这里也是协议默认的实现

1
2
3
4
5
6
7
8
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}

return self.asObservable().subscribe(observer)
}

在这里就分两步了,一是观察者的实现,而是事件的传递

观察者

在这里很简单,也就是创建AnonymousObserver匿名观察者对象B1,B1将事件处理闭包持有,闭包我们成为B2

传递

首先是asObservable()方法,因为 B1间接继承自Observable,所以也就是return self,应该是在处理其他类型的可观察物用到,在后续 如果碰到我会补充~

然后就是对A1的 另一个订阅方法(重载),将B1作为参数传入
细枝末节先不说,先把握主干~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {

if !CurrentThreadScheduler.isScheduleRequired {
//第一步
let disposer = SinkDisposer()
//第二步
let sinkAndSubscription = run(observer, cancel: disposer)
//第三步
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
//else先不说~
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer
}
}
}

第一步

SinkDisposer对象是关于 传递结束后,处理资源回收的对象,叫它C1,用来处理 A1create闭包返回的disposer闭包的~

第二步

调用了run方法,将B1对象传入

1
2
3
4
5
6
7
8
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//2.1
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
//2.2
let subscription = sink.run(self)
//2.3
return (sink: sink, subscription: subscription)
}

2.1步

创建AnonymousObservableSink对象,我称它D1,它也是将B1对象和C1对象持有

2.2步

调用D1对象的run方法,将A1自身传入

1
2
3
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}

在该方法中,就是将A1对象的A2闭包 调用,将D1对象化为AnyObserver结构体作为A2参数传入~

然后我们看 D1对象 若何转换的

1
2
3
4
//结构体方法
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}

在这里结构体 将 D1持有的B1对象的on方法 作为属性持有~,将结构体成为E1

再来看E1onNext....方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
extension ObserverType {
//YSD
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
on(.next(element))
}

/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
on(.completed)
}

/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
on(.error(error))
}
}

对应的其实是调用 B1on方法~~

1
2
3
4
5
6
7
8
9
10
11
12
13
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:

if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}

对应的B1onCore方法

1
2
3
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}

也就是将 E1A2接收的事件 传入B2中,最终实现内容的传递~~ 然后再将A1中释放资源的闭包返回~

2.3

D1和disposable闭包 作为元组返回~

第三步

C1接收元组参数,调用setSinkAndSubscription方法~,然后将SinkDisposer对象返回,让用户选择是否释放~

图示

文字太抽象,画个图吧~ 画的有点丑(๑•ᴗ•๑)~

screenshot

可以看到 A1 在这个过程中只持有了A2, 不会导致内存泄露~ 当然如果你dispose 使用不当 肯定有泄漏的~ 亲测(๑•ᴗ•๑)~

细枝末节

1

订阅2中的if !CurrentThreadScheduler.isScheduleRequired

内容是这样的~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static fileprivate(set) var isScheduleRequired: Bool {
get {
//获取该指示值
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {

//设置
//http://www.jianshu.com/p/d52c1ebf808a
// 成功返回0 true设置no no设置为 true
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}

private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
//YSD
//https://onevcat.com/2015/01/swift-pointer/
//可变指针 pthread_key_t类型 分配空间
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer {
key.deallocate(capacity: 1)
}

//创建线程安全的变量
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}

return key.pointee
}()

这里应该是为了保护,RXSwift在多线程操作下的数据安全~ 在本次事件流中只使用了get方法,并没使用set~,所以具体效果我不清楚~,以后碰到了 我在补充上吧~

SinkDisposer

就是释放资源部分~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
   fileprivate enum DisposeState: UInt32 {     
case disposed = 1
case sinkAndSubscriptionSet = 2
}

// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}

private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil


func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription

let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}

if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}

func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}

if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}

sink.dispose()
subscription.dispose()

_sink = nil
_subscription = nil
}
}

从输出崩溃提示哪里就可以得知~ 这里是为了防止dispose的多次调用~ 因为在整个事件流中,dipose闭包 可能是 产生Complete,Error或者用户手动调用的~

AtomicOr方法其实调用的是OSAtomicOr32OrigBarrier(A,&B) 该函数会将两个变量 线程安全的 按位或运算返回结果, 并为后者赋值=前者~ B=A

未调用dipose时 逻辑与运算 state = 2 previousState = 0 两个条件都不成立~ 所以此时是用户要手动dispose

之前调用过 也就是发生complete 或 Error(在上面的代码中也有保证,两者只发生一起~),则 state = 1当调用setSinkAndSubscription方法时 逻辑与运算 state = 2 previousState = 1 则第一个条件不成立 第二个成立~ 释放资源

当多次Complete时,则只会dipose一次~

当在外界多次调用时 则state = 2 previousState = 1 则第一个条件成立 崩溃~

当然这里实现这种效果的方案有很多种~ RSSwift的方案比较有逼格吧~

总结

看完这些源码,我的感觉是RXSwift对 设计模式 贯彻的很彻底~ 在时间富裕的情况下自己写的项目要向他靠拢,增强项目的延展性,这样项目经理让加啥也不会太头疼了~~

CATALOG
  1. 1. RXSwift源码浅析(一)
  2. 2. 简述
  3. 3. 使用Demo
  4. 4. 主要类
    1. 4.1. AnonymousObservable
    2. 4.2. AnyObserver
    3. 4.3. AnonyObserver
    4. 4.4. AnonymousObservableSink
    5. 4.5. ObserverType
    6. 4.6. Event
  5. 5. 实现过程
    1. 5.1. 存储
      1. 5.1.1. create方法
  6. 6. 激活
    1. 6.1. 观察者
    2. 6.2. 传递
      1. 6.2.1. 第一步
      2. 6.2.2. 第二步
        1. 6.2.2.1. 2.1步
        2. 6.2.2.2. 2.2步
        3. 6.2.2.3. 2.3
      3. 6.2.3. 第三步
  7. 7. 图示
  8. 8. 细枝末节
    1. 8.1. 1
    2. 8.2. SinkDisposer
  9. 9. 总结