rsyslogを非リアルタイムでELK(logstash
, elastic search
, kibana
)に送る。データの加工がメイン。
ELK?
ELKがなにか知るには、公式日本語サイトを見るのが分かりやすいです。あまり個人向けという感じがしませんが、個人でも使えます。
Elasticsearch
,Logstash
,Kibana
という3つのOSSプロジェクトの頭文字をとったもので、簡潔にいうとそれぞれ、データの貯蔵、データの加工、データの可視化といった役割をすることで、データの分析を楽にしようというものです。
自前でデータベースに格納してもいいのですが、定義と特に管理が面倒でやりませんでした。ELKに溜め込むだけなら管理はかなり楽だと思います。
目的(logstashがメイン)
通常ELKはリアルタイムデータを見ることが出来る点が大きなメリットですが、今回は、貯まっている圧縮されたサーバーのログファイルを貯蔵しておきたいということで、ELKを使っています。
とりあえず貯蔵できればいいので、可視化を担うkibana
の出番は今回はないです。
貯蔵するまでが大変だったので、今回のメインは加工を担うlogstash
になります。リアルタイムが主流なので、結構調査が大変でした。公式ドキュメントの検索が結構不便なのが痛かったです。logstash
の公式ドキュメントはこちら: Logstash Reference [current version] | Elastic
ログファイルは、rsyslogにjson形式の文字列で出力されたもので、gz圧縮された複数のファイルを圧縮されたまま読み込む予定です。
環境
まずは環境から。この環境で貯めたサーバーログを:
- centos 7
- rsyslogd 8.24
macOSに移してから、macOS上の次の環境で作業します:
- logstash 7.15.0
- elastic search 7.15.0
- kibana 7.15.0
- Docker version 19.03 (Docker Desktop)
docker-compose
でelkスタックを立ち上げます。volumeは使わずにホストのディレクトリをマウントして使っています。docker-compose.yml
は後述します。
logstash
logstashは、ログの取得・加工・次の段階への受け渡しを行います。その処理内容はconfファイルで設定できます。
confファイルでは、取得・加工・受け渡しの3つのステップをそれぞれinput
, filter
, output
に分けて定義していきます。filter
は任意なので無くても問題はありません。
logstashでは、それぞれのステップで多種多様なプラグインを使用することで柔軟な処理を行うことができます。
ファイルの取得・読み込み
最初はinput
、データの取得を定義します。今回はファイルの読み取りを行いたいので、file
プラグインを利用します。file
プラグインでは.gz
ファイルの読み込みもサポートしています。公式ドキュメント: File input plugin | Logstash Reference [current] | Elastic
confファイルには次のように定義します:
# apipeline.conf
input {
file {
mode => read
path => ["/var/log/shipper/*.gz"]
file_completed_action => delete
check_archive_validity => true
}
}
#...
mode
をread
にして1つのファイルを先頭からEOFまで読んでくれるようにします。もう1つのモードtail
は追記を監視して追記内容について以降の処理を行います。tail
使用時はファイルローテーションには注意しましょう。
デフォルト設定では、読み取りが終わると対象だったファイルは削除されるので、それを意図しない場合は設定を忘れないようにしましょう。削除しない場合は、File input plugin | Logstash Reference [current] | Elasticにあるように、delete
ではなくlog
を指定しておきます。加えて、file_completed_log_path
の指定も必要になります。
path
には、対象ファイルのパターンを書いておきます。exclude
で例外として特定ファイル(パターン)を外すこともできます。
加工
2段階目の加工ステップfilter
では、結構やることが多いです。データの整形などのために、各データ(イベント)のフィールドを操作していきます。
色々なプラグインを使って次のことをやっていきます:
- grokでsyslogのメッセージ部分を抽出
- jsonとして読む
- fingerprintで衝突の回避(2重読み込みの防止)
- mutateで不要なフィールドを削除
confファイルはこのようになります:
# apipeline.conf
#...
filter {
# syslog形式を分解。json文字列がsyslog_messageになる。
grok {
match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" }
}
json {
source => "syslog_message"
}
fingerprint {
source => ["time", "source_ip", "status_code"]
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "1234"
concatenate_sources => true
}
mutate {
remove_field => ["syslog_message", "message"]
}
}
#...
grok
プラグインは文字列を解析して構造化するのに非常に便利なプラグインです。%{SYNTAX:SEMANTIC}
の形式で任意のフィールドへ選択的に格納できます。%{SYSLOGBASE}
だけであれば、フィールドに格納しないでおけます。主なパターンはこちら: logstash-patterns-core/patterns at master · logstash-plugins/logstash-patterns-core · GitHub。
プラグインのドキュメントはこちら: Grok filter plugin | Logstash Reference [current] | Elastic。
ここでやっていることは、message
フィールドを解析し、rsyslogのメッセージ部分をsyslog_message
フィールドとして取り出しています。この部分はログの設定によりjson
形式の文字列になっています。
続くjson
プラグインでは、先のsyslog_message
フィールドを解析し、json
形式の文字列中の全てのフィールドと値をイベントに追加します。プラグインのドキュメントはこちら: JSON filter plugin | Logstash Reference [current] | Elastic
そしてfingerprint
プラグインを使って、うっかり同じファイルを読み込んでしまった時に備えるため、データの重複を防ぎます。ユニークになるハッシュを生成しておきます。
ハッシュに使うフィールドを指定するsource
には、ユニークになるようなフィールドの組み合わせを配列で指定します。jsonプラグインで解析した結果のフィールドを利用します。そのままだと各フィールドのハッシュが生成されるので、concatenate_sources
をtrue
にして、連結した値のハッシュ値を使うようにしています。プラグインのドキュメントはこちら: Fingerprint filter plugin | Logstash Reference [current] | Elastic
最後のmutate
プラグインでは、リネーム・削除・置換などフィールドの細かい操作ができます。今回は、不要なフィールドの削除のために使用します。syslog_message
とmessage
フィールドは解析が終わった後は不要になるので、ストレージの容量を考慮して削除しておくことにしました。プラグインのドキュメントはこちら: Mutate filter plugin | Logstash Reference [current] | Elastic
これで一通りの加工・整形は終わりました。残りはelastic search
へのアウトプットです。
加工の注意点
調査不足なところもあるような気がしますが、confファイルのfilter
部分の作成にあたって気になった部分を残しておきます。
fingerprint
プラグインの処理でハッシュは[@metadata][fingerprint]
フィールドに格納しますが、[@metadata][<field_name>]
の形式を使うことで、アウトプットはされないけど処理の中では使える、というフィールドを指定できます。なので、このフィールドは後で使用します。[@metadata]
の参考はこちら: Accessing event data and fields in the configuration | Logstash Reference [current] | Elastic。
また、fingerprint
プラグインのよくわかってない点としてsource
にmessage
フィールドを指定して、後述するmutate
を使ってハッシュに使ったmessage
フィールドを削除すると、正しく出力されていないことがありました。そのため、上述のような指定をしています。このフィールドならほぼ間違いなく単独でユニークになるのに…
もしかすると、filter
は上から順に適用ではないのかもしれないです。調べた限りでは順序については記載がありませんでした。
出力(elasticsearchに格納)
最後のステップであるoutput
では、elastic search
への格納を行います。使用するプラグインはそのままelasticsearch
という名前です。次のようにoutput
を定義します:
# apipeline.conf
#...
output {
elasticsearch {
hosts => ["es:9200"]
index => "somelogsrc_ls-%{+yyyy.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
}
stdout { codec => dots }
}
elasticsearch
プラグインのドキュメントはこちら: Elasticsearch output plugin | Logstash Reference [current] | Elastic
index
は、elastic search
でのインデックスを指定できます。デフォルトでは、logstash-%{+yyyy.MM.dd}
なので区別したい場合は変更します。
elastic search
ではひとつのデータをひとつのドキュメントとして扱い、ドキュメントはユニークなIDで識別されます。logstashからの出力時にこのIDを先程のfingerprint
で生成したハッシュ値にすることで重複データの挿入を防ぎます。防ぐといっても、上書きはされるようです。このIDはプラグインではdocument_id
として扱います。
%{}
の有無などfingerprint
のハッシュ値の指定には注意しましょう。若干の参考: Accessing event data and fields in the configuration | Logstash Reference [current] | Elastic
docker
のlog
で確認できるようにするため、追加としてstdout
プラグインでイベント1つを処理するごとに.
をdocker
コンテナから標準出力するようにしています。
これらをまとめる
以上の3つのステップをまとめて、1つのconfファイルを作成します:
# logstash/pipeline/apipeline.conf
input {
file {
mode => read
path => ["/var/log/shipper/*.gz"]
file_completed_action => delete
check_archive_validity => true
}
}
filter {
grok {
match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" }
}
json {
source => "syslog_message"
}
fingerprint {
source => ["time", "source_ip", "status_code"]
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "1234"
concatenate_sources => true
}
mutate {
remove_field => ["syslog_message", "message"]
}
}
output {
elasticsearch {
hosts => ["es:9200"]
index => "somelogsrc_ls-%{+yyyy.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
}
stdout { codec => dots }
}
以降はこのファイルをdocker-compose
で使っていきます。
docker-composeで起動
docker-compose
でまとめて起動させます。
ディレクトリ構造
これまでに必要なファイルとそのディレクトリ構造は、このようになります:
$ tree --dirsfirst -I nodes**
.
├── esdata # elasticsearchの中身
├── logshipper # ログ転送用フォルダ
├── logstash # logstashのDockerfileと設定類用フォルダ
│ ├── config
│ │ ├── logstash.yml
│ │ └── pipelines.yml
│ ├── pipeline
│ │ └── apipeline.conf
│ └── Dockerfile
└── docker-compose.yml # docker-composeで使う
6 directories, 6 files
esdata
とlogshipper
は空のディレクトリを作成しておきます。予めlogshipper
のなかにログファイル(のコピー)を格納しても問題ないです。elastic search
に格納されたら削除されることには注意します。
ファイルを移す都合上、logstashにはDockerfile
を用意します。また、ログの読み取り用のファイルの置き場logshipper
をlogstashのコンテナにマウントさせます。Dockerfile
はこのようになります:
# logstash/Dockerfile
FROM docker.elastic.co/logstash/logstash:7.15.0
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
ADD pipeline/ /usr/share/logstash/pipeline/
ADD config/ /usr/share/logstash/config/
設定ファイルたちを適切な場所に配置するだけです。まだ説明してない2つの設定ファイルlogstash/config/logstash.yml
とlogstash/config/pipelines.yml
については、次節のように最小限度のことだけ記述しておきます。
残りの設定ファイル
まずは、logstash.yml
から。これはlogstash
プロセス全体の設定を行います:
# logstash/config/logstash.yml
node.name: "docker-logstash"
config.reload.automatic: true
ノードの名前だけ設定します。
最後の行の設定ファイルを監視して反映の機能は使わなかったので、false
でもいいです。docker exec
から使えばよかったなあと後で思いました。このファイルのドキュメントはこちら: logstash.yml | Logstash Reference [current] | Elastic
次はpipelines.yml
。これはデータ(イベント)を処理するパイプラインごとの個別な設定を指定できます(マルチパイプライン)。パイプラインがひとつだけでも後から増やすかもしれないので、これを使っています:
# logstash/conf/pipelines.yml
- pipeline.id: somelog
path.config: "/usr/share/logstash/pipeline/apipeline.conf"
pipeline.ecs_compatibility: disabled
path.config
で、パイプラインのconfファイルの位置を指定しています。ディレクトリやパターンも指定できるようです。
ecs_compatibility
を指定しないと、明記したほうがいいよという内容の警告メッセージがlogstash
のログに表示されるので明記しています。
マルチパイプラインのドキュメントはこちら: Multiple Pipelines | Logstash Reference [current] | Elastic
logstashについての設定はこれで全部です。各ファイルを適切に配置しておきます。
あとはdocker-compose
で立ち上げます。
docker-compose.ymlでまとめる
docker-compose
でELK要素を全てup
します。そのためのyml
ファイルは次のようになります:
version: '3'
services:
es:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
ports:
- 9200:9200
environment:
- discovery.type=single-node
- node.master=true
- node.data=true
- xpack.security.enabled=false
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms2048m -Xmx2048m"
volumes:
- ./esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
kibana:
image: docker.elastic.co/kibana/kibana:7.15.0
ports:
- 5601:5601
environment:
ELASTICSEARCH_HOSTS: http://es:9200
logstash:
build: ./logstash
volumes:
- ./logshipper:/var/log/shipper
elasticsearchについては、環境変数で設定することがあります。基本的には、Install Elasticsearch with Docker | Elasticsearch Guide [current] | Elasticを参考にしています。
シングルノードにして、メモリ不足での起動失敗を防ぐためES_JAVA_OPTS
でヒープサイズを指定しています。この時、Docker Desktop
でのメモリ割り当てにも注意します。(Docker Desktop
では5GBを指定していました。)
kibanaについてはそれほど設定することはありません。logstashは設定ファイルの配置くらいでそれもDockerfile
で行うので、ここではすっきりしています。
ここまできたら、docker-compose up -d
でログを読み取ってくれる状態になります。起動が遅いのは残念ですが…
docker-compose logs logstash
でログの読み取り具合が分かります。http://localhost:5601
でkibanaにアクセスできます。当環境だと初回のup
のときはなぜかkibanaが落ちてしまうことがありました。2回目以降は問題ないようです。
これで今回したかったことは完了です。
その他いろいろ
作業中の種々の問題などはこちらに書いていきます。
2つのディレクトリをマウントしましたが、volumeを使うこともできます。記憶が曖昧ですが、volumeにgz
ファイルを移しても状況によってはlogstashが読み取ってくれない場合があるようです。
どうやら権限がよろしくないようで読み取ってくれなかったようでした。(記憶曖昧)777ならまあ大丈夫だと思います。(未確認)
一応のメモとして残しておきます。
ホストのディレクトリをマウントしていると、ホストのFinder
でディレクトリに対象ログを移しても認識してくれないことがありました。面倒ですがVSCodeのウィンドウのマウント元のディレクトリへD&Dすると確実に認識してくれました。
macOSでelasticsearch用のディレクトリをマウントして作業を終えたあと、windows10の方にディレクトリをコピーしてkibana
で覗こうと思ったらkibana
が開けませんでした。elastic did not load properly
といった表示が出ました。理由が不明のままです…。ファイルシステムの問題か?
おわり
ELKって順番考えたらLEKじゃないかなと思いながら作業してました。kibanaを使うのに慣れたい、elasticsearchのAPIもさくっと使えるようにしたいなと思いました。初期段階だと削除の頻度が多くてなかなか大変でした。
あとは起動が早くなって容量をもう少し取らない感じになるとすごいなって思います。今のままでも十分すごいんですが。
logstash
の部分をfluentd
にしている例が日本語資料だと多いのでlogstash
の場合の資料を探すのが少し大変かもしれないです。ないわけでもないですが、公式ドキュメントを読むのが結局は最短だと思います。公式のは検索が使いづらい以外は良いドキュメントです。検索結果を個別ページで表示できれば満点だと思います。
随分長文になりましたが以上です。ありがとうございました。
Amazonアソシエイト
コメント