ClojureでTransducer

前置き

Clojure 1.7で導入されたTransducerについて。

  • mapfiltertakeなどは、reduceで書くことができる
  • つまりreduceは根源的な演算
  • Transducerは、reduceをより汎用的にしたもの

コードはGitHubで。

問題#1

本記事では、以下のデータを使う。

(def d
  [#{1}
   #{\a \b \c \d \e}
   #{"aa" "bb"}
   #{[1] {:k 'v} #{}}])

見ての通り、セットのベクタである。各セットの要素数を文字列化してみよう。

普通の方法

(->> d
     (map count)
     (map str)) ;;=> ("1" "5" "2" "3")

関数合成を使えば、1回のmapでOK:

(def count-str
  (comp str count)) ;; 順番に注意。countしてstrする。

(map count-str d)

Transducerなら

mapを使ってTransducerを作る
(def count-t
  (map count))
(def str-t
  (map str))

Transducerは関数合成可能。

(def count-str-t
  (comp count-t str-t)) ;; 順番に注意。さっきとは逆。

使ってみる。

(reduce (count-str-t conj) [] d)  ;;=> ["1" "5" "2" "3"]

よりTransducer的にいくなら:

(transduce count-str-t conj d)

問題#2

さらに、"5"を除外したのち、最初の2つを取る。

ヘルパ関数
(defn not5
  [s]
  (not= s "5"))

普通の方法

count-strnot5を合成するのは無理なので地道に:

(->> d
     (map count-str)
     (filter not5)
     (take 2))    ;;=> ("1" "2")

いい知らせ

Transducerなら、合成可能。

(def not5-t
  (filter not5))
(def take2-t
  (take 2))
(def all-t
  (comp count-str-t not5-t take2-t))

行けぇー:

(transduce all-t conj d)    ;;=> ["1" "2"]

で、Transducerって何?

用語

用語定義
Reducing function(rf)reduce関数の最初の引数に指定する関数。
Transformercountstrのように、何かを別のものに変える関数。

Transducer

Transducer(xf)は、Reducing function用のTransformerである。

Transducerは関数合成可能。そして合成するとき、以下のことに依存しない。

  • 入力コンテキスト(コレクションなのか、Streamなのか、Channelなのか?)
  • 出力コンテキスト(コレクションが欲しいのか、スカラ値が欲しいのか?)

こういったことは、最後にreduce/transduceするときまで気にする必要がない。

;; コレクションからコレクションへ
(transduce all-t conj d)    ;;=> ["1" "2"]

;; スカラ値へ
(transduce all-t str d)     ;;=> "12"

コンテキストを決めるのは、ここで使うconjstrである。all-tが適切なrfを作ってくれる。このコードは、以下のコードと同じである。

(def rf-v (all-t conj))
(reduce rf-v (rf-v) d)

(def rf-s (all-t str))
(reduce rf-s (rf-s) d)

Reducing functions

通常Reducing functionは、2つの引数を取る関数である。しかしTransducerと共に使うためには、いくつかの条件を満たす必要がある。

Multi-arity

Reducing functionは、3種類のArityを持つ。

(def rf1 (count-str-t conj))

;; arity-0
(rf1)                    ;;=> []

;; arity-2
(rf1 ["1"] #{1 2})       ;;=> ["1" "2"]
(rf1 ["1" "2"] #{1 2 3}) ;;=> ["1" "2" "3"]

;; arity-1
(rf1 ["1"])              ;;=> ["1"]
(rf1 ["1" "2"])          ;;=> ["1" "2"]
  • arity-0(Init)は、Reducingの初期値を得るために、必要に応じて呼ばれる
  • arity-2(Step)は、いつものReducingを担当する
  • arity-1(Completion)は、Reducingの最後に1度だけ呼ばれ、最終的な出力を決める

1つ定義してみる。

(defn rf-csv
  ([] "")
  ([acc s]
   (if (empty? acc)
     s
     (str acc "," s)))
  ([acc] (str "/" acc "/")))

(transduce count-str-t rf-csv d)  ;;=> "/1,5,2,3/"
(transduce all-t rf-csv d)        ;;=> "/1,2/"

Early termination

すべての入力を処理する前にReducingが終わる場合(takeとか)、arity-2関数はreducedを呼ぶ。reducetransduce関数は、内部でreduced?を呼ぶことによりそれを検知して、Reducing処理を切り上げてくれる。

(defn rf-csv-until3
  ([] "")
  ([acc s]
   (if (= s "3")
     (reduced acc)
     (if (empty? acc)
       s
       (str acc "," s))))
  ([acc] (str "/" acc "/")))

(transduce count-str-t rf-csv-until3 d)  ;;=> "/1,5,2/"

ここで(reduced acc)が返すのは、accをラップした値(derefすればaccを取り出せる)。つまりarity-2関数が返す値には2通りの型がある。

  • 通常のacc ...Reducingが継続するとき
  • ラップされたacc ...Reducingが途中終了するとき

もしReducing途中で、arity-2関数の戻り値を参照する場合は、このことを念頭に置く必要がある。

....(let [result (rf acc inp)]
      (if (reduced? result)
        then      ;; it's wrapped
        else))    ;; not wrapped

clojure.core/interposeのソースコードが参考になる。

状態を扱うTransducer

take2-tを思い出して。

   (def take2-t (take 2))

これは、状態を持ったrfを作る。

   (def rf-t2 (take2-t conj))

rf-t2は、「これまで何個の要素をaccへ追加したか(あるいは、あと何個追加できるか)」を覚えておく必要がある(2個追加した時点でreducedするため)。

   ;; count 1
   (let [acc (rf-t2 [] 1)] (reduced? acc))   ;;=> false
   ;; count 2, and done
   (let [acc (rf-t2 [] 1)] (reduced? acc))   ;;=> true

状態はrfを作るときに初期化されるが、それっきりリセットされることはない。例えばarity-1関数を呼んだからといって、(次に備えて)状態をリセットしてくれたりはしない。

   (rf-t2 [])
   (let [acc (rf-t2 [] 1)] (reduced? acc))   ;;=> true(依然、reduced済み)

つまり、rfは再利用できない。reduceするたびに作るべき(あるいはtransduceを使う)。

問題#3

Transducerを作る関数を定義する。このTransducerは、引数n, f, pを取り、述語pを満たす最初のn個の要素にfを適用する。

使用例
(def nfp-t (mk-nfp-t 3 inc even?))
(transduce nfp-t conj [1 2 3 4 5 6 7 8 9])  ;;=> [1 3 3 5 5 7 7 8 9]
;; only 2, 4, and 6 are incremented
解答例
(defn mk-nfp-t
  [n f p]
  (fn [rf]
    (let [cnt (atom n)]
      (fn
        ([] (rf))
        ([acc] (rf acc))
        ([acc e]
         (let [match (p e)
               ee (f e)]
           (if match
             (if (pos? @cnt)
               (do (swap! cnt dec)
                   (rf acc ee))
               (rf acc e))
             (rf acc e))))))))

試す。

(def nfp-t (mk-nfp-t 3 inc even?))
(transduce nfp-t conj [1 2 3 4 5 6 7 8 9])  ;;=> [1 3 3 5 5 7 7 8 9]
(transduce
  (comp (map #(* % 2)) nfp-t str-t take2-t)
  rf-csv [1 2 3 4 5 6 7 8 9])               ;;=> "/3,5/"

ChannelとTransducer

Transducerはコレクション(ベクタとか)以外にも使える。clojure.core.asyncは、Channelに対してTransducerを使うための関数を提供する。

(require '[clojure.core.async :as async])

(def c (async/chan 1 count-str-t))  ;; 入力Channel
(def rc (async/reduce conj [] c))   ;; reduceは出力Channelを返す

chanは入力ChannelにTransducerを組み込む。これを使ってreduceすると出力Channelが手に入る。

あとは、普通通りput!, take!, <!, >!する。

(async/take! rc prn)    ;; 出力を待つ

入力Channelに値をput!してみる。

(async/put! c #{1})               ;; 何も起きない
(async/put! c #{\a \b \c \d \e})  ;; 依然、動きなし
(async/put! c #{"aa" "bb"})       ;; 静寂
(async/close! c)                  ;; ["1" "5" "2"]が出力される

all-tを使ってみる。

(let [c (async/chan)
      rc (async/transduce all-t str "" c)]
  (async/take! rc prn)
  (for [s d]
    (async/put! c s)))            ;; "12"が出力される

transduce以外は?

Clojure 1.7で、map, filter, take, mapcatなど、clojure.coreの多くの関数がTransducerを返せるようになった。catは、それ自身がTransducerだ。

(transduce cat conj d)   ;;=> [1 \a \b \c \d \e "aa" "bb" #{} {:k v} [1]]
(transduce cat str d)    ;;=> "1abcdeaabb#{}{:k v}[1]"

transduceによりReducingする以外にも、Transducerは、Iteratorやコレクション、シーケンスなどを作るのに使われる。

;; `eduction`はIteratorを返す
(let [iterable (eduction all-t d)]
  (first (seq iterable))) ;;=> "1"

;; `into`はコレクションを返す
(into [] all-t d)         ;;=> ["1" "2"]

;; `sequence`はシーケンスを返す
(sequence all-t d)        ;;=> ("1" "2")
(sequence (map str) [1 2 3] ["apple" "orange"]) ;;=> ("1apple" "2orange")

型についての考察

問題#1の入力と出力
;; IN                        -> OUT
[#{1} #{\a \b \c \d \e} ...] -> ["1" "5" "2" "3"]

[set] -> [string]
countcount-t
count       :: set -> long
count-t     :: (x, long -> x) -> (x, set -> x)
            ;; (x, set -> x) -> (x, long -> x)じゃないよ!
strstr-t
str         :: long -> string
str-t       :: (x, string -> x) -> (x, long -> x)

通常、(comp g f)するには、gは、fの出力を受け取れる必要がある。cout-tstr-tの出力を受け取れるので、(comp count-t str-t)できるわけだ。合成された関数の型は:

count-str-t :: (x, string -> x) -> (x, set -> x)

これは、(x, string -> x)なrfを、(x, set -> x)な別のrfへ変える。(x, string -> x)なrfといえば、conjstrが該当する。よって、(x, set -> x)なrfが手に入る:

(def rf1 (count-str-t conj))   ;; rf1 :: coll, set -> coll
(def rf2 (count-str-t str))    ;; rf2 :: string, set -> string

というわけで、これらを使えば、dreduceできる(dの要素はセットなので)。

   (reduce rf1 [] d)
   (reduce rf2 "" d)

以上。

Last modified:2016/11/20 11:26:33
Keyword(s):
References:[FP: 関数型プログラミング]
This page is frozen.