python与多线程

多线程
  • 让许多独立的任务同时运行
  • 使用多线程编程时,一个编程任务可以规划成几个执行特定函数的线程
    UserRequestThread:负责读取客户端输入,该输入可能来自I/O通道。程序将创建多个线程,每个客户端各一个,客户端的请求将会被放入队列中。
    RequestProcessor:该程序负责从队列中获取请求并进行处理,为第3个线程提供输出。
    ReplyThread:负责向用户输出,将结果传回给用户(如果是网络应用),或者把数据写到本地文件系统或数据库中。
  • 多线程编程对于具有如下特点的编程任务而言是非常理想的:
    本质上是异步的;
    需要多个并发活动;
    每个活动的处理顺序可能是不确定的(或者说是随机的、不可预测的)
进程与线程
进程(有时称为重量级进程)
  • 计算机程序只是存储在磁盘上的可执行二进制(或其他类型)文件。而进程就是一个正在执行的程序,并且一个CPU同一时间只能运行一个进程
  • 进程也可以通过派生(fork或spawn)新的进程来执行其他任务,不过因为每个新进程也都拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息
线程(有时称为轻量级进程)
  • 与进程类似,不过它们是在同一个进程下的
  • 包括开始、执行顺序和结束三部分
  • 有一个指令指针,用于记录当前运行的上下文
  • 当其他线程运行时,它可以被抢占(中断)和临时挂起(也称sleep)—— 这种方法叫做让步
进程与线程
  • 线程是最小的执行单元,而进程由至少一个线程组成
  • 多线程的执行方式与多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样(当然,真正地同时执行多线程需要多核CPU才能实现)
  • 一个进程至少由一个线程构成,其各个线程与主线程共享同一片数据空间,相比于独立的进程而言,线程间的信息共享和通信更加容易
守护线程
  • 当主线程退出时,所有子线程都将停止,不管它们是否仍在工作
  • 守护线程一般是一个等待客户端请求服务的服务器,如果没有客户端请求,守护线程就是空闲的,如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成
Python与多线程
全局解释器锁(GIL)
  • Python代码的执行是由Python虚拟机(又名解释器主循环)进行控制的,Python在设计时是这样考虑的,在主循环中同时只能有一个控制线程在执行,就像单核CPU系统中的多进程一样。内存中可以有许多程序,但是在任意给定时刻只能有一个程序在运行。同理,尽管Python解释器中可以运行多个线程,但是在任意给定时刻只有一个线程会被解释器执行
  • 《python核心编程》里这样写道

    对Python虚拟机的访问是由GIL控制的。这个锁就是用来保证同时只能有一个线程运行的。在多线程环境中,Python虚拟机将按照下面所述的方式执行
    1.设置GIL
    2.切换进一个线程去运行
    3.执行下面操作之一
        a. 指定数量的字节码指令
        b. 线程主动让出控制权(可以调用time.sleep(0)来完成)
    4.把线程设置回睡眠状态(切换出线程)
    5.解锁GIL
    6.重复上述步骤

  • 由于Python的GIL的限制,多线程更适合于I/O密集型应用(I/O释放了GIL,可以允许更多的并发),而不是计算密集型应用。
几个模块
  • threading(相比较而言,thread模块有诸多缺点)
    使用Thread类,可以有很多方法来创建线程

1.创建Thread的实例,给它一个函数
2.创建Thread的实例,给它一个可调用的类实例
3.派生Thread的字类,并创建子类的实例(class MyThread(threading.Thread))
import threading

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self):
            self.name = name
            self.func = func
            self.args = args

    def yourFunc():
        pass
        ...

    def run(self):
        your code
        ...
  • subprocess
    这是派生进程的主要替代方案,可以单纯地执行任务,或者通过标准文件(stdin、stdout、stderr)进行进程间通信。
  • multiprocessing
    该模块允许为多核或多CPU派生进程,其接口与threading模块非常相似。该模块同样也包括在共享任务的进程间传输数据的多种方式
    from multiprocessing import Pool引用进程池
  • threadpool
    python库中的线程池,基本用法为:
1
2
3
4
>>> pool = threadpool.ThreadPool(poolsize)    
>>> requests = threadpool.makeRequests(some_callable, list_of_args, callback)
>>> [pool.putRequest(req) for req in requests]
>>> pool.wait()

将《python核心编程》(第3版)示例4-12的生产者-消费者改成线程池

  • concurrent.futures
    一个新的高级库,它只在“任务”级别进行操作,也就是说,你不再需要过分关注同步和线程/进程的管理了。你只需要指定一个给定了“worker”数量的线程/进程池,提交任务,然后整理结果。
  • Queue
    供多线程使用的同步先入先出队列
同步原语
  • 在多线程代码中,当一些特定的函数或代码快不希望(或不应该)被多个线程同时执行,通常包括修改数据库、更新文件或其他会产生竞态条件的类似情况发生,就需要使用同步。
  • 锁示例
    多个线程对同一临界区进行访问与操作
    I/O与访问相同的数据结构都属于临界区,因此需要用锁来防止多个线程同时进入临界区(临界区指的是一个访问共用资源的程序片段,而这些共用资源又无法同时被多个线程访问的特性)
1
2
3
4
5
import threading
lock = threading.Lock()
lock.acquire()
...
lock.release()
  • 信号量
    一个计数器,当资源消耗时递减,当资源释放时递增,可以认为信号量代表它们的资源可用或不可用。
    下面是《python核心编程》(第3版)示例4-11的代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    from threading import BoundedSemaphore, Lock, Thread  
    from atexit import register
    from random import randrange
    from time import ctime

    lock = Lock()
    MAX = 5
    candytray = BoundedSemaphore(MAX)

    def refill():
    lock.acquire()
    print 'Refilling candy...',
    try:
    candytray.release()
    except valueError:
    print 'full, skipping'
    else:
    print 'OK'
    lock.release()
    def buy():
    lock.acquire()
    print 'Buying candy...',
    if candytray.acquire(False):
    print 'OK'
    else:
    print 'empty, skipping'
    lock.release()

    def producer(loops):
    for i in xrange(loops):
    refill()
    sleep(randrange(3))
    def consumer(loops):
    for i in xrange(loops):
    buy()
    sleep(randrange(3))

    def _main():
    print 'starting at:', ctime()
    nloops = randrange(2, 6)
    print 'THE CANDY MACHINE (full with %d bars)!' % MAX
    Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start() # buyer
    Thread(target=producer, args=(nloops,)).start() # vndr

    @register:
    def _atexit():
    print 'all DONE at:', ctime()

    if __name__ == '__main__':
    _main()