我正在尝试使用 Logstash 从 Elasticsearch 服务器获取不同的最新数据,消除一些字段和零值,然后将其插入 Redis。对于弹性搜索数据,我有一个名称字段、一些描述字段和 2 个位置值——纬度和经度。还有其他人管理弹性服务器,所以我不能用弹性更改任何配置。
我发现这个 Stackoverflow 问题我认为符合我的问题:如何使用 Elasticsearch 查询获取每个组的最新值?
因此,我尝试在我的 logstash 配置中使用所选答案中的查询。下面是我的配置文件:
input {
elasticsearch {
hosts => "elasticdb"
size => 1000
index => "logstash-db"
query =>'{"aggs":{"group":{"terms":{"field":"name.raw"},"aggs":{"group_docs":{"top_hits":{"size":1,"sort":[{"@timestamp":{"order":"desc"}}]}}}}}}'
}
}
filter {
if [latitude] = 0 and [longitude] = 0 {
drop { }
}
mutate {
remove_field => ["country","message","type","@timestamp","@version"]
}
}
output {
redis {
data_type => "list"
key => "%{name}"
}
stdout {
codec => rubydebug
}
}
弹性查询(为了更好的查看):
{
"aggs": {
"group": {
"terms": {
"field": "name.raw"
},
"aggs": {
"group_docs": {
"top_hits": {
"size": 1,
"sort": [{
"@timestamp": {
"order": "desc"
}
}
]
}
}
}
}
}
}
我尝试使用上述配置运行logstash,但logstash 会多次发送一些具有完全相同数据的行。有什么帮助吗?