pandas分块读取

reader=pd.read_csv("test.csv",encoding="gbk",chunksize=5000,names=["UID","Date","Numb"])
  1. chunksize设置过后reader变为迭代,在循环中不断调用next()方法。

  2. 还有一个参数iterator,当为True时候,也可以生成一个可迭代对象

    reader=pd.read_csv(“test.csv”,encoding=”gbk”,iterator=True,names=[“UID”,”Date”,”Numb”])

多进程运行

1. 导入模块

from multiprocessing import freeze_support,Pool

2. 多进程调用

def SayHello():
    print("Hello")

if __name__=="__main__":
    freeze_support()
    with Pool() as p:
        for i in range(12):
            p.apply_async(SayHello)
        p.close()
        p.join()

数据分析中的运用

计算test.csv中每个员工,在2016年的输入代码总量。点击下载数据

1. 导入库

import pandas as pd
from multiprocessing import freeze_support,Pool
import time

2. 调用函数处理数据部分

def cl(read):

    read = pd.DataFrame(read)
    read.Date = read.Date.apply(lambda x: pd.to_datetime(x, format="%Y-%m-%d"))
    read["Year"] = read.Date.apply(lambda x: x.year)
    read.drop("Date", axis=1, inplace=True)
    read = read[read.Year == 2016]
    read.drop("Year", inplace=True, axis=1)
    c = read.groupby(by="UID", axis=0).sum()
    print c.head()
    c.sort_values(by="Numb", ascending=False, inplace=True)
    c.to_csv("temp.csv", encoding="gbk", mode="a+", header=None)

3. 主体代码

if __name__=="__main__":
    st=time.time()
    p=Pool()
    reader=pd.read_csv("test.csv",encoding="gbk",chunksize=5000,names=["UID","Date","Numb"])
    f=open("temp.csv","w")
    f.close()
    for read in reader:
        p.apply_async(cl,(read,))
    p.close()
    p.join()
    datare=pd.read_csv("temp.csv",encoding="gbk",names=["UID","Numb"])
    datare=datare.groupby(by="UID").sum()
    datare.sort_values(by="Numb",ascending=False,inplace=True)
    datare=datare.head(5)
    datare.to_csv("result.csv",encoding="gbk",header=None)
    print("已结束")
    eT=time.time()
    print(eT-st)

4. 运行结果

5. 总结

  1. 较单线程而言,多进程可以合理分配cpu,加快程序的执行效率。
  2. 缩短执行用时,经过测试,程序成功的从10s的执行时长,降低到1.66s。降幅达8秒多。