Python 多线程
分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。
10.1 单进程的时代
在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序来执行任务。假如我们创建了两个任务,听音乐(music)和看电影(move),在单线程中我们只能按先后顺序来执行这两个任务。
Onethread.py
#coding=utf-8
from time import sleep, ctime
def music():
print 'I was listening to music! %s' %ctime()
sleep(2)
def move():
print 'I was at the movies! %s' %ctime()
sleep(5)
if __name__ == '__main__':
music()
move()
print 'allend:', ctime()
别创建了两个任务music 和move,执行music 需要2 秒,执行move 需要5 秒,通过sleep()
方法设置休眠时间来模拟任务的运行时间。
给music() 和move()两个方法设置参数,用于接收要播放的歌曲和视频,通过for 循环控制播放的次数,分别让歌曲和电影播放了2 次:onethread_with_pri.py:
#coding=utf-8
from time import sleep, ctime
def music(func):
for i in range(2):
print 'I was listening to music %s! %s'%(func,ctime())
sleep(2)
def move(func):
for i in range(2):
print 'I was at the movies %s! %s'%(func,ctime())
sleep(5)
if __name__ == '__main__':
music(u'爱情买卖')
move(u'阿凡达')
print 'all end:', ctime()
10.2 多线程技术
Python 通过两个标准库thread和threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于Java 的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python 中则是独立的对象。
10.2.1 threading 模块
避免使用thread 模块,因为是它不支持守护线程。当主线程退出时,所有的子线程不论它
们是否还在工作,都会被强行退出。threading模块则支持守护线程。Threads.py:
#coding=utf-8
from time import sleep, ctime
import threading
def music(func):
for i in range(2):
print 'I was listening to music %s! %s'%(func,ctime())
sleep(2)
def move(func):
for i in range(2):
print 'I was at the movies %s! %s'%(func,ctime())
sleep(5)
threads=[]
t1=threading.Thread(target=music,args=(u'爱情买卖',))
threads.append(t1)
t2=threading.Thread(target=move,args=(u'阿凡达',))
threads.append(t2)
if __name__ == '__main__':
for i in threads:
i.start()
#keep thread
for i in threads:
i.join()
print 'all end:', ctime()
import threading 引入线程模块。
threads = [] 创建线程数组,用于装载线程。
threading.Thread()通过调用threading模块的Thread()方法来创建线程。
start() 开始线程活动。
join() 等待线程终止。
通过for 循环遍历thread 数组中所装载的线程;然后通过start()函数启动每一个线程。
join()会等到线程结束,或者在给了timeout 参数的时候,等到超时为止。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
10.2.2 优化线程的创建
从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),如果创建的线程较多时这样极其不方便。Player.py
#coding=utf-8
from time import sleep, ctime
import threading
def music(func):
for i in range(2):
print 'I was listening to music %s! %s'%(func,ctime())
sleep(2)
def move(func):
for i in range(2):
print 'I was at the movies %s! %s'%(func,ctime())
sleep(5)
def player(name):
r=name.split(".")[1]
if r=="mp3" orr=="MP3":
music(name)
elif r=="mp4" or r=="MP4":
move(name)
else:
print "Error:the format is notrecognized!"
list=["the sale oflove.mp3","love.MP3","a fan da.mp4","the sale oflove.MP4"]
threads=[]
files=range(len(list))
for i in files:
t=threading.Thread(target=player,args=(list[i],))
threads.append(t)
if __name__ == '__main__':
for i in files:
threads[i].start()
#keep thread
for i in files:
threads[i].join()
print 'all end:', ctime()
创建了一个player()函数,这个函数用于判断播放文件的类型。如果是mp3 格式的,
我们将调用music()函数,如果是mp4 格式的我们调用move()函数。哪果两种格式都不是那么只能告诉用户你所提供有文件我播放不了。
然后创建了一个list 的文件列表,注意为文件加上后缀名。然后我们用len(list) 来计算list列表有多少个文件,确定循环次数。
接着通过一个for 循环,把list 中的文件添加到线程中数组threads[]中。接着启动threads[]线程组,最后打印结束时间。
现在向list 数组中添加一个文件,程序运行时会自动为其创建一个线程。
通过上面的程序,我们发现player()用于判断文件扩展名,然后调用music()和move() ,其实,music()和move()完整工作是相同的:Super_player.py
#coding=utf-8
from time import sleep, ctime
import threading
def super_player(name,time):
for i in range(2):
print 'Start playing...%s!%s' %(file,ctime())
sleep(time)
dic={"love.mp3":3,"love.rmvb":103,"love.mp4":65}
threads=[]
files=range(len(dic))
for file,time in dic.items():
t=threading.Thread(target=super_player,args=(file,time))
threads.append(t)
if __name__ == '__main__':
for i in files:
threads[i].start()
#keep thread
for i in files:
threads[i].join()
print 'all end:', ctime()
首先创建字典list ,用于定义要播放的文件及时长(秒),通过字典的items()方法来循环的取file和time,取到的这两个值用于创建线程。接着创建super_player()函数,用于接收file 和time,用于确定要播放的文件及时长。最后是线程启动运行。
10.2.3 创建线程类
创建自己的线程类:mythread.py
#coding=utf-8
import threading
from time import sleep,ctime
class MyThread(threading.Thread):
def __init__(self,func,args,name=" "):
threading.Thread.__init__(self)
self.name=name
self.func=func
self.args=args
def run(self):
apply(self.func,self.args)
def super_player(file,time):
for i in range(2):
print 'Starting playing:%s!\t%s\n' %(file,ctime())
sleep(time)
dic={"the sale oflove.mp3":3,"a fan da.mp4":6}
threads=[]
files=range(len(dic))
for file,time in dic.items():
t=MyThread(super_player,(file,time),super_player.__name__)
threads.append(t)
if __name__ == '__main__':
for i in files:
threads[i].start()
for i in files:
threads[i].join()
print 'end:%s' %ctime()
MyThread(threading.Thread)
创建MyThread 类,用于继承threading.Thread 类。
__init__() 类的初始化方法对func、args、name 等参数进行初始化。
apply() 当函数参数已经存在于一个元组或字典中时,间接地调用函数。args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。
10.3 多进程技术
10.3.1 mutiprocessing 模块
multiprocessing 提供了本地和远程的并发性,有效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。。由于GIL 的存在,在CPU 密集型的程序当中,使用多线程并不能有效地利用多核CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。
multiprocessing 模块的Process实现了多进程。process.py:
#coding=utf-8
from timeimport sleep, ctime
import multiprocessing
def super_player(f,time):
for i in range(2):
print 'Start playing...%s! %s'%(f,ctime())
sleep(time)
dic={"love.mp3":3,"love.rmvb":4,"love.mp4":5}
process=[]
files=range(len(dic))
for f,time indic.items():
t=multiprocessing.Process(target=super_player,args=(f,time))
process.append(t)
if __name__ =='__main__':
for i in files:
process[i].start()
#keep thread
for i in files:
process[i].join()
print 'all end:%s'%ctime()
multiprocessing.Process 对象来创建一个进程。Process对象与Thread 对象的用法相同,也有start(),run(), join()的方法。multiprocessing.Process(group=None,target=None, name=None, args=(), kwargs={})target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。Group 实质上不使用。多程的结果显示是以进程
组为顺序进行显示的。在*nix 上面创建的新的进程使用的是fork
10.3.2 Pipe 和 Queue
multiprocessing 包中有Pipe 类和Queue 类来分别支持这两种IPC 机制。Pipe 和Queue 可以用来传送常见的对象。
(1)Pipe可以是单向(half-duplex),也可以是双向(duplex),通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从PIPE 一端输入对象,然后被PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。pipe.py: 注:本程序只能在linux/Unix上运行
#coding=utf-8
import multiprocessing
def proc1(pipe):
pipe.send('hello')
print('proc1 rec:',pipe.recv())
def proc2(pipe):
print('proc2 rec:',pipe.recv())
pipe.send('hello, too')
pipe = multiprocessing.Pipe()
p1 =multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 =multiprocessing.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
这里的Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe 的一端(Connection 对象)。我们对Pipe 的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
(2) Queue 与Pipe 相类似,都是先进先出的结构。但Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。queue.py: 注:本程序只能在linux/Unix上运行:
#coding=utf-8
import multiprocessing
import time,os
#input worker
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)
#output worker
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()
#Main
record1 = [] # store input processes
record2 = [] # store outputprocesses
lock = multiprocessing.Lock()#addlock ,avoid to san luan daying
queue = multiprocessing.Queue(3)
#input processes
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
#output processes
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close()
for p in record2:
p.join()
10.4 应用于自动化测试
10.4.1 应用于自动化测试项目
为实现多进程运行测试用例,我需要对文件结构进行调整:Thread\TestProject
/test_project/thread1/baidu_sta.py -----测试用例
/thread1/__init__.py
/thread2/youdao_sta.py -----测试用例
/thread2/__init__.py
/report/ ----测试报告目录
/all_tests_pro.py
创建了thread1 和thread2 两个文件夹,分别放入了两个测试用例; 下面我们编写
all_tests_process.py 文件来通过多进程来执行测试用例。test_all_pro.py:
#coding=utf-8
import unittest, time, os,multiprocessing
import HTMLTestRunner
def EEEcreatsuit():
casedir=[]
listaa=os.listdir('\\Users\\ewang\\Desktop\\Python_Selenium2\\Thread\\TestProject')
print listaa
for xx in listaa:
if "thread" in xx:
casedir.append(xx)
suite=[]
for n in casedir:
testunit=unittest.TestSuite()
discover=unittest.defaultTestLoader.discover(n,pattern ='test*.py',top_level_dir=n)
print discover
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
suite.append(testunit)
print "===casedir:%r====" %casedir
print "+++++++++++++++++++++++++++++++++++++++++++++++"
print "!!!suite:%r!!!" %suite
return suite,casedir
def EEEEEmultiRunCase(suite,casedir):
now =time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time()))
filename = '.\\report/'+now+'result.html'
fp = file(filename, 'wb')
proclist=[]
s=0
for i in suite:
runner =HTMLTestRunner.HTMLTestRunner(stream=fp,title=u'测试报告',description=u'用例执行情况:' )
proc =multiprocessing.Process(target=runner.run(i),args=(i,))
proclist.append(proc)
s=s+1
for proc in proclist: proc.start()
for proc in proclist: proc.join()
fp.close()
if __name__ == "__main__":
runtmp=EEEcreatsuit()
EEEEEmultiRunCase(runtmp[0],runtmp[1])
定义casedir 数组,读取test_project 目录下的文件夹,找到文件夹的名包含“thread”的文件夹添加到casedir 数组中.
定位suite 数组,for 循环读取casedir数组中的数据(即thread1 和thread2 两个文件夹)。通过discover 分别读取文件夹下匹配test*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的suite 数组中。
在整个EEEcreatsuite1()函数中返回suite 和casedir 两个数组的值。
定义proclist()函数,for 循环把suite数组中的用例执行结果写入HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到proclist 数组中,for 循环proc.start()开始进程活动,proc.join()等待线程终止。
10.4.2 应用于Grid2分布式测试
Selenium Grid 只是提供多系统、多浏览器的执行环境,Selenium Grid 本身并不提供并行的执行策略。也就是说我们不可能简单地丢给SeleniumGrid 一个test case 它就能并行地在不同的平台及浏览器下运行。如果您希望利用Selenium Grid 分布式并行的执行测试脚本,那么需要结束Python 多线程技术。
启动Selenium Server
在本机打开两个命令提示符窗口分别:
启动一个hub (默认端口4444),ip 地址为:172.16.10.66
>java -jar selenium-server-standalone-2.39.0.jar -role hubtandalone-2.39.0.jar-role hub
启动一个本地node(节点)
>java -jar selenium-server-standalone-2.39.0.jar -role node -port 5555 -jarselenium-server-stan启动一个远程node(节点),ip 为:172.16.10.34:
>java -jar selenium-server-standalone-2.39.0ar -role node -port 5555 -hubhttp://172.16.10.66:4444/grid/register
fnngj@fnngj-VirtualBox:~/selenium$java -jar selenium-server-standalone-2.39.0ar -role node -po
grid_thread.py
</pre><pre name="code" class="python"><pre name="code" class="python">#coding=utf-8
from threading import Thread
from selenium import webdriver
import time
list ={'http://127.0.0.1:4444/wd/hub':'chrome',
'http://127.0.0.1:5555/wd/hub':'internet explorer',
'http://172.16.10.34:5555/wd/hub':'firefox'
}
def test_baidu(host,browser):
print 'start:%s' %time.ctime()
print host,browser
driver = webdriver.Remote( command_executor=host,
desired_capabilities={'platform': 'ANY',
'browserName':browser,
'version': '',
'javascriptEnabled': True
})
driver.get('http://www.baidu.com')
driver.find_element_by_id("kw").send_keys(browser)
driver.find_element_by_id("su").click()
sleep(2)
driver.close()
threads = []
files = range(len(list))
for host,browser in list.items():
t = Thread(target=test_baidu,args=(host,browser))
threads.append(t)
if __name__ == '__main__':
for i in files:
threads[i].start()
for i in files:
threads[i].join()
print 'end:%s' %time.ctime()
</pre><pre>
到此,我们才真正解决了同一个用例不同的主机与浏览器下并行执行的目的。Grid主要实现在本机与远程主机启动多个节点的作用,要想实现并行的效果还需要借助Python的多线程技术。