admin管理员组

文章数量:1587996

文章目录

  • 一、 操作系统的发展史
  • 二、 进程基础
  • 三、 并发和并行
  • 四、 如何开启多进程
  • 五、 进程调度算法(了解)
  • 六、 同步异步,阻塞非阻塞(了解)
    • 1、 状态介绍
    • 2、同步和异步、阻塞和非阻塞
  • 七、 开启多进程(Process)
    • 1、multiprocess模块
    • 2、 multiprocess.process模块
    • 3、 process模块介绍
      • 3.1 Process类的参数(重点)
      • 3.2 Process类的方法,属性(重点)
      • 3.3 Process类的属性
      • 3.4 在windows中使用process模块的注意事项
    • 4、使用process模块创建进程
      • 4.1 查看主进程和子进程的进程号
      • 4.2 同时开启多个进程(join)
      • 4.3 通过继承Process类开启进程
      • 4.4 进程之间数据隔离
    • 5、 守护进程
      • 5.1 守护进程的启动
      • 5.2 主进程代码执行结束守护进程立即结束
    • 6、 高并发的tcp服务端
  • 八、 进程同步(进程锁)(Lock)(次重点)
  • 九、 进程间通信(Queue)
    • 1、进程Queue介绍
    • 2、 方法介绍
    • 3、 通过Queue实现进程间通信
    • 4、 批量生产数据放入Queue再批量取出
    • 5、 生产者消费者模型(重点)
    • 6、 多个生产者多个消费者的生产者消费者模型
  • 十、 JoinableQueue([maxsize])
    • 1、 方法介绍
    • 2、 JoinableQueue队列实现消费之生产者模型
  • 十一、 进程间数据共享(Manager)(了解)
  • 十二、线程概念
    • 1、有了进程为什么要有线程
    • 2、线程的出现
  • 十三、 GIL全局解释器锁
  • 十四、 开启线程的两种方式
  • 十五、 多线程与多进程比较
    • 1、 pid的比较
    • 2、 开启速度比较
    • 3、 内存数据的共享问题
  • 十六、 Thread类的其他方法
    • 1、 代码示例
    • 2、 join方法
    • 3、守护线程
      • 3.1 详细解释
  • 十七、 多线程实现socket
    • 1、 服务端
    • 2、 客户端
  • 十八、 锁-信号量-Event事件
    • 一、同步锁(互斥锁)
    • 二、 死锁与递归锁
    • 三、 典型问题:科学家吃面
    • 四、 信号量
    • 五、 Event事件
  • 十九、线程队列
  • 二十、 进程池,线程池(concurrent.futures)
  • 二十一、 协程介绍
  • 二十二、 协程之greenlet模块(初级模块,实现了保存状态加切换
  • 二十三、 协程之gevent模块
  • 二十四、 asyncio


一、 操作系统的发展史

1 手工操作——穿孔卡片
	-用户独占全机
    -CPU的利用不充分
2 批处理
	-把一个操作整个写到磁带中,以后要进行这个操作,直接拿着磁带,读入即可
    -脱机批处理
    -联机批处理
    
3 多道程序系统
	-当一道程序因I/O请求而暂停运行时,CPU便立即转去运行另一道程序
    -各道程序轮流地用CPU,并交替运行
4 分时系统
	-多个程序在运行,时间片的概念,cpu执行完固定的时间,就会转去另一个程序
5 通用操作系统
	多道批处理系统,分时


io操作:(统统不占用cpu)
	键盘输入,从硬盘拿数据,从网络加载数据---》都叫输入
    显示在显示器,写到硬盘,从网络发送数据--->都叫输出

二、 进程基础

狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。

广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元


# 进程是资源分配的最小单位,线程是cpu执行的最小单位
#一个程序运行,最少有一个进程
#一个进程里最少有一条线程


# 进程和程序的区分
程序可以作为一种软件资料长期存在,而进程是有一定生命期的。
程序是永久的,进程是暂时的。


# 进程的状态()
	-就绪态:可以被cpu调度执行了,还没有执行,排着队
    -运行态:在cpu中运行,正在运行(如果到了时间片,也会被调度出去,调度出去的程序是就绪态)
    -阻塞态:io操作,把数据加载到内存中

三、 并发和并行

并发:你在跑步,鞋带开了,停下跑步,系鞋带,系完以后,继续跑步,在一个时间段内来看,你干了多个事

  • 并发是指资源有限的情况下,两者交替轮流使用资源
  • 单核下的并发

并行:你在跑步,你用随身听在听着,同一时刻,在干多个事

  • 并行是指两者同时执行
  • 只有多核才涉及到并行

四、 如何开启多进程

from multiprocessing import Process


#如果在win下开多进程,必须写main,否则报错
import time
def wirte_file(s):

    time.sleep(5)
    with open('a.txt','a') as f:
        f.write(s)
        f.write('\n')

if __name__ == '__main__':
    time.sleep(5)
    # wirte_file()
    # 开启多进程的第一个方式
    p=Process(target=wirte_file,args=['lqz is nb'])
    # 执行该进程
    p.start()


    # 又开了一个进程
    p1 = Process(target=wirte_file,args=['egon is dsb'])
    # 执行该进程
    p1.start()

五、 进程调度算法(了解)

-先来先服务调度算法
-短作业优先调度算法
-时间片轮转法
-多级反馈队列

六、 同步异步,阻塞非阻塞(了解)

1、 状态介绍


在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  1. 就绪(Ready)状态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
  2. 执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
  3. 阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

2、同步和异步、阻塞和非阻塞

1 同步调用:提交了以后,一直等待结果返回
2 异步调用:提交了以后,返回一个标志,等执行完成后,有消息通知

3 同步,异步:指的是消息通知机制

4  阻塞,非阻塞:程序在等待调用结果的状态

5 同步阻塞(效率最低):打电话要买书,如果电话没挂,我也一直在等待,
6 同步非阻塞:打电话买书,电话没挂,我一边干别的事,一边听一下电话
7 异步阻塞:打电话买书,电话先挂掉,过一会老板会回回来(回调),老板给回电话的过程一直在等待
8 异步非阻塞:打电话买书,电话先挂掉,过一会老板会回回来(回调),老板给回电话的过程中,在干别的事

七、 开启多进程(Process)

1、multiprocess模块

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

2、 multiprocess.process模块

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

3、 process模块介绍

3.1 Process类的参数(重点)

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:

  1. 需要使用关键字的方式来指定参数
  2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

  • group参数未使用,值始终为None
  • target表示调用对象,即子进程要执行的任务
  • args表示调用对象的位置参数元组,args=(1,2,‘egon’,)
  • kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}
  • name为子进程的名称
