11

是否可以在状态单子中以恒定的堆栈和堆空间执行折叠?还是一种不同的功能技术更适合我的问题?

接下来的部分描述了这个问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。


折叠在State单子中填满了堆

假设 Scalaz 7。考虑状态单子中的单子折叠。为了避免堆栈溢出,我们将蹦床折叠。

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

对于大型集合col,这将填满堆。

我相信在折叠过程中,会为集合(x: R参数)中的每个值创建一个闭包(一个 State mobit),填充堆。在执行之前,这些都不能被评估run 0,提供初始状态。

可以避免这种 O(n) 堆使用吗?

更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?

或者可以构造折叠以便在 State monad 之后延迟执行run?这样,在前一个x: R闭包被评估并适合垃圾收集之前,不会创建下一个闭包。

或者这种工作有更好的功能范式吗?


示例应用程序

但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我在这里走错路了吗?

考虑水库采样,即k从一个太大而无法放入内存的集合中一次性挑选一个统一的随机项目。在 Scala 中,这样的函数可能是

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

如果拉皮条到TraversableOnce类型可以这样使用

val tenRandomInts = (Int.Min to Int.Max) sample 10

所做的工作sample本质上是fold

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

但是,update是有状态的;这取决于n, 已经看到的项目数量。(它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理的技术将很容易n扩展。)。那么如何处理这种状态呢?

不纯的解决方案很简单,并且以恒定的堆栈和堆运行。

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

但是纯粹的功能解决方案呢? update必须n作为附加参数并返回新值以及更新的样本。我们可以n在隐式状态中包含折叠累加器,例如,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。

我们将使用带有这些导入的 Scalaz 7

import scalaz._
import Scalaz._
import scalaz.std.iterable_

并在 a 上操作Iterable[A],因为 Scalaz 不支持 a 的单子折叠Traversable

sample现在定义

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

更新在哪里

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

不幸的是,这会破坏大型集合的堆栈。

所以让我们蹦床吧。sample就是现在

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

更新在哪里

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。在折叠期间为集合中的每个值创建一个匿名函数(我相信关闭每个x: A参数),甚至在蹦床运行之前消耗堆。(FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)

4

2 回答 2

7

我们真正的问题是未执行的 State mobits 使用的堆。

不它不是。真正的问题是集合不适合内存,foldLeftM并且foldRightM强制整个集合。不纯解决方案的一个副作用是您正在释放内存。在“纯功能”解决方案中,您不会在任何地方这样做。

你的使用Iterable忽略了一个关键细节:col实际上是什么类型的集合,它的元素是如何创建的,以及它们应该如何被丢弃。所以,必然地,确实如此foldLeftMIterable它可能过于严格,并且您将整个集合强制放入内存。例如,如果它是 a Stream,那么只要你坚持col到目前为止强制的所有元素都会在内存中。如果是其他类型的懒惰Iterable,它不会记住它的元素,那么折叠仍然太严格了。

我尝试了您的第一个示例,EphemeralStream但没有看到任何显着的堆压力,即使它显然具有相同的“未执行状态 mobits”。不同之处在于 anEphemeralStream的元素被弱引用,并且它foldRight不会强制整个流。

我怀疑如果您使用过Foldable.foldr,那么您将看不到有问题的行为,因为它与第二个参数中惰性的函数折叠在一起。当你调用 fold 时,你希望它立即返回一个看起来像这样的暂停:

Suspend(() => head |+| tail.foldRightM(...))

当蹦床恢复第一次暂停并运行到下一次暂停时,暂停之间的所有分配都将可供垃圾收集器释放。

尝试以下操作:

def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

对于蹦床单子,这将在恒定堆中运行M,但对于非蹦床单子,它将溢出堆栈。

真正的问题是,Iterable对于太大而无法放入内存的数据,这不是一个好的抽象。当然,你可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠。在您想将该程序与另一个程序组合之前,这很有效。而且我假设您正在调查在State单子中这样做的全部原因是为了获得组合性。

所以,你可以做什么?以下是一些选项:

  1. 使用Reducer, Monoid, 及其组合,然后在命令式显式释放循环(或蹦床惰性右折叠)中运行作为最后一步,之后组合是不可能的或预期的。
  2. 使用Iterateecomposition 和 monadicEnumerator来喂养它们。
  3. 使用Scalaz-Stream编写组合流转换器。

这些选项中的最后一个是我在一般情况下会使用和推荐的选项。

于 2013-12-25T00:15:56.520 回答
1

使用State或任何类似的单子不是解决问题的好方法。使用State注定会破坏大型集合的堆栈/堆。考虑x: State[A,B]从大型集合构造的值(例如通过折叠它)。然后x可以对初始状态的不同值进行评估A,产生不同的结果。所以x需要保留集合中包含的所有信息。在纯设置中,x不能忘记一些不破坏堆栈/堆的信息,因此计算的任何内容都保留在内存中,直到整个单子值被释放,这仅在评估结果后才会发生。所以内存消耗x与集合的大小成正比。

我相信解决这个问题的合适方法是使用功能iteratees/pipes/conduits。发明这个概念(在这三个名称下)是为了处理具有恒定内存消耗的大量数据集合,并使用简单的组合器来描述此类过程。

我尝试使用 Scalaz' Iteratees,但是这部分似乎还不成熟,它同样会遭受堆栈溢出的困扰State(或者我可能没有正确使用它;如果有人感兴趣,可以在此处获得代码)。

但是,使用我的(仍然有点实验性)scala-conduit库很简单(免责声明:我是作者):

import conduit._
import conduit.Pipe._

object Run extends App {
  // Define a sampling function as a sink: It consumes
  // data of type `A` and produces a vector of samples.
  def sampleI[A](k: Int): Sink[A, Vector[A]] =
    sampleI[A](k, 0, Vector())

  // Create a sampling sink with a given state. It requests
  // a value from the upstream conduit. If there is one,
  // update the state and continue (the first argument to `requestF`).
  // If not, return the current sample (the second argument).
  // The `Finalizer` part isn't important for our problem.
  private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                  Sink[A, Vector[A]] =
    requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
             (_: Any) => sample)(Finalizer.empty)


  // The sampling algorithm copied from the question.
  val rand = new scala.util.Random()

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
      sample :+ x // must keep first k elements
    } else {
      val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
      if (r <= k)
        sample.updated(r - 1, x) // sample is 0-index
      else
        sample
    }
  }

  // Construct an iterable of all `short` values, pipe it into our sampling
  // funcition, and run the combined pipe.
  {
    print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
          sampleI(10)))
  }
}

更新:使用 可以解决问题State,但我们需要专门实现一个自定义折叠,因为State它知道如何做到这一点恒定空间:

import scala.collection._
import scala.language.higherKinds
import scalaz._
import Scalaz._
import scalaz.std.iterable._

object Run extends App {
  // Folds in a state monad over a foldable
  def stateFold[F[_],E,S,A](xs: F[E],
                            f: (A, E) => State[S,A],
                            z: A)(implicit F: Foldable[F]): State[S,A] =
    State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))


  // Sample a lazy collection view
  def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                  State[Int,Vector[A]] =
    stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())

  // update using State monad
  def update[A](k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
  }

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...

  {
    print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
  }
}
于 2013-12-25T21:04:17.430 回答