我正在尝试编写一个将在树莓派上运行的 python 应用程序,它将同时具有套接字连接(带有 uvicorn 的 socketio)和物理输入侦听器。我打算同时监听套接字连接和 gpio 事件,而不会相互阻塞。这是我到目前为止所拥有的:
api.py
import uvicorn
import asyncio
from interaction.volume import VolumeControl
from system.platform_info import PlatformInfo
from connection.api_socket import app
class Api:
def __init__(self):
pass
def initialize_volume_listener(self):
volume_controller = VolumeControl()
volume_controller.start_listener()
def start(self):
PlatformInfo().print_info()
self.initialize_volume_listener()
uvicorn.run(app, host='127.0.0.1', port=5000, loop="asyncio")
volume_control.py
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
def volume_up(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(volume_up(None, None))
loop.run_until_complete(future)
loop.close()
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up
api_socket.py
import socketio
from system.platform_info import PlatformInfo
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
app = socketio.ASGIApp(sio)
@sio.on('connect')
async def test_connect(sid, environ):
system_info = PlatformInfo().get_info()
current_volume = 35
initial_data = {"system_info": system_info,
"settings": {"volume": current_volume}
}
await sio.emit('initial_data', initial_data, room=sid)
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('disconnect')
async def test_disconnect(sid):
print('Client disconnected')
await sio.emit('disconnect', {'data': 'Connected', 'count': 0}, room=sid)
@sio.on('volume_up')
async def volume_up(sid, volume=None):
increased_volume = 25
await sio.emit('volume_up', {'volume': increased_volume})
@sio.on('volume_down')
async def volume_down(sid, volume=None):
decreased_volume = 25
await sio.emit('volume_down', {'volume': decreased_volume})
我尝试过使用 asyncio,但我对 python 的异步功能有点陌生。问题是,我无法连续运行按钮侦听器,因此当套接字功能正在进行时,我将能够同时侦听按钮交互,而不会相互阻塞。按钮监听器根本不起作用。相反,只要 uvicorn 应用程序启动,我就需要按钮侦听器运行。
任何帮助将不胜感激。谢谢。