ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

如何写一个转发日志的fluentd插件?

2021-07-04 11:31:13  阅读:237  来源: 互联网

标签:fluentd 插件 end plugin chunk record tag 日志


如何写一个转发日志的fluentd插件?

上篇介绍了logging-operator依赖于自定义的fluentd插件,实现了根据指定的namespaceslabels转发日志,本篇将从以下几个方面介绍如何编写一个具有该功能集成的fluentd插件:

  • 确定要扩展的插件类型

  • 相关语法词法介绍

  • 学习如何编写一个fluentd插件

确定要扩展的插件类型

根据我们的需求, 需要按照namespaceslabels来完成日志的转发,这依赖于kubernetes元数据。kubernetes元数据的获取并不在fluentd阶段配置,而是在转发给fluentd之前,依赖于fluent-bit的配置。

https://docs.fluentbit.io/manual/pipeline/filters/kubernetes#workflow-of-tail-kubernetes-filter

$ kubectl get secrets defaultlogging-fluentbit  -o json | jq '.data."fluent-bit.conf"' | xargs echo | base64 --decode
[SERVICE]
    Flush        1
    Grace        5
    Daemon       Off
    Log_Level    info
    Parsers_File parsers.conf
    Coro_Stack_Size    24576
    storage.path  /buffers

