Examples of Putting ECK+PSO to Work

Joshua Robinson
10 min readDec 16, 2019

--

Elasticsearch-as-a-Service on Pure by Example

Building on Elasticsearch clusters deployed with ECK and PSO, this blog will walk through three different uses for Elasticsearch in Kubernetes. My Elasticsearch clusters are backed by Elastic Cloud for Kubernetes (ECK) with storage automatically provisioned by Pure Service Orchestrator (PSO). These use-cases highlight how to take advantage of Elasticsearch clusters running in Kubernetes. Starting from simpler to more advanced, the uses are 1) Fluentd for centralized Kubernetes logging, 2) Esrally benchmarking, and 3) Rapidfile to list large directory hierarchies and upload entries through the bulk API.

The complete yaml for all of these examples are in this github repo.

Use Case 1: Collecting Kubernetes Logs with Fluentd and Elasticsearch

While working with Kubernetes, I often find myself searching on different nodes and pods for logs to help me debug . One immediate way that ECK can be useful is a central location for collecting all Kubernetes logs using Fluentd and Elasticsearch. Instead of looking across many different components for log files, now I use a Kibana dashboard to see all my Kubernetes-related logs in one place.

There are generic great tutorials for how to setup FluentD with Elasticsearch, so I will focus specifically on using fluentd and ECK together. You can find on github the full yaml files for the elasticsearch cluster and fluentd daemonset.

With ECK and PSO, setting up a Fluentd log collector is as simple as creating the fluentd daemonset with the necessary environment variables to connect to your Elasticsearch cluster. Fluentd connects to an existing ECK cluster with the password injected into an environment variable via secretKeyRef. The below were the necessary settings for the fluentd daemonset:

- name: FLUENT_ELASTICSEARCH_HOST
value: “elastic-internal-es-http”
- name: FLUENT_ELASTICSEARCH_PORT
value: “9200”
- name: FLUENT_ELASTICSEARCH_SCHEME
value: “https”
- name: FLUENT_ELASTICSEARCH_SSL_VERIFY
value: “false”
- name: FLUENT_ELASTICSEARCH_SSL_VERSION
value: “TLSv1_2”
- name: FLUENT_ELASTICSEARCH_USER
value: “elastic”
- name: FLUENT_ELASTICSEARCH_PASSWORD
valueFrom:
secretKeyRef:
name: elastic-internal-es-elastic-user
key: elastic

Most of the above settings are straightforward, except FLUENT_ELASTICSEARCH_SSL_VERSION, which was required because the default SSL version used by Fluentd is rejected by Elasticsearch as insecure.

Then simply deploy the fluentd daemonset. Note that this requires the Elasticsearch cluster to be in the same namespace as fluentd, otherwise the password secret is not visible.

To view the indices created by Fluentd, I leverage a standalone pod as a terminal, though there are other options to expose the service externally. I use the fully-qualified name of my service (including namespace) because I am accessing from a different namespace as the fluentd Elasticsearch cluster:

> kubectl exec -it pod/terminal -- curl -u “elastic:$PASSWORD” -k “https://elastic-internal-es-http.kube-system:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open logstash-2019.11.12 bEhCOp0USeyXriNNPDFLnQ 1 1 2133199 0 1.8gb 970.6mb
green open logstash-2019.10.21 JYmmHE-EQeKDfaImkWh3Tg 1 1 3429081 0 2.1gb 1gb

To view and analyze the indexed logs, I connect to the Kibana instance by forwarding connections as follows:

> kubectl port-forward --address 0.0.0.0 -n kube-system service/elastic-internal-kb-http 5601

Here again there are more robust solutions for accessing the Kibana UI from outside Kubernetes, i.e., the browser on your laptop, using a load balancer service.

Login to the Kibana UI using the ‘elastic’ username and associated password retrieved from a Kubernetes secret:

> echo $(kubectl get secret -n kube-system elastic-internal-es-elastic-user -o=jsonpath=’{.data.elastic}’ | base64 --decode)

Once in Kibana, create an index pattern for the fluentd data so that it can be easily searched. By default, Fluentd creates one index per day following the template “logstash-YYYY-MM-DD”; the index pattern “logstash-*” makes searching across all time straightforward. Another advantage of creating one index per day is that it enables flexible lifecycle rules as well, for example automatically deleting indices older than 60 days.

Then select the timestamp field for time-based filtering:

Next time your complicated, distributed Kubernetes application does not work as expected, you can use the single Kibana dashboard to navigate through the log messages across multiple components. A key benefit of this centralized logging is the ability to filter using the timestamp field synchronized across multiple, independent components.

The size of this Elastic cluster is only about 1–2 GB per day of stored indices for my 8-node Kubernetes cluster. A cluster this small can easily fit alongside other, larger production clusters. Normally, each cluster incurs extra burden of provisioning and management, regardless of size. But using the Elastic operator ECK makes it easy to spin up variously-sized Elastic cluster as needed; creating a flexible Elastic-as-a-service instead of having to purpose-build and manually manage DAS clusters.

