我们今天要讲多进程和协程,先说说多进程和协程的好处,它可以最大的利用你cpu的资源和网络带宽,这样可以充分的节省程序消费的时间。在生产消费者模式中引入多进程和协程之前我们先简单入手一个实例(供初学者可以看懂之后的代码):
from multiprocessing import Process
import time
def test(i):
while True:
print("我是子进程"+str(i))
time.sleep(2)
if i==1:
time.sleep(10)
print("我是进程" + str(i))
# print[(x,y) for x in range(10) if x%2 if x>3 for y in range(10) if y > 7 if y != 8]
if __name__ == '__main__':
processes=[]
for i in range(3):
p = Process(target=test,args=(i,))
p.start()
processes.append(p)
print(processes)
for p in processes:
p.join()
以上的代码是一段简单的多进程,我们可以看到运行后三个进程都在运行,由于我这里让进程1睡眠10s,进程1和进程0,2是相对独立的,这里1的睡眠不会影响到进程0和2的运行,我们可以看到他们依旧在控制台不断的输出,这样,我们的多进程+消费者模式的雏形就出来了,现在只需要把我们消费者的业务逻辑替换进去就可以了。这里多说一句,有时候我们会不清楚多进程和多线程的概念,这个可以这么理解,进程消费的资源是一个大类,而线程消费的资源是从进程里面来的,一个线程死了会影响到其他线程,但多进程之间是相对独立的,一个进程死了不会影响其他进程运行,这就是为什么我会在这里使用多进程而不是多线程,当然,对于多线程还有一定的优化,那就是协程,这个点我们会在生产消费者模式与python+redis实例运用(次高级篇)中介绍到。
好了,有了多进程的概念我们就可以开始生消模式的实现了:
这里我们的生产者与上一篇的不变:from DBUtil import *import timefrom pandasql import sqldfimport redisdef product(i): length=r.llen("goods2") print(length) if length>5000: print("长度过大睡一会") time.sleep(1) product(i) else: #生产者 r.lpush("goods2", "good1"+str(i)) print("加入一个值睡一会") # time.sleep(5)if __name__ == '__main__': # 此处表示循环10000次,往redis里面放10000次数据 for i in range(10000): product(i)
消费者开启多进程,在消费者开启多进程的时候我们会遇到一个问题,就是多个进程同时去抢同一个资源的情况,这个时候我们可以选择加锁到资源,也就是redis会话队列上,当某个进程拿资源的时候redis会话队列加上锁,保证其他进程拿不到这个资源,当这个进程拿完资源后,释放锁,让其他进程去抢占资源:import timeimport redisfrom multiprocessing import Process,Lockpool=redis.ConnectionPool(host='localhost', port=6379,db=1,decode_responses=True)r=redis.Redis(connection_pool=pool)def users(lock): length = r.llen("goods2") print(length) while True: # print(1) # 对资源进行加锁 lock.acquire() if length > 0: goods = r.lpop("goods2") # 获得资源后释放锁 lock.release() #以下可以写自己的业务逻辑操作 try: data = goods print(data) if str(goods) == "None": print("无值多等等") time.sleep(2) except: print("无值等等") time.sleep(2) users() else: print("无值等等") time.sleep(10) users(lock)if __name__ == '__main__': lock = Lock() processes = [] for i in range(20): p = Process(target=users, args=(lock,)) p.start() processes.append(p) for p in processes: p.join() print('处理完成')
留言与评论(共有 0 条评论) |