A Presto Data Pipeline with S3

The Application: Tracking Filesystem Metadata

Presto+Hive Concept 1: External Tables

> cat people.json
{“name”:”Michael”}
{“name”:”Julia”, “age”:30}
{“name”:”Melissa”, “age”:19}
> s5cmd cp people.json s3://joshuarobinson/people.json/1
CREATE TABLE people (name varchar, age int) WITH (format = ‘json’, external_location = ‘s3a://joshuarobinson/people.json/’);
presto:default> SELECT * FROM people;
name | age
— — — — -+ — — —
Michael | NULL
Andy | 30
Justin | 19
(3 rows)

Presto+Hive Concept 2: Partitioned Tables

bucketname/people.json/school=central/1
bucketname/people.json/school=west/1
bucketname/people.json/school=west/2
CREATE TABLE people (name varchar, age int, school varchar) WITH (format = ‘json’, external_location = ‘s3a://joshuarobinson/people.json/’, partitioned_by=ARRAY[‘school’] );
CALL system.sync_partition_metadata(schema_name=>’default’, table_name=>’people’, mode=>’FULL’);
presto:default> SELECT * FROM people;
name | age | school
— — — -+ — — — + — — — — -
Andy | 30 | west
Justin | 19 | west
Michael | NULL | central
(3 rows)

Pipeline Implementation

Data Collector

{“dirid”: 3, “fileid”: 54043195528445954, “filetype”: 40000, “mode”: 755, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 0, “atime”: 1584074484, “mtime”: 1584074484, “ctime”: 1584074484, “path”: “\/mnt\/irp210\/ravi”}
{“dirid”: 3, “fileid”: 13510798882114014, “filetype”: 40000, “mode”: 777, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 0, “atime”: 1568831459, “mtime”: 1568831459, “ctime”: 1568831459, “path”: “\/mnt\/irp210\/ivan”}
pls --ipaddr $IPADDR --export /$EXPORTNAME -R --json > /$TODAY.jsons5cmd --endpoint-url http://$S3_ENDPOINT:80 -uw 32 mv /$TODAY.json s3://joshuarobinson/acadia_pls/raw/$TODAY/ds=$TODAY/data

The Destination: Presto Data Warehouse

> CREATE SCHEMA IF NOT EXISTS hive.pls WITH (location = 's3a://joshuarobinson/warehouse/pls/');
> CREATE TABLE IF NOT EXISTS pls.acadia (atime bigint, ctime bigint, dirid bigint, fileid decimal(20), filetype bigint, gid varchar, mode bigint, mtime bigint, nlink bigint, path varchar, size bigint, uid varchar, ds date) WITH (format='parquet', partitioned_by=ARRAY['ds']);

ETL Logic: Ingest via External Table on S3

1> CREATE TABLE IF NOT EXISTS $TBLNAME (atime bigint, ctime bigint, dirid bigint, fileid decimal(20), filetype bigint, gid varchar, mode bigint, mtime bigint, nlink bigint, path varchar, size bigint, uid varchar, ds date) WITH (format='json', partitioned_by=ARRAY['ds'], external_location='s3a://joshuarobinson/pls/raw/$src/');2> CALL system.sync_partition_metadata(schema_name=>'default', table_name=>'$TBLNAME', mode=>'FULL');3> INSERT INTO pls.acadia SELECT * FROM $TBLNAME;4> DROP TABLE $TBLNAME;

Using the Warehouse

presto:default> SELECT ds, COUNT(*) AS filecount, SUM(size)/(1024*1024*1024) AS size_gb FROM pls.acadia GROUP BY ds ORDER BY ds;day        | filecount | size_gb
— — — — — -+ — — — — — + — — — — -
2019–10–05 | 417905229 | 44354
2019–10–06 | 417905350 | 44467
2019–10–07 | 417905603 | 44586
2019–10–08 | 712302359 | 45610
2019–10–20 | 377790894 | 47421
2019–10–28 | 482777169 | 50193
2019–10–29 | 482552818 | 50275
presto:default> SELECT COUNT (DISTINCT uid) as active_users FROM pls.acadia WHERE ds > date_add('day', -7, now());active_users
— — — — — — —
16

Even More: Combining Presto and Spark

df = spark.read.parquet(“s3a://joshuarobinson/warehouse/pls/acadia/”)df.printSchema()
root
| — atime: long (nullable = true)
| — ctime: long (nullable = true)
| — dirid: long (nullable = true)
| — fileid: decimal(20,0) (nullable = true)
| — filetype: long (nullable = true)
| — gid: string (nullable = true)
| — mode: long (nullable = true)
| — mtime: long (nullable = true)
| — nlink: long (nullable = true)
| — path: string (nullable = true)
| — size: long (nullable = true)
| — uid: string (nullable = true)
| — ds: date (nullable = true)

Summary

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store