Logstash

ES 消费 KAFKA 数据

input {
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["weibo"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "weibo_group_1"
        }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["weixin"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "weixin_group_1"
        }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["toutiao"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "toutiao_group_1"
        }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["baidu"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "baidu_group_1"
        }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["zhihu"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "zhihu_group_1"       
    }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["kejizixun"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "kejizixun_group_1"
        }
        kafka {
            bootstrap_servers => "10.27.240.159:9092"
            topics => ["sogouweixin"]
            auto_offset_reset => "earliest"
            codec => "json"
            group_id => "sogouweixin_group_1"
        }
}
filter {
    mutate {
         add_field => { "[@metadata][id]" => "%{id}"}
         add_field => { "[@metadata][index_name]" => "%{index_name}"}
         add_field => { "[@metadata][type_name]" => "%{type_name}"}
         remove_field => ["@timestamp","@version","index_name","type_name","id"]
    }
}
output {
    if [@metadata][index_name] == "weibo_articles_and_weiboers" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "weixin_articles_and_weixiners" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "toutiao_articles_and_users" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "baidunews_news" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "zhihu_questions" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "tech_news" {
        stdout { codec => rubydebug }
    }
    else if [@metadata][index_name] == "sogou_weixin_articles" {
        stdout { codec => rubydebug }
    }
}

results matching ""

    No results matching ""