from multiprocessing import Process


def task(name,age):
    print(name)
    print(age)


if __name__ == '__main__':
    # p=Process(target=task,args=['lqz',18])
    # p=Process(target=task,kwargs={'age':19,'name':'lqz'},name='process01')
    p=Process(target=task,kwargs={'age':19,'name':'lqz'})
    p2=Process(target=task,kwargs={'age':19,'name':'lqz'})
    p.start()
    p2.start()

    print(p.name)
    print(p2.name)
    # target=None, 你要执行的任务,函数
    # name=None,  进程名
    # args=(),    以位置参数给任务(函数)传递参数
    # kwargs={}   以关键字的形式给任务(函数)传递参数

3.2 Process类的方法,属性(重点)

  • p.start():启动进程,并调用该子进程中的p.run()
  • p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • p.is_alive():如果p仍然运行,返回True
  • p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

from multiprocessing import Process
import time

def task(name,age):
    time.sleep(10)
    print(name)
    print(age)


if __name__ == '__main__':
    p=Process(target=task,kwargs={'age':19,'name':'lqz'})


    p.start()  #启动进程,必须调用start
    # p.run()    # 实际进程在执行的时候,执行的是run方法,但是调用run不会开进程,后面我们另一种开启进程的方案使用到它

    # p.join()    # 等待子进程执行完成


    print(p.is_alive())   #True

    p.terminate() # 杀死p这个进程,通知操作系统去杀死该进程
    time.sleep(0.1)

    print(p.is_alive())  #可能还是True
    print('ssss')

    print(p.is_alive())  #就是False

    print(p)

    
    
'''
p.start()  #启动进程,必须调用start
p.run()    # 实际进程在执行的时候,执行的是run方法,但是调用run不会开进程,后面我们另一种开启进程的方案使用到它
p.join()    # 等待子进程执行完成
p.terminate() # 杀死p这个进程,通知操作系统去杀死该进程,并不是立即结束
p.is_alive() #进程是否还存活
'''

3.3 Process类的属性

  • p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
  • p.name:进程的名称
  • p.pid:进程的pid
  • p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
from multiprocessing import Process
import time

def task(name,age):
    time.sleep(10)
    print(name)
    print(age)


if __name__ == '__main__':
    p=Process(target=task,kwargs={'age':19,'name':'lqz'})
    # p.start()
    # print(p.name) # 进程名字
    # p.daemon=True  #主进程结束,子进程也结束,必须在start之前调用
    p.start()
    print(p.pid)  # 进程id号
    time.sleep(10)
    
'''
print(p.name) # 进程名字
print(p.pid)  # 进程id号
p.daemon=True  #主进程结束,子进程也结束,必须在start之前调用
'''

3.4 在windows中使用process模块的注意事项

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ =='__main__'判断保护起来,import 的时候,就不会递归运行了。

4、使用process模块创建进程

4.1 查看主进程和子进程的进程号

from multiprocessing import Process
import time
import os
def task(name,age):
    # 如果在任务中取出进程id号,需要使用os模块
    print('当前进程(子进程)id号是:',os.getpid()) #当前进程id号
    print('当前进程父进程的id号是:',os.getppid()) # 当前进程父进程的id号
    time.sleep(10)

    print(name)
    print(age)


if __name__ == '__main__':
    p=Process(target=task,kwargs={'age':19,'name':'lqz'})
    p.start()
    print('p这个进程的id号是:',p.pid)  # 进程id号
    print('当前进程id(主进程)号是:', os.getpid())  # 当前进程id号
    print('当前进程父进程(pycharm)的id号是:', os.getppid())  # 当前进程父进程的id号
    time.sleep(10)
    
'''

如果有p对象,就是用p.pid获取进程id号
如果没有p对象,就是用os模块的
os.getpid() #当前进程id号
os.getppid() #父进程id号
'''

4.2 同时开启多个进程(join)


from multiprocessing import Process
import time
import os
# def task(name,age):
#     time.sleep(10)
#     print(name)
#     print(age)
# def task1():
#     time.sleep(2)
#     print("我是task1")
#
#
# if __name__ == '__main__':
#     p=Process(target=task,kwargs={'age':19,'name':'lqz'})
#     p.start()
#
#     p1=Process(target=task1)
#     p1.start()
#
#     print('主进程')

import time

def task1(i):
    time.sleep(2)
    print("我是task1",i)


