战“疫”期,阿里云云效团队在家高效开发实录
背景在数据科学世界 , Python是一个不可忽视的存在 , 且有愈演愈烈之势 。 而其中主要的使用工具 , 包括Numpy、Pandas和Scikit-learn等 。
NumpyNumpy是数值计算的基础包 , 内部提供了多维数组(ndarray)这样一个数据结构 , 用户可以很方便地在任意维度上进行数值计算 。

文章图片
我们举一个蒙特卡洛方法求解Pi的例子 。 这背后的原理非常简单 , 现在我们有个半径为1的圆和边长为2的正方形 , 他们的中心都在原点 。 现在我们生成大量的均匀分布的点 , 让这些点落在正方形内 , 通过简单的推导 , 我们就可以知道 , Pi的值=落在圆内的点的个数/点的总数*4 。
这里要注意 , 就是随机生成的点的个数越多 , 结果越精确 。
用Numpy实现如下:
importnumpyasnpN=10**7#1千万个点data=https://pcff.toutiao.jxnews.com.cn/p/20200416/np.random.uniform(-1,1,size=(N,2))#生成1千万个x轴和y轴都介于-1和1间的点inside=(np.sqrt((data**2).sum(axis=1))<1).sum()#计算到原点的距离小于1的点的个数pi=4*inside/Nprint('pi:%.5f'%pi)可以看到 , 用Numpy来进行数值计算非常简单 , 只要寥寥数行代码 , 而如果读者习惯了Numpy这种面相数组的思维方式之后 , 无论是代码的可读性还是执行效率都会有巨大提升 。
pandaspandas是一个强大的数据分析和处理的工具 , 它其中包含了海量的API来帮助用户在二维数据(DataFrame)上进行分析和处理 。
pandas中的一个核心数据结构就是DataFrame , 它可以简单理解成表数据 , 但不同的是 , 它在行和列上都包含索引(Index) , 要注意这里不同于数据库的索引的概念 , 它的索引可以这么理解:当从行看DataFrame时 , 我们可以把DataFrame看成行索引到行数据的这么一个字典 , 通过行索引 , 可以很方便地选中一行数据;列也同理 。
我们拿movielens的数据集作为简单的例子 , 来看pandas是如何使用的 。 这里我们用的是Movielens20MDataset.
importpandasaspdratings=pd.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']})通过一行简单的pandas.read_csv就可以读取CSV数据 , 接着按userId做分组聚合 , 求rating这列在每组的总和、平均、最大、最小值 。
“食用“pandas的最佳方式 , 还是在Jupyternotebook里 , 以交互式的方式来分析数据 , 这种体验会让你不由感叹:人生苦短 , 我用xx()
scikit-learnscikit-learn是一个Python机器学习包 , 提供了大量机器学习算法 , 用户不需要知道算法的细节 , 只要通过几个简单的high-level接口就可以完成机器学习任务 。 当然现在很多算法都使用深度学习 , 但scikit-learn依然能作为基础机器学习库来串联整个流程 。
我们以K-最邻近算法为例 , 来看看用scikit-learn如何完成这个任务 。
importpandasaspdfromsklearn.neighborsimportNearestNeighborsdf=pd.read_csv('data.csv')#输入是CSV文件 , 包含20万个向量 , 每个向量10个元素nn=NearestNeighbors(n_neighbors=10)nn.fit(df)neighbors=nn.kneighbors(df)fit接口就是scikit-learn里最常用的用来学习的接口 。 可以看到整个过程非常简单易懂 。
Mars——Numpy、pandas和scikit-learn的并行和分布式加速器Python数据科学栈非常强大 , 但它们有如下几个问题:
现在是多核时代 , 这几个库里鲜有操作能利用得上多核的能力 。 随着深度学习的流行 , 用来加速数据科学的新的硬件层出不穷 , 这其中最常见的就是GPU , 在深度学习前序流程中进行数据处理 , 我们是不是也能用上GPU来加速呢?这几个库的操作都是命令式的(imperative) , 和命令式相对应的就是声明式(declarative) 。 命令式的更关心howtodo , 每一个操作都会立即得到结果 , 方便对结果进行探索 , 优点是很灵活;缺点则是中间过程可能占用大量内存 , 不能及时释放 , 而且每个操作之间就被割裂了 , 没有办法做算子融合来提升性能;那相对应的声明式就刚好相反 , 它更关心whattodo , 它只关心结果是什么 , 中间怎么做并没有这么关心 , 典型的声明式像SQL、TensorFlow1.x , 声明式可以等用户真正需要结果的时候才去执行 , 也就是lazyevaluation , 这中间过程就可以做大量的优化 , 因此性能上也会有更好的表现 , 缺点自然也就是命令式的优点 , 它不够灵活 , 调试起来比较困难 。为了解决这几个问题 , Mars被我们开发出来 , Mars在MaxCompute团队内部诞生 , 它的主要目标就是让Numpy、pandas和scikit-learn等数据科学的库能够并行和分布式执行 , 充分利用多核和新的硬件 。
Mars的开发过程中 , 我们核心关注的几点包括:
我们希望Mars足够简单 , 只要会用Numpy、pandas或scikit-learn就会用Mars 。 避免重复造轮子 , 我们希望能利用到这些库已有的成果 , 只需要能让他们被调度到多核/多机上即可 。 声明式和命令式兼得 , 用户可以在这两者之间自由选择 , 灵活度和性能兼而有之 。 足够健壮 , 生产可用 , 能应付各种failover的情况 。当然这些是我们的目标 , 也是我们一直努力的方向 。
Marstensor:Numpy的并行和分布式加速器上面说过 , 我们的目标之一是 , 只要会用Numpy等数据科学包 , 就会用Mars 。 我们直接来看代码 , 还是以蒙特卡洛为例 。 变成Mars的代码是什么样子呢?
importmars.tensorasmtN=10**10data=https://pcff.toutiao.jxnews.com.cn/p/20200416/mt.random.uniform(-1,1,size=(N,2))inside=(mt.sqrt((data**2).sum(axis=1))<1).sum()pi=(4*inside/N).execute()print('pi:%.5f'%pi)可以看到 , 区别就只有两处:importnumpyasnp变成importmars.tensorasmt , 后续的np.都变成mt.;pi在打印之前调用了一下.execute()方法 。
也就是默认情况下 , Mars会按照声明式的方式 , 代码本身移植的代价极低 , 而在真正需要一个数据的时候 , 通过.execute()去触发执行 。 这样能最大限度得优化性能 , 以及减少中间过程内存消耗 。
这里 , 我们还将数据的规模扩大了1000倍 , 来到了100亿个点 。 之前1/1000的数据量的时候 , 在我的笔记本上需要757ms;而现在数据扩大一千倍 , 光data就需要150G的内存 , 这用Numpy本身根本无法完成 。 而使用Mars , 计算时间只需要3min44s , 而峰值内存只需要1G左右 。 假设我们认为内存无限大 , Numpy需要的时间也就是之前的1000倍 , 大概是12min多 , 可以看到Mars充分利用了多核的能力 , 并且通过声明式的方式 , 极大减少了中间内存占用 。
前面说到 , 我们试图让声明式和命令式兼得 , 而使用命令式的风格 , 只需要在代码的开始配置一个选项即可 。
importmars.tensorasmtfrommars.configimportoptionsoptions.eager_mode=True#打开eagermode后 , 每一次调用都会立即执行 , 行为和Numpy就完全一致N=10**7data=https://pcff.toutiao.jxnews.com.cn/p/20200416/mt.random.uniform(-1,1,size=(N,2))inside=(mt.linalg.norm(data,axis=1)<1).sum()pi=4*inside/N#不需要调用.execute()了print('pi:%.5f'%pi.fetch())#目前需要fetch()来转成float类型 , 后续我们会加入自动转换MarsDataFrame:pandas的并行和分布式加速器看过怎么样轻松把Numpy代码迁移到Marstensor , 想必读者也知道怎么迁移pandas代码了 , 同样也只有两个区别 。 我们还是以movielens的代码为例 。
importmars.dataframeasmdratings=md.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']}).execute()MarsLearn:scikit-learn的并行和分布式加速器MarsLearn也同理 , 这里就不做过多阐述了 。 但目前Marslearn支持的scikit-learn算法还不多 , 我们也在努力移植的过程中 , 这需要大量的人力和时间 , 欢迎感兴趣的同学一起参与 。
importmars.dataframeasmdfrommars.learn.neighborsimportNearestNeighborsdf=md.read_csv('data.csv')#输入是CSV文件 , 包含20万个向量 , 每个向量10个元素nn=NearestNeighbors(n_neighbors=10)nn.fit(df)#这里fit的时候也会整体触发执行 , 因此机器学习的高层接口都是立即执行的neighbors=nn.kneighbors(df).fetch()#kneighbors也已经触发执行 , 只需要fetch数据这里要注意的是 , 对于机器学习的fit、predict等高层接口 , MarsLearn也会立即触发执行 , 以保证语义的正确性 。
RAPIDS:GPU上的数据科学相信细心的观众已经发现 , GPU好像没有被提到 。 不要着急 , 这就要说到RAPIDS 。
在之前 , 虽然CUDA已经将GPU编程的门槛降到相当低的一个程度了 , 但对于数据科学家们来说 , 在GPU上处理Numpy、pandas等能处理的数据无异于天方夜谭 。 幸运的是 , NVIDIA开源了RAPIDS数据科学平台 , 它和Mars的部分思想高度一致 , 即使用简单的import替换 , 就可以将Numpy、pandas和scikit-learn的代码移植到GPU上 。

