-2

使用 Pathos 并行化以下代码,我得到一个错误。这是我使用的代码:

import numpy as np
from PyQt5 import QtWidgets
from PyQt5.QtCore import pyqtSignal, pyqtSlot
import pyqtgraph as pg
import pyqtgraph.exporters
from PyQt5.QtWidgets import QMessageBox, QApplication
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph import GraphicsLayoutWidget
from threading import Thread, Event
import time

import sys

# import multiprocessing
# print("multiprocessing.__version__:", multiprocessing.__version__)
# '0.70a1'
import multiprocess
print("multiprocess.__version__:", multiprocess.__version__)

import dill
# import multiprocess
import pathos.multiprocessing as mp
# from pathos.multiprocessing import ProcessPool
# from multiprocessing import Pool, freeze_support
# from multiprocessing import Pool
# from multiprocessing import cpu_count

app = QtWidgets.QApplication(sys.argv)


# def sigint_handler(*args):
#     """Handler for the SIGINT signal."""
#     sys.stderr.write('\r')
#     if QMessageBox.question(None, '', "Are you sure you want to quit?",
#                             QMessageBox.Yes | QMessageBox.No,
#                             QMessageBox.No) == QMessageBox.Yes:
#         QApplication.quit()


# Routine to acquire and serve data
# This might be a camera driver, notifying when a new frame is available
def generate_data(callback, threadkill, myVar):
    # def generate_data(callback):
    # while not threadkill.is_set():
    #     width = 1600
    #     data = np.zeros(width)
    #     runner = int(np.random.random()*5)
    #     data.fill(runner)
    #     callback(data, runner)
    #     time.sleep(0.01)
    print("generate_data-myVar:", myVar)
    for runner in range(5):
        width = 1600
        data = np.zeros(width)
        data.fill(runner)
        my_var = callback(data, runner)
        time.sleep(0.01)
        print("callback called")

    # QApplication.quit()
    # app.quit()
    print("finfin-generate_data")

    # threadkill.set()
    # # sys.exit(1)




class PyQtGraphTest(GraphicsLayoutWidget):
    def workerFunction(self, myVar):
        generate_data(self.data_acquired.emit, self.threadkill, myVar)

    # Signal to indicate new data acquisition
    # Note: signals need to be defined inside a QObject class/subclass
    data_acquired = pyqtSignal(np.ndarray, int)

    def __init__(self, myVar):
        super().__init__()
        self.exporter = None

        self.setWindowTitle('Test pyqtgraph paint signals')
        self.resize(640, 400)
        self.plot = self.addPlot()
        self.spectrum = self.plot.plot()
        self.plot.enableAutoRange(pg.ViewBox.XYAxes)

        # Connect the signal
        self.data_acquired.connect(self.update_data)

        # Make and start the background thread to acquire data
        # Pass it the signal.emit as the callback function
        self.threadkill = Event()

        if True:

            dill.copy(self.workerFunction)

            # processes = cpu_count()
            processes = 4
            # pool = ProcessPool(processes)
            pool = mp.Pool(processes)
            # pool.map(generate_data, (self.data_acquired.emit, self.threadkill))
            pool.map(self.workerFunction,range(myVar))
            # pool.map(generate_data, self.data_acquired.emit)
        else:
            # myVar = 15
            self.thread = Thread(target=generate_data, args=(self.data_acquired.emit, self.threadkill, myVar))
            # self.thread = Thread(target=generate_data, args=(self.data_acquired.emit, self.threadkill))
            self.thread.start()
            self.thread.join()

        # QApplication.quit()
        # app.quit()

        print("finfin-init")

        # my_var = self.thread.isAlive()
        # print("my_var:", my_var)
        # self.threadkill.set()
        # print("fin")

    # Kill our data acquisition thread when shutting down
    def closeEvent(self, close_event):
        self.threadkill.set()

    # Slot to receive acquired data and update plot
    @pyqtSlot(np.ndarray, int)
    def update_data(self, data, runner):
        print("in update_data")
        if not self.exporter:
            # Here we are passing the exporter the GraphicsLayout object that is
            # the central item (ci) inside this GraphicsLayoutWidget. That in
            # turn contains the two PlotItem objects.
            self.exporter = pg.exporters.ImageExporter(self.ci)
            self.exporter.parameters()['width'] = 640

        runner = 0
        while runner < 100_000_000:
            myVar * myVar
            runner += 1

        self.spectrum.setData(data)
        self.plot.autoRange()
        self.exporter.export(str(runner) + 'exported_image.png')
        print("exported image " + str(runner))

        # QApplication.quit()
        app.quit()
        print("finfin-update")


print("hallo")
# if True:
if __name__ == '__main__':
    # def startApp(myVar):
    myVar = 1
    print("myVar:", myVar)
    window = PyQtGraphTest(myVar)
    window.show()
    if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
        sys.exit(app.exec_())

    print("fin2")

错误:

/home/user/anaconda3/bin/python3 /snap/pycharm-professional/89/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 38059 --file /home/user/PycharmProjects/0480_all_integrated/0030_wrk/a_0020_working_advanced.py
pydev debugger: process 19371 is connecting

Connected to pydev debugger (build 182.4505.26)
/home/user/anaconda3/lib/python3.6/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.
  from ._conv import register_converters as _register_converters
multiprocess.__version__: 0.70.6.1
hallo
myVar: 1
Traceback (most recent call last):
  File "/snap/pycharm-professional/89/helpers/pydev/pydevd.py", line 1664, in <module>
    main()
  File "/snap/pycharm-professional/89/helpers/pydev/pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/snap/pycharm-professional/89/helpers/pydev/pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/snap/pycharm-professional/89/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/home/user/PycharmProjects/0480_all_integrated/0030_wrk/a_0020_working_advanced.py", line 160, in <module>
    window = PyQtGraphTest(myVar)
  File "/home/user/PycharmProjects/0480_all_integrated/0030_wrk/a_0020_working_advanced.py", line 98, in __init__
    dill.copy(self.workerFunction)
  File "/home/user/anaconda3/lib/python3.6/site-packages/dill/_dill.py", line 239, in copy
    return loads(dumps(obj, *args, **kwds))
  File "/home/user/anaconda3/lib/python3.6/site-packages/dill/_dill.py", line 293, in dumps
    dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
  File "/home/user/anaconda3/lib/python3.6/site-packages/dill/_dill.py", line 286, in dump
    pik.dump(obj)
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/anaconda3/lib/python3.6/site-packages/dill/_dill.py", line 1069, in save_instancemethod0
    pickler.save_reduce(MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 610, in save_reduce
    save(args)
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/home/user/anaconda3/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle PyQtGraphTest objects
4

1 回答 1

2

我是pathosdill作者。您的回溯说PyQtGraphTest对象不可序列化。有些对象dill无法序列化——通常具有 C 源代码的对象不容易序列化。如果您不关心PyQtGraphTest对象,那么您也许可以重新组织您的代码,因此当您的函数指向时globals(即从顶级名称空间中删除不必要的对象),您的函数会引用最少数量的对象。或者,ProcessPool您可以尝试 a ,而不是使用 a ThreadPool,因为它不那么强烈地依赖序列化。

于 2018-09-24T11:41:26.847 回答