Skip to content

Do you know: how to map a field from one document to another using Logstash?

Each week, a new “Do You Know” will be posted on our Elastic Technical Knowledge Hub to share useful knowledge to improve the observability using Elasticsearch. These topics originate from day-to-day challenges we solved for our clients. A stepwise description helps you to successfully implement solutions on improving the performance of your deployment and get the best monitoring of your applications using dashboards and alerting.

This week I will discuss: how to map a field from one document to another using Logstash.

Background

In some situations, it occurs that we miss important information in the logging that is present in a previous step in the transaction flow. For example, an error occurs in your transaction flow and ideally you want to have the OrderID in your error log. This OrderID is only available in the first step (request-in) of your flow. In order to achieve this, we must perform a map function in the Logstash pipeline.

Solution

Add a field in the Logstash pipeline that is unique for the transaction flow. In our case that is the concatenate of businessTransactionID and OperationName.

filter {

 mutate {
 add_field => { "uniqueId" => "%{businessTransactionID}%{OperationName}" }
 }

}

Define the condition in which you get your OrderID. In our case, that is the Type: request-in and OperationName: Ordering and create the mapping action.

 if [Type] == "request-in" and [OperationName] == "Ordering"{
 aggregate {
 task_id => "%{uniqueId}"
 code => "map['Documentnummer1'] = event.get('OrderID')"
 map_action => "create"
 }
 }

Write the condition that defines the document in which you want to map the OrderID. In our case, that is the Type: ERROR with OperationName: Ordering.

 if [Type] == "ERROR" and [OperationName] == "Ordering"{
 aggregate {
 task_id => "%{uniqueId}"
 code => "event.set('OrderID', map['Documentnummer1'])"
 map_action => "update"
 end_of_task => true
 timeout => 120
 }
 }

Remove the field “uniqueId”. This field was only used for the mapping and can be removed.

 filter {
 mutate {
    remove_field => ["uniqueId"]
}
}

Now we can put all the pieces together of the precious four steps and add them in the Logstash pipeline.

filter {

 mutate {
 add_field => { "uniqueId" => "%{businessTransactionID}%{OperationName}" }
 }

 if [Type] == "request-in" and [OperationName] == "Ordering"{
 aggregate {
 task_id => "%{uniqueId}"
 code => "map['Documentnummer1'] = event.get('OrderID')"
 map_action => "create"
 }
 }

 if [Type] == "ERROR" and [OperationName] == "Ordering"{
 aggregate {
 task_id => "%{uniqueId}"
 code => "event.set('OrderID', map['Documentnummer1'])"
 map_action => "update"
 end_of_task => true
 timeout => 120
 }
 }
}

 filter {
 mutate {
    remove_field => ["uniqueId"]
}
}

Need help with your Elastic challenges? Contact our experts.

With our 25+ Elastic certified consultants, Devoteam is your partner for developing and implementing Monitoring & Observability solutions that facilitate optimal IT control, from Business & IT Operations dashboards to centralized logging and proactive alerting.