订阅 Kafka 中的数据¶
作者:宋龙奇
Datakit 支持从 kafka 中订阅消息采集链路、指标和日志信息。目前仅支持 SkyWalking
.
SkyWalking¶
java agent启动配置¶
kafka 插件默认会将 traces
, JVM metrics
, logging
, Instance Properties
, and profiled snapshots
发送到 kafka 集群中。 该功能默认是关闭的。需要将 kafka-reporter-plugin-x.y.z.jar
, 从 agent/optional-reporter-plugins
放到 agent/plugins
才会生效.
修改配置文件 agent/config/agent.config
# 服务名称:最终会在 UI 中展示,确保唯一
agent.service_name=${SW_AGENT_NAME:myApp}
# kafka 地址
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:<ip>:<port>}
在启动之前请先确保 kafka 已经启动。
或者 通过环境变量方式
-Dskywalking.agent.service_name=myApp
-Dskywalking.plugin.kafka.bootstrap_servers=10.200.14.114:9092
启动java项目(jar包形式启动)
- Linux Tomcat 7, Tomcat 8, Tomcat 9
Change the first line oftomcat/bin/catalina.sh
.
CATALINA_OPTS="$CATALINA_OPTS -javaagent:/path/to/skywalking-agent/skywalking-agent.jar"; export CATALINA_OPTS
- Windows Tomcat 7, Tomcat 8, Tomcat 9
Change the first line oftomcat/bin/catalina.bat
.
- JAR file
Add-javaagent
argument to command line in which you start your app. eg:
- Jetty
Modifyjetty.sh
, add-javaagent
argument to command line in which you start your app. eg:
配置 datakit¶
复制配置文件并修改
配置文件说明
[[inputs.kafkamq]]
# kafka 地址
addr = "localhost:9092"
# topic groupID
group_id = "datakit-group"
[inputs.kafkamq.skywalking]
topics = [
"skywalking-metrics",
"skywalking-profilings",
"skywalking-segments",
"skywalking-managements",
"skywalking-meters",
"skywalking-logging",
]
# 如果 skywalking-agent 已经配置了 namespace ,这里必填。
namespace = ""
# [inputs.kafkamq.threads]
# buffer = 100
# threads = 8
plugins = ["db.type"]
# customer_tags = ["key1", "key2", ...]
## Keep rare tracing resources list switch.
## If some resources are rare enough(not presend in 1 hour), those resource will always send
## to data center and do not consider samplers and filters.
# keep_rare_resource = false
## Ignore tracing resources map like service:[resources...].
## The service name is the full service name in current application.
## The resource list is regular expressions uses to block resource names.
## If you want to block some resources universally under all services, you can set the
## service name as "*". Note: double quotes "" cannot be omitted.
# [inputs.kafkamq.close_resource]
# service1 = ["resource1", "resource2", ...]
# service2 = ["resource1", "resource2", ...]
# "*" = ["close_resource_under_all_services"]
# ...
## Sampler config uses to set global sampling strategy.
## sampling_rate used to set global sampling rate.
# [inputs.kafkamq.sampler]
# sampling_rate = 1.0
# [inputs.kafkamq.tags]
# key1 = "value1"
# key2 = "value2"
# ...
# 本地缓存
# [inputs.kafkamq.storage]
# path = "./skywalking_storage"
# capacity = 5120
重启 datakit
将日志发送到 kafka¶
- log4j2
toolkit 依赖包添加到 maven 或者 gradle 中。
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-log4j-2.x</artifactId>
<version>{project.release.version}</version>
</dependency>
在日志中打印 trace ID
<Configuration>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d [%traceId] %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
<AsyncRoot level="INFO">
<AppenderRef ref="Console"/>
</AsyncRoot>
</Loggers>
</Configuration>
将日志发送到kafka
<GRPCLogClientAppender name="grpc-log">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</GRPCLogClientAppender>
整体配置:
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%traceId] [%t] %-5level %logger{36} %msg%n"/>
</Console>
<GRPCLogClientAppender name="grpc-log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%traceId] [%t] %-5level %logger{36} %msg%n"/>
</GRPCLogClientAppender>
<RandomAccessFile name="fileAppender" fileName="${sys:user.home}/tmp/skywalking-logs/log4j2/e2e-service-provider.log" immediateFlush="true" append="true">
<PatternLayout>
<Pattern>[%sw_ctx] [%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c:%L - %m%n</Pattern>
</PatternLayout>
</RandomAccessFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="grpc-log"/>
</Root>
<Logger name="fileLogger" level="info" additivity="false">
<AppenderRef ref="fileAppender"/>
</Logger>
</Loggers>
</Configuration>
至此 agent 会将日志发送到 kafka 中。
更多日志如何配置:
从 kafka 中采集的日志不需要通过 pipeline 处理。已经全部切割好。