Elasticsearch: A Short Introduction to Ingest Pipelines

Image by Gerd Altmann from Pixabay

While indexing data we need to apply some changes that are common to all the documents. We can achieve that with some custom logic while indexing the documents. However, that can take more efforts and can be error prone. Elasticsearch also provide an inbuilt functionality called Ingest Pipelines to achieve some of the transformation.

Introduction

Ingest pipelines provide mean to apply some common transformations before indexing the document. e.g If we want to set a field called writeTime in each of the document or converting a field to uppercase.

Prerequisite

The minimum requirement to use ingest pipeline is at least one of the nodes in the cluster must be an ingest node. For more details please follow this.

Ingest Pipeline Structure

The following block shows the structure of an ingest pipeline. The description is optional and then we can define a list of one or more processors. A processor is configurable task. 

{
  "description" : "This is optional",
  "processors": [
    {
      //processor 1
    },
    {
     //processor 2
    }
    //
  ]
}

These processors run sequentially. 

Use case

We need to add a field called writeTime in each document of an index in the Elasticsearch. The following pipeline achieves the same:

PUT _ingest/pipeline/write_time_pipeline
{
  "description": "Pipeline to add writeTime in the documents",
  "processors": [
    {
      "set": {
        "field": "writeTime",
        "value": "{{_ingest.timestamp}}"
      }
    }
}

Once this is done we can use this pipeline in index request as shown below:

POST my-events/_doc?pipeline=write_time_pipeline
{
  "reason": "foo"
}

If you are using Java High Level Rest Client then we can use as following:

final Map<Sring, Object> source = new HashMap<>();
source.put("reason","foo")
IndexRequest indexRequest = new IndexRequest("my-events");
// we are setting the pipeline here to the request
indexRequest.setPipeline("write_time_pipeline");
indexRequest.source(source);
restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);

Elasticsearch provides a lot of processors.To get a list of available processors we can use node info API as shown below or a detailed list can be found here.

curl -X GET "localhost:9200/_nodes/ingest?filter_path=nodes.*.ingest.processors&pretty"

I hope this is helpful. If you have any feedback, please feel free to comment.

References

  • https://pixabay.com/illustrations/digitization-transformation-binary-5140055/
  • https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest.html#ingest-prerequisites
  • https://www.elastic.co/guide/en/elasticsearch/reference/master/put-pipeline-api.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html

Comments