博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
九、python基础(多进程、进程队列Queues、Pipes管道、Managers、进程池、selery 分布式任务队列)...
阅读量:5111 次
发布时间:2019-06-13

本文共 4586 字,大约阅读时间需要 15 分钟。

多进程multiprocessing

 is a package that supports spawning processes using an API similar to the  module. The  package offers both local and remote concurrency, effectively side-stepping the  by using subprocesses instead of threads. Due to this, the  module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

1
2
3
4
5
6
7
8
9
10
from 
multiprocessing 
import 
Process
import 
time
def 
f(name):
    
time.sleep(
2
)
    
print
(
'hello'
, name)
 
if 
__name__ 
=
= 
'__main__'
:
    
= 
Process(target
=
f, args
=
(
'bob'
,))
    
p.start()
    
p.join()

To show the individual process IDs involved, here is an expanded example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from 
multiprocessing 
import 
Process
import 
os
 
def 
info(title):
    
print
(title)
    
print
(
'module name:'
, __name__)
    
print
(
'parent process:'
, os.getppid())
    
print
(
'process id:'
, os.getpid())
    
print
(
"\n\n"
)
 
def 
f(name):
    
info(
'\033[31;1mfunction f\033[0m'
)
    
print
(
'hello'
, name)
 
if 
__name__ 
=
= 
'__main__'
:
    
info(
'\033[32;1mmain process line\033[0m'
)
    
= 
Process(target
=
f, args
=
(
'bob'
,))
    
p.start()
    
p.join()

 例子:

 

进程间通讯  

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

使用方法跟threading里的queue差不多,但是必须from  multiprocessing import  Queue 。其中Q是大写的,注意,要把生成的队列q作为参数传递给子进程(相当于克隆了一个q,当时其实底层就是两个q之间是再通过pickle序列化和反序列化再通信的,我们编程可以不关注底部通信),不然会报错,无法共享队列数据,另外,不能把一个线程的队列传给另外一个进程,只能在进程和进程间进行队列操作,或者线程和线程之间进行,再就是一个进程和里面的线程之间进行操作

注意:必须要把q作为参数传给另一个进程

1
2
3
4
5
6
7
8
9
10
11
from 
multiprocessing 
import 
Process, Queue
 
def 
f(q):
    
q.put([
42
None
'hello'
])
 
if 
__name__ 
=
= 
'__main__'
:
    
= 
Queue()
    
= 
Process(target
=
f, args
=
(q,))
    
p.start()
    
print
(q.get())    
# prints "[42, None, 'hello']"
    
p.join()

Pipes(管道)

The  function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

1
2
3
4
5
6
7
8
9
10
11
12
from 
multiprocessing 
import 
Process, Pipe
 
def 
f(conn):
    
conn.send([
42
None
'hello'
])
    
conn.close()
 
if 
__name__ 
=
= 
'__main__'
:
    
parent_conn, child_conn 
= 
Pipe()
    
= 
Process(target
=
f, args
=
(child_conn,))
    
p.start()
    
print
(parent_conn.recv())  
   
# prints "[42, None, 'hello']"
    
p.join()
 

The two connection objects returned by  represent the two ends of the pipe. Each connection object has  and  methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

 

管道生成了之后会生成两个数据,例如:parent-conn,child-conn=Pipe(),管道两头一个放父进程,一个放子进程,放随无所谓,没有顺序,都可以,通过send和recv发和收消息
 
Managers
 

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support (可以实现好多方法共享,默认给进程加锁了,实现多进程间共享数据)types , , , , , , , , , , ,  and . For example,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from 
multiprocessing 
import 
Process, Manager
 
def 
f(d, l):
    
d[
1
= 
'1'
    
d[
'2'
= 
2
    
d[
0.25
= 
None
    
l.append(
1
)
    
print
(l)
 
if 
__name__ 
=
= 
'__main__'
:
    
with Manager() as manager:
        
= 
manager.
dict
()
 
        
= 
manager.
list
(
range
(
5
))
        
p_list 
= 
[]
        
for 
in 
range
(
10
):
            
= 
Process(target
=
f, args
=
(d, l))
            
p.start()
            
p_list.append(p)
        
for 
res 
in 
p_list:
            
res.join()
 
        
print
(d)
        
print
(l)

 

进程同步

Without using the lock output from the different processes is liable to get all mixed up.这个锁存在的意义是屏幕共享,导致屏幕打印时不会乱,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from 
multiprocessing 
import 
Process, Lock
 
def 
f(l, i):
    
l.acquire()
    
try
:
        
print
(
'hello world'
, i)
    
finally
:
        
l.release()
 
if 
__name__ 
=
= 
'__main__'
:
    
lock 
= 
Lock()
 
    
for 
num 
in 
range
(
10
):
        
Process(target
=
f, args
=
(lock, num)).start()

 

进程池  

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

在windows系统上启动多进程,要if __name__=='__main__':(主要作用是将其下面的代码只在本模块执行,不能在其他模块调用运行下面的代码,可以不用from multiprocessing import freeze_support)

进程池中有两个方法:

  • apply  串行
  • apply_async  并行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from  
multiprocessing 
import 
Process,Pool
import 
time
 
def 
Foo(i):
    
time.sleep(
2
)
    
return 
i
+
100
 
def 
Bar(arg):
    
print
(
'-->exec done:'
,arg)
 
pool 
= 
Pool(
5
)
 
for 
in 
range
(
10
):
    
pool.apply_async(func
=
Foo, args
=
(i,),callback
=
Bar)
    
#pool.apply(func=Foo, args=(i,))
 
print
(
'end'
)
pool.close()
pool.join()
#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

例子中的callback好处:该函数直接在主进程执行,比如写日志的时候,直接是主进程和数据库连接,不需要每个子进程单独再去连接数据库,

注意:进程池只能先close,再join

 

selery 分布式任务队列

 https://www.cnblogs.com/alex3714/p/6351797.html

转载于:https://www.cnblogs.com/daemon-czk/p/9764200.html

你可能感兴趣的文章
永远的动漫,梦想在,就有远方
查看>>
springboot No Identifier specified for entity的解决办法
查看>>
慵懒中长大的人,只会挨生活留下的耳光
查看>>
"远程桌面连接--“发生身份验证错误。要求的函数不受支持
查看>>
【BZOJ1565】 植物大战僵尸
查看>>
视频:"我是设计师"高清完整版Plus拍摄花絮
查看>>
VALSE2019总结(4)-主题报告
查看>>
浅谈 unix, linux, ios, android 区别和联系
查看>>
51nod 1428 活动安排问题 (贪心+优先队列)
查看>>
中国烧鹅系列:利用烧鹅自动执行SD卡上的自定义程序(含视频)
查看>>
Solaris11修改主机名
查看>>
latex for wordpress(一)
查看>>
如何在maven工程中加载oracle驱动
查看>>
Flask 系列之 SQLAlchemy
查看>>
aboutMe
查看>>
【Debug】IAR在线调试时报错,Warning: Stack pointer is setup to incorrect alignmentStack,芯片使用STM32F103ZET6...
查看>>
一句话说清分布式锁,进程锁,线程锁
查看>>
python常用函数
查看>>
FastDFS使用
查看>>
服务器解析请求的基本原理
查看>>