if __name__ == '__main__':
    ctime=time.time()
    ll=[]
    for i in range(5):
        p1=Process(target=task1,args=[i,])
        p1.start()
        p1.join()  #等待子进程执行完成
        # ll.append(p1)

    # for p in ll:
    #     p.join()
    [p.join()for p in ll]
    print('主进程')
    print(time.time()-ctime)
    
    
'''
开启多个进程
如果想等待多个进程同时执行完,先把进程开启完成,再统一join

'''

开启进程的另一种方案

4.3 通过继承Process类开启进程

### 通过继承Process类的方式来实现,重写run方法,run方法就是你要执行的任务,实例化得到对象,调用start方法开启进程

class MyProcess(Process):
    def __init__(self,name1,age):
        self.name1=name1
        self.age=age
        # 这个必须写
        super().__init__()

    def run(self) :
        time.sleep(2)
        print(self.name)
        print(self.name1)
        print(self.age)

if __name__ == '__main__':
    p=MyProcess('lqz',19)
    p.start() #调用p.start(),不要调用run
    print('主进程')

4.4 进程之间数据隔离

from multiprocessing import Process
import time
import os


def task():
    print('我是task')
    global n
    n=100
    print('子进程的:',n)



if __name__ == '__main__':
    # 在主进程中定义了一个n=10
    n=10

    ###如果这样写,n会被改掉
    # task()
    # print(n)

    ##如果这样写,n不会被改掉
    p=Process(target=task)
    p.start()
    print('主进程的:',n)

5、 守护进程

会随着主进程的结束而结束。

主进程创建守护进程

  • 其一:守护进程会在主进程代码执行结束后就终止
  • 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。

5.1 守护进程的启动

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
time.sleep(10)  # 在sleep时查看进程id对应的进程ps -ef|grep id
print('主')

5.2 主进程代码执行结束守护进程立即结束

from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")  # 打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.

6、 高并发的tcp服务端

import socket
from multiprocessing import Process




def talk(sock,addr):
    print('客户端连接成功',addr)
    while True:
        try:
            data = sock.recv(1024)
            if len(data) == 0:
                break
            print('从客户端收到了:', data)
            sock.send(data.upper())
        except Exception as e:
            print(e)
            break
    sock.close()
if __name__ == '__main__':
    server = socket.socket()
    server.bind(('127.0.0.1', 81))
    server.listen(5)
    while True:
        print('等待客户端连接')
        sock, addr = server.accept()
        p=Process(target=talk,args=[sock,addr])
        p.start()
    server.close()

八、 进程同步(进程锁)(Lock)(次重点)

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

-多个进程操作同一个数据(文件中的数据,而不是内存中的数据:)
-lock=Lock()
-加锁:lock.acquire()
-解锁:lock.release()

import time
import json
from multiprocessing import Process,Lock


## 查询票余额
def check(i):
    with open('ticket', 'rt', encoding='utf-8') as f:
        res = json.load(f)
    print('%s:在查询票,票还剩%s张' % (i, res['count']))
    if res['count'] > 0:
        return True


## 抢票,票的余额减一

def buy(i):
    with open('ticket', 'rt', encoding='utf-8') as f:
        res = json.load(f)
    time.sleep(1)  # 模拟查票延迟
    if res['count'] > 0:
        print('%s现在要买了,票数还是大于0'%i)
        res['count'] -= 1
        time.sleep(2)  # 模拟买票延迟
        with open('ticket', 'wt', encoding='utf-8') as f1:
            json.dump(res, f1)
        print('%s这个人购票成功' % i)
    else:
        print('%s这个人购票失败,票不够了,买不了了' % i)


def task(i,lock):
    res = check(i)
    if res:
        lock.acquire()
        buy(i)
        lock.release()


##模拟10个人买票

if __name__ == '__main__':
    lock=Lock()
    for i in range(10):
        p = Process(target=task, args=[i,lock ])
        p.start()

九、 进程间通信(Queue)

1、进程Queue介绍

1 进程间数据隔离,两个进程进行通信,借助于Queue

2 进程间通信:IPC(Inter-Process Communication)
	-借助于Queue实现进程间通信
    -借助于文件
    
    -借助于数据库
    -借助于消息队列:rabbitmq,kafka....

3 Queue([maxsize])创建共享的进程队列。
  参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。

4 底层队列使用管道和锁定实现

2、 方法介绍



from multiprocessing import Process,Queue


if __name__ == '__main__':
    # maxsize表示Queue的大小是多少,能放多少东西
    queue=Queue(3)
    ## 放数据
    queue.put('zhangsan')
    queue.put('liss')
    queue.put('wwwww')

    queue.put('wwwww',timeout=0.1)

    # queue.put_nowait('sdafsd')
    #
    # res=queue.get()
    # print(res)
    # res=queue.get()
    # print(res)
    res=queue.get()
    # print(res)
    # # 卡住
    # # res=queue.get()
    # res=queue.get_nowait()
    # print(res)
    
    
    
  '''
  # 实例化得到一个对象,数字表示queue的大小
  queue=Queue(3)
  
  # 放值  queue.put() 
  q.put(item [, block [,timeout ] ] )
  # block:是否阻塞(默认为True,如果设置为False,将引发Queue.Empty异常(定义在Queue模块中))
  #timeout:等待的时间(如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常)

  
  #取值  queue.get() 
  q.get( [ block [ ,timeout ] ] )
  # block:是否阻塞
  #timeout:等待的时间

  
  # 不等待,如果满了,就报错
  queue.put_nowait()
  
  # 去取值,如果没有值,直接报错
  res=queue.get_nowait()
  
  #查看这个queue是否满(如果q已满,返回为True)
  queue.full()
  
  #查看queue是否是空的(如果调用此方法时 q为空,返回True)
  queue.empty()
  
  # 查看queue中有几个值
  queue.qsize()
  '''

