lidongok

用 Python 实现的线程池
lidongok | Dec 27, 2007 4:33:47 PM
原文出处 http://blog.daviesliu.net/2006/10/09/234822/

用 Python 实现的线程池

davies 发表于 2006 年 10 月 9 日

为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。

下面是用Python实现的通用的线程池代码:
view plainprint?

   1. import Queue, threading, sys  
   2. from threading import Thread  
   3. import time,urllib  
   4.   
   5. # working thread  
   6. class Worker(Thread):  
   7.     worker_count = 0  
   8.     def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):  
   9.         Thread.__init__( self, **kwds )  
  10.         self.id = Worker.worker_count  
  11.         Worker.worker_count += 1  
  12.         self.setDaemon( True )  
  13.         self.workQueue = workQueue  
  14.         self.resultQueue = resultQueue  
  15.         self.timeout = timeout  
  16.   
  17.     def run( self ):  
  18.         ''' the get-some-work, do-some-work main loop of worker threads '''  
  19.         while True:  
  20.             try:  
  21.                 callable, args, kwds = self.workQueue.get(timeout=self.timeout)  
  22.                 res = callable(*args, **kwds)  
  23.                 print "worker[%2d]: %s" % (self.id, str(res) )  
  24.                 self.resultQueue.put( res )  
  25.             except Queue.Empty:  
  26.                 break  
  27.             except :  
  28.                 print 'worker[%2d]' % self.id, sys.exc_info()[:2]  
  29.                   
  30. class WorkerManager:  
  31.     def __init__( self, num_of_workers=10, timeout = 1):  
  32.         self.workQueue = Queue.Queue()  
  33.         self.resultQueue = Queue.Queue()  
  34.         self.workers = []  
  35.         self.timeout = timeout  
  36.         self._recruitThreads( num_of_workers )  
  37.   
  38.     def _recruitThreads( self, num_of_workers ):  
  39.         for i in range( num_of_workers ):  
  40.             worker = Worker( self.workQueue, self.resultQueue, self.timeout )  
  41.             self.workers.append(worker)  
  42.   
  43.     def start(self):  
  44.         for w in self.workers:  
  45.             w.start()  
  46.   
  47.     def wait_for_complete( self):  
  48.         # ...then, wait for each of them to terminate:  
  49.         while len(self.workers):  
  50.             worker = self.workers.pop()  
  51.             worker.join( )  
  52.             if worker.isAlive() and not self.workQueue.empty():  
  53.                 self.workers.append( worker )  
  54.         print "All jobs are are completed."  
  55.   
  56.     def add_job( self, callable, *args, **kwds ):  
  57.         self.workQueue.put( (callable, args, kwds) )  
  58.   
  59.     def get_result( self, *args, **kwds ):  
  60.         return self.resultQueue.get( *args, **kwds )  

import Queue, threading, sys from threading import Thread import time,urllib # working thread class Worker(Thread): worker_count = 0 def __init__( self, workQueue, resultQueue, timeout = 0, **kwds): Thread.__init__( self, **kwds ) self.id = Worker.worker_count Worker.worker_count += 1 self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue self.timeout = timeout def run( self ): ''' the get-some-work, do-some-work main loop of worker threads ''' while True: try: callable, args, kwds = self.workQueue.get(timeout=self.timeout) res = callable(*args, **kwds) print "worker[%2d]: %s" % (self.id, str(res) ) self.resultQueue.put( res ) except Queue.Empty: break except : print 'worker[%2d]' % self.id, sys.exc_info()[:2] class WorkerManager: def __init__( self, num_of_workers=10, timeout = 1): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.workers = [] self.timeout = timeout self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue, self.timeout ) self.workers.append(worker) def start(self): for w in self.workers: w.start() def wait_for_complete( self): # ...then, wait for each of them to terminate: while len(self.workers): worker = self.workers.pop() worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) print "All jobs are are completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) def get_result( self, *args, **kwds ): return self.resultQueue.get( *args, **kwds )

Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到 resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue 中获取任务超时,则线程结束。

WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。
view plainprint?

   1. def test_job(id, sleep = 0.001 ):  
   2.     try:  
   3.         urllib.urlopen('https://www.gmail.com/').read()  
   4.     except:  
   5.         print '[%4d]' % id, sys.exc_info()[:2]  
   6.     return  id  
   7.   
   8. def test():  
   9.     import socket  
  10.     socket.setdefaulttimeout(10)  
  11.     print 'start testing'  
  12.     wm = WorkerManager(10)  
  13.     for i in range(500):  
  14.         wm.add_job( test_job, i, i*0.001 )  
  15.     wm.start()  
  16.     wm.wait_for_complete()  
  17.     print 'end testing'  

def test_job(id, sleep = 0.001 ): try: urllib.urlopen('https://www.gmail.com/').read() except: print '[%4d]' % id, sys.exc_info()[:2] return id def test(): import socket socket.setdefaulttimeout(10) print 'start testing' wm = WorkerManager(10) for i in range(500): wm.add_job( test_job, i, i*0.001 ) wm.start() wm.wait_for_complete() print 'end testing'

完成的程序可以在这里下载。
网友留言:
Re: 用 Python 实现的线程池1. fandatou 发表于 2006 年 12 月 11 日 3:25 p.m.

