mapオペレータを自作してみる

SwiftUIもだんだんと成熟してきてるらしいのでそれに伴ってCombineも今後より採用されるようになると思います。今までiOSでリアクティブプログラミングというとRx系のライブラリを使用することがデファクトだったと思うのですが(自分は本番環境で使ったことないけど)、SwiftUIとの相性やAppleから公式に提供されているということで今後必ず必要になる技術になります。

今回はCombineフレームワークで提供されているmapオペレータを再実装(正常系だけ)することで、Combineの仕組みを少しばかり見ていきたいと思います。

mapオペレーター

まずmapオペレーターの挙動を見ていきましょう。mapオペレータはいくつか種類があるのですが、今回はmap(_ transform:)を対象にします。

// func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> {
let publisher = [1,2,3,4,5].publisher
publisher
  .map { value in
    value * 2
  }
  .sink(receiveValue: {print($0)})
// output: 2,4,6,8,10

mapオペレータはSwiftで提供されているmapとほぼ同じ挙動をとります。closureを渡してあげることで、それぞれの値に対して適用させることができます。

ここで少しだけmapオペレータについて見ておきましょう。 引数である (Self.Output) -> T のOutputというのはPublisherプロトコルが定義している型で、Publisherによって出力される値の型です。以下のように定義されています。

public protocol Publisher {

    /// The kind of values published by this publisher.
    associatedtype Output

    /// The kind of errors this publisher might publish.
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    associatedtype Failure : Error

    /// Attaches the specified subscriber to this publisher.
    ///
    /// Implementations of ``Publisher`` must implement this method.
    ///
    /// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
    ///
    /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

また返り値であるPublishers.Mapは以下のように定義されています。値の変換するtransformだけでなく、upstreamというmapオペレータを呼び出すPublisherも初期化時に受け取るようになっています。

public struct Map<Upstream, Output> : Publisher where Upstream : Publisher {

        /// The kind of errors this publisher might publish.
        ///
        /// Use `Never` if this `Publisher` does not publish errors.
        public typealias Failure = Upstream.Failure

        /// The publisher from which this publisher receives elements.
        public let upstream: Upstream

        /// The closure that transforms elements from the upstream publisher.
        public let transform: (Upstream.Output) -> Output

        public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output)

        /// Attaches the specified subscriber to this publisher.
        ///
        /// Implementations of ``Publisher`` must implement this method.
        ///
        /// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
        ///
        /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
        public func receive<S>(subscriber: S) where Output == S.Input, S : Subscriber, Upstream.Failure == S.Failure
    }

これを参考にしてmapオペレータと同じように値の変換を行う独自のオペレータmyMapを定義してみましょう。

myMapオペレータの定義

まず実装はおいておいて、myMapオペレータを既存のmapオペレータの実装を用いて定義してみます。 myMap関数をPublisherのextensionとして用意してあげることで、他のオペレータと同じように上流のPublisherからmyMapを呼び出すことが可能になります。 今のところ具体的な実装はmapオペレータに代替わりしてもらいましょう。

extension Publisher {
  func myMap<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> {
    return map { transform($0) }
  }
}

publisher
  .myMap { value in
    value * 2
  }
  .sink(receiveValue: {print($0)})

このようにmyMapオペレータを作ることに成功しました。 それでは次に、mapオペレータを使用しない方法で、myMapの処理を自分で作ってみます。

myMapオペレータの実装

PublishersネームスペースにextensionとしてMyMapを実装します。実装は特別なことは何もしてなくMapに倣って実装してみました。

ポイントとしては、receive(subscriber:)の実装で、AnySubscriberというSubscriberを作ってあげてUpstreamに対してSubscribeを行うことで、Upstreamからの値を手に入れることができる、という点です。

今回はここで手に入れた値に対して、初期化時に定義されたtransformを適用するのみ、という実装になっています。completionはUpstreamからの通知をそのままsubscriberに流しているだけです。

public class MyMap<Upstream, Output> : Publisher where Upstream: Publisher {
    public typealias Failure = Upstream.Failure
    
    private let transform: (Upstream.Output) -> Output
    private let upstream: Upstream

    private var subscriber: AnySubscriber<Output,Failure>? = nil

