我们想跟踪日志的摄取是否有超出我们 Elasticsearch 可接受延迟的额外延迟。 因此,我们已按照之前文章 “Elasticsearch:在 Elasticsearch 中计算摄取延迟并存储摄取时间以提高可观察性” 中提供的步骤进行操作。

1. 创建如下的一个 ingest pipeline


  1. PUT _ingest/pipeline/calculate_lag
  2. {
  3. “description”: “Add an ingest timestamp and calculate ingest lag”,
  4. “processors”: [
  5. {
  6. “set”: {
  7. “field”: “_source.ingest_time”,
  8. “value”: “{{_ingest.timestamp}}”
  9. }
  10. },
  11. {
  12. “script”: {
  13. “lang”: “painless”,
  14. “source”: “””
  15. if(ctx.containsKey(“ingest_time”) && ctx.containsKey(“event_timestamp”)) {
  16. ctx[‘lag_in_seconds’] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx[‘event_timestamp’]), ZonedDateTime.parse(ctx[‘ingest_time’]))/1000;
  17. }
  18. “””
  19. }
  20. }
  21. ]
  22. }

注意:此脚本是用 Painless 脚本编写的,它以秒为单位跟踪摄取时间延迟。更多关于 Painless 的编程请参考 “Elastic:开发者上手指南” 中的 “Painless 编程” 部分。

2. 添加如下的 pipeline 到你想要的索引配置中

如果你的索引是新的并且你正在尝试这些东西,则可以执行此步骤。 如果你在运行以下命令之前已经有一个包含数据的索引,请阅读下面的注释。


  1. PUT my_index/_settings
  2. {
  3. “index.default_pipeline”: “calculate_lag”
  4. }

注意:

  • 如果你只是继续并仅将上述 PUT 方法与 index.default_pipeline 一起应用,它可能会弄乱你已经存在的索引设置。 确保首先获得索引的设置并将此管道添加为 ndex.final_pipeline 以及你已有的任何设置,然后进行应用
  • 下面是一个名为 my_index 的示例索引设置

  1. PUT my_index/_settings
  2. {
  3. “index”: {
  4. “routing”: {
  5. “allocation”: {
  6. “total_shards_per_node”: “3”
  7. }
  8. },
  9. “mapping”: {
  10. “total_fields”: {
  11. “limit”: “2000”
  12. }
  13. },
  14. “refresh_interval”: “30s”,
  15. “default_pipeline”: “pipeline_default”,
  16. “number_of_replicas”: “1”
  17. }
  18. }
  • 再次应用完整设置以及添加新字段以将管道添加为 final_pipeline。

  1. PUT my_index/_settings
  2. {
  3. “index”: {
  4. “routing”: {
  5. “allocation”: {
  6. “total_shards_per_node”: “3”
  7. }
  8. },
  9. “mapping”: {
  10. “total_fields”: {
  11. “limit”: “2000”
  12. }
  13. },
  14. “refresh_interval”: “30s”,
  15. “final_pipeline”: “calculate_lag”,
  16. “default_pipeline”: “pipeline_default”,
  17. “number_of_replicas”: “1”
  18. }
  19. }

注意:使用 index.final_pipeline 索引设置来设置最终管道。 Elasticsearch 在请求或默认管道之后应用此管道,即使两者均未指定。如果你之前已经有一个 ingest pipeline,设置 calculate_tag 将使得我们计算出摄入的时间差。

这将在获取的日志记录中添加名为 lag_in_seconds 的新字段。 你可以再次查看索引设置的变化以进行交叉验证。 基本上,这表示记录创建的 event_timestamp与它被摄取的时间之间的时间差,即 _ingest.timestamp。 如果你的日志记录具有不同的时间戳文件名,请相应地进行修改。