rsyslogを非リアルタイムでELK(logstash, elastic search, kibana)に送る。データの加工がメイン。
ELK?
ELKがなにか知るには、公式日本語サイトを見るのが分かりやすいです。あまり個人向けという感じがしませんが、個人でも使えます。

Elasticsearch,Logstash,Kibanaという3つのOSSプロジェクトの頭文字をとったもので、簡潔にいうとそれぞれ、データの貯蔵、データの加工、データの可視化といった役割をすることで、データの分析を楽にしようというものです。
自前でデータベースに格納してもいいのですが、定義と特に管理が面倒でやりませんでした。ELKに溜め込むだけなら管理はかなり楽だと思います。
目的(logstashがメイン)
通常ELKはリアルタイムデータを見ることが出来る点が大きなメリットですが、今回は、貯まっている圧縮されたサーバーのログファイルを貯蔵しておきたいということで、ELKを使っています。
とりあえず貯蔵できればいいので、可視化を担うkibanaの出番は今回はないです。
貯蔵するまでが大変だったので、今回のメインは加工を担うlogstashになります。リアルタイムが主流なので、結構調査が大変でした。公式ドキュメントの検索が結構不便なのが痛かったです。logstashの公式ドキュメントはこちら: Logstash Reference [current version] | Elastic
ログファイルは、rsyslogにjson形式の文字列で出力されたもので、gz圧縮された複数のファイルを圧縮されたまま読み込む予定です。
環境
まずは環境から。この環境で貯めたサーバーログを:
- centos 7
- rsyslogd 8.24
macOSに移してから、macOS上の次の環境で作業します:
- logstash 7.15.0
- elastic search 7.15.0
- kibana 7.15.0
- Docker version 19.03 (Docker Desktop)
docker-composeでelkスタックを立ち上げます。volumeは使わずにホストのディレクトリをマウントして使っています。docker-compose.ymlは後述します。
logstash
logstashは、ログの取得・加工・次の段階への受け渡しを行います。その処理内容はconfファイルで設定できます。
confファイルでは、取得・加工・受け渡しの3つのステップをそれぞれinput, filter, outputに分けて定義していきます。filterは任意なので無くても問題はありません。
logstashでは、それぞれのステップで多種多様なプラグインを使用することで柔軟な処理を行うことができます。
ファイルの取得・読み込み
最初はinput、データの取得を定義します。今回はファイルの読み取りを行いたいので、fileプラグインを利用します。fileプラグインでは.gzファイルの読み込みもサポートしています。公式ドキュメント: File input plugin | Logstash Reference [current] | Elastic
confファイルには次のように定義します:
# apipeline.conf
input {
file {
mode => read
path => ["/var/log/shipper/*.gz"]
file_completed_action => delete
check_archive_validity => true
}
}
#...
modeをreadにして1つのファイルを先頭からEOFまで読んでくれるようにします。もう1つのモードtailは追記を監視して追記内容について以降の処理を行います。tail使用時はファイルローテーションには注意しましょう。
デフォルト設定では、読み取りが終わると対象だったファイルは削除されるので、それを意図しない場合は設定を忘れないようにしましょう。削除しない場合は、File input plugin | Logstash Reference [current] | Elasticにあるように、deleteではなくlogを指定しておきます。加えて、file_completed_log_pathの指定も必要になります。
pathには、対象ファイルのパターンを書いておきます。excludeで例外として特定ファイル(パターン)を外すこともできます。
加工
2段階目の加工ステップfilterでは、結構やることが多いです。データの整形などのために、各データ(イベント)のフィールドを操作していきます。
色々なプラグインを使って次のことをやっていきます:
- grokでsyslogのメッセージ部分を抽出
- jsonとして読む
- fingerprintで衝突の回避(2重読み込みの防止)
- mutateで不要なフィールドを削除
confファイルはこのようになります:
# apipeline.conf
#...
filter {
# syslog形式を分解。json文字列がsyslog_messageになる。
grok {
match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" }
}
json {
source => "syslog_message"
}
fingerprint {
source => ["time", "source_ip", "status_code"]
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "1234"
concatenate_sources => true
}
mutate {
remove_field => ["syslog_message", "message"]
}
}
#...
grokプラグインは文字列を解析して構造化するのに非常に便利なプラグインです。%{SYNTAX:SEMANTIC}の形式で任意のフィールドへ選択的に格納できます。%{SYSLOGBASE}だけであれば、フィールドに格納しないでおけます。主なパターンはこちら: logstash-patterns-core/patterns at master · logstash-plugins/logstash-patterns-core · GitHub。
プラグインのドキュメントはこちら: Grok filter plugin | Logstash Reference [current] | Elastic。
ここでやっていることは、messageフィールドを解析し、rsyslogのメッセージ部分をsyslog_messageフィールドとして取り出しています。この部分はログの設定によりjson形式の文字列になっています。
続くjsonプラグインでは、先のsyslog_messageフィールドを解析し、json形式の文字列中の全てのフィールドと値をイベントに追加します。プラグインのドキュメントはこちら: JSON filter plugin | Logstash Reference [current] | Elastic
そしてfingerprintプラグインを使って、うっかり同じファイルを読み込んでしまった時に備えるため、データの重複を防ぎます。ユニークになるハッシュを生成しておきます。
ハッシュに使うフィールドを指定するsourceには、ユニークになるようなフィールドの組み合わせを配列で指定します。jsonプラグインで解析した結果のフィールドを利用します。そのままだと各フィールドのハッシュが生成されるので、concatenate_sourcesをtrueにして、連結した値のハッシュ値を使うようにしています。プラグインのドキュメントはこちら: Fingerprint filter plugin | Logstash Reference [current] | Elastic
最後のmutateプラグインでは、リネーム・削除・置換などフィールドの細かい操作ができます。今回は、不要なフィールドの削除のために使用します。syslog_messageとmessageフィールドは解析が終わった後は不要になるので、ストレージの容量を考慮して削除しておくことにしました。プラグインのドキュメントはこちら: Mutate filter plugin | Logstash Reference [current] | Elastic
これで一通りの加工・整形は終わりました。残りはelastic searchへのアウトプットです。
加工の注意点
調査不足なところもあるような気がしますが、confファイルのfilter部分の作成にあたって気になった部分を残しておきます。
fingerprintプラグインの処理でハッシュは[@metadata][fingerprint]フィールドに格納しますが、[@metadata][<field_name>]の形式を使うことで、アウトプットはされないけど処理の中では使える、というフィールドを指定できます。なので、このフィールドは後で使用します。[@metadata]の参考はこちら: Accessing event data and fields in the configuration | Logstash Reference [current] | Elastic。
また、fingerprintプラグインのよくわかってない点としてsourceにmessageフィールドを指定して、後述するmutateを使ってハッシュに使ったmessageフィールドを削除すると、正しく出力されていないことがありました。そのため、上述のような指定をしています。このフィールドならほぼ間違いなく単独でユニークになるのに…
もしかすると、filterは上から順に適用ではないのかもしれないです。調べた限りでは順序については記載がありませんでした。
出力(elasticsearchに格納)
最後のステップであるoutputでは、elastic searchへの格納を行います。使用するプラグインはそのままelasticsearchという名前です。次のようにoutputを定義します:
# apipeline.conf
#...
output {
elasticsearch {
hosts => ["es:9200"]
index => "somelogsrc_ls-%{+yyyy.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
}
stdout { codec => dots }
}
elasticsearchプラグインのドキュメントはこちら: Elasticsearch output plugin | Logstash Reference [current] | Elastic
indexは、elastic searchでのインデックスを指定できます。デフォルトでは、logstash-%{+yyyy.MM.dd}なので区別したい場合は変更します。
elastic searchではひとつのデータをひとつのドキュメントとして扱い、ドキュメントはユニークなIDで識別されます。logstashからの出力時にこのIDを先程のfingerprintで生成したハッシュ値にすることで重複データの挿入を防ぎます。防ぐといっても、上書きはされるようです。このIDはプラグインではdocument_idとして扱います。
%{}の有無などfingerprintのハッシュ値の指定には注意しましょう。若干の参考: Accessing event data and fields in the configuration | Logstash Reference [current] | Elastic
dockerのlogで確認できるようにするため、追加としてstdoutプラグインでイベント1つを処理するごとに.をdockerコンテナから標準出力するようにしています。
これらをまとめる
以上の3つのステップをまとめて、1つのconfファイルを作成します:
# logstash/pipeline/apipeline.conf
input {
file {
mode => read
path => ["/var/log/shipper/*.gz"]
file_completed_action => delete
check_archive_validity => true
}
}
filter {
grok {
match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" }
}
json {
source => "syslog_message"
}
fingerprint {
source => ["time", "source_ip", "status_code"]
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "1234"
concatenate_sources => true
}
mutate {
remove_field => ["syslog_message", "message"]
}
}
output {
elasticsearch {
hosts => ["es:9200"]
index => "somelogsrc_ls-%{+yyyy.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
}
stdout { codec => dots }
}
以降はこのファイルをdocker-composeで使っていきます。
docker-composeで起動
docker-composeでまとめて起動させます。
ディレクトリ構造
これまでに必要なファイルとそのディレクトリ構造は、このようになります:
$ tree --dirsfirst -I nodes**
.
├── esdata # elasticsearchの中身
├── logshipper # ログ転送用フォルダ
├── logstash # logstashのDockerfileと設定類用フォルダ
│ ├── config
│ │ ├── logstash.yml
│ │ └── pipelines.yml
│ ├── pipeline
│ │ └── apipeline.conf
│ └── Dockerfile
└── docker-compose.yml # docker-composeで使う
6 directories, 6 files
esdataとlogshipperは空のディレクトリを作成しておきます。予めlogshipperのなかにログファイル(のコピー)を格納しても問題ないです。elastic searchに格納されたら削除されることには注意します。
ファイルを移す都合上、logstashにはDockerfileを用意します。また、ログの読み取り用のファイルの置き場logshipperをlogstashのコンテナにマウントさせます。Dockerfileはこのようになります:
# logstash/Dockerfile
FROM docker.elastic.co/logstash/logstash:7.15.0
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
ADD pipeline/ /usr/share/logstash/pipeline/
ADD config/ /usr/share/logstash/config/
設定ファイルたちを適切な場所に配置するだけです。まだ説明してない2つの設定ファイルlogstash/config/logstash.ymlとlogstash/config/pipelines.ymlについては、次節のように最小限度のことだけ記述しておきます。
残りの設定ファイル
まずは、logstash.ymlから。これはlogstashプロセス全体の設定を行います:
# logstash/config/logstash.yml
node.name: "docker-logstash"
config.reload.automatic: true
ノードの名前だけ設定します。
最後の行の設定ファイルを監視して反映の機能は使わなかったので、falseでもいいです。docker execから使えばよかったなあと後で思いました。このファイルのドキュメントはこちら: logstash.yml | Logstash Reference [current] | Elastic
次はpipelines.yml。これはデータ(イベント)を処理するパイプラインごとの個別な設定を指定できます(マルチパイプライン)。パイプラインがひとつだけでも後から増やすかもしれないので、これを使っています:
# logstash/conf/pipelines.yml
- pipeline.id: somelog
path.config: "/usr/share/logstash/pipeline/apipeline.conf"
pipeline.ecs_compatibility: disabled
path.configで、パイプラインのconfファイルの位置を指定しています。ディレクトリやパターンも指定できるようです。
ecs_compatibilityを指定しないと、明記したほうがいいよという内容の警告メッセージがlogstashのログに表示されるので明記しています。
マルチパイプラインのドキュメントはこちら: Multiple Pipelines | Logstash Reference [current] | Elastic
logstashについての設定はこれで全部です。各ファイルを適切に配置しておきます。
あとはdocker-composeで立ち上げます。
docker-compose.ymlでまとめる
docker-composeでELK要素を全てupします。そのためのymlファイルは次のようになります:
version: '3'
services:
es:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
ports:
- 9200:9200
environment:
- discovery.type=single-node
- node.master=true
- node.data=true
- xpack.security.enabled=false
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms2048m -Xmx2048m"
volumes:
- ./esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
kibana:
image: docker.elastic.co/kibana/kibana:7.15.0
ports:
- 5601:5601
environment:
ELASTICSEARCH_HOSTS: http://es:9200
logstash:
build: ./logstash
volumes:
- ./logshipper:/var/log/shipper
elasticsearchについては、環境変数で設定することがあります。基本的には、Install Elasticsearch with Docker | Elasticsearch Guide [current] | Elasticを参考にしています。
シングルノードにして、メモリ不足での起動失敗を防ぐためES_JAVA_OPTSでヒープサイズを指定しています。この時、Docker Desktopでのメモリ割り当てにも注意します。(Docker Desktopでは5GBを指定していました。)
kibanaについてはそれほど設定することはありません。logstashは設定ファイルの配置くらいでそれもDockerfileで行うので、ここではすっきりしています。
ここまできたら、docker-compose up -dでログを読み取ってくれる状態になります。起動が遅いのは残念ですが…
docker-compose logs logstashでログの読み取り具合が分かります。http://localhost:5601でkibanaにアクセスできます。当環境だと初回のupのときはなぜかkibanaが落ちてしまうことがありました。2回目以降は問題ないようです。
これで今回したかったことは完了です。
その他いろいろ
作業中の種々の問題などはこちらに書いていきます。
2つのディレクトリをマウントしましたが、volumeを使うこともできます。記憶が曖昧ですが、volumeにgzファイルを移しても状況によってはlogstashが読み取ってくれない場合があるようです。
どうやら権限がよろしくないようで読み取ってくれなかったようでした。(記憶曖昧)777ならまあ大丈夫だと思います。(未確認)
一応のメモとして残しておきます。
ホストのディレクトリをマウントしていると、ホストのFinderでディレクトリに対象ログを移しても認識してくれないことがありました。面倒ですがVSCodeのウィンドウのマウント元のディレクトリへD&Dすると確実に認識してくれました。
macOSでelasticsearch用のディレクトリをマウントして作業を終えたあと、windows10の方にディレクトリをコピーしてkibanaで覗こうと思ったらkibanaが開けませんでした。elastic did not load properlyといった表示が出ました。理由が不明のままです…。ファイルシステムの問題か?
おわり
ELKって順番考えたらLEKじゃないかなと思いながら作業してました。kibanaを使うのに慣れたい、elasticsearchのAPIもさくっと使えるようにしたいなと思いました。初期段階だと削除の頻度が多くてなかなか大変でした。
あとは起動が早くなって容量をもう少し取らない感じになるとすごいなって思います。今のままでも十分すごいんですが。
logstashの部分をfluentdにしている例が日本語資料だと多いのでlogstashの場合の資料を探すのが少し大変かもしれないです。ないわけでもないですが、公式ドキュメントを読むのが結局は最短だと思います。公式のは検索が使いづらい以外は良いドキュメントです。検索結果を個別ページで表示できれば満点だと思います。
随分長文になりましたが以上です。ありがとうございました。
Amazonアソシエイト



コメント