Logstash Filter to run ElasticSearch Queries Dynamically on Events in Scala
Logstash filter for ElasticSearch queries in Scala
This filter is not to be confused with Logstash inbuilt filter ElasticSearch, which can be used to load fields from existing events(or any other object) in ElasticSearch(ES) into current event. Logstash filter explained here is to check if a event matches a given ES query and take any action depending on whether event satisfies the query or not. Also this logstash filter is developed in Scala(except the plugin linking part which is in java).
So this article will be relevant to whoever interested in either designing solution to take some action by running ES style queries on events or just developing Logstash filter in scala. This article first focuses on how to develop Logstash filter in scala and then on how to implement Logstash filter to filter events by ES style queries
How to develop Logstash filter in Scala
Logstash 7.2.0 includes native support for Java plugins(GA). Beta support is available from earlier release(6.6.0). Logstash documentation on how to write java filter plugin has clear instructions on how to write plugin in Java. I would only focus on additional changes needed to develop the plugin in scala.
Below are the instructions on how to link scala source files to Java example filter plugin
creating scala source module
You would need to create a scala directory under your project src/main and mark that as source folder(These are instructions for IntelliJ, other IDE’s may have different steps that needs to be followed). Make sure to include scala library dependencies in IntelliJ module settings. Under scala source module, packages and source files can be created as desired. Project structure may look like this
gradle build configuration
Gradle build configuration file available here needs to be changed to include
-
scala library dependencies
-
configuration to compile scala code first then java code
For step 1, when building the plugin, make sure scala library jars are included in the final jar(gradle dependencies.implementation option compiles the code, but needed scala libraries wont be included unless dependencies.compile option is used), otherwise you would run into issues, where your scala part of plugin is not executed inside Logstash. This is because Logstash run time environment doesn’t have scala libraries needed
apply plugin: 'java'
// gradle plugin for scala
apply plugin: 'scala'
// step 1 to include scala library into final jar
dependencies {
compile 'org.scala-lang:scala-library:2.11.12'
}
// step 2 to comple scala code first
tasks.compileJava.dependsOn compileScala
tasks.compileScala.dependsOn.remove("compileJava")
sourceSets {
main {
scala {
srcDirs = ["src/main/scala"]
outputDir = file("$buildDir/classes/scala/main")
}
java {
srcDirs = ["src/main/java"]
outputDir = file("$buildDir/classes/java/main")
}
}
}
With above changes to the gradle.build file, scala source code inside the src/main/scala should compile before java and the fat jar generated inside the gem file(after ./gradlew gem) should contain all libraries needed to run plugin inside the Logstash
Using scala classes inside java source files
Using scala classes/objects/methods inside java classes is pretty straightforward and there are several examples here
Logstash filter for running ES style queries against events
ElasticSearch queries support a variety of operators including aggregations to search data. We only focus on boolean queries that tests event fields here. Sample queries that tests on event fields
Query A
{
"query": {
"range": {
"time": {
"gte": "2022-01-17T18:00:00.000"
}
}
}
}
Query B
{
"query": {
"bool": {
"must": [
{
"bool": {
"must": [
{
"term": {
"source.type": "Moesif"
}
}
]
}
},
{
"exists": {
"version": "api_version"
}
}
]
}
}
}
What should be done once a event matches a given ES query, depends on the project you are working on. In this blog we explain, assuming every query has unique tag, how events can be tagged with a query tag if event satisfies given query.
Our approach involves two steps
Optimize the ES query into simple flat expression with and, or etc
Above sample queries will be transformed into flat expressions as below with query tags prepended
A:( range [time] gte 2021-07-17T18:00:00.000 )
Query B flat expression equivalent
B:( and ( eq [source][type] Moesif ) ( defined [api_version] )
Scala code that does the transformation from ES queries into flat expressions is available here ElasticSearchQueryTransformer. ElasticSearchQueryTransformer not only transforms the query but also optimizes it by removing unnecessary expressions still maintaining the logical equivalence
{
"query": {
"bool": {
"must": [
{
"bool": {
"must": [
{
"match": {
"source.type": "Moesif"
}
}
]
}
}
]
}
}
}
Above query transformed into flat expressions
Flat expression without optimization
( and ( and ( eq [source][type] Moesif ) ) )
Flat expression with optimization
( eq [source][type] Moesif )
Logstash filter elasticsearch_query_filter
elasticsearch_query_filter available under Apache license here uses the simple flat expressions to evaluate events. Flat expressions from step above needs to be made available to the filter, which can be done in several ways
- Store the flat expressions in ES and load the flat expressions into event itself using inbuilt filter elasticsearch (which is what we did and what elasticsearch_query_filter expects unchanged)
- Flat expressions could be saved into any other database and can be loaded into event using a different plugin specific to that database
- Change ElasticSearchQueryFilter class to load flat expressions into memory on plugin initialization
ElasticSearchQueryFilter evaluates incoming events against the flat expressions and tag events with matched query tags
Testing the Logstash filter plugin
Logstash filter needs to installed first
bin/logstash-plugin install logstash-filter-elasticsearch_query_filter-1.0.0.gem
logstash_test.conf has test logstash configuration
input {
generator {
message => "Hello world!"
add_field => {
"es_query_config" => ["tag1:( defined [request][time] )", "tag2:( and ( or ( eq testField 10 ) ( defined [request][verb][field] ) ) ( range [request][time] gt 2021-01-01 ) )"]
"[request][verb][field]" => "GET"
"[request][status]" => "404"
}
count => 1
}
}
filter {
ruby {
code => "event.set('[request][time]', Time.now());"
}
elastic_query_filter {}
}
output {
stdout { codec => rubydebug }
}
Once the filter is installed, logstash can be run using above configuration to test the filter plugin, and the output event should contain field matched_query_tags with values tag1 and tag2
bin/logstash -f logstash_test_conf.conf
Conclusion
Logstash Java execution engine performance is on par or better than Ruby execution engine. Now we can use java for faster plugin development along with other benefits that come from its rich ecosystem or scala if you enjoy coding in scala like me. Above instructions should guide you on how to write the plugin in scala. And if your plugin project requires taking some actions on events based on whether events match Elasticsearch style queries, you can make use of code here