其他方法(了解)

q.close() :关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() :不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread()
:连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

3、 通过Queue实现进程间通信

from multiprocessing import Process,Queue


import os
import time

def task(queue):
    print('我这个进程%s开始放数据了'%os.getpid())
    time.sleep(10)
    queue.put('lqz is handsome')
    print('%s我放完了' % os.getpid())


if __name__ == '__main__':
    #不写数字,表示可以任意长度
    queue=Queue()
    p=Process(target=task,args=[queue,])
    p.start()

    res=queue.get()  #会卡在这
    print(res)

4、 批量生产数据放入Queue再批量取出

from multiprocessing import Process,Queue
import os

def get_task(queue):
    res=queue.get()
    print('%s这个进程取了数据:%s'%(os.getpid(),res))


def put_task(queue):
    queue.put('%s:放了数据'%os.getpid())

if __name__ == '__main__':
    queue=Queue(1)
    p1=Process(target=put_task,args=[queue])
    p2=Process(target=put_task,args=[queue])
    p1.start()
    p2.start()


    p3=Process(target=get_task,args=[queue])
    p4=Process(target=get_task,args=[queue])
    p3.start()
    p4.start()

5、 生产者消费者模型(重点)

'''
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。


1 为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

2 什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
'''
from multiprocessing import Process, Queue
# import os
#
# import time
# import random
# def producer(queue):
#     # 生产的东西,放到Queue中
#     for i in range(10):
#         data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
#         print(data)
#         # 模拟一下延迟
#         time.sleep(random.randint(1,3))
#         queue.put('第%s个包子'%i)
#
#
# def consumer(queue):
#     # 消费者从queue中取数据,消费(吃包子)
#     while True:
#
#         res=queue.get()
#         # 模拟一下延迟
#         time.sleep(random.randint(1, 3))
#         print('%s这个消费者,吃了%s'%(os.getpid(),res))
#
#
#
# if __name__ == '__main__':
#     queue=Queue(3)
#     p=Process(target=producer,args=[queue,])
#     p.start()
#
#     p1=Process(target=consumer,args=[queue,])
#     p1.start()


###### 改良(生产者以及不生产东西了,但是消费者还在等着拿)
# import os
#
# import time
# import random
# def producer(queue):
#     # 生产的东西,放到Queue中
#     for i in range(10):
#         data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
#         print(data)
#         # 模拟一下延迟
#         time.sleep(random.randint(1,3))
#         queue.put('第%s个包子'%i)
#     # 生产完了,在queue中放一个None
#     queue.put(None)
#
#
# def consumer(queue):
#     # 消费者从queue中取数据,消费(吃包子)
#     while True:
#
#         res=queue.get()
#         if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
#         # 模拟一下延迟
#         time.sleep(random.randint(1, 3))
#         print('%s这个消费者,吃了%s'%(os.getpid(),res))
#
#
#
# if __name__ == '__main__':
#     queue=Queue(3)
#     p=Process(target=producer,args=[queue,])
#     p.start()
#
#     p1=Process(target=consumer,args=[queue,])
#     p1.start()


#### 把put none 放在主进程中执行
import os

# import time
# import random
# def producer(queue):
#     # 生产的东西,放到Queue中
#     for i in range(10):
#         data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
#         print(data)
#         # 模拟一下延迟
#         time.sleep(random.randint(1,3))
#         queue.put('第%s个包子'%i)
#
#
#
# def consumer(queue):
#     # 消费者从queue中取数据,消费(吃包子)
#     while True:
#
#         res=queue.get()
#         if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
#         # 模拟一下延迟
#         time.sleep(random.randint(1, 3))
#         print('%s这个消费者,吃了%s'%(os.getpid(),res))
#
#
#
# if __name__ == '__main__':
#     queue=Queue(3)
#     p=Process(target=producer,args=[queue,])
#     p.start()
#
#     p1=Process(target=consumer,args=[queue,])
#     p1.start()
#
#     # 如果把put None放在这,会有问题
#     # 主进程会先执行这句话,消费进程读到None,直接结束,生产者进程没有结束,于是生产一直在生产,消费已经不消费了
#     # 直到Queue满了,就一直卡在这了
#     # queue.put(None)
#
#     ### 现在就要放在这,你把问题解决
#     p.join()
#     queue.put(None)

6、 多个生产者多个消费者的生产者消费者模型

# 多个生产者在生产,多个消费者在消费
# import time
# import random
# def producer(queue,food):
#     # 生产的东西,放到Queue中
#     for i in range(10):
#         data = '%s这个厨师,做了第%s个%s' % (os.getpid(), i,food)
#         print(data)
#         # 模拟一下延迟
#         time.sleep(random.randint(1,3))
#         queue.put('第%s个%s'%(i,food))
#
#
# def consumer(queue):
#     # 消费者从queue中取数据,消费(吃包子)
#     while True:
#         res=queue.get()
#         if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
#         # 模拟一下延迟
#         time.sleep(random.randint(1, 3))
#         print('%s这个消费者,吃了%s'%(os.getpid(),res))
#
#
#
# if __name__ == '__main__':
#     queue=Queue(3)
#     ##起了三个生产者
#     p1=Process(target=producer,args=[queue,'包子'])
#     p2=Process(target=producer,args=[queue,'骨头'])
#     p3=Process(target=producer,args=[queue,'泔水'])
#     p1.start()
#     p2.start()
#     p3.start()
#
#
#
#     # 起了两个消费者
#     c1=Process(target=consumer,args=[queue,])
#     c2=Process(target=consumer,args=[queue,])
#     c1.start()
#     c2.start()
#
#     ##等三个生产者都生产完,放三个None
#     p1.join()
#     p2.join()
#     p3.join()
#     queue.put(None)
#     queue.put(None)
#     queue.put(None)