[INPUT]
    Name         tail
    DB  /tail-db/tail-containers-state.db
    Mem_Buf_Limit  5MB
    Parser  docker
    Path  /var/log/containers/*.log
    Refresh_Interval  5
    Skip_Long_Lines  On
    Tag  kubernetes.*
[FILTER]
    Name        kubernetes
    Buffer_Size  0
    Kube_CA_File  /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Tag_Prefix  kubernetes.var.log.containers
    Kube_Token_File  /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_URL  https://kubernetes.default.svc:443
    Match  kubernetes.*
    Merge_Log  On

在确定好该配置后激活后,我们来到fluentd这一层,需要编写一个output插件来完成过滤、转发功能。

相关语法词法介绍

详细样例参考:https://docs.fluentd.org/plugin-development/api-plugin-output

上面链接中搬运过来就是这样的:

require 'fluent/plugin/output'

module Fluent::Plugin
  class SomeOutput < Output
    # First, register the plugin. 'NAME' is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_output('NAME', self)

    # Enable threads if you are writing an async buffered plugin.
    helpers :thread

    # Define parameters for your plugin.
    config_param :path, :string

    #### Non-Buffered Output #############################
    # Implement `process()` if your plugin is non-buffered.
    # Read "Non-Buffered output" for details.
    ######################################################
    def process(tag, es)
      es.each do |time, record|
        # output events to ...
      end
    end

    #### Sync Buffered Output ##############################
    # Implement `write()` if your plugin uses normal buffer.
    # Read "Sync Buffered Output" for details.
    ########################################################
    def write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'writing data to file', chunk_id: dump_unique_id_hex(chunk.unique_id)

      # For standard chunk format (without `#format()` method)
      chunk.each do |time, record|
        # output events to ...
      end

      # For custom format (when `#format()` implemented)
      # File.open(real_path, 'w+')

      # or `#write_to(io)` is available
      # File.open(real_path, 'w+') do |file|
      #   chunk.write_to(file)
      # end
    end

    #### Async Buffered Output #############################
    # Implement `try_write()` if you want to defer committing
    # chunks. Read "Async Buffered Output" for details.
    ########################################################
    def try_write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'sending data to server', chunk_id: dump_unique_id_hex(chunk.unique_id)

      send_data_to_server(@host, real_path, chunk.read)

      chunk_id = chunk.unique_id

      # Create a thread for deferred commit.
      thread_create(:check_send_result) do
        while thread_current_running?
          sleep SENDDATA_CHECK_INTERVAL # == 5

          if check_data_on_server(real_path, chunk_id)
            # commit chunk
            # chunk will be deleted and not be retried anymore by this call
            commit_write(chunk_id)
            break
          end
        end
      end
    end

    # Override `#format` if you want to customize how Fluentd stores
    # events. Read the section "How to Customize the Serialization
    # Format for Chunks" for details.
    def format(tag, time, record)
      [tag, time, record].to_json
    end
  end
end

我将一个插件的编写规范整理为两类,一类是骨架定义,一类是子类逻辑实现:

  • 骨架定义部分包括requiremoduleclass definition
  • 子类逻辑实现又包括插件注册、参数定义、激活配置等前置逻辑和具体接口实现和内置方法调用的逻辑。
require

根据需要编写的插件类型导入依赖:

require 'fluent/plugin/output' # input, filter, output, parser, formatter, storage or buffer
subclass

所有的插件都是Fluent::Plugin::Base的子类。

class definition
module Fluent::Plugin
    class SomeOutput < Output
        ...
    end
end
register

注册插件的名称类别,需要根据这个来识别该插件,这里我们注册了一个名为NAME类别的output插件

Fluent::Plugin.register_output('NAME', self)
helpers

https://docs.fluentd.org/plugin-helper-overview

以下的语法激活了线程helper, 可以调用 thread_create(:check_send_result)thread_current_running?

# Load thread helper
helpers :thread
----
 thread_create(:check_send_result) do
   while thread_current_running?
     sleep SENDDATA_CHECK_INTERVAL # == 5
     if check_data_on_server(real_path, chunk_id)
        # commit chunk
        # chunk will be deleted and not be retried anymore by this call
         commit_write(chunk_id)
         break
     end
   end
 end
----
config_param && desc

config_param定义插件的参数, desc定义描述:

desc 'The port number'
# `config_param` Defines a parameter. You can refer the following parameter via @port instance variable.
# Without `:default`, a parameter is required.
config_param :port, :integer
config_section

定义一个可以嵌套的参数结构:

name: 名称.

options:

  • root: 是否激活为root配置区域,内部使用;
  • param_name: 子区域的名称;
  • final: 激活后子类无法修改, buffer配置区域就是通过这种方法实现。
  • init:激活后,必须要有初始默认值;
  • required: 激活后,整个配置区域会被设为必须配置项, 否则会报错;
  • multi: 激活后可以多次配置该配置区域;
  • alias: Alias for this section.

参考:

config_section :user, param_name: :users, multi: true, required: false do
  desc 'Username for authentication'
  config_param :username, :string
  desc 'Password for authentication'
  config_param :password, :string, secret: true
end
接口实现和内置方法调用

如果output没有使用buffer就需要实现process(tag, es)方法,反之,则需要实现write(同步)和try_write方法(异步)。

#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)

#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)

#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)

# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)

更多接口实现和内置方法可以访问上文提到的链接。

补充介绍下configure(conf)方法, confFluent::Config::Element的一个实例,实例变量和可访问的方法需要super调用之后才能可用。

def configure(conf)
  super

  # cache_default_value is created/configured by config_param
  @in_memory_cache = Hash.new(@cache_default_value)
end

学习如何编写一个fluentd插件

掌握相关语法后,我们试着分析下上篇文章提到的fluentd插件如何实现根据namespaceslabels转发日志的功能。

https://github.com/banzaicloud/fluent-plugin-label-router/blob/master/lib/fluent/plugin/out_label_router.rb#L22:11

require
require "fluent/plugin/output"
require 'prometheus/client'
class定义

按照官方的说法, 这里继承Output即可,如果不是做了巨大的改变,一般不推荐直接继承BareOutput

class LabelRouterOutput < BareOutput
register

注册了一个名为label_routertype

Fluent::Plugin.register_output("label_router", self)
helpers

激活event_emitterrecord_accessor两个helper api

helpers :event_emitter, :record_accessor
---
# event_emitter
# 1. emit event
router.emit(tag, time, record)
# 2. emit event stream
router.emit_stream(tag, es)
---
# record_accessor
# 1. Call `record_accessor_create` to create object
 @accessor = record_accessor_create('$.user.name')
# 2. Call `call` method to get value
value = @accessor.call(record) # With `$.user.name`, access to record["user"]["name"]
---
config_param

emit_mode: list类型,可选值为batch或者record;

sticky_tags: bool类型,默认为true, 相同的tag使用相同的方法;

default_routestring类型,默认为空,无法匹配时使用默认标签;

default_tagstring类型,默认为空, 无法匹配时使用默认tag

metrics: bool类型,默认为false,是否激活监控;

config_section

定义了两层嵌套配置区域。

image-20210703114719912

第一层,子嵌套配置区域名称为routes,可以配置多个routeroute详细参数如下:

@label: 类型为string,默认为nil,如果子区域的选择器命中匹配到,则会新建一个名为@label值的label给该record;

tag: 类型为string, 如果子区域匹配到,则会新建一个名为tag值的tag给给该record,前提是这个新tag不为空;

metrics_labels: 类型为string, 配置额外的metrics labels;

第二层子嵌套配置区域名称为matches,可以配置多个matchmatch详细参数如下:

labels : hash 类型, 例如app:nginx

namespaces: array类型,默认是[], 需要过滤的命名空间在这里定义;

hostsarray类型,默认是[], 需要过滤的hosts在这里定义;

container_names: array类型,默认是[], 需要过滤的container_names在这里定义;

negate: bool类型,用来标记为反选,默认为false;

接口实现和内置方法

首先,定义了一个Route类共给初始化配置时调用,具体的逻辑可以不用看,只需要注意它实现了两个方法,分别用于逐个处理和批处理,处理完毕后将计数器增加size个计数:

image-20210703134141141

下面直接看configure(conf)部分:

def configure(conf)
    super
    @registry = (::Prometheus::Client.registry if @metrics)
    @route_map = Hash.new { |h, k| h[k] = Set.new }
    @mutex = Mutex.new
    @routers = []
    @default_router = nil
    @routes.each do |rule|
       route_router = event_emitter_router(rule['@label'])
       @routers << Route.new(rule, route_router, @registry)
    end

    if @default_route != '' or @default_tag != ''
       default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route}
       @default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry)
    end

    @access_to_labels = record_accessor_create("$.kubernetes.labels")
    @access_to_namespace = record_accessor_create("$.kubernetes.namespace_name")
    @access_to_host = record_accessor_create("$.kubernetes.host")
    @access_to_container_name = record_accessor_create("$.kubernetes.container_name")

    @batch = @emit_mode == :batch
end

这里定义了一些初始化默认值和实例变量,需要注意的是routers这个数组的值,存放的是定义的Route实例, 其中, event_emitter_routerhelpers api导入的函数。

https://github.com/fluent/fluentd/blob/5844f7209fec154a4e6807eb1bee6989d3f3297f/lib/fluent/plugin_helper/event_emitter.rb#L71

 @routes.each do |rule|
    route_router = event_emitter_router(rule['@label'])
    @routers << Route.new(rule, route_router, @registry)
 end

参考上文,由于没有定义buffer组件,只需要实现process方法即可:

image-20210703133736264

上面这个函数基本上囊括了整个处理逻辑,无非是做一些匹配以及根据参数做一些控制流,来触发router实例中emitemit_es方法。

整个逻辑很简单的。如果开启了强制匹配tag的模式,会在route_map中寻找该tag,做一次快速处理,否则会拿着组装的input_metadata去做匹配,如果匹配到则触发上面的两个emit方法,没有一个批次全部没匹配到就会判断有没有默认router来触发,最后,会触发一次批量emit_es

至此,我们探讨了一下如果编写fluentd插件的流程,希望对你有所帮助!

PS: 码字不易,欢迎点赞收藏~

本文由博客一文多发平台 OpenWrite 发布!

标签:fluentd,插件,end,plugin,chunk,record,tag,日志
来源: https://blog.csdn.net/ss__zz/article/details/118460135

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有