Concurrent Programming Case Study: S3 Metadata Requests

  • The performance gain (requests/sec) of Go and Rust relative to Python.
  • How to use FlashBlade as an S3 endpoint when coding in Python, Go, or Rust.
  • Basic concurrency patterns in these languages.

Results

Code Walkthroughs

Python Baseline

pip install boto3
import boto3FB_DATAVIP='10.62.64.200'
s3 = boto3.resource('s3', endpoint_url='http://' + FB_DATAVIP)
while True:
objlist = s3.meta.client.list_objects_v2(**kwargs)
keys = [o['Key'] for o in objlist.get('Contents', [])]
for key in keys:
check_for_custom_metadata(s3, bucketname, key)

try:
kwargs['ContinuationToken'] = objlist['NextContinuationToken']
except KeyError:
break
def check_for_custom_metadata(s3, bucketname, key):
response = s3.meta.client.head_object(Bucket=bucketname, Key=key)
if response['Metadata']:
print("{} {}".format(key, response['Metadata']))

Python Multiprocessing

import multiprocessingp = multiprocessing.pool.ThreadPool(multiprocessing.cpu_count())… # same listing logic as above
keys = [o['Key'] for o in objlist.get('Contents', [])]
# start HEAD operations asynchronously
for k in keys:
p.apply_async(check_for_custom_metadata, (s3, bucketname, k))
# make a per process s3_client
s3_client = None
def initialize():
global s3_client
s3_client = boto3.resource('s3', \
aws_access_key_id=AWS_KEY, \
aws_secret_access_key=AWS_SECRET, \
use_ssl=False, endpoint_url='http://' + FB_DATAVIP)
import multiprocessingp = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)… # Listing logic repeated here
keys = [o['Key'] for o in objlist.get('Contents', [])]
# Start async HEAD ops and then continue listing.
for k in keys:
p.apply_async(check_for_custom_metadata, (bucketname, k))
p.close()
p.join()

Go: Goroutines and Channels

Connecting to FlashBlade S3

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
endpointUrl := "http://10.62.64.200"s3Config := &aws.Config{
Region: aws.String("us-east-1"), // ignored
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
HTTPClient: httpClient,
}
if endpointUrl != "" {
s3Config.Endpoint = &endpointUrl
}
sess := session.Must(session.NewSession(s3Config))
svc := s3.New(sess)

Concurrency Logic

func listToChannelAndClose(svc *s3.S3, bucketname *string, pfix string, channel chan string) {  err := svc.ListObjectsPages(&s3.ListObjectsInput{
Bucket: bucketname,
Prefix: &pfix,
}, func(p *s3.ListObjectsOutput, _ bool) (shouldContinue bool) {
for _, v := range p.Contents {
channel <- *v.Key
}
return true
})
reportAWSError(err)
close(channel)
}
channel := make(chan string, 2048)
go listToChannelAndClose(svc, &bucketname, prefix, channel)
var wg sync.WaitGroup
workerFunc := func() {
defer wg.Done()
for k := range channel {
input := &s3.HeadObjectInput{
Bucket: &bucketname,
Key: &k,
}
res, err := svc.HeadObject(input)
reportAWSError(err)
if len(res.Metadata) > 0 {
printMetadata(k, res.Metadata)
}
}
}
workerCount := 2 * runtime.NumCPU()wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go workerFunc()
}
wg.Wait()

Sidebar: TCP Connection Reuse

Rust: Tokio

let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
let n = f.read(&mut buffer[..]).await?;

Connecting to FlashBlade S3

[dependencies]
aws-config = “0.6.0”
aws-sdk-s3 = “0.6.0”
aws-endpoint = “0.6.0”
tokio = { version = “1”, features = [“full”] }
http = “0.2”
futures = “0.3”
awaitgroup = “0.6”
let endpoint = "http://10.62.64.200";
let region = Region::new("us-west-2"); // Value ignored
// load S3 access and secret keys from environment variables
let conf = aws_config::load_from_env().await;
let ep = Endpoint::immutable(Uri::from_static(endpoint));let s3_conf = aws_sdk_s3::config::Builder::from(&conf)
.endpoint_resolver(ep)
.region(region)
.build();
let client = Client::from_conf(s3_conf);

Concurrency Logic

let mut wg = WaitGroup::new();
let mut continuation_token = String::from("");
loop {
let resp = client.list_objects_v2().bucket(bucket).prefix(prefix).continuation_token(continuation_token).send().await?;
for object in resp.contents().unwrap_or_default() {
let key = object.key().unwrap_or_default();
… // start new task here
}
if resp.is_truncated() {
continuation_token = resp.next_continuation_token().unwrap().to_string();
} else {
break;
}
}

let req = client.head_object().bucket(bucket).key(key).send();
let worker = wg.worker();
tokio::spawn(async move {
let resp = match req.await {
Ok(r) => r,
Err(e) => panic!("HeadObject Error: {:?}", e),
};
let info = resp.metadata().unwrap();
for (key, value) in info.iter() {
println!("{}: {}", key, value);
}
worker.done();
});
  wg.wait().await;
Ok(())
}

Summary

--

--

--

Data science, software engineering, hacking

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

PyMC3 and Bayesian inference for Parameter Uncertainty Quantification Towards Non-Linear Models…

Happy boss — Happy developer? Or the other way around?

Laying Bricks vs. Building Cathedrals?

Solving pyproj install error — ERROR: PROJ_DIR dir not found. Please set PROJ_DIR

Fixing Docker Hub rate limit issue with ECR source image

Why we’re backing Metaflow

jobSort() Story

Microservices plumbing

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Joshua Robinson

Joshua Robinson

Data science, software engineering, hacking

More from Medium

Introducing Credmark’s Senior Data Engineer

Taking a FastAPI App to Production on AWS

PyTorch Inference Server on GKE

Data Warehouse on Kubernetes