是否可以在状态单子中以恒定的堆栈和堆空间执行折叠?还是一种不同的功能技术更适合我的问题?
接下来的部分描述了这个问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。
折叠在State
单子中填满了堆
假设 Scalaz 7。考虑状态单子中的单子折叠。为了避免堆栈溢出,我们将蹦床折叠。
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
对于大型集合col
,这将填满堆。
我相信在折叠过程中,会为集合(x: R
参数)中的每个值创建一个闭包(一个 State mobit),填充堆。在执行之前,这些都不能被评估run 0
,提供初始状态。
可以避免这种 O(n) 堆使用吗?
更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?
或者可以构造折叠以便在 State monad 之后延迟执行run
?这样,在前一个x: R
闭包被评估并适合垃圾收集之前,不会创建下一个闭包。
或者这种工作有更好的功能范式吗?
示例应用程序
但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我在这里走错路了吗?
考虑水库采样,即k
从一个太大而无法放入内存的集合中一次性挑选一个统一的随机项目。在 Scala 中,这样的函数可能是
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
如果拉皮条到TraversableOnce
类型可以这样使用
val tenRandomInts = (Int.Min to Int.Max) sample 10
所做的工作sample
本质上是fold
:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
但是,update
是有状态的;这取决于n
, 已经看到的项目数量。(它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理的技术将很容易n
扩展。)。那么如何处理这种状态呢?
不纯的解决方案很简单,并且以恒定的堆栈和堆运行。
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
但是纯粹的功能解决方案呢? update
必须n
作为附加参数并返回新值以及更新的样本。我们可以n
在隐式状态中包含折叠累加器,例如,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。
我们将使用带有这些导入的 Scalaz 7
import scalaz._
import Scalaz._
import scalaz.std.iterable_
并在 a 上操作Iterable[A]
,因为 Scalaz 不支持 a 的单子折叠Traversable
。
sample
现在定义
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
更新在哪里
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
不幸的是,这会破坏大型集合的堆栈。
所以让我们蹦床吧。sample
就是现在
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
更新在哪里
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。在折叠期间为集合中的每个值创建一个匿名函数(我相信关闭每个x: A
参数),甚至在蹦床运行之前消耗堆。(FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)