文章图片
其中 , RAPIDScuDF用来加速pandas , 而RAPIDScuML用来加速scikit-learn 。
对于Numpy来说 , CuPy已经很好地支持用GPU来加速了 , 这样RAPIDS也得以把重心放在数据科学的其他部分 。
CuPy:用GPU加速Numpy还是蒙特卡洛求解Pi 。
importcupyascpN=10**7data=https://pcff.toutiao.jxnews.com.cn/p/20200416/cp.random.uniform(-1,1,size=(N,2))inside=(cp.sqrt((data**2).sum(axis=1))<1).sum()pi=4*inside/Nprint('pi:%.5f'%pi)在我的测试中 , 它将CPU的757ms , 降到只有36ms , 提升超过20倍 , 可以说效果非常显著 。 这正是得益于GPU非常适合计算密集型的任务 。
RAPIDScuDF:用GPU加速pandas将importpandasaspd替换成importcudf , GPU内部如何并行 , CUDA编程这些概念 , 用户都不再需要关心 。
importcudfratings=cudf.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']})运行时间从CPU上的18s提升到GPU上的1.66s , 提升超过10倍 。
RAPIDScuML:用GPU加速scikit-learn同样是k-最邻近问题 。
importcudffromcuml.neighborsimportNearestNeighborsdf=cudf.read_csv('data.csv')nn=NearestNeighbors(n_neighbors=10)nn.fit(df)neighbors=nn.kneighbors(df)运行时间从CPU上1min52s , 提升到GPU上17.8s 。
Mars和RAPIDS结合能带来什么?RAPIDS将Python数据科学带到了GPU , 极大地提升了数据科学的运行效率 。 它们和Numpy等一样 , 是命令式的 。 通过和Mars结合 , 中间过程将会使用更少的内存 , 这使得数据处理量更大;Mars也可以将计算分散到多机多卡 , 以提升数据规模和计算效率 。
在Mars里使用GPU也很简单 , 只需要在对应函数上指定gpu=True 。 例如创建tensor、读取CSV文件等都适用 。
importmars.tensorasmtimportmars.dataframeasmda=mt.random.uniform(-1,1,size=(1000,1000),gpu=True)df=md.read_csv('ml-20m/ratings.csv',gpu=True)下图是用Mars分别在Scaleup和Scaleout两个维度上加速蒙特卡洛计算Pi这个任务 。 一般来说 , 我们要加速一个数据科学任务 , 可以有这两种方式 , Scaleup是指可以使用更好的硬件 , 比如用更好的CPU、更大的内存、使用GPU替代CPU等;Scaleout就是指用更多的机器 , 用分布式的方式提升效率 。

