Pulsar

简介

flusher_pulsar flusher插件可以实现将采集到的数据,经过处理后,发送到Pulsar

版本

Alpha

配置参数

参数 类型 是否必选 说明
Type String 插件类型
Url String Pulsar url,多地址用逗号分隔,可以参考本文中的用例配置
Topic String Pulsar Topic,支持动态topic, 例如: test_%{contents.appname}
Name String producer名称,默认ilogtail
Convert Struct ilogtail数据转换协议配置
Convert.Protocol String ilogtail数据转换协议,kafka flusher 可选值:custom_single,custom_single_flatten,otlp_log_v1。默认值:custom_single
Convert.Encoding String ilogtail flusher数据转换编码,可选值:jsonnoneprotobuf,默认值:json
Convert.TagFieldsRename Map 对日志中tags中的json字段重命名
Convert.ProtocolFieldsRename Map ilogtail日志协议字段重命名,可当前可重命名的字段:contents,tagstime
EnableTLS Boolean 是否启用TLS安全连接,对应采用TLS和Athenz两种认证模式都需要设置为true,默认值:false
TLSTrustCertsFilePath String TLS CA根证书文件路径,对应采用TLS和Athenz认证时需要指定
Authentication Struct Pulsar连接访问认证配置
Authentication.TLS.CertFile String TLS连接Pulsar证书文件路径
Authentication.TLS.KeyFile String TLS连接Pulsar私钥文件路径
Authentication.Token.Token String 采用JWT 认证方式的token
Authentication.Athenz.ProviderDomain String Provider domain name
Authentication.Athenz.TenantDomain String 租户域
Authentication.Athenz.TenantService String 租户服务
Authentication.Athenz.PrivateKey String Tenant private key path
Authentication.Athenz.KeyID String Key id for the tenant private key
Authentication.Athenz.PrincipalHeader String
Authentication.Athenz.ZtsURL String ZTS server的地址
Authentication.OAuth2.Enabled Boolean 是否启用OAuth2认证
Authentication.OAuth2.IssuerURL String 认证提供商的URL,OAuth2.Enabled开启时必填
Authentication.OAuth2.PrivateKey String JSON 凭据文件的 URL,OAuth2.Enabled开启时必填
Authentication.OAuth2.Audience String Pulsar 集群的 OAuth 2.0 “资源服务” 的标识符
Authentication.OAuth2.Scope String 访问范围
CompressionType String 压缩算法,NONE,LZ4,ZLIB,ZSTD,默认值NONE
BlockIfQueueFull Boolean 队列满的时候是否阻塞,默认值:false
SendTimeout Int 发送超时时间,默认30s
OperationTimeout Int pulsar producer创建、订阅、取消订阅的超时时间,默认30s
ConnectionTimeout Int tcp连接建立超时时间,默认5s
MaxConnectionsPerBroker Int 单个broker连接池保持的连接数,默认1
MaxReconnectToBroker Int 重连broker的最大重试次数,默认为无限
HashingScheme Int 消息push分区的分发方式:JavaStringHash,Murmur3_32Hash,默认值:JavaStringHash
BatchingMaxPublishDelay int 提交时延,默认值:1ms
BatchingMaxMessages int 批量提交最大消息数,默认值:1000
MaxCacheProducers int 动态topic情况下最大Producer数量 ,默认最大数量:8,使用动态topic的使用可以根据自己的情况调整。
PartitionKeys String数组 指定消息分区分发的key。
ClientID String 写入Pulsar的Client ID,默认取值:iLogtail

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Pulsar。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail pulsar flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtailprocessor_regex插件,将上面的日志提取处理后几个关键字段:

  • time
  • loglevel
  • appname
  • thread
  • class
  • message

最后推送到kafka的数据样例如下:

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不通的topic, 则topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。

topic动态表达式规则:

  • %{content.fieldname}content代表从contents中取指定字段值
  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}
  • ${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。可以参考flusher-kafka_v2中的使用。
  • 其它方式暂不支持

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
    Topic: PulsarTestTopic

ProtocolFieldsRename

ilogtail协议字段重命名,在ilogtail的数据转换协议中, 最外层三个字段contents,tagstime属于协议字段。ProtocolFieldsRename只能对 contents,tagstime这个三个字段进行重命名。 例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
      ProtocolFieldsRename:
        time: '@timestamp'
    Topic: PulsarTestTopic

指定分区分发

ilogtail flusher pulsar使用的官方SDK只支持hash方式分区投递,通过HashingScheme来选择不同的hash算法。 分发是可以指定PartitionKeysPartitionKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    PartitionKeys:
      - content.application  
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application

数据平铺

ilogtail 1.8.0新增数据平铺协议custom_single_flattencontentstagstime三个convert层的协议字段中数据做一级打平。 当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。

配置用例:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    Convert:
      Protocol: custom_single_flatten
      Encoding: json
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

非平铺前写入pulsar的消息格式

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

使用平铺协议后custom_single_flattenjson全部被一级平铺。

{
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415",
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log",
    "time": 1664435098
}

安全连接配置

flusher_pulsar支持多种安全认证连接pulsar服务端。

  • TLS认证;
  • TokenJWT Token认证;
  • Athenz pulsar租户域认证;
  • OAuth2认证;

JWT Token认证配置比较简单,参照前面的配置表配置即可,下面主要介绍下OAuth2,TLSAthenz两种认证的配置。

OAuth2认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Authentication:
      OAuth2:
        Enabled: true
        IssuerURL: https://accounts.google.com
        PrivateKey: file:/path/to/file/credentials_file.json
        Audience: https://broker.example.com
        Scope: api://pulsar-cluster-1/.default
    Topic: PulsarTestTopic

credentials_file.json配置内容样例

{
  "type": "client_credentials",
  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
  "client_secret": "on1uJ...k6F6R",
  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
  "issuer_url": "https://accounts.google.com"
}

TLS配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      TLS:
        CertFile: /data/cert/client.crt
        KeyFile: /data/cert/client.key
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用TLS必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://
  • TLSTrustCertsFilePath根证书需要设置。 注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Athenz认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      Athenz:
        ProviderDomain: pulsar
        TenantDomain: shopping
        TenantService: some_app
        PrivateKey: file:///path/to/client-key.pem
        KeyID: v1
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用Athenz认证必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://
  • TLSTrustCertsFilePath根证书需要设置。

results matching ""

    No results matching ""