TypeError: can't pickle _thread.lock objects
Python 3.xPython 3.x Problem Overview
Trying to run two different functions at the same time with shared queue and get an error...how can I run two functions at the same time with a shared queue? This is Python version 3.6 on Windows 7.
from multiprocessing import Process
from queue import Queue
import logging
def main():
x = DataGenerator()
try:
x.run()
except Exception as e:
logging.exception("message")
class DataGenerator:
def __init__(self):
logging.basicConfig(filename='testing.log', level=logging.INFO)
def run(self):
logging.info("Running Generator")
queue = Queue()
Process(target=self.package, args=(queue,)).start()
logging.info("Process started to generate data")
Process(target=self.send, args=(queue,)).start()
logging.info("Process started to send data.")
def package(self, queue):
while True:
for i in range(16):
datagram = bytearray()
datagram.append(i)
queue.put(datagram)
def send(self, queue):
byte_array = bytearray()
while True:
size_of__queue = queue.qsize()
logging.info(" queue size %s", size_of_queue)
if size_of_queue > 7:
for i in range(1, 8):
packet = queue.get()
byte_array.append(packet)
logging.info("Sending datagram ")
print(str(datagram))
byte_array(0)
if __name__ == "__main__":
main()
The logs indicate an error, I tried running console as administrator and I get the same message...
INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
File "test.py", line 8, in main
x.run()
File "test.py", line 20, in run
Process(target=self.package, args=(queue,)).start()
File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
Python 3.x Solutions
Solution 1 - Python 3.x
I had the same problem with Pool()
in Python 3.6.3.
Error received: TypeError: can't pickle _thread.RLock objects
Let's say we want to add some number num_to_add
to each element of some list num_list
in parallel. The code is schematically like this:
class DataGenerator:
def __init__(self, num_list, num_to_add)
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()
pool = Pool(processes=50)
results = [pool.apply_async(run_parallel, (num, new_num_list))
for num in num_list]
roots = [r.get() for r in results]
pool.close()
pool.terminate()
pool.join()
def run_parallel(self, num, shared_new_num_list):
new_num = num + self.num_to_add # uses class parameter
shared_new_num_list.append(new_num)
The problem here is that self
in function run_parallel()
can't be pickled as it is a class instance. Moving this parallelized function run_parallel()
out of the class helped. But it's not the best solution as this function probably needs to use class parameters like self.num_to_add
and then you have to pass it as an argument.
Solution:
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
new_num = num + to_add
shared_new_num_list.append(new_num)
class DataGenerator:
def __init__(self, num_list, num_to_add)
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()
pool = Pool(processes=50)
results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
for num in num_list]
roots = [r.get() for r in results]
pool.close()
pool.terminate()
pool.join()
Other suggestions above didn't help me.
Solution 2 - Python 3.x
You need to change from queue import Queue
to from multiprocessing import Queue
.
The root reason is the former Queue is designed for threading module Queue while the latter is for multiprocessing.Process module.
Solution 3 - Python 3.x
Move the queue to self instead of as an argument to your functions package
and send
Solution 4 - Python 3.x
Complementing Marina answer here something to access the whole class. It also fools Pool.map as I needed today.
fakeSelf = None
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
new_num = num + fakeSelf.num_to_add
shared_new_num_list.append(new_num)
class DataGenerator:
def __init__(self, num_list, num_to_add)
globals()['fakeSelf'] = self
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()