    public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S) where Output == S.Input, S : Subscriber, Upstream.Failure == S.Failure {
      self.subscriber = AnySubscriber(subscriber)

      let sink = AnySubscriber(receiveSubscription: { subscription in
        subscription.request(.unlimited)
      },
      receiveValue: { [weak self] (value: Upstream.Output) -> Subscribers.Demand in
        self?.relay(value)
        return .none
      },
      receiveCompletion: { [weak self] in
        self?.complete($0)
      })

      upstream.subscribe(sink)
    }
    
    private func relay(_ value: Upstream.Output) {      
      guard let subscriber = subscriber else { return }
      subscriber.receive(transform(value))
    }
    
    private func complete(_ completion: Subscribers.Completion<Failure>) {      
      guard let subscriber = subscriber else { return }
      subscriber.receive(completion: .finished)
      self.subscriber = nil
    }
  }

カスタムPublisherを作成したい多くの場合は、カスタムSubscriptionも作ってあげる必要があると思うのすが、今回は作る必要がありませんでした。一応パッと作ってみたのを載せておくと以下の感じになりました。

extension Publishers {
 
  private final class MyMapSubscription<Output, Failure: Error>: Subscription {
    
    var subscriber: AnySubscriber<Output,Failure>? = nil
    var value: Output?
    var completion: Subscribers.Completion<Failure>? = nil
    
    init<S>(subscriber: S,
            completion: Subscribers.Completion<Failure>?) where S: Subscriber, Failure == S.Failure, Output == S.Input {
      self.subscriber = AnySubscriber(subscriber)
      self.completion = completion
    }
    
    func request(_ demand: Subscribers.Demand)  {
      if demand != .none {
        return
      }
      
      emit()
    }
    
    // Publisherから値を受け取る
    func receive(_ input: Output) {
      guard subscriber != nil else { return }
      value = input
      emit()
    }
    
    // Publisherからcompletionを受け取る
    func receive(completion: Subscribers.Completion<Failure>) {
      guard let subscriber = subscriber else { return }
      self.subscriber = nil
      self.value = nil
      subscriber.receive(completion: completion)
    }

    func cancel() {
      complete(with: .finished)
    }
    
    private func complete(with completion: Subscribers.Completion<Failure>) {
      guard let subscriber = subscriber else { return }
      self.subscriber = nil
      self.completion = nil
      self.value = nil
      
      subscriber.receive(completion: completion)
    }

    private func emit() {
      guard let subscriber = subscriber, let value = value else { return }
      subscriber.receive(value)
      self.value = nil
      
      if let completion = completion {
        complete(with: completion)
      }
    }
  }
  
  public class MyMap<Upstream, Output> : Publisher where Upstream: Publisher {
    public typealias Failure = Upstream.Failure
    
    private let transform: (Upstream.Output) -> Output
    private let upstream: Upstream
    private var subscription: MyMapSubscription<Output, Failure>?
    private var completion: Subscribers.Completion<Failure>? = nil

    public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S) where Output == S.Input, S : Subscriber, Upstream.Failure == S.Failure {      
      let subscription = MyMapSubscription(subscriber: subscriber, completion: completion)
      self.subscription = subscription
      subscriber.receive(subscription: subscription)

      let sink = AnySubscriber(receiveSubscription: { subscription in
        subscription.request(.unlimited)
      },
      receiveValue: { [weak self] (value: Upstream.Output) -> Subscribers.Demand in
        self?.relay(value)
        return .none
      },
      receiveCompletion: { [weak self] in
        self?.complete($0)
      })

      upstream.subscribe(sink)
    }
    
    private func relay(_ value: Upstream.Output) {
      guard completion == nil, let subscription = subscription else { return }
      subscription.receive(transform(value))
    }
    
    private func complete(_ completion: Subscribers.Completion<Failure>) {
      self.completion = completion
      guard let subscription = subscription else { return }
      subscription.receive(completion: completion)
    }
  }
}

Demandの調整や処理の制御、エラーハンドリングは行っていないのでちゃんとした実装ではないのですが、大まかにはこういった流れの実装になるのではないかなと思います。

今回Combineで独自のオペレータやPublisherを作成したい場合に、どういったが作業必要になるかの一例を見てみました。 ただ多くの場合は独自に作成する必要はなく、既存のオペレータの組み合わせで事足りると思いますのでその扱い方を今後見てみたいと思います。