原文出处 http://blog.daviesliu.net/2006/10/09/234822/
用 Python 实现的线程池
davies 发表于 2006 年 10 月 9 日
为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。
下面是用Python实现的通用的线程池代码:
view plainprint?
1. import Queue, threading, sys
2. from threading import Thread
3. import time,urllib
4.
5. # working thread
6. class Worker(Thread):
7. worker_count = 0
8. def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):
9. Thread.__init__( self, **kwds )
10. self.id = Worker.worker_count
11. Worker.worker_count += 1
12. self.setDaemon( True )
13. self.workQueue = workQueue
14. self.resultQueue = resultQueue
15. self.timeout = timeout
16.
17. def run( self ):
18. ''' the get-some-work, do-some-work main loop of worker threads '''
19. while True:
20. try:
21. callable, args, kwds = self.workQueue.get(timeout=self.timeout)
22. res = callable(*args, **kwds)
23. print "worker[%2d]: %s" % (self.id, str(res) )
24. self.resultQueue.put( res )
25. except Queue.Empty:
26. break
27. except :
28. print 'worker[%2d]' % self.id, sys.exc_info()[:2]
29.
30. class WorkerManager:
31. def __init__( self, num_of_workers=10, timeout = 1):
32. self.workQueue = Queue.Queue()
33. self.resultQueue = Queue.Queue()
34. self.workers = []
35. self.timeout = timeout
36. self._recruitThreads( num_of_workers )
37.
38. def _recruitThreads( self, num_of_workers ):
39. for i in range( num_of_workers ):
40. worker = Worker( self.workQueue, self.resultQueue, self.timeout )
41. self.workers.append(worker)
42.
43. def start(self):
44. for w in self.workers:
45. w.start()
46.
47. def wait_for_complete( self):
48. # ...then, wait for each of them to terminate:
49. while len(self.workers):
50. worker = self.workers.pop()
51. worker.join( )
52. if worker.isAlive() and not self.workQueue.empty():
53. self.workers.append( worker )
54. print "All jobs are are completed."
55.
56. def add_job( self, callable, *args, **kwds ):
57. self.workQueue.put( (callable, args, kwds) )
58.
59. def get_result( self, *args, **kwds ):
60. return self.resultQueue.get( *args, **kwds )
import Queue, threading, sys from threading import Thread import time,urllib # working thread class Worker(Thread): worker_count = 0 def __init__( self, workQueue, resultQueue, timeout = 0, **kwds): Thread.__init__( self, **kwds ) self.id = Worker.worker_count Worker.worker_count += 1 self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue self.timeout = timeout def run( self ): ''' the get-some-work, do-some-work main loop of worker threads ''' while True: try: callable, args, kwds = self.workQueue.get(timeout=self.timeout) res = callable(*args, **kwds) print "worker[%2d]: %s" % (self.id, str(res) ) self.resultQueue.put( res ) except Queue.Empty: break except : print 'worker[%2d]' % self.id, sys.exc_info()[:2] class WorkerManager: def __init__( self, num_of_workers=10, timeout = 1): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.workers = [] self.timeout = timeout self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue, self.timeout ) self.workers.append(worker) def start(self): for w in self.workers: w.start() def wait_for_complete( self): # ...then, wait for each of them to terminate: while len(self.workers): worker = self.workers.pop() worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) print "All jobs are are completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) def get_result( self, *args, **kwds ): return self.resultQueue.get( *args, **kwds )
Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到 resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue 中获取任务超时,则线程结束。
WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。
一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。
view plainprint?
1. def test_job(id, sleep = 0.001 ):
2. try:
3. urllib.urlopen('https://www.gmail.com/').read()
4. except:
5. print '[%4d]' % id, sys.exc_info()[:2]
6. return id
7.
8. def test():
9. import socket
10. socket.setdefaulttimeout(10)
11. print 'start testing'
12. wm = WorkerManager(10)
13. for i in range(500):
14. wm.add_job( test_job, i, i*0.001 )
15. wm.start()
16. wm.wait_for_complete()
17. print 'end testing'
def test_job(id, sleep = 0.001 ): try: urllib.urlopen('https://www.gmail.com/').read() except: print '[%4d]' % id, sys.exc_info()[:2] return id def test(): import socket socket.setdefaulttimeout(10) print 'start testing' wm = WorkerManager(10) for i in range(500): wm.add_job( test_job, i, i*0.001 ) wm.start() wm.wait_for_complete() print 'end testing'
完成的程序可以在这里下载。
网友留言:
Re: 用 Python 实现的线程池1. fandatou 发表于 2006 年 12 月 11 日 3:25 p.m.
楼主,请问你的线程持timeout怎么设置是socket.setdefaulttimeout(10) 还是worker.Worker.timeout = 20
另外请问下,我开5个线程cpu就是100%,似乎google能跑300个线程。
Re: 用 Python 实现的线程池2. davies 发表于 2006 年 12 月 11 日 7:40 p.m.
你看一下代码吧,还是比较易读的。WorkerManager有timeout参数,socket.setdefaulttimeout()是与网络相关的,与线程没有关系。
开多少线程得看是做什么事情,如果是CPU密集型的,一个线程都会导致CPU 100%,如果是IO密集型,比如用urllib获取网页,则可以开多一些,我用过100多个,甚至可以更多。
Re: 用 Python 实现的线程池3. fandatou 发表于 2006 年 12 月 12 日 6:21 p.m.
我就是用来抓网页。用perl写开30个线程要用2G内存,用python写5线程只用40M内存但是cpu是100%。另外关于timeout能明确说明下吗?刚从perl转python还不熟悉基本是拿人家代码修改。正好google到你这里来。
class WorkerManager:
def __init__( self, num_of_workers=10, timeout = 2):
Worker.py中的这两行中的timeout就是线程timeout的时间吗?
Re: 用 Python 实现的线程池4. davies 发表于 2006 年 12 月 12 日 7:07 p.m.
对,这个 timeout 是线程等待任务的时间,如果在 timeout 时间内任务队列中没有要执行的任务,则会结束线程。这就要求在线程开始后,timeout 时间内添加足够的任务,让所有线程都在运行。
现在这样设计是不好的,只适合与事先知道要做什么这样的场合,要改成其它方式也容易
Re: 用 Python 实现的线程池5. fandatou 发表于 2006 年 12 月 14 日 1:03 a.m.
原来是任务的timeout啊,我以为是每个线程的timeout时间呢。我用你的worker,往往会出现意外,最后只剩下一个线程再跑,只好在线程任务中每个关键点都加上try
Re: 用 Python 实现的线程池6. davies 发表于 2006 年 12 月 14 日 8:25 p.m.
看看第28-29行,在执行任务出现意外时,会打印出错信息后继续执行下一个任务,只有当队列中没有其它任务时才会结束。有可能是你的timeout设得太短,而添加任务又相对较慢,导致任务还没加进去,就有线程结束了。
可以给WorkerManager加一个start函数,手动让所有线程开始工作。
Re: 用 Python 实现的线程池7. 老熊 发表于 2007 年 01 月 23 日 12:12 a.m.
环境:简体XP+python 2.4+mysql
初学python,使用了你这个链接池,遇到个奇怪的问题
你这里的test_job,我又调用了自己写外部的一个类,test_job是将URL地址,给外部类,然后由外部类对URL处理,包括read()等,返回值,test_job再将返回值插入数据库。
现在问题是,这个外部类,在非多线程情况下运行正常,能正常处理URL,包括read,parse等。但是在多线程运行时候就抛出exception:
class exceptions.UnicodeDecodeError at 0x00B64F90
百思不得其解~~~
Re: 用 Python 实现的线程池8. davies 发表于 2007 年 01 月 23 日 8:43 a.m.
看看是哪一句抛出的异常,可能你的外部类里有部分代码不是线程安全的,比如用到的第三方库之类
Re: 用 Python 实现的线程池9. 老熊 发表于 2007 年 01 月 23 日 9:33 a.m.
呵呵,你真早。
外部类中没有第三方的库,仅仅是string的方法,发现抛出异常的代码是String.find()方法,后来将
String.decode('utf-8','ignore'),然后再执行find,就通过了。
但让我真正疑问的是,这样的decode与多线程有关?因为段代码在非多线程情况下测试是正确的
方便的话,加gtalk交流:-)1380127(#)gmail.com
Re: 用 Python 实现的线程池10. 嘛呢 发表于 2007 年 09 月 11 日 1:58 a.m.
由于你这个只能完成设定好的工作,想把他改成能自动添加工作任务的。
增加class WorkPump,能够往WorkManager.workQueue添加工作任务。
那class WorkManager修改为:
# class WorkerManager:
# def __init__( self, num_of_workers=10, timeout = 1):
# self.workQueue = Queue.Queue()
# self.resultQueue = Queue.Queue()
# self.workers = []
# self.pump = WorkPump() 新增
# self.timeout = timeout
# self._recruitThreads( num_of_workers )
新增的这个我也想把它做成一个线程,我应该怎么调度增加工作任务的线程和其他下载任务的线程?
Re: 用 Python 实现的线程池11. davies 发表于 2007 年 09 月 11 日 8:46 a.m.
现在已经是可以一边下载一遍添加任务,调用add_job方法就可以了,新添加的任务会自动执行,当所有任务添加完成后,再调用wait_for_complete。
Re: 用 Python 实现的线程池12. 嘛呢 发表于 2007 年 09 月 11 日 10:05 a.m.
因为任务也不确定,只能说是一般添加一边下载
今天我试试用Condition
Re: 用 Python 实现的线程池13. davies 发表于 2007 年 09 月 11 日 4:37 p.m.
只要构造一个函数,加参数就行了,支持任意的任务,但得是线程安全的。
可以把线程的等待时间改长一些,否则没任务了它就退出了。