1. pandarallel (pip install )
对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。
from pandarallel import pandarallel # Initialization pandarallel.initialize() # Standard pandas apply df.apply(func) # Parallel apply df.parallel_apply(func)
注意,如果不想并行化计算,仍然可以使用经典的apply方法。
另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。
2. joblib (pip install )
https://pypi.python.org/pypi/joblib
# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly from math import sqrt from joblib import Parallel, delayed def test(): start = time.time() result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000)) end = time.time() print(end-start) result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000)) end2 = time.time() print(end2-end)
-------输出结果----------
0.4434356689453125
0.6346755027770996
3. multiprocessing
import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(f, df['col']) multiprocessing.cpu_count()
返回系统的CPU数量。
该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。
可能引发 NotImplementedError 。
参见os.cpu_count()
4. 几种方法性能比较
(1)代码
import sys import time import pandas as pd import multiprocessing as mp from joblib import Parallel, delayed from pandarallel import pandarallel from tqdm import tqdm, tqdm_notebook def get_url_len(url): url_list = url.split(".") time.sleep(0.01) # 休眠0.01秒 return len(url_list) def test1(data): """ 不进行任何优化 """ start = time.time() data['len'] = data['url'].apply(get_url_len) end = time.time() cost_time = end - start res = sum(data['len']) print("res:{}, cost time:{}".format(res, cost_time)) def test_mp(data): """ 采用mp优化 """ start = time.time() with mp.Pool(mp.cpu_count()) as pool: data['len'] = pool.map(get_url_len, data['url']) end = time.time() cost_time = end - start res = sum(data['len']) print("test_mp \t res:{}, cost time:{}".format(res, cost_time)) def test_pandarallel(data): """ 采用pandarallel优化 """ start = time.time() pandarallel.initialize() data['len'] = data['url'].parallel_apply(get_url_len) end = time.time() cost_time = end - start res = sum(data['len']) print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time)) def test_delayed(data): """ 采用delayed优化 """ def key_func(subset): subset["len"] = subset["url"].apply(get_url_len) return subset start = time.time() data_grouped = data.groupby(data.index) # data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条 results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped)) data = pd.concat(results) end = time.time() cost_time = end - start res = sum(data['len']) print("test_delayed \t res:{}, cost time:{}".format(res, cost_time)) if __name__ == '__main__': columns = ['title', 'url', 'pub_old', 'pub_new'] temp = pd.read_csv("./input.csv", names=columns, nrows=10000) data = temp """ for i in range(99): data = data.append(temp) """ print(len(data)) """ test1(data) test_mp(data) test_pandarallel(data) """ test_delayed(data)
(2) 结果输出
1k
res:4338, cost time:0.0018074512481689453
test_mp res:4338, cost time:0.2626469135284424
test_pandarallel res:4338, cost time:0.3467681407928467
1w
res:42936, cost time:0.008773326873779297
test_mp res:42936, cost time:0.26111721992492676
test_pandarallel res:42936, cost time:0.33237743377685547
10w
res:426742, cost time:0.07944369316101074
test_mp res:426742, cost time:0.294996976852417
test_pandarallel res:426742, cost time:0.39208269119262695
100w
res:4267420, cost time:0.8074917793273926
test_mp res:4267420, cost time:0.9741342067718506
test_pandarallel res:4267420, cost time:0.6779992580413818
1000w
res:42674200, cost time:8.027287006378174
test_mp res:42674200, cost time:7.751036882400513
test_pandarallel res:42674200, cost time:4.404983282089233
在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:
1k
res:4338, cost time:10.054503679275513
test_mp res:4338, cost time:0.35697126388549805
test_pandarallel res:4338, cost time:0.43415403366088867
test_delayed res:4338, cost time:2.294757843017578
5. 小结
(1)如果数据量比较少,并行处理比单次执行效率更慢;
(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。
6. 问题及解决方法
(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
https://www.jianshu.com/p/0be1b4b27bde
(2)Linux查看物理CPU个数、核数、逻辑CPU个数
https://lover.blog.csdn.net/article/details/113951192
(3) 进度条的使用
https://www.jb51.net/article/206219.htm
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 中国武警男声合唱团《辉煌之声1天路》[DTS-WAV分轨]
- 紫薇《旧曲新韵》[320K/MP3][175.29MB]
- 紫薇《旧曲新韵》[FLAC/分轨][550.18MB]
- 周深《反深代词》[先听版][320K/MP3][72.71MB]
- 李佳薇.2024-会发光的【黑籁音乐】【FLAC分轨】
- 后弦.2012-很有爱【天浩盛世】【WAV+CUE】
- 林俊吉.2012-将你惜命命【美华】【WAV+CUE】
- 晓雅《分享》DTS-WAV
- 黑鸭子2008-飞歌[首版][WAV+CUE]
- 黄乙玲1989-水泼落地难收回[日本天龙版][WAV+CUE]
- 周深《反深代词》[先听版][FLAC/分轨][310.97MB]
- 姜育恒1984《什么时候·串起又散落》台湾复刻版[WAV+CUE][1G]
- 那英《如今》引进版[WAV+CUE][1G]
- 蔡幸娟.1991-真的让我爱你吗【飞碟】【WAV+CUE】
- 群星.2024-好团圆电视剧原声带【TME】【FLAC分轨】