EFK Logstash Kafka

本例是一个EFK Logstash Kafka的完全体

Elasticsearch

无特殊点,参考 EFK安装

Kibana

无特殊点,参考 EFK安装

Filebeat

之前Filebeat是输出到 Elasticsearch的,这里需要输出到 Kafka

/usr/share/filebeat/filebeat.yml

filebeat.inputs:
- type: log
  paths:
    - /run/containerd/io.containerd.runtime.v1.linux/**/*.log
    - /run/containerd/io.containerd.runtime.v1.linux/k8s.io/*/rootfs/usr/local/tomcat/app.log

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

processors:
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

output.kafka:
  hosts: ["kafkasvc.efk.svc:9093"]
  topic: 'estopic'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Kafka

Kafka本身没有特殊配置,但是需要注意端口的映射

另外 默认的timeout时间有点短 6秒,这里可以适当延长到60秒

- name: KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
  value: "60000"

Logstash

Logstash是最麻烦的一个点,需要配置两个文件

/usr/share/logstash/pipeline/logstash.conf

input {
  kafka {
    bootstrap_servers => ["kafkasvc.efk.svc:9092"]
    group_id => "es-group"
    topics => ["estopic"] 
    codec => json
 }
}
filter {

}
output {
  elasticsearch {
    hosts => "http://elasticsearch.efk.svc:9200"
    user => elastic
    password => 'xxxxxxxxxxxxx'
    index => "kafka‐%{+YYYY.MM.dd}"
 }
}

/usr/share/logstash/config/logstash.yml

http.host: "0.0.0.0"
xpack.monitoring.enabled: false
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: xxxxxxxxxxxx
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch.efk.svc:9200"]

/run/containerd/io.containerd.runtime.v1.linux/k8s.io

Send a Message