3

更新:编辑标题以关注主要问题。请参阅我的答案以获取完整更新。

在下面的代码中,a()b()是相同的。它们中的每一个同时从 0 计数到 9,同时每 2 个计数获取并产生一个锁。

import asyncio

lock = asyncio.Lock()

def a ():
 yield from lock.acquire()
 for i in range(10):
  print('a: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

def b ():
 yield from lock.acquire()
 for i in range(10):
  print('b: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

asyncio.get_event_loop().run_until_complete(asyncio.gather(a(), b()))

print('done')

我期望交错输出,但我得到:

b: 0
b: 1
b: 2
b: 3
b: 4
b: 5
b: 6
b: 7
b: 8
b: 9
a: 0
a: 1
a: 2
a: 3
a: 4
a: 5
a: 6
a: 7
a: 8
a: 9
done

似乎第二个yield实际上并没有产生,而是立即重新获取锁并继续。

这对我来说似乎是一个错误。我对吗?还是有其他解释?

下面的代码,用额外的初始 "noop" 进行了修改yield,可以正常工作。这让我相信锁确实是公平的并且可能是正确的。

import asyncio

lock = asyncio.Lock()

def a ():
 yield from lock.acquire()
 yield from asyncio.sleep(0)
 for i in range(10):
  print('a: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

def b ():
 yield from lock.acquire()
 yield from asyncio.sleep(0)
 for i in range(10):
  print('b: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

asyncio.get_event_loop().run_until_complete(asyncio.gather(a(), b()))

print('done')

输出:

a: 0
b: 0
a: 1
a: 2
b: 1
b: 2
a: 3
a: 4
b: 3
b: 4
a: 5
a: 6
b: 5
b: 6
a: 7
a: 8
b: 7
b: 8
a: 9
b: 9
done

请注意,我在一开始只做一次无操作产量,而不是每 2 次。然而,这样做会导致在第一段代码中按预期每 2 个计数进行交错。

调度程序中只有一些优化(我认为是一个错误),yield当获取没有其他人正在等待的锁时,它并没有真正实现。

如何解释第一个输出?

4

2 回答 2

2

更新:根据我对 github 问题的评论(链接),以下内容已过时。该评论指出,您可以使用它Lock.locked()来预测是否Lock.acquire()会屈服。它还观察到许多其他协程在快速情况下不会屈服,因此即使考虑修复所有协程也是一个失败的原因。最后,它与如何解决不同的问题有关,并建议它可以更好地解决。那是对一种方法的请求,该asyncio.nop()方法只会让步给调度程序而不做其他任何事情。他们没有添加该方法,而是决定重载asyncio.sleep(0)和“反优化”它(在本次讨论的上下文中lock.acquire())以在参数为 0 时让步给调度程序。

下面的原始答案,但被上面的段落取代:

asyncio.lock 的实现在其前三行中尝试过于智能并且如果没有等待者则不会将控制权交还给调度程序的根本原因:

if not self._locked and all(w.cancelled() for w in self._waiters):
    self._locked = True
    return True

但是,正如我的第一个示例所示,这会阻止其他协程甚至成为服务员。他们只是没有机会跑到他们试图获得锁的地步。

无效的解决方法是总是在获取锁之前yield from asyncio.sleep(0)立即进行。

这是低效的,因为在常见情况下会有其他等待者,并且获取锁也会将控制权交还给调度程序。因此,在大多数情况下,您将两次将控制权交还给调度程序,这很糟糕。

另请注意,锁的文档含糊地说:“此方法阻塞,直到锁被解锁,然后将其设置为锁定并返回 True。” 当然给人的印象是它会在获取锁之前将控制权交给调度程序。

在我看来,正确的做法是让锁实现始终让步并且不要太聪明。或者,锁实现应该有一个方法告诉您如果获得它是否会屈服,以便您的代码可以在锁获得不会时手动屈服。另一种选择是让acquire()调用返回一个值,告诉您它是否实际产生。这不太可取,但仍比现状好。

有人可能认为更好的解决方法可能是在release(). 但是,如果您查看一个在工作完成后发布和重新获取的紧密循环,那么它就相当于同一件事——在常见的情况下,它仍然会产生两次,一次在发布时,再次在获取时,增加了低效率。

于 2017-01-18T01:07:41.317 回答
1

目前尚不清楚您要实现什么目标,但似乎Lock不是您需要的工具。要交错 python 代码,您可以执行以下简单操作:

def foo(tag, n):
    for i in range(n):
        print("inside", tag, i)
        yield (tag, i)


print('start')

for x in zip(foo('A', 10), foo('B', 10)):
    print(x)

print('done')

不需要asyncioor threading。无论如何,asyncio没有 IO 没有多大意义。

threading. Lock用于同步在独立线程中运行的程序的关键部分。 asyncio.Lock将允许其他协程在一个协程等待时继续使用 IO:

import asyncio
import random

lock = asyncio.Lock()


async def foo(tag):
    print(tag, "Start")
    for i in range(10):
        print(tag, '>', i)
        await asyncio.sleep(random.uniform(0.1, 1))
        print(tag, '<', i)

    async with lock:
        # only one coroutine can execute the critical section at once.
        # other coroutines can still use IO.
        print(tag, "CRITICAL START")
        await asyncio.sleep(1)
        print(tag, "STILL IN CRITICAL")
        await asyncio.sleep(1)
        print(tag, "CRITICAL END")

    for i in range(10, 20):
        print(tag, '>', i)
        await asyncio.sleep(random.uniform(0.1, 1))
        print(tag, '<', i)

    print(tag, "Done")


print('start')

loop = asyncio.get_event_loop()
tasks = asyncio.gather(foo('A'), foo('B'), foo('C'))
loop.run_until_complete(tasks)
loop.close()

print('done')

请记住,关键字yield并不总是遵循yield的英文含义:-) 。

您可以看到async with lock立即获取锁更有意义,而无需等待其他协程做更多工作:第一个到达关键部分的协程应该开始运行它。(即,添加await asyncio.sleep(0)之前async with lock:没有任何意义。)

于 2017-01-18T08:25:13.537 回答