11

(我在这个例子中使用了pyprocessing模块,但是如果你运行python 2.6或者使用multiprocessing backport,用 multiprocessing 替换 processing 应该可以工作)

我目前有一个程序可以侦听 unix 套接字(使用 processing.connection.Listener),接受连接并生成一个处理请求的线程。在某个时刻,我想优雅地退出该过程,但由于 accept() 调用被阻塞,我看不出有什么办法可以很好地取消它。我至少有一种方法可以在这里工作(OS X),设置一个信号处理程序并从另一个线程发出进程信号,如下所示:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

我想到的唯一另一种方法是深入连接(listener._listener._socket)并设置非阻塞选项......但这可能有一些副作用并且通常非常可怕。

有没有人有更优雅(甚至可能是正确的!)的方式来完成这个?它需要可移植到 OS X、Linux 和 BSD,但 Windows 可移植性等不是必需的。

澄清:谢谢大家!像往常一样,我原来的问题中的歧义被揭示了:)

  • 我需要在取消监听后执行清理,并且我并不总是想真正退出该过程。
  • 我需要能够从不是从同一个父进程产生的其他进程访问这个进程,这使得队列变得笨拙
  • 线程的原因是:
    • 他们访问共享状态。实际上或多或少是一个常见的内存数据库,所以我想它可以做得不同。
    • 我必须能够同时接受多个连接,但实际线程大部分时间都在阻塞。每个接受的连接都会产生一个新线程;这是为了不阻止 I/O 操作上的所有客户端。

关于线程与进程,我使用线程来使我的阻塞操作非阻塞,并使用进程来启用多处理。

4

5 回答 5

3

这不是选择的目的吗?

如果选择表明它不会阻塞,则仅在套接字上调用接受...

选择有超时,因此您可以偶尔中断以检查是否该关闭...。

于 2009-01-10T19:03:29.077 回答
3

我以为我可以避免它,但似乎我必须做这样的事情:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

我知道这违反了 demeter 法则并引入了一些邪恶的猴子补丁,但似乎这将是实现这一点的最容易移植的方式。如果有人有更优雅的解决方案,我会很高兴听到它:)

于 2009-01-21T13:46:04.070 回答
1

我是多处理模块的新手,但在我看来,混合处理模块和线程模块是违反直觉的,他们的目标不是解决同样的问题吗?

无论如何,将你的监听函数包装到一个进程本身怎么样?我不清楚这如何影响您的其余代码,但这可能是一个更干净的选择。

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'
于 2008-12-11T03:46:03.413 回答
0

可能并不理想,但您可以通过从信号处理程序或终止进程的线程向套接字发送一些数据来释放块。

编辑:实现这一点的另一种方法可能是使用Connection Queues,因为它们似乎支持超时(抱歉,我在第一次阅读时误读了您的代码)。

于 2008-12-10T22:14:28.683 回答
0

我遇到了同样的问题。我通过向侦听器发送“停止”命令来解决它。在侦听器的主线程(处理传入消息的线程)中,每次收到新消息时,我只是检查它是否是“停止”命令并退出主线程。

这是我正在使用的代码:

def start(self):
    """
    Start listening
    """
    # set the command being executed
    self.command = self.COMMAND_RUN

    # startup the 'listener_main' method as a daemon thread
    self.listener = Listener(address=self.address, authkey=self.authkey)
    self._thread = threading.Thread(target=self.listener_main, daemon=True)
    self._thread.start()

def listener_main(self):
    """
    The main application loop
    """

    while self.command == self.COMMAND_RUN:
        # block until a client connection is recieved
        with self.listener.accept() as conn:

            # receive the subscription request from the client
            message = conn.recv()

            # if it's a shut down command, return to stop this thread
            if isinstance(message, str) and message == self.COMMAND_STOP:
                return

            # process the message

def stop(self):
    """
    Stops the listening thread
    """
    self.command = self.COMMAND_STOP
    client = Client(self.address, authkey=self.authkey)
    client.send(self.COMMAND_STOP)
    client.close()

    self._thread.join()

我正在使用身份验证密钥来防止黑客通过从任意客户端发送停止命令来关闭我的服务。

我的不是一个完美的解决方案。似乎更好的解决方案可能是修改 中的代码multiprocessing.connection.Listener,并添加一个stop()方法。但是,这需要通过流程发送它以供 Python 团队批准。

于 2018-06-02T09:06:14.740 回答