python多进程

@[toc]

多线程和多进程的区分

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响;而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

python中,多线程并没有实际用户,并不能充分利用cpu资源 ,而多进程可以,所有,下面记录一下多进程的使用

多进程的基本使用

  • Process([group [, target [, name [, args [, kwargs]]]]])

  • 传参

    1
    2
    3
    4
    5
    6
    7
    8
    9
    target  <->  函数名a

    ​name <-> 子进程的别名(没有写的必要)

    ​args <-> a函数要传入的参数,为元组类型

    ​kwargs <-> 表示调用对象的字典

    ​group <-> 分组,实际上不会使用
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import multiprocessing
    import time

    def process(num):
    time.sleep(num)
    print('process ', num)

    if __name__ == '__main__':
    for i in range(5):
    p = multiprocessing.Process(target=process, args=(1,))
    p.start()

    print('cpu number:', multiprocessing.cpu_count())
    for p in multiprocessing.active_children():
    print('child process name: {} id: {}'.format(p.name, str(p.pid)))

    print("Process Ended")

daemon,join()

  • p.daemon = True,使父进程结束时,所有子进程也都被关闭

  • p.join(), 设置父进程等待所有子进程结束后再结束

  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from multiprocessing import Process
    import time


    class MyProcess(Process):
    def __init__(self, loop):
    Process.__init__(self)
    self.loop = loop

    def run(self):
    print(self.loop)
    for count in range(self.loop):
    time.sleep(1)
    print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))


    if __name__ == '__main__':
    for i in range(2, 5):
    p = MyProcess(i)
    # 设置daemon为True,程序将会在主进程完成后终止
    p.daemon = True
    p.start()
    # 设置join则让主(父)进程等待所有的子进程完成后结束
    p.join()

    print('Main process Ended')

Lock

  • Lock()加锁,使每个子进程在执行时,其他子进程处于等待状态

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class MyProcess(Process):
    def __init__(self, loop, lock):
    Process.__init__(self)
    self.loop = loop
    self.lock = lock

    def run(self):
    for count in range(self.loop):
    time.sleep(0.1)
    print('Pid = ', self.pid, 'LoopCount = ', count)

    if __name__ == '__main__':
    lock = Lock()
    for i in range(10, 15):
    p = MyProcess(i, lock)
    p.start()

Semaphore

  • semaphore是一个内置的计数器

    • 每当调用acquire()时,内置计数器-1
    • 每当调用release()时,内置计数器+1
    • 计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
  • 计数器的作用:

    如果在主机执行IO密集型任务的时候再执行这种类型的程序时,计算机就有很大可能会宕机。这时候就可以为这段程序添加一个计数器功能,来限制一个时间点内的线程数量。

    • 示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      import time
      import threading

      s1=threading.Semaphore(5) #添加一个计数器

      def foo():
      s1.acquire() #计数器获得锁
      time.sleep(2) #程序休眠2秒
      print("ok",time.ctime())
      s1.release() #计数器释放锁


      for i in range(20):
      t1=threading.Thread(target=foo,args=()) #创建线程
      t1.start() #启动线程

进程间得通信

Queue
  • 进程间通信必须通过Queue队列,不然起不到效果, 这里的队列可以使用multiprocessing.Queue

  • 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
Pipe
  • 介绍

    Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    from multiprocessing import Process, Pipe


    class Consumer(Process):
    def __init__(self, pipe):
    Process.__init__(self)
    self.pipe = pipe

    def run(self):
    self.pipe.send('Consumer Words')
    print('Consumer Received:', self.pipe.recv())


    class Producer(Process):
    def __init__(self, pipe):
    Process.__init__(self)
    self.pipe = pipe

    def run(self):
    print('Producer Received:', self.pipe.recv())
    self.pipe.send('Producer Words')


    if __name__ == '__main__':
    pipe = Pipe()
    p = Producer(pipe[0])
    c = Consumer(pipe[1])
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print('Ended!')

Pool

  • 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

    Pool的用法有阻塞和非阻塞两种方式。非阻塞即为添加进程后,不一定非要等到改进程执行完就添加其他进程运行,阻塞则相反。

    个人理解: 开启多个Process,还需要通过Semaphore来控制阻塞或非阻塞,而使用Pool可以方便得解决这个问题,省去手动限制进程数量的工作。

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    from multiprocessing import Lock, Pool
    import time


    def function(index):
    print 'Start process: ', index
    time.sleep(3)
    print 'End process', index


    if __name__ == '__main__':
    pool = Pool(processes=3)
    for i in xrange(4):
    pool.apply_async(function, (i,)) # 非阻塞的用法
    # pool.apply(function, (i,)) # 阻塞的用法

    print "Started processes"
    pool.close()
    pool.join()
    print "Subprocess done."

感谢

本文是自己学习多线程和多进程过程中记录的知识点,内容大多来自下面两个大大的文章

Python爬虫进阶六之多进程的用法

进程与线程