楼主,请问你的线程持timeout怎么设置是socket.setdefaulttimeout(10) 还是worker.Worker.timeout = 20

另外请问下,我开5个线程cpu就是100%,似乎google能跑300个线程。
Re: 用 Python 实现的线程池2. davies 发表于 2006 年 12 月 11 日 7:40 p.m.

你看一下代码吧,还是比较易读的。WorkerManager有timeout参数,socket.setdefaulttimeout()是与网络相关的,与线程没有关系。

开多少线程得看是做什么事情,如果是CPU密集型的,一个线程都会导致CPU 100%,如果是IO密集型,比如用urllib获取网页,则可以开多一些,我用过100多个,甚至可以更多。
Re: 用 Python 实现的线程池3. fandatou 发表于 2006 年 12 月 12 日 6:21 p.m.

我就是用来抓网页。用perl写开30个线程要用2G内存,用python写5线程只用40M内存但是cpu是100%。另外关于timeout能明确说明下吗?刚从perl转python还不熟悉基本是拿人家代码修改。正好google到你这里来。

class WorkerManager:

def __init__( self, num_of_workers=10, timeout = 2):

Worker.py中的这两行中的timeout就是线程timeout的时间吗?
Re: 用 Python 实现的线程池4. davies 发表于 2006 年 12 月 12 日 7:07 p.m.

对,这个 timeout 是线程等待任务的时间,如果在 timeout 时间内任务队列中没有要执行的任务,则会结束线程。这就要求在线程开始后,timeout 时间内添加足够的任务,让所有线程都在运行。

现在这样设计是不好的,只适合与事先知道要做什么这样的场合,要改成其它方式也容易
Re: 用 Python 实现的线程池5. fandatou 发表于 2006 年 12 月 14 日 1:03 a.m.

原来是任务的timeout啊,我以为是每个线程的timeout时间呢。我用你的worker,往往会出现意外,最后只剩下一个线程再跑,只好在线程任务中每个关键点都加上try
Re: 用 Python 实现的线程池6. davies 发表于 2006 年 12 月 14 日 8:25 p.m.

看看第28-29行,在执行任务出现意外时,会打印出错信息后继续执行下一个任务,只有当队列中没有其它任务时才会结束。有可能是你的timeout设得太短,而添加任务又相对较慢,导致任务还没加进去,就有线程结束了。

可以给WorkerManager加一个start函数,手动让所有线程开始工作。
Re: 用 Python 实现的线程池7. 老熊 发表于 2007 年 01 月 23 日 12:12 a.m.

环境:简体XP+python 2.4+mysql

初学python,使用了你这个链接池,遇到个奇怪的问题

你这里的test_job,我又调用了自己写外部的一个类,test_job是将URL地址,给外部类,然后由外部类对URL处理,包括read()等,返回值,test_job再将返回值插入数据库。

现在问题是,这个外部类,在非多线程情况下运行正常,能正常处理URL,包括read,parse等。但是在多线程运行时候就抛出exception:

class exceptions.UnicodeDecodeError at 0x00B64F90

百思不得其解~~~
Re: 用 Python 实现的线程池8. davies 发表于 2007 年 01 月 23 日 8:43 a.m.

看看是哪一句抛出的异常,可能你的外部类里有部分代码不是线程安全的,比如用到的第三方库之类
Re: 用 Python 实现的线程池9. 老熊 发表于 2007 年 01 月 23 日 9:33 a.m.

呵呵,你真早。

外部类中没有第三方的库,仅仅是string的方法,发现抛出异常的代码是String.find()方法,后来将

String.decode('utf-8','ignore'),然后再执行find,就通过了。

但让我真正疑问的是,这样的decode与多线程有关?因为段代码在非多线程情况下测试是正确的

方便的话,加gtalk交流:-)1380127(#)gmail.com
Re: 用 Python 实现的线程池10. 嘛呢 发表于 2007 年 09 月 11 日 1:58 a.m.

由于你这个只能完成设定好的工作,想把他改成能自动添加工作任务的。

增加class WorkPump,能够往WorkManager.workQueue添加工作任务。

那class WorkManager修改为:

# class WorkerManager:

# def __init__( self, num_of_workers=10, timeout = 1):

# self.workQueue = Queue.Queue()

# self.resultQueue = Queue.Queue()

# self.workers = []

# self.pump = WorkPump() 新增

# self.timeout = timeout

# self._recruitThreads( num_of_workers )

新增的这个我也想把它做成一个线程,我应该怎么调度增加工作任务的线程和其他下载任务的线程?
Re: 用 Python 实现的线程池11. davies 发表于 2007 年 09 月 11 日 8:46 a.m.

现在已经是可以一边下载一遍添加任务,调用add_job方法就可以了,新添加的任务会自动执行,当所有任务添加完成后,再调用wait_for_complete。
Re: 用 Python 实现的线程池12. 嘛呢 发表于 2007 年 09 月 11 日 10:05 a.m.

因为任务也不确定,只能说是一般添加一边下载

今天我试试用Condition
Re: 用 Python 实现的线程池13. davies 发表于 2007 年 09 月 11 日 4:37 p.m.

只要构造一个函数,加参数就行了,支持任意的任务,但得是线程安全的。

可以把线程的等待时间改长一些,否则没任务了它就退出了。

Comment: (no reply)
To post your comment, Please login first.