multiprocessing.Pool – PicklingError: Can’t pickle : attribute lookup thread.lock failed

multiprocessing.Pool is driving me crazy…
I want to upgrade many packages, and for every one of them I have to check whether there is a greater version or not. This is done by the check_one function.
The main code is in the Updater.update method: there I create the Pool object and call the map() method.

Here is the code:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) 
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

When I run it I get this weird error:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

multiprocessing passes tasks (which include check_one and data) to the worker processes through a mp.SimpleQueue. Unlike Queue.Queues, everything put in the mp.SimpleQueue must be pickable. Queue.Queues are not pickable:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

yields this exception:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

Your data includes packages, which is a Queue.Queue. That might be the source of the problem.


Here is a possible workaround: The Queue is being used for two purposes:

  1. to find out the approximate size (by calling qsize)
  2. to store results for later retrieval.

Instead of calling qsize, to share a value between multiple processes, we could use a mp.Value.

Instead of storing results in a queue, we can (and should) just return values from calls to check_one. The pool.map collects the results in a queue of its own making, and returns the results as the return value of pool.map.

For example:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()

Method 2

After a lot of digging on a similar issue…

It also turns out that ANY object that happens to contain a threading.Condition() object will NEVER NEVER work with multiprocessing.Pool.

Here is an example

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

I’m ran this with Python 2.7.5 and hit the same error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

But then ran it on python 3.4.1 and this issue has been fixed.

Although I haven’t come across any useful workarounds yet for those of us still on 2.7.x.

Method 3

I experienced this issue with python version 3.6 on docker. Changed the version to 3.7.3 and it was solved.


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x