我最近使用带有 aiohttp 的 python-socketio 编写了一个客户端/服务器应用程序,我的应用程序基于异步命名空间(服务器端),另外我的 on_message 事件中有许多等待调用,因此我必须使用异步锁来确保我保持我想要的流量。为了实现这种行为,我编写了一个装饰器并用它包装了每个临界区类型的函数。
@async_synchronized('_async_mutex')
async def on_connect(self, sid, environ):
self._logger.info("client with sid: {} connected to namespace: {}!".format(
sid, __class__.__name__))
important_member = 1
await other_class.cool_coroutine()
important_member = 2
在我的构造函数中,我已经初始化_async_mutex = asyncio.Lock()
装饰师:
def async_synchronized(tlockname):
"""A decorator to place an instance based lock around a method """
def _synched(func):
@wraps(func)
async def _synchronizer(self, *args, **kwargs):
tlock = self.__getattribute__(tlockname)
try:
async with tlock:
return await func(self, *args, **kwargs)
finally:
pass
return _synchronizer
return _synched
现在,在任何正常使用情况下,一切都可以正常工作(关闭/打开客户端会正确触发功能并且锁按预期执行)。重要的是要注意我的 on_disconnect 函数是用完全相同的装饰器和锁包装的。我遇到的问题是当客户端的网络适配器物理断开连接时(正常的客户端关闭工作正常),我看到我的 on_disconnect 事件确实被调用了,但另一个协程当前正在持有锁。由于某种原因,该事件被多次触发并最终陷入僵局。
我已经用描述锁的状态/调用函数的打印包装了我的装饰器,并且还在每个异步调用周围添加了一个 try/catch。似乎我所有的协同程序都捕获了一个取消的异常(我推测是 aiohttp),因此一个“持有”锁的方法被取消并且锁永远不会被释放。我尝试用 asyncio.shield() 包装每个异步调用,但行为没有改变。
我应该在这里采用不同的异步锁方法吗?(移除锁完全可以解决问题,但可能会导致应用程序的计算部分出现未定义的行为)
更多代码示例:实际的 on_connect 和 on_disconnect 事件:
@async_synchronized('_async_mutex')
async def on_connect(self, sid, environ):
self._logger.info("very good log message")
self._connected_clients_count += 1
@async_synchronized('_async_mutex')
async def on_disconnect(self, sid):
self._logger.info("very good disconnect message")
self._connected_clients_count -= 1
await self._another_namespace_class.inform_client_disconnect(sid) # this method is wrapped with the same decorator but with a different lock
注意:另一个没有相同的客户端连接到它。此外,当发生网络断开连接时,我也看不到日志消息(我已将日志级别设置为调试)