为了进行并行处理,效处我们将任务划分为子单元。文件它增加了程序处理的效处作业数量,减少了整体处理时间。文件
例如,效处如果你正在处理一个大的文件CSV文件,你想修改一个单列。效处我们将把数据以数组的文件形式输入函数,它将根据可用的效处进程数量,一次并行处理多个值。文件这些进程是效处基于你的处理器内核的数量。
在这篇文章中,文件我们将学习如何使用multiprocessing、效处joblib和tqdm Python包减少大文件的文件处理时间。这是效处一个简单的教程,服务器托管可以适用于任何文件、数据库、图像、视频和音频。
开始我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。
https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。
# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion
import pandas as pd
# Text Processing
import re
from nltk.corpus import stopwords
import string在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。
n_workers = 2 * mp.cpu_count()
print(f"{ n_workers} workers are available")
>>> 8 workers are available下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。
%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)
print(f"Shape:{ df.shape}\n\nColumn Names:\n{ df.columns}\n")输出:
Shape:(2845342, 47)
Column Names:
Index([ID, Severity, Start_Time, End_Time, Start_Lat, Start_Lng,
End_Lat, End_Lng, Distance(mi), Description, Number, Street,
Side, City, County, State, Zipcode, Country, Timezone,
Airport_Code, Weather_Timestamp, Temperature(F), Wind_Chill(F),
Humidity(%), Pressure(in), Visibility(mi), Wind_Direction,
Wind_Speed(mph), Precipitation(in), Weather_Condition, Amenity,
Bump, Crossing, Give_Way, Junction, No_Exit, Railway,
Roundabout, Station, Stop, Traffic_Calming, Traffic_Signal,
Turning_Loop, Sunrise_Sunset, Civil_Twilight, Nautical_Twilight,
Astronomical_Twilight],
dtype=object)
CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s处理文本clean_text是云服务器一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。
def clean_text(text):
# Remove stop words
stops = stopwords.words("english")
text = " ".join([word for word in text.split() if word
not in stops])
# Remove Special Characters
text = text.translate(str.maketrans(, , string.punctuation))
# removing the extra spaces
text = re.sub( +, , text)
return text串行处理对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。
我们将处理280万条记录,并将结果保存回 “Description” 列中。
%%time
tqdm.pandas()
df[Description] = df[Description].progress_apply(clean_text)输出高端处理器串行处理280万行花了9分5秒。
100%云南idc服务商