文章图片
可以看到在一台24核的机器上 , Mars计算需要25.8s , 而通过分布式的方式 , 使用4台24核的机器的机器几乎以线性的时间提升 。 而通过使用一个NVIDIATESLAV100显卡 , 我们就能将单机的运行时间提升到3.98s , 这已经超越了4台CPU机器的性能 。 通过再将单卡拓展到多卡 , 时间进一步降低 , 但这里也可以看到 , 时间上很难再线性扩展了 , 这是因为GPU的运行速度提升巨大 , 这个时候网络、数据拷贝等的开销就变得明显 。
性能测试我们使用了https://github.com/h2oai/db-benchmark的数据集 , 测试了三个数据规模的groupby和一个数据规模的join 。 而我们主要对比了pandas和DASK 。 DASK和Mars的初衷很类似 , 也是试图并行和分布式化Python数据科学 , 但它们的设计、实现、分布式都存在较多差异 , 这个后续我们再撰文进行详细对比 。
测试机器配置是500G内存、96核、NVIDIAV100显卡 。 Mars和DASK在GPU上都使用RAPIDS执行计算 。
Groupby数据有三个规模 , 分别是500M、5G和20G 。
查询也有三组 。
查询一df=read_csv('data.csv')df.groupby('id1').agg({'v1':'sum'})查询二df=read_csv('data.csv')df.groupby(['id1','id2']).agg({'v1':'sum'})查询三df=read_csv('data.csv')df.gropuby(['id6']).agg({'v1':'sum','v2':'sum','v3':'sum'})数据大小500M , 性能结果
文章图片
数据大小5G , 性能结果【战“疫”期,阿里云云效团队在家高效开发实录】
文章图片
数据大小20G , 性能结果
文章图片
数据大小到20G时 , pandas在查询2会内存溢出 , 得不出结果 。
可以看到 , 随着数据增加 , Mars的性能优势会愈发明显 。
得益于GPU的计算能力 , GPU运算性能相比于CPU都有数倍的提升 。 如果单纯使用RAPIDScuDF , 由于显存大小的限制 , 数据来到5G都难以完成 , 而由于Mars的声明式的特点 , 中间过程对显存的使用大幅得到优化 , 所以整组测试来到20G都能轻松完成 。 这正是Mars+RAPIDS所能发挥的威力 。
Join测试查询:
x=read_csv('x.csv')y=read_csv('y.csv')x.merge(y,on='id1')测试数据x为500M , y包含10行数据 。

文章图片
总结RAPIDS将Python数据科学带到了GPU , 极大提升了数据分析和处理的效率 。 Mars的注意力更多放在并行和分布式 。 相信这两者的结合 , 在未来会有更多的想象空间 。
推荐阅读
- 百亿富豪遇“麻烦”,相中老牌百货,举牌后遭警示,恐添变数?
- 通达信精选指标:价位时空主图,画线战法是技术派的核心内功
- 疫情冲击经济,第一个“破产”的国家出现!今年5次调查自华产品
- 美国用“核试验”来恫吓中国“核裁军”,那是赤裸裸的核讹诈
- 颠覆未来战场?美军成功测试新武器,但中国早用来砍树了
- “泡芙消极事件”引关注,Curse:大家都不ping人,除非特别恶心
- 云顶日报0516 史上最“短命”套路?炸弹人惨遭热修
- 三国杀:卡牌是否应该“界限突破”?且看这版「界闪电」设计如何
- 讲解bjl下三路打法原理及个人实战技巧心得经验分享【许祥孜】
- 这3个星座爱得忠诚,弱水三千只取一瓢,用情至深断绝“暧昧”
