Faster Data Loading for Pandas on S3

Dataset and Test Scenario Introduction

project url                                       count bytes
aa %CE%92%CE%84_%CE%95%CF%80%CE%B9%CF%83%CF… 1 4854
aa %CE%98%CE%B5%CF%8C%CE%B4%CF%89%CF%81%CE… 1 4917
aa %CE%9C%CF%89%CE%AC%CE%BC%CE%B5%CE%B8_%CE… 1 4832
aa %CE%A0%CE%B9%CE%B5%CF%81_%CE%9B%27_%CE%91…1 4828
aa %CE%A3%CE%A4%CE%84_%CE%A3%CF%84%CE%B1%CF… 1 4819
pip install boto3 pandas s3fs
import pandas as pdENDPOINT_URL="http://10.62.64.200"
storage_opts = {'client_kwargs': {'endpoint_url': ENDPOINT_URL}}
df = pd.read_csv("s3://" + BUCKETPATH, storage_options=storage_opts)

Step 1, Using Parallelization

Details on How to Run Each Test

pip install boto3 dask dask[distributed] pandas s3fs
import dask.dataframe as ddENDPOINT_URL="http://10.62.64.200"
storage_opts = {'client_kwargs': {'endpoint_url': ENDPOINT_URL}}
ddf = dd.read_csv("s3://"+BUCKETPATH, storage_options=storage_opts)
df = ddf.compute(scheduler='processes')
wget -N https://github.com/kahing/goofys/releases/latest/download/goofys
chmod a+x goofys
sudo mkdir -p /mnt/fuse_goofys && sudo chown $USER /mnt/fuse_goofys./goofys --endpoint=http://$FLASHBLADE_IP $BUCKETNAME /mnt/fuse_goofys
apt install -y openjdk-11-jdk
pip install pandas pyspark
spark-submit \
--packages org.apache.hadoop:hadoop-aws:3.2.2 \
--conf spark.hadoop.fs.s3a.endpoint=10.62.64.200 \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
import pyspark.pandas as pspdf = ps.read_csv("s3a://" + BUCKETPATH)with pdf.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as df:
print(df.count())
pip install pandas pyarrow
import pyarrow.dataset as dsENDPOINT = "10.62.64.200"
fbs3 = pyarrow.fs.S3FileSystem(access_key=ACCESS_KEY, secret_key=SECRET_KEY, endpoint_override=ENDPOINT, scheme='http')
dataset = ds.dataset(BUCKETPATH, filesystem=fbs3)
df = dataset.to_table().to_pandas()

Is Storage the Bottleneck?

AWS Error [code 99]: curlCode: 28, Timeout was reached

Step 2, Beyond Pandas

# pyspark
c = df.filter(df["count"] == 1).count()
# pyarrow
tab = dataset.scanner(filter=ds.field("count") == 1).to_table()
# dask
c = len(df[df['count'] == 1].index)

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store