博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
进程池
阅读量:5239 次
发布时间:2019-06-14

本文共 6497 字,大约阅读时间需要 21 分钟。

进程池:

1.进程池初识,2.效率比较,3.同步和异步,4.进程池的返回值和回调函数,5.socket并发的服务端


为什么要有进程池?进程池的概念。

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

 

multiprocess.Pool模块:

Pool([numprocess  [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None3 initargs:是要传给initializer的参数组
参数介绍
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''   p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用主要方法
主要方法
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。obj.ready():如果调用完成,返回Trueobj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常obj.wait([timeout]):等待结果变为可用。obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
其他方法(了解)

 

进程池初识:

#进程池#开辟一个空间,固定放着固定数目的进程,比如5个。现在有50个任务,每次任务一执行,就去池子里找这5个进程,一次执行五个任务,#执行完的任务,把进程放回进程池,供其他任务继续使用。从而大大减少了进程的开启和销毁的内存占用,提高了效率#更高级的进程池  #···python里没有#假设 进程池最少的时候有n个进程,最多是m个#当我们访问网站,用户量多的时候,进程池的进程,就会根据算法,慢慢增加,增加到m个#当访问量,或者任务下降,进程池就会 减减减  减到最少 n个#信号量:是同一段代码,只能同一时间由几个进程执行,但是这些进程数都是要悉数开启和销毁的,如果有200个进程,这些进程都会占用内存#进程池:进程都是固定的,不用每次都开启和销毁

 

进程池和普通多进程的效率比较:

from multiprocessing import Pool,Processimport timedef func(n):    for i in range(10):        print(n+1)if __name__ == '__main__':    pool = Pool(5) #一般进程数是 cpu核数+1    start = time.time()    pool.map(func,range(100)) #开启了一百个任务,map是自带join,close的    #map的功能最多就只能这样了,如果想传更多参数,可以iterable里可以传tuple,list..    t1 = time.time() - start    start2 = time.time()    p_lst = []    for i in range(100):        p = Process(target=func,args=(i,))        p.start()        p_lst.append(p)    [p.join() for p in p_lst]    t2 = time.time() - start2    print(t1,t2) # 0.19 VS  2.61 进程池完胜
进程池和多进程-效率比较

 

同步: apply   异步: apply_async(func, args=(args1,args2,...),callback=func2):

import osimport timefrom multiprocessing import Pool,Processdef func(n):    print('%s 开始了~~~'%n,os.getpid())    time.sleep(1)    print('%s 结束了'%n,os.getpid())if __name__ == '__main__':    pool = Pool(5)    for i in range(10):        # pool.apply(func,args=(i,))  #apply 进程池的同步调用(自然不需要close,join),要等待进程中的任务完结,才会执行下一个任务.基本上不会用        pool.apply_async(func,args=(i,))#apply_async异步,  一但有任务结束了,它所使用的进程(pid)马上会被其他任务使用                                        #需要close和join,不然会导致主代码的结束,从而结束进程中的任务    pool.close() #结束进程池接收任务    pool.join()  #感知所有任务的结束,然后在执行主程序接下来的代码
同步和异步-进程池

 

#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC 进程间通信:队列(子进程put,主进程get),管道

#但是进程池 是可以直接接收子进程的返回值的
#apply的结果就是func的返回值

#通过get方法,获取 apply_async的返回值

#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC  进程间通信:队列(子进程put,主进程get),管道#但是进程池 是可以直接接收子进程的返回值的#apply的结果就是func的返回值from multiprocessing import Poolimport timedef func(n):    time.sleep(0.2)    return n*nif __name__ == '__main__':    pool = Pool(5)    ret_lst=[]    for i in range(10):        # ret = pool.apply(func, args=(i,))        ret = pool.apply_async(func,args=(i,))        #print(ret.get()) #get会阻塞着,直到拿到ret的结果。每个ret马上跟着一个get,所以会按顺序一个个print                        #一旦异步提交了一个任务,返回一个对象,这个对象获得一个get方法,获取这个任务的返回值,所以get可以替代close和join        ret_lst.append(ret)    for ret in ret_lst:print(ret.get())    # pool.close()    # pool.join()    # ret = pool.map(func,range(10))    # print(ret)  #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]    #因为map自带close和join,它会把所有的值都计算好了,再打印出来。    #所以虽然map简单,但是apply_async可以实时提交,还是要用后者
进程池的返回值

 

回调函数不能传参数,唯一的参数就是注册函数的返回值

回调函数import osfrom multiprocessing import Pooldef func1(n):    print('in func1',os.getpid())    return n*ndef func2(nn):    print('in func2',os.getpid())    print(nn)if __name__ == '__main__':    print('主进程 :',os.getpid())    p = Pool(5)    for i in range(10):        p.apply_async(func1,args=(10,),callback=func2) #回调函数不能传参数,唯一的参数就是注册函数的返回值    p.close()                                          #回调函数是在主进程里执行的    p.join()
进程池的回调函数
import requestsfrom multiprocessing import Pooldef get(url):  #让并发进程去获取url,把网络延时交给并发,把数据处理返回给主进程    res = requests.get(url)    if res.status_code == 200:        return url,res.content.decode('utf8')def call_back(args):    url,content = args    print(url,len(content))if __name__ == '__main__':    url_lst = [        'https://www.cnblogs.com/',        'http://www.baidu.com',        'https://www.sogou.com/',        'http://www.sohu.com/',    ]    p = Pool(5)    for url in url_lst:        p.apply_async(get,args=(url,),callback=call_back)    p.close()    p.join()
小爬虫-回调函数
url = r'https://movie.douban.com/subject/26281899/?from=subject-page'import requestsres = requests.get(url)print(res) #返回为一个 
print(res.status_code) #状态码:200print(res.content) #二进制的网页源码 (无格式版的)print(res.text) #网页源码# from urllib.request import urlopen# ret = urlopen(url)# print(ret.read().decode('utf8')) #获得一个跟原网页源码一样格式的内容
requests模块
# apply        # 同步的:只有当func执行完之后,才会继续向下执行其他代码        # ret = apply(func,args=())        # 返回值就是func的return    # apply_async        # 异步的:当func被注册进入一个进程之后,程序就继续向下执行        # apply_async(func,args=())        # 返回值 : apply_async返回的对象obj        #          为了用户能从中获取func的返回值obj.get()        # get会阻塞直到对应的func执行完毕拿到结果        # 使用apply_async给进程池分配任务,        # 需要先close后join来保持多进程和主进程代码的同步性
复习

 

使用进程池来实现socket服务端的并发:

import socketfrom multiprocessing import Pooldef server(conn):    conn.send(b'hi')    ret = conn.recv(1024).decode('utf8')    print(ret)    conn.send(b'hello')    conn.close()if __name__ == '__main__':    pool = Pool(5)    sk = socket.socket()    sk.bind(('127.0.0.1',8080))    sk.listen()    while True:        conn,addr = sk.accept()        pool.apply_async(server,args=(conn,))    sk.close()    # pool.close()    # pool.join()
server端
import socketsk = socket.socket()sk.connect(('127.0.0.1',8080))print(sk.recv(1024))msg = input('>>>> ')sk.send(msg.encode('utf8'))msg = sk.recv(1024).decode('utf8')print(msg)# sk.close()
client端

 

转载于:https://www.cnblogs.com/gkx0731/p/9744461.html

你可能感兴趣的文章
Visual FoxPro权威指南pdf
查看>>
HDU 2561 第二小整数
查看>>
这两天遇到iphone使用app store下载免费软件,必须验证付款信息才能购物是怎么回事???...
查看>>
Linux下中间人攻击利用框架bettercap测试
查看>>
ecshop hash登录 + wordpress mysql盲注字段
查看>>
接口和多态
查看>>
初涉三元环
查看>>
Python SMTP发送邮件
查看>>
庄子·内篇·养生主第三
查看>>
【LeetCode】137. Single Number II (3 solutions)
查看>>
物理模型name与comment互相转化
查看>>
markdown语法
查看>>
HDU 4553 约会安排
查看>>
使用其他模型分页$data = $this->paginate('MerchantProductOrder');
查看>>
BZOJ3456城市规划
查看>>
Oracle联机日志损坏解决办法
查看>>
python自学开始
查看>>
tomcat 查看和修改内存
查看>>
iOS:制作一个简易的计算器
查看>>
正则表达式
查看>>