Use Case 2: Benchmarking ECK with Esrally

Elastic’s benchmarking tool for Elasticsearch clusters, Esrally, makes it very easy to test various settings, configurations, and workloads. I run esrally against ECK clusters by using a Kubernetes job, running the esrally load generator also in Kubernetes. My job configuration is based on the docker image directly distributed by Elastic (“elastic/rally”) with no extra customization. I leveraged the simple Docker instructions to build my Kubernetes esrally job.

The rally process downloads the source data before running a benchmark track. To avoid re-downloading large datasets to a pod’s local storage for each run, I instead use a PVC that can be shared across jobs to store the track data. This PVC will be attached to the job and the source data downloaded on the first run of each track only. Because the PersistentVolume outlives each benchmark run, the data can be reused for all subsequent runs.

I store the PVC definition for rally data as a separate file to reduce the chance of accidental deletion:

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: esrally-claim
spec:
storageClassName: pure-file
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2Ti

The batch job definition for running a rally benchmark has several components. First, I use a configmap to store json configuration data for the track and which will be mounted in the job container as a json file.

kind: ConfigMap
apiVersion: v1
metadata:
name: esrally-trackparams-cfg
data:
params.json: |
{
"number_of_shards": 20,
"number_of_replicas": 0,
"ingest_percentage": 100,
"bulk_size": 10000,
"bulk_indexing_clients": 8,
"index_settings": { "refresh_interval":-1 }
}

In order to securely access the Elasticsearch cluster, the password is again retrieved and inserted as an environment variable using a secretKeyRef.

The configmap and secret key are combined with into a job, with the remaining configuration done in the “args” keyword to control the esrally binary. In particular, the “client-options” controls security settings and I specify the username/password as part of the target URL:

apiVersion: batch/v1
kind: Job
metadata:
name: esrally
spec:
template:
spec:
containers:
- name: esrally
image: elastic/rally
env:
- name: ELASTICPASS
valueFrom:
secretKeyRef:
name: quickstart-es-elastic-user
key: elastic
args: ['--track=nyc_taxis',
'--pipeline=benchmark-only',
'--challenge=append-no-conflicts-index-only',
'--client-options="use_ssl:true, verify_certs:false, basic_auth_user:''elastic''"',
'--track-params=/rally/params.json',
'--target-hosts= https://elastic:$(ELASTICPASS)@quickstart-es-http:9200']
volumeMounts:
- name: esrally-data
mountPath: /rally/.rally
- name: trackparams-cfg-vol
mountPath: /rally/params.json
subPath: params.json
imagePullPolicy: Always
restartPolicy: Never
volumes:
- name: esrally-data
persistentVolumeClaim:
claimName: esrally-claim
- name: trackparams-cfg-vol
configMap:
name: esrally-trackparams-cfg

Note, I also find it useful to add an initContainer to the above job to rotate log files so that each individual benchmark run has a separate log file:

initContainers:
- name: logrotate
image: ubuntu:18.04
command: [“/bin/sh”]
args: [“-c”, “mv /data/logs/rally.log /data/logs/rally.log.$(date +%s) || true”]
volumeMounts:
- name: esrally-data
mountPath: /data/

My complete yaml file for the job can be found on github here.

For subsequent runs of the job, I find it easier to use the “kubectl replace” command with force option to recreate the job and start a new run:

> kubectl replace --force -f job-esrally.yaml

Logs for previous runs can still be found by directly mounting the PVC mentioned earlier.

With the ability to run Esrally against ECK clusters, you can experiment and validate various settings to understand the performance of your infrastructure. For serious performance benchmarking, the distributed mode of esrally might be required.

Use Case 3: Filesystem Listings with Rapidfile

Tracking the directory and file hierarchy of an actively shared filesystem can be made easier and more useful with Elasticsearch. I use the Rapidfile toolkit to produce the equivalent data as a recursive “ls” operation, except highly parallelized and much faster. This data is then bulk indexed into Elasticsearch each day using a Kubernetes cronjob.

Filesystem metadata is useful in a security context in many ways, for example 1) checking for improper files/permissions, 2) understanding the blast radius of a security incident, or 3) tracking per-user or group usage statistics.

Walking a large directory structure can be very slow with traditional linux utilities due to the serial nature of utilities like “ls” or “find.” The Rapidfile toolkit solves this problem by rewriting the coreutils with concurrency in mind, resulting in an order of magnitude performance improvement. As a useful bonus, the parallelized “ls” (pls) includes a command line option to output file and directory information in json format, making it easily consumable by Elasticsearch.

Example output of “pls --json” contains the same information as “ls -l” and looks as follows:

> pls --json /mnt/acadia
{“dirid”: 2, “fileid”: 69805796216956995, “filetype”: 40000, “mode”: 775, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 0, “atime”: 1506496625, “mtime”: 1506496625, “ctime”: 1515971643, “path”: “\/mnt\/acadia\/mysql_testing”}
{“dirid”: 2, “fileid”: 18014400631903067, “filetype”: 100000, “mode”: 664, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 79802445255, “atime”: 1574891755, “mtime”: 1517266800, “ctime”: 1517266800, “path”: “\/mnt\/acadia\/documents.json”}

