pandas分块读取
reader=pd.read_csv("test.csv",encoding="gbk",chunksize=5000,names=["UID","Date","Numb"])
chunksize设置过后reader变为迭代,在循环中不断调用next()方法。
还有一个参数iterator,当为True时候,也可以生成一个可迭代对象
reader=pd.read_csv(“test.csv”,encoding=”gbk”,iterator=True,names=[“UID”,”Date”,”Numb”])
多进程运行
1. 导入模块
from multiprocessing import freeze_support,Pool
2. 多进程调用
def SayHello():
print("Hello")
if __name__=="__main__":
freeze_support()
with Pool() as p:
for i in range(12):
p.apply_async(SayHello)
p.close()
p.join()
数据分析中的运用
计算test.csv中每个员工,在2016年的输入代码总量。点击下载数据
1. 导入库
import pandas as pd
from multiprocessing import freeze_support,Pool
import time
2. 调用函数处理数据部分
def cl(read):
read = pd.DataFrame(read)
read.Date = read.Date.apply(lambda x: pd.to_datetime(x, format="%Y-%m-%d"))
read["Year"] = read.Date.apply(lambda x: x.year)
read.drop("Date", axis=1, inplace=True)
read = read[read.Year == 2016]
read.drop("Year", inplace=True, axis=1)
c = read.groupby(by="UID", axis=0).sum()
print c.head()
c.sort_values(by="Numb", ascending=False, inplace=True)
c.to_csv("temp.csv", encoding="gbk", mode="a+", header=None)
3. 主体代码
if __name__=="__main__":
st=time.time()
p=Pool()
reader=pd.read_csv("test.csv",encoding="gbk",chunksize=5000,names=["UID","Date","Numb"])
f=open("temp.csv","w")
f.close()
for read in reader:
p.apply_async(cl,(read,))
p.close()
p.join()
datare=pd.read_csv("temp.csv",encoding="gbk",names=["UID","Numb"])
datare=datare.groupby(by="UID").sum()
datare.sort_values(by="Numb",ascending=False,inplace=True)
datare=datare.head(5)
datare.to_csv("result.csv",encoding="gbk",header=None)
print("已结束")
eT=time.time()
print(eT-st)
4. 运行结果
5. 总结
- 较单线程而言,多进程可以合理分配cpu,加快程序的执行效率。
- 缩短执行用时,经过测试,程序成功的从10s的执行时长,降低到1.66s。降幅达8秒多。