##如果消费者多,比生产者多出来的消费者不会停

import time
import random


def producer(queue, food,name):
    # 生产的东西,放到Queue中
    for i in range(10):
        data = '%s:这个厨师,做了第%s个%s' % (name, i, food)
        print(data)
        # 模拟一下延迟
        time.sleep(random.randint(1, 3))
        queue.put('第%s个%s' % (i, food))


def consumer(queue,name):
    # 消费者从queue中取数据,消费(吃包子)
    while True:
        try:
            res = queue.get(timeout=20)
            # 模拟一下延迟
            time.sleep(random.randint(1, 3))
            print('%s这个消费者,吃了%s' % (name, res))
        except Exception as e:
            print(e)
            break


if __name__ == '__main__':
    queue = Queue(3)
    ##起了三个生产者
    p1 = Process(target=producer, args=[queue, '包子','egon'])
    p2 = Process(target=producer, args=[queue, '骨头','lqz'])
    p3 = Process(target=producer, args=[queue, '泔水','jsason'])
    p1.start()
    p2.start()
    p3.start()

    # 起了两个消费者
    c1 = Process(target=consumer, args=[queue, '一号'])
    c2 = Process(target=consumer, args=[queue,'二号' ])
    c3 = Process(target=consumer, args=[queue,'三号' ])
    c4 = Process(target=consumer, args=[queue,'四号' ])
    c1.start()
    c2.start()
    c3.start()
    c4.start()

十、 JoinableQueue([maxsize])

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

1、 方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

  • q.task_done():使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
  • q.join():生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
    下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

2、 JoinableQueue队列实现消费之生产者模型

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))
        q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('生产了 %s' %(os.getpid(),res))
    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

十一、 进程间数据共享(Manager)(了解)

'''
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的。

虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
'''


from multiprocessing import Process,Manager,Lock


# def task(dic,lock):
#     # lock.acquire()
#     # dic['count']-=1
#     # lock.release()
#     with lock:
#         dic['count'] -= 1
#
# if __name__ == '__main__':
#     lock = Lock()
#     with Manager() as m:
#         # 如果直接定义dict,这个dict在多个进程中其实是多份,进程如果改,只改了自己的
#         #如果定义的是m.dict({'count': 100}),多个进程之间就可以共享这个数据
#         dic = m.dict({'count': 100})
#
#         p_l = []
#         for i in range(100):
#             p = Process(target=task, args=(dic, lock))
#             p_l.append(p)
#             p.start()
#         for p in p_l:
#             p.join()





def task(dic,lock):
    with lock:    # 不加锁而操作共享的数据,肯定会出现数据错乱
        dic['count'] -= 1