These json fields are immediately ready to ingest into Elasticsearch as one document per entry in the filesystem hierarchy. The output of the Rapidfile listing can be fed into Elasticsearch using the bulk API as follows:

TODAY=$(date +\"%Y-%m-%d\")
IDXNAME="plsidx-$EXPORTNAME-$TODAY"
pls --ipaddr $IPADDR --export /$EXPORTNAME --json -R | sed 's/^/{ \"index\": {} }\\n/g' > work.jsoncurl -S -XPOST -u "elastic:$ESPASSWORD" -H 'Content-Type: application/x-ndjson' -k https://quickstart-es-http:9200/$IDXNAME/_bulk --data-binary @work.json

The most non-obvious part of the above command is a special sed invocation that adds extra lines indicating instructions to Elasticsearch. The expected format by Elasticsearch is a combination of “index” commands followed by the document to be indexed on the next line:

{ “index”: {} }
{“dirid”: 3, “fileid”: 54043195528445954, “filetype”: 40000, “mode”: 755, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 0, “atime”: 1572642063, “mtime”: 1572642063, “ctime”: 1572642063, “path”: “\/mnt\/irp210\/ravi”}
{ “index”: {} }
{“dirid”: 3, “fileid”: 13510798882114014, “filetype”: 40000, “mode”: 777, “nlink”: 1, “uid”: “ir”, “gid”: “ir”, “size”: 0, “atime”: 1568831459, “mtime”: 1568831459, “ctime”: 1568831459, “path”: “\/mnt\/irp210\/ivan”}

Note also that the above command does not set the target index explicitly for each command as that is instead specified using the target REST endpoint:

https://quickstart-es-http:9200/$IDXNAME/_bulk

If the index $IDXNAME does not already exist, Elasticsearch will automatically create it using default settings.

The code shown so far works for small filesystems, but real world directory listings can often contains millions to billions of entries. As a result, a single call to the bulk API would result in an OOM with this many documents. Instead, I chunk the data using the linux “split” utility to carve up the output into smaller files.

pls --ipaddr $IPADDR --export /$EXPORTNAME -R --json | sed 's/^/{ \"index\": {} }\n/g' | split -a 5 -l 2000 — /scratch/for fname in /scratch/*; do
curl -S -s -XPOST -u "elastic:$ESPASSWORD" -H 'Content-Type: application/x-ndjson' -k https://quickstart-es-http:9200/$IDXNAME/_bulk --data-binary @$fname
rm $fname
done

Storage challenges show up in many different places; for example, the intermediate json data output from “pls” can temporarily use 10s to 100s of GBs of capacity. Instead of hoping each Kubernetes node has sufficient local storage, using a PersistentVolume backed by FlashArray or FlashBlade solves this concern. The PVC is mounted at “/scratch” and the output of “pls” is redirected there instead.

To make the indexed data even more useful, I explicitly create the index instead of relying on automatic creation with default settings. This allows me to control settings like the shard count, as well as define mappings for some of the fields. Specifically, the atime/mtime/ctime fields are auto-interpreted as numbers by Elasticsearch, when in fact we know that they are unix-epoch timestamps. The following custom mapping is able to encode this extra information about the fields:

> curl -X PUT -u “elastic:$ESPASSWORD” -k “https://quickstart-es-http:9200/$IDXNAME?pretty" -H 'Content-Type: application/json' -d '
{
"settings" : {
"number_of_shards" : 3
},
"mappings" : {
"properties" : {
"atime" : { "type" : "date", "format" : "epoch_second" },
"ctime" : { "type" : "date", "format" : "epoch_second" },
"mtime" : { "type" : "date", "format" : "epoch_second" }
}
}
}'

With these fields correctly defined as timestamps, it now becomes easier in Kibana to search on these time-based fields, for example, to create a histogram of when files were created (ctime) as below:

Finally, the full yaml file for the cronjob that wraps the above script can be found on github here. The cronjob itself is a straightforward combination of a PersistentVolumeClaim for scratch data and a Configmap to store a small script. The end result is a daily script that collects filesystem hierarchy information and ingests it into Elasticsearch with the bulk API.

Summary

These three example use cases illustrate the usefulness of Elasticsearch running in Kubernetes. Each use case may be relatively small and scattered across different teams. The ability to run multiple clusters with different sizes, configuration, and uses highlights the advantage of Elasticsearch-as-service infrastructure. Additionally, the ability to easily provision PVCs on shared storage avoids the headaches of local storage constraints on your Kubernetes nodes. Use these examples as inspiration and go start using Elasticsearch-as-a-service in Kubernetes!

--

--

Joshua Robinson
Joshua Robinson

Written by Joshua Robinson

Data science, software engineering, hacking

Responses (1)