在运维工作中,随着业务和系统的发展,集群规模会随之增长,就会面临一个大规模, 批量运维的难题,这也就要求了,日常的操作,变更,查询,维护任务要能够系统化。 并尽可能做到,并发或者并行。运维工具脚本语言中 python 就成了一个很有用利器。 python的多线程在实际工作中,还是能够提高很大的工作效率的。
python多线程的概述和一些不足
python是支持多线程的,并且是native的线程。主要是通过thread和threading这两个 模块来实现的。thread模块是比较底层的,threading模块是对thread做了一些包装 threading 模块按照OOP的思想对thread做了封装,主要线程的操作对象化了,创建了 叫Thread的class 。
####1).多线程
一般来说,使用线程有3种模式: a.创建任务函数,讲函数传递给thread对象执行 ; b.传入一个可调用的对象,python 对象都是我们所说的可调用的,类的对象也是可以调用的, 当被调用时会自动调用对象的内建方法__call__(),因此这种新建线程的方法就是给线程指 定一个__call__方法被重载了的对象 ; c.创建一个类,从Thread继承,把线程执行的代码放到这个新的class里面的run
如下 :
1 : thread.start_new_thread ( function, args[, kwargs] ) function - 线程函数。 args - 传递给线程函数的参数,他必须是个tuple类型 like thie (1,2) kwargs - 可选参数。
样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import sys, os , thread # 为线程定义 task 函数 def prinft( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) # 创建两个线程 try: thread.start_new_thread( prinft, ("Thread-1", 2, ) ) thread.start_new_thread( prinft, ("Thread-2", 4, ) ) except: print "Error: unable to start thread"
2 : 传入可调用对象 样例如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threading #可调用的类 class Callable(object): def __init__(self, func, args): self.func = func; self.args = args; def __call__(self): apply(self.func, self.args); #用于线程执行的函数 def prinft( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) if __name__ == '__main__': #初始化一个线程对象,传入可调用的Callable对象,并用函数counter及其参数1000初始化这个对象 th = threading.Thread(target=Callable(prinft, ('th1',5))); #启动线程 th.start(); #主线程阻塞等待子线程结束 th.join();
3 : 继承thread类 创建类 prinft 继承自 threading.Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #线程类创建 class PRINFT(threading.Thread): def __init__(self,threadname, delay): threading.Thread.__init__(self) self.name = threadname self.delay = delay def run(self): threadName = self.name count = 0 while count < 5: time.sleep(self.delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) #创建两个线程 consumers = [ PRINFT("thread-"+str(i) ,i+1 , ) for i in xrange(2) ] #开启线程 for w in consumers: w.start() #线程同步 for w in consumers: w.join() 注意:目前使用多线程的情况,就是处理很多同质的任务的时候;为了提高性能 大多是使用的生产者和消费者模型来组织代码结构。但是Python的thread有天生的缺陷(GIL) 在计算密集型任务中性能会很差,而在io密集型的任务中,IO期间线程会释放解释器 性能还是有比较大的提升的,特别是磁盘io密集操作,网络或者数据库相关的任务
####2). multi-process 多进程
普通的任务或者运维脚本按照上述的两种结构基本上可以完成任务,当然性能上由于GIL的局限肯定有很大 的影响。Python 3.X 中有所改善。低版本的Python 引入的多线程的替代品,multiprocessing 多进程实现真正意义的并发
多进程编程模型
多进程编程模型和多线程模型基本上一致的,只是应用的lib和调用的方法不一样 。 进程的创建方式跟线程完全一致,只不过要将threading.Thread换成multiprocessing.Process。 multiprocessing模块尽力保持了与threading模块在方法名上的一致性.
创建进程池
该模块还允许一次创建一组进程,然后再给他们分配任务,另外包中有很多方法,具体见文档手册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import multiprocessing import time def prinft( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 return "%s: %s" % ( threadName, time.ctime(time.time()) ) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = []#获得结果 #如无需返回结果 可以直接 pool.apply_async(func, (msg, xx )) for i in xrange(10): msg = "hello %d" %(i) result.append(pool.apply_async(prinft, (msg, ))) pool.close() pool.join() for res in result: print res.get()
使用进程好处
完全并行,无GIL的限制,可充分利用多cpu多核的环境;可以接受linux信号,后面将看到, 这个功能非常好用
####3). multiprocessing.dummy 简化
最近看到一个老外的 blog 讲解 multiprocessing.dummy ,感觉很方便,.按照我的理解来记录。 dummy 是多进程的 multiprocessing 的一个子包,其中其实引用了map的方式来进行简化多线程 Map能够处理集合按顺序遍历,最终将调用产生的结果保存在一个简单的集合当中,因为在引入需要的包 文件后,Map能大大简化并发的复杂度!
multiprocessing & multiprocessing.dummy 都是支持 Map 的。Digression这是啥东西?没听说过线程引用叫dummy的多进程包文件。我也是直到最近才知道。它在多进程的说明文档中也只被提到了一句。它的效果也只是让大家直到有这么个东西而已。这可真是营销的失误! Dummy是一个多进程包的完整拷贝。唯一不同的是,多进程包使用进程,而dummy使用线程(自然也有Python本身的一些限制)。所以一个有的另一个也有。这样在两种模式间切换就十分简单,并且在判断框架调用时使用的是IO还是CPU模式非常有帮助
#####准备开始
准备使用带有并发的map功能首先要导入相关包文件:
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
然后初始化:
pool = ThreadPool()
具体来讲,它首先创建一些有效的worker启动它并将其保存在一些变量中以便随时访问。 pool对象需要一些参数,但现在最紧要的就是:进程。它可以限定线程池中worker的数量。 如果不填,它将采用系统的内核数作为初值.
一般情况下,如果你进行的是计算密集型多进程任务,内核越多意味着速度越快(当然这是有前提的)。但如果是涉及到网络计算方面,影响的因素就千差万别。所以最好还是能给出合适的线程池大小数。
pool = ThreadPool(4) # Sets the pool size to 4
如果运行的线程很多,频繁的切换线程会十分影响工作效率。所以最好还是能通过调试找出任务调度的时间平衡点。好的,既然已经建好了线程池对象还有那些简单的并发内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import urllib2 from multiprocessing.dummy import Pool as ThreadPool urls = [ 'http://www.python.org', 'http://www.python.org/about/', 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html', 'http://www.python.org/doc/', 'http://www.python.org/download/', 'http://www.python.org/getit/', 'http://www.python.org/community/', 'https://wiki.python.org/moin/', 'http://planet.python.org/', 'https://wiki.python.org/moin/LocalUserGroups', 'http://www.python.org/psf/', 'http://docs.python.org/devguide/', 'http://www.python.org/community/awards/' # etc.. ] # Make the Pool of workers pool = ThreadPool(4) # Open the urls in their own threads # and return the results results = pool.map(urllib2.urlopen, urls) #close the pool and wait for the work to finish pool.close() pool.join()
注 : 这是一个完整的例子,pool的size 要经过调试,然后再进行设置。不过编程模型就是这一的。 关于多线程和多进程就到这里 。 还有实际开发工具和脚本的时候,面向对象的方式来进行开发和设计 是一个很好的体验。讲任务对象化 。 本篇blog 是以Python特性维度来进行的,后面会再根据任务类型来 深入总结。