if __name__ == '__main__':
    lock = Lock()
    dic={'count':100}
    p_l = []
    for i in range(100):
        p = Process(target=task, args=(dic, lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()



    print(dic)

十二、线程概念

1、有了进程为什么要有线程

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

如果这两个缺点理解比较困难的话,举个现实的例子也许你就清楚了:如果把我们上课的过程看成一个进程的话,那么我们要做的是耳朵听老师讲课,手上还要记笔记,脑子还要思考问题,这样才能高效的完成听课的任务。而如果只提供进程这个机制的话,上面这三件事将不能同时执行,同一时间只能做一件事,听的时候就不能记笔记,也不能用脑子思考,这是其一;如果老师在黑板上写演算过程,我们开始记笔记,而老师突然有一步推不下去了,阻塞住了,他在那边思考着,而我们呢,也不能干其他事,即使你想趁此时思考一下刚才没听懂的一个问题都不行,这是其二。

2、线程的出现

60年代,在OS中能拥有资源和独立运行的基本单位是进程,然而随着计算机技术的发展,进程出现了很多弊端,一是由于进程是资源拥有者,创建、撤消与切换存在较大的时空开销,因此需要引入轻型进程;二是由于对称多处理机(SMP)出现,可以满足多个运行单位,而多个进程并行开销过大。
因此在80年代,出现了能独立运行的基本单位——线程(Threads)。

注意:进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程。

from threading import Thread
from queue import Queue
import os
import time


def task():
    time.sleep(3)
    print('我是子线程执行的')
    print(os.getpid())


if __name__ == '__main__':
    # 启动线程

    ctime = time.time()
    t = Thread(target=task)
    t.start()
    # task()
    time.sleep(3)
    print(os.getpid())
    print(time.time() - ctime)

十三、 GIL全局解释器锁

0 pypy(没有全局解释器锁) cpython(99.999999%-pypy python好多模块用不了,
1 全局解释器锁,GIL锁(cpython解释器的问题)
	-当年python设计的时候,还是单核,没有多核的概念
    -python需要做垃圾回收(gc)
    -垃圾回收线程,进行垃圾回收
    -设计了一个大锁(GIL锁),只有拿到这把锁的线程,才能执行
    -同一时刻,在一个进程中,可以开多个线程,但是只能有一条线程在执行
    -不能利用多核优势
  

### 只针对于cpython解释器(其他解释器,包括其他语言不这样)
2 如果是计算密集型:要开进程
3 如果是io密集型:要开线程

十四、 开启线程的两种方式

from threading import Thread
import time


###第一种方式
#
# def task():
#     time.sleep(1)
#     print('我是子线程')
#
#
# if __name__ == '__main__':
#     t=Thread(target=task)
#     t.start()
#     print('我是主线程')
#
#


###第二种方式

class MyThread(Thread):
    def __init__(self,a):
        self.a=a
        super().__init__()
    def run(self):
        time.sleep(1)
        print('我是子线程',self.a)

if __name__ == '__main__':
    t=MyThread('aaaaa')
    t.start()
    print('我是主线程')

十五、 多线程与多进程比较

1、 pid的比较

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    # part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())

2、 开启速度比较

#开线程消耗的资源,耗费的时间远远小于开进程
from threading import Thread
import time
import os
from multiprocessing import Process

def task():
    time.sleep(0.1)
    print('我是子线程')


if __name__ == '__main__':
    ####线程
    # ctime=time.time()
    # t=Thread(target=task)
    # t.start()
    # t.join() # 等待子线程执行完成主线程再执行
    # print('我是主线程')
    # print(time.time()-ctime)

    ##进程
    ctime=time.time()
    t=Process(target=task)
    t.start()
    t.join() # 等待子线程执行完成主线程再执行
    print('我是主线程')
    print(time.time()-ctime)

3、 内存数据的共享问题

##线程间数据共享
from threading import Thread
import time
import os
from multiprocessing import Process

def task():
    global n
    n=10
    print(n)


if __name__ == '__main__':
    ####线程
    n=100
    t=Thread(target=task)
    t.start()
    t.join() # 等待子线程执行完成主线程再执行
    print('我是主线程')
    print(n)

十六、 Thread类的其他方法

Thread实例对象的方法:

  • isAlive():返回线程是否活动的。
  • getName():返回线程名。
  • setName():设置线程名。

threading模块提供的一些方法:

  • threading.currentThread():返回当前的线程变量。
  • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

1、 代码示例

from threading import Thread
import threading
import time
def task():
    # time.sleep(0.01)
    #在子线程中执行
    # res = threading.currentThread()
    # print(res)

    res=threading.get_ident()
    print('子线程:',res)
    print('我是子线程')


if __name__ == '__main__':
    t=Thread(target=task)
    t1=Thread(target=task)

    t.start()
    t1.start()
    # print(t.is_alive())  #看线程是否存活
    #
    # print(t.getName() )  # 获取线程的名字
    # t.setName('lqz')   # 设置线程民资
    # print(t.getName() )
    #
    #
    # print('主线程')
    # time.sleep(0.02)
    # print(t.is_alive())


    # 主线程中执行,返回当前线程对象
    # res=threading.currentThread()
    # print(res)


    # 返回当前进程中正在运行的子线程对象列表
    # res=threading.enumerate()
    # print(res)


    # 返回当前正在运行的线程个数
    # res=threading.activeCount()
    # print(res)

    # 线程id号
    res=threading.get_ident()
    print('主线程:',res)
	
	'''
    t.is_alive()
    t.getName()
    t.setName('lqz') 
   
    threading:模块下的一些方法
    res=threading.currentThread()
    res=threading.enumerate()
    res=threading.activeCount()
    res=threading.get_ident()
    '''

2、 join方法

from threading import Thread
import time

def task():
    time.sleep(2)
    print('我是子线程')


if __name__ == '__main__':
    ll=[]
    for i in range(1000):
        t=Thread(target=task)

        t.start()
        ll.append(t)

    for i in ll:
        i.join()



    # 主线程等待子线程执行完再执行
    print('我是主线程,子线程全都执行完了')

3、守护线程

3.1 详细解释

  1. 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。
  2. 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time

def task():
    time.sleep(2)
    print('我是子线程')


if __name__ == '__main__':
    t=Thread(target=task)
    t.setDaemon(True) # 必须在t.start()之前设置
            		  # 如果主线程执行结束,子线程也结束(不执行了)
    t.start()

    #只要主线程执行结束,子线程也结束
    print('主线程执行结束')

十七、 多线程实现socket

1、 服务端

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

2、 客户端

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

十八、 锁-信号量-Event事件

一、同步锁(互斥锁)

'''
互斥锁和join的区别
如果使用互斥锁:只锁临界区,只有临界区是串行,其他地方还是并发的
如果使用join,整个过程全变成串行执行
'''
## 多个线程操作同一个数据(变量),会出现并发安全的问题
# from threading import Thread,Lock
# import time
# import random
# def task():
#     global n
#
#
#     ### 临界区(加锁)
#     time.sleep(random.random())
#     temp=n
#     time.sleep(random.random())
#     temp=temp-1
#     n=temp
#
#     ##模拟不出来,因为太快了,没有cup的切换(io,时间片到了),模拟io,让cpu切换
#
#     # n-=1
#
#
# if __name__ == '__main__':
#     n=10
#     ll=[]
#     for i in range(10):
#         t=Thread(target=task)
#         t.start()
#         ll.append(t)
#
#     for i in ll:
#         i.join()
#
#
#     print(n)



###出现了并发安全的问题,加锁解决

from threading import Thread,Lock
import time
import random
def task_lock(lock):
    global n


    ### 临界区(加锁)
    with lock:
        time.sleep(random.random())
        temp=n
        time.sleep(random.random())
        temp=temp-1
        n=temp

    ##模拟不出来,因为太快了,没有cup的切换(io,时间片到了),模拟io,让cpu切换

    # n-=1

def task_no_lock():

    global n
    time.sleep(random.random())
    temp=n
    time.sleep(random.random())
    temp=temp-1
    n=temp



if __name__ == '__main__':
    n=10
    lock=Lock()
    ll=[]
    for i in range(10):
        # t=Thread(target=task_lock,args=[lock,])
        t=Thread(target=task_no_lock,args=[lock,])
        t.start()
        ll.append(t)
        t.join()

    # for i in ll:
    #     i.join()


    print(n)

二、 死锁与递归锁

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
死锁

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
递归锁(RLock)

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

三、 典型问题:科学家吃面

# from threading import Thread, Lock
# import time
# import random
#
#
# def eat1(lock_1, lock_2, name):
#     lock_1.acquire()
#     print('%s:拿到了筷子' % name)
#     time.sleep(random.random())
#     lock_2.acquire()
#     print('%s:拿到了面条' % name)
#     print('开始吃面')
#     time.sleep(random.random())
#     lock_2.release()
#     print('%s放下了面条' % name)
#     lock_1.release()
#     print('%s放下了筷子' % name)
#
#
# def eat2(lock_1, lock_2, name):
#     lock_2.acquire()
#     print('%s:拿到了面条' % name)
#     time.sleep(random.random())
#     lock_1.acquire()
#     print('%s:拿到了筷子' % name)
#     print('开始吃面')
#     time.sleep(random.random())
#     lock_1.release()
#     print('%s放下了筷子' % name)
#     lock_2.release()
#     print('%s放下了面条' % name)
#
#
# if __name__ == '__main__':
#     lock_1 = Lock()
#     lock_2 = Lock()
#     for i in ['张三', '李四', '王五']:
#         t = Thread(target=eat1, args=[lock_1, lock_2, i])
#         t.start()
#     for i in ['赵6', '往7', '傻逼']:
#         t = Thread(target=eat2, args=[lock_1, lock_2, i])
#         t.start()



### 解决死锁问题  RLock:可重入,可以重复acquire,获得几次,就要释放几次
from threading import Thread, Lock,RLock
import time
import random


def eat1(lock_1, lock_2, name):
    lock_1.acquire()
    print('%s:拿到了筷子' % name)
    time.sleep(random.random())
    lock_2.acquire()
    print('%s:拿到了面条' % name)
    print('开始吃面')
    time.sleep(random.random())
    lock_2.release()
    print('%s放下了面条' % name)
    lock_1.release()
    print('%s放下了筷子' % name)


def eat2(lock_1, lock_2, name):
    lock_2.acquire()
    print('%s:拿到了面条' % name)
    time.sleep(random.random())
    lock_1.acquire()
    print('%s:拿到了筷子' % name)
    print('开始吃面')
    time.sleep(random.random())
    lock_1.release()
    print('%s放下了筷子' % name)
    lock_2.release()
    print('%s放下了面条' % name)


if __name__ == '__main__':
    lock_1 = RLock()
    lock_2 = lock_1

    # lock_1 = Lock()
    # lock_2 = lock_1
    for i in ['张三', '李四', '王五']:
        t = Thread(target=eat1, args=[lock_1, lock_2, i])
        t.start()
    for i in ['赵6', '往7', '傻逼']:
        t = Thread(target=eat2, args=[lock_1, lock_2, i])
        t.start()

四、 信号量

### 信号量可以理解为多把锁,同时允许多个线程来更改数据

from  threading import Thread,Semaphore

import time
import random
def task(sm,i):
    sm.acquire()
    print('%s:这个人在上厕所'%i)
    time.sleep(random.random())
    print('%s:这个人拉完了'%i)
    sm.release()



sm=Semaphore(5)
for i in range(40):
    t=Thread(target=task,args=[sm,i])
    t.start()

五、 Event事件

Event事件:
一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
比如一个线程等待另一个线程执行结束再继续执行


# 一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
# 比如一个线程等待另一个线程执行结束再继续执行

from threading import Thread, Event

import time
import random


def girl(event):
    print('赵丽颖现在在结婚状态')
    time.sleep(1)
    # 离婚了,发送信号
    print('赵丽颖离婚了')
    event.set()  # 发送一个信号


def boy(i, event):
    print('屌丝%s:在等赵丽颖的离婚信号'%i)
    event.wait()  # 收不到信号之前,一直卡在这
    print('屌丝%s号,收到了离婚信号,开始追' % i)


event = Event()
t = Thread(target=girl, args=[event, ])
t.start()

for i in range(10):
    t1 = Thread(target=boy, args=[i, event])
    t1.start()

十九、线程队列

queue队列:使用import queue,用法与进程Queue一样
1 线程Queue,解决线程间数据共享的问题
2 线程间数据共享可以使用共享变量(可能会存在并发安全的问题)

同一个进程下多个线程数据是共享的
为什么先同一个进程下还会去使用队列呢
因为队列是
    管道 + 锁
所以用队列还是为了保证数据的安全
Queue:先进先出
LifoQueue:后进先出
PriorityQueue:优先级队列
from threading import Thread
from queue import Queue,LifoQueue,PriorityQueue # 线程queue


# import time
# def task(queue):
#     time.sleep(3)
#     queue.put('lqz')
#
#
#
# if __name__ == '__main__':
#     queue=Queue()
#     t=Thread(target=task,args=[queue,])
#     t.start()
#
#
#     res=queue.get()
#     print(res)



# Queue:先进先出
# LifoQueue:后进先出
# PriorityQueue:优先级队列


if __name__ == '__main__':
    # quque1=Queue()
    #
    # quque1.put(1)
    # quque1.put(2)
    # print(quque1.get())



    # quque2=LifoQueue()
    # quque2.put(1)
    # quque2.put(2)
    # print(quque2.get())


    quque3=PriorityQueue()
    quque3.put((1,'lqz'))
    quque3.put((100,'egon'))
    # put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较)
    # 数字越小,优先级越高
    print(quque3.get())

二十、 进程池,线程池(concurrent.futures)

'''
submit(fn, *args, **kwargs):异步提交任务
map(func, *iterables, timeout=None, chunksize=1):取代for循环submit的操作
shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作
    -wait=True,等待池内所有任务执行完毕回收完资源后才继续
    -wait=False,立即返回,并不会等待池内的任务执行完毕
    -但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
result(timeout=None):取得结果
add_done_callback(fn):回调函数
done():判断某一个线程是否完成
cancle():取消某个任务
'''

# 线程池,进程池都在这个模块下concurrent.futures


import time
import os

import random
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

# def task(n):
#     print(os.getpid(), '开始执行了')
#     time.sleep(random.random())
#     return n * n
#
#
#
# def callback(result):
#     print(result)
# if __name__ == '__main__':
#     # 开进程池去执行
#     # ProcessPoolExecutor实例化得到一个对象
#     pool_p=ProcessPoolExecutor(3)
#     # ll=[]
#     # for i in range(10):
#     #     # 把任务提交到进程池执行
#     #     f=pool_p.submit(task,n=i)
#     #     ll.append(f)
#     #
#     # # 等待所有子进程执行完成,主进程在执行
#     # pool_p.shutdown()
#     #
#     # for l in ll:
#     #     res=l.result()  # 取到当前进程执行任务的返回值
#     #     print(res)
#     #
#     # print('我是主进程')


'''
# map取代for循环的,第一个参数是要执行的任务,第二个参数,是一个可迭代对象,迭代一次的结果,会传给任务


for i in range(10):
  f=pool_p.submit(task,n=i)
 # 等同于上面
pool_p.map(task,range(10))
pool_p.shutdown()
print('主进程')

'''

'''
# 回调

for i in range(10):
    pool_p.submit(task,n=i).add_done_callback()
'''

def task(n):
    print(os.getpid(), '开始执行了')
    time.sleep(1)
    return n * n


def callback(result):
    print(result.result())


# if __name__ == '__main__':
#     pool_p = ProcessPoolExecutor(3)
#     for i in range(10):
#         pool_p.submit(task, n=i).add_done_callback(callback)
#

if __name__ == '__main__':
    pool_p = ThreadPoolExecutor(3)
    for i in range(10):
        pool_p.submit(task, n=i).add_done_callback(callback)

二十一、 协程介绍

协程是:程序级别的切换,单线程下实现并发

python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

二十二、 协程之greenlet模块(初级模块,实现了保存状态加切换

# 安装第三方模块:在命令行下
# pip3 install greenlet
# pip3 uninstall greenlet  卸载第三方模块
# pip3 list   # 列出当前解释器环境下安装的第三方模块
from greenlet import greenlet



def eat(name):
    print(name,'在吃了一口')
    g2.switch(name)

    print(name,'在吃了第二口')
    g2.switch()


def play(name):
    print(name, '玩了一下')
    g1.switch()

    print(name, '玩了第二下')


g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')

二十三、 协程之gevent模块

用法

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值
## gevent模块,协程模块,遇到io可以自动切换


# pip3 install gevent

import gevent
import time

# def eat(name):
#     print(name, '在吃了一口')
#     # 遇到了io
#     gevent.sleep(2)
#
#     print(name, '在吃了第二口')
#
#
# def play(name):
#     print(name, '玩了一下')
#     # 遇到了io,是gevent的io
#     gevent.sleep(3)
#     print(name, '玩了第二下')
#
#
# res1 = gevent.spawn(eat, 'egon')
# res2 = gevent.spawn(play, 'egon')
#
#
# ctime=time.time()
# # res1.join()
# # res2.join()  # 等地任务执行完成再执行下面那句
#
# gevent.joinall([res1,res2])   # 相当于上面那两句
# print('主线程')
# print(time.time()-ctime)




###使用原来的time的io,不会切,并且变成了串行
# def eat(name):
#     print(name, '在吃了一口')
#     time.sleep(2)
#
#     print(name, '在吃了第二口')
#
#
# def play(name):
#     print(name, '玩了一下')
#     time.sleep(3)
#     print(name, '玩了第二下')
#
#
# res1 = gevent.spawn(eat, 'egon')
# res2 = gevent.spawn(play, 'egon')
#
#
# ctime=time.time()
#
# gevent.joinall([res1,res2])   # 相当于上面那两句
# print('主线程')
# print(time.time()-ctime)


### time的io也要切换

# 猴子补丁:把原来的io全都替换成gevent的io
from gevent import monkey;monkey.patch_all()



def eat(name):
    print(name, '在吃了一口')
    time.sleep(2)

    print(name, '在吃了第二口')


def play(name):
    print(name, '玩了一下')
    time.sleep(3)
    print(name, '玩了第二下')


res1 = gevent.spawn(eat, 'egon')
res2 = gevent.spawn(play, 'egon')


ctime=time.time()

gevent.joinall([res1,res2])   # 相当于上面那两句
print('主线程')
print(time.time()-ctime)

二十四、 asyncio

# 内置模块   python 3.4 推出这个模块,python作者主导的
import asyncio
import time
import threading
# 这个函数是协程函数
async def task():
    res=threading.current_thread().getName()
    print(res)
    print('xxx')
    await asyncio.sleep(2)
    print('协程执行完成')

async def task2():
    res=threading.current_thread().getName()
    print(res)
    print('2222')
    await asyncio.sleep(3)
    print('222协程执行完成')



ctime=time.time()
loop=asyncio.get_event_loop()



tasks=[task(),task2()]



loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(time.time()-ctime)

本文标签: 进程信号量线程全局事件