Concurrent Programming Case Study: S3 Metadata Requests

Results

Code Walkthroughs

Python Baseline

pip install boto3
import boto3FB_DATAVIP='10.62.64.200'
s3 = boto3.resource('s3', endpoint_url='http://' + FB_DATAVIP)
while True:
objlist = s3.meta.client.list_objects_v2(**kwargs)
keys = [o['Key'] for o in objlist.get('Contents', [])]
for key in keys:
check_for_custom_metadata(s3, bucketname, key)

try:
kwargs['ContinuationToken'] = objlist['NextContinuationToken']
except KeyError:
break
def check_for_custom_metadata(s3, bucketname, key):
response = s3.meta.client.head_object(Bucket=bucketname, Key=key)
if response['Metadata']:
print("{} {}".format(key, response['Metadata']))

Python Multiprocessing

import multiprocessingp = multiprocessing.pool.ThreadPool(multiprocessing.cpu_count())… # same listing logic as above
keys = [o['Key'] for o in objlist.get('Contents', [])]
# start HEAD operations asynchronously
for k in keys:
p.apply_async(check_for_custom_metadata, (s3, bucketname, k))
# make a per process s3_client
s3_client = None
def initialize():
global s3_client
s3_client = boto3.resource('s3', \
aws_access_key_id=AWS_KEY, \
aws_secret_access_key=AWS_SECRET, \
use_ssl=False, endpoint_url='http://' + FB_DATAVIP)
import multiprocessingp = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)… # Listing logic repeated here
keys = [o['Key'] for o in objlist.get('Contents', [])]
# Start async HEAD ops and then continue listing.
for k in keys:
p.apply_async(check_for_custom_metadata, (bucketname, k))
p.close()
p.join()

Go: Goroutines and Channels

“A goroutine is a lightweight thread of execution.”

Channels are the pipes that connect concurrent goroutines.”

Connecting to FlashBlade S3

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
endpointUrl := "http://10.62.64.200"s3Config := &aws.Config{
Region: aws.String("us-east-1"), // ignored
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
HTTPClient: httpClient,
}
if endpointUrl != "" {
s3Config.Endpoint = &endpointUrl
}
sess := session.Must(session.NewSession(s3Config))
svc := s3.New(sess)

Concurrency Logic

func listToChannelAndClose(svc *s3.S3, bucketname *string, pfix string, channel chan string) {  err := svc.ListObjectsPages(&s3.ListObjectsInput{
Bucket: bucketname,
Prefix: &pfix,
}, func(p *s3.ListObjectsOutput, _ bool) (shouldContinue bool) {
for _, v := range p.Contents {
channel <- *v.Key
}
return true
})
reportAWSError(err)
close(channel)
}
channel := make(chan string, 2048)
go listToChannelAndClose(svc, &bucketname, prefix, channel)
var wg sync.WaitGroup
workerFunc := func() {
defer wg.Done()
for k := range channel {
input := &s3.HeadObjectInput{
Bucket: &bucketname,
Key: &k,
}
res, err := svc.HeadObject(input)
reportAWSError(err)
if len(res.Metadata) > 0 {
printMetadata(k, res.Metadata)
}
}
}
workerCount := 2 * runtime.NumCPU()wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go workerFunc()
}
wg.Wait()

Sidebar: TCP Connection Reuse

Rust: Tokio

let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
let n = f.read(&mut buffer[..]).await?;

Connecting to FlashBlade S3

[dependencies]
aws-config = “0.6.0”
aws-sdk-s3 = “0.6.0”
aws-endpoint = “0.6.0”
tokio = { version = “1”, features = [“full”] }
http = “0.2”
futures = “0.3”
awaitgroup = “0.6”
let endpoint = "http://10.62.64.200";
let region = Region::new("us-west-2"); // Value ignored
// load S3 access and secret keys from environment variables
let conf = aws_config::load_from_env().await;
let ep = Endpoint::immutable(Uri::from_static(endpoint));let s3_conf = aws_sdk_s3::config::Builder::from(&conf)
.endpoint_resolver(ep)
.region(region)
.build();
let client = Client::from_conf(s3_conf);

Concurrency Logic

let mut wg = WaitGroup::new();
let mut continuation_token = String::from("");
loop {
let resp = client.list_objects_v2().bucket(bucket).prefix(prefix).continuation_token(continuation_token).send().await?;
for object in resp.contents().unwrap_or_default() {
let key = object.key().unwrap_or_default();
… // start new task here
}
if resp.is_truncated() {
continuation_token = resp.next_continuation_token().unwrap().to_string();
} else {
break;
}
}

let req = client.head_object().bucket(bucket).key(key).send();
let worker = wg.worker();
tokio::spawn(async move {
let resp = match req.await {
Ok(r) => r,
Err(e) => panic!("HeadObject Error: {:?}", e),
};
let info = resp.metadata().unwrap();
for (key, value) in info.iter() {
println!("{}: {}", key, value);
}
worker.done();
});
  wg.wait().await;
Ok(())
}

Summary

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store