为什么子进程总是拿不到数据?聊聊Python多进程里的“隔阂”

为什么子进程总是拿不到数据?聊聊Python多进程里的“隔阂”
免费编程软件「pythonpycharm」链接https://pan.quark.cn/s/48a86be2fdc0一个让我在数据清洗项目上翻车的Bug去年我接了一个数据清洗的活需要处理100万行日志数据。每行数据要做格式校验、字段提取、类型转换纯Python跑起来慢得让人抓狂。我心想Python多线程有GIL那我用多进程总行了吧每个核跑一个进程4核机器至少提速3倍。于是我写了这样的代码import multiprocessing as mp # 全局配置所有子进程都要用 CONFIG {date_format: %Y-%m-%d, max_length: 100} def clean_line(line): # 用CONFIG里的配置清洗单行数据 # ... return cleaned if __name__ __main__: with mp.Pool(4) as pool: results pool.map(clean_line, data)看起来没问题吧跑起来之后诡异的事情发生了配置在子进程里经常读不到有时候读到的还是旧值。我当时完全懵了——全局变量不是所有地方都能访问吗为什么子进程就不行后来我才明白多进程之间是“隔阂”的每个进程有自己独立的记忆空间你主进程里的变量子进程根本看不见。为什么子进程拿不到你的数据要理解这个问题先记住一句话进程是独立的。当你用multiprocessing.Process或者Pool创建一个子进程时Python会做这样几件事启动一个新的Python解释器相当于重新打开了一个Python程序在新的解释器里重新执行你的代码包括导入模块、定义函数、执行全局代码子进程有自己的内存空间和主进程完全隔开所以你在主进程里定义的CONFIG、my_list、global_counter子进程都看不见。不是Python不让它们共享而是操作系统根本不允许一个进程随便读另一个进程的内存。阿里云开发者社区的一篇文章里有一个很直观的例子from multiprocessing import Process num 10 # 主进程的全局变量 def run(): global num num 1 print(f子进程里: {num}) if __name__ __main__: p Process(targetrun) p.start() p.join() print(f主进程里: {num})运行结果子进程里: 11 主进程里: 10看到了吗子进程改了num主进程里的num纹丝不动。子进程拿到的是num的一份拷贝而不是引用。子进程为什么总是“拿不到数据”真实项目里“拿不到数据”通常表现为这几种情况情况1全局变量在子进程里是“初始值”就像前面的例子你在主进程里修改了CONFIG然后启动子进程子进程看到的CONFIG可能是最初的值——因为子进程启动时重新执行了模块代码用的是初始值。情况2Windows下更坑在Windows系统上multiprocessing启动子进程时会重新导入主模块。这意味着模块顶层所有的代码都会在子进程里再执行一遍。如果模块顶层有耗时的初始化操作比如加载大模型、连接数据库每个子进程都会重复执行一遍你的程序会慢到怀疑人生。情况3进程池(map)里修改不了列表你可能会这样写results [] # 想把结果收集到这里 def worker(x): results.append(x * 2) # 试图修改全局列表 with mp.Pool(4) as pool: pool.map(worker, range(10)) print(results) # 还是空的每个子进程修改的是自己那份results的拷贝主进程的results从来没变过。那怎么让子进程“拿到”数据好消息是Python提供了好几种办法来打破这种“隔阂”。方法1通过参数传递最推荐不要依赖全局变量把要用的数据通过参数传进去def clean_line(line, config): # 用传进来的config不用全局的 return cleaned if __name__ __main__: CONFIG {...} with mp.Pool(4) as pool: # 把config作为参数传进去 results pool.map(lambda x: clean_line(x, CONFIG), data)这是最简单、最干净的方式。数据从一个地方流向另一个地方没有隐式的全局依赖。方法2用multiprocessing.Queue传递数据Queue是一个可以在多个进程之间安全传递数据的队列。from multiprocessing import Process, Queue def worker(q): q.put(子进程处理完了) if __name__ __main__: q Queue() p Process(targetworker, args(q,)) p.start() result q.get() # 主进程收到子进程的消息 print(result)Queue的原理是一个进程把数据放进去另一个进程取出来。数据被序列化后在进程间传递。方法3用Manager共享对象如果你想让多个子进程共享一个列表或字典可以用Managerfrom multiprocessing import Process, Manager def worker(shared_list): shared_list.append(来自子进程的数据) if __name__ __main__: manager Manager() shared_list manager.list([主进程的数据]) p Process(targetworker, args(shared_list,)) p.start() p.join() print(shared_list) # [主进程的数据, 来自子进程的数据]Manager的原理是启动一个独立的“管理器进程”所有对共享数据的操作都通过这个进程来协调。缺点是慢因为每次读写都要走进程间通信。方法4用shared_memory共享内存Python 3.8如果数据量大且对性能有要求可以用shared_memory模块from multiprocessing import shared_memory import numpy as np # 主进程创建共享内存 shm shared_memory.SharedMemory(createTrue, size100) # 子进程通过名字连接到这块内存 shm_b shared_memory.SharedMemory(nameshm.name)这是最高效的方式数据在内存中直接共享不需要序列化和复制。一张图理清楚你想要做的事错误做法正确做法子进程读主进程的配置用全局变量通过函数参数传递子进程返回结果给主进程修改全局列表用Queue或返回值多个子进程共享一个计数器用全局变量用Manager或Value多个子进程共享超大数组用Manager列表用shared_memory回到开头的Bug我那个数据清洗项目最后怎么解决的我把CONFIG从全局变量改成了函数参数每个子进程通过参数拿到配置。至于那些需要汇总的结果我用Queue从子进程收集回来。虽然代码多写了几行但再也不会有“子进程拿不到数据”的烦恼了。记住多进程之间是“隔阂”的别指望它们共享记忆。想要传递数据要么递纸条参数/Queue要么用公共黑板Manager/shared_memory。