域名

用 Python 高效处理大文件

时间:2010-12-5 17:23:32  作者:数据库   来源:人工智能  查看:  评论:0
内容摘要:为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程

为了进行并行处理,效处我们将任务划分为子单元。文件它增加了程序处理的效处作业数量,减少了整体处理时间。文件

例如,效处如果你正在处理一个大的文件CSV文件,你想修改一个单列。效处我们将把数据以数组的文件形式输入函数,它将根据可用的效处进程数量,一次并行处理多个值。文件这些进程是效处基于你的处理器内核的数量。

在这篇文章中,文件我们将学习如何使用multiprocessing、效处joblib和tqdm Python包减少大文件的文件处理时间。这是效处一个简单的教程,服务器托管可以适用于任何文件、数据库、图像、视频和音频。

开始

我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。

# Parallel Computing

import multiprocessing as mp

from joblib import Parallel, delayed

from tqdm.notebook import tqdm

# Data Ingestion

import pandas as pd

# Text Processing

import re

from nltk.corpus import stopwords

import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。

n_workers = 2 * mp.cpu_count()

print(f"{ n_workers} workers are available")

>>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

%%time

file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"

df = pd.read_csv(file_name)

print(f"Shape:{ df.shape}\n\nColumn Names:\n{ df.columns}\n")

输出:

Shape:(2845342, 47)

Column Names:

Index([ID, Severity, Start_Time, End_Time, Start_Lat, Start_Lng,

End_Lat, End_Lng, Distance(mi), Description, Number, Street,

Side, City, County, State, Zipcode, Country, Timezone,

Airport_Code, Weather_Timestamp, Temperature(F), Wind_Chill(F),

Humidity(%), Pressure(in), Visibility(mi), Wind_Direction,

Wind_Speed(mph), Precipitation(in), Weather_Condition, Amenity,

Bump, Crossing, Give_Way, Junction, No_Exit, Railway,

Roundabout, Station, Stop, Traffic_Calming, Traffic_Signal,

Turning_Loop, Sunrise_Sunset, Civil_Twilight, Nautical_Twilight,

Astronomical_Twilight],

dtype=object)

CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s

Wall time: 46.9 s处理文本

clean_text是云服务器一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。

def clean_text(text):

# Remove stop words

stops = stopwords.words("english")

text = " ".join([word for word in text.split() if word

not in stops])

# Remove Special Characters

text = text.translate(str.maketrans(, , string.punctuation))

# removing the extra spaces

text = re.sub( +, , text)

return text串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。

%%time

tqdm.pandas()

df[Description] = df[Description].progress_apply(clean_text)输出

高端处理器串行处理280万行花了9分5秒。

100%云南idc服务商
copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap