观测云采集 Amazon ECS 日志¶
简介¶
Amazon Elastic Container Service (Amazon ECS) 是一项高度可扩展的快速容器管理服务,可以使用它轻松运行、停止和管理群集上的容器。这些容器可以运行在自己的 EC2 服务器上,也可以运行在由 AWS Fargate 托管的无服务器基础设施。针对任务使用 Fargate 的启动类型,需要启动容器的 awslogs 日志驱动程序, 运行在容器中的应用以 STDOUT 和 STDERR I/O 流的方式输出的日志,会被发送到 CloudWatch Logs 的日志组中,再通过 Func 采集这些日志,Func 再把日志通过 EC2 上部署的 DataKit 写入观测云中。本文的日志采集就是针对 AWS Fargate 托管的容器。
环境版本¶
- DataKit 1.4.18
前置条件¶
- 您需要先创建一个观测云账号
- 安装 DataKit
- 安装 Func 携带版
- 已经拥有运行在 ECS 的 Java 应用
这里使用到的 ECS 集群名称是 cluster-docker,下面查看示例的日志及日志组。登录『AWS』,进入『Elastic Container Service』,点击 『集群』->『cluster-docker』。
点击服务名称。
进入任务。
在详细信息标签的容器下面找到日志配置。
点击日志标签,里面是应用的日志,接下来采集这些日志。
操作步骤¶
步骤 1: AWS 配置¶
1.1 用户密钥¶
使用部署 ECS 用到的账号,AWS 创建该用户时提供的 Access key ID 和 Secret access key 后面会使用到。
1.2 设置 AWS 用户权限¶
登录 AWS 的 IAM 控制台,在用户下面找到 ECS 所在的用户,点击“添加权限”。
点击“直接附加现有策略”。筛选策略输入 CloudWatchLogsReadOnlyAccess、CloudWatchEventsReadOnlyAccess,选中,然后点击“下一步:审核”。
步骤 2: Func 配置¶
2.1 配置环境变量¶
登录『Func』-> 『开发』-> 『环境变量』-> 『添加环境变量』。这里添加三个环境变量,AWS_LOG_KEY 值对应步骤 1 中 AWS 用户的 Access key ID,AWS_LOG_SECRET_ACCESS_KEY 值对应步骤 1 中 AWS 用户的 Secret access key,AWS_REGION_NAME 值对应 AWS 用户所在的 REGION。
2.2 配置连接器¶
登录『Func』->『开发』-> 『连接器』-> 『添加连接器』。这里 ID 必须填 datakit,主机对应已安装 DataKit 的地址,端口是 DataKit 的端口,本示例直接用 IP,所以协议填 HTTP,点击“测试连通性”,有对号返回,说明 DataKit 可用。
2.3 PIP 工具配置¶
登录『Func』-> 『管理』-> 『实验性功能』,右侧选中“开启 PIP 工具模块”。
点击左侧的“PIP 工具”,选择“阿里云镜像”,输入“boto3”,点击“安装”。
2.4 脚本库¶
登录『Func』->『开发』-> 『脚本库』-> 『添加脚本集』,ID 可以自定义,点击“保存”。
找到“AWS 日志采集”,点击“添加脚本”。
输入 ID 这里定义为“aws_ecs__log”,点击“保存”。
点击“编辑”。
输入如下内容。
import boto3
import json
import time
scope_id='ecs_log'
@DFF.API('aws_ecs log', timeout=500, api_timeout=180)
def run(measurement, logGroupName, interval):
print(measurement, logGroupName, interval)
get_log_data(measurement, logGroupName, interval)
# if data is not None:
# push_log(data)
# else:
# print("None")
def get_cron_time(interval, measurement):
cache = DFF.CACHE.get('last_time_%s' %measurement,scope=scope_id)
if cache == None:
currentTime = int(round(time.time() * 1000))
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
currentTime = int(round(time.time() * 1000))
if currentTime - int(cache) > 10 * 60 * 1000:
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
startTime = int(cache) + 1
endTime = currentTime
print(startTime, endTime)
return startTime, endTime
def get_log_data(measurement, logGroupName, interval):
logTime = get_cron_time(interval, measurement)
startTime = logTime[0]
endTime = logTime[1]
isPush = False
client = boto3.client(
'logs',
aws_access_key_id=DFF.ENV('AWS_LOG_KEY'),
aws_secret_access_key=DFF.ENV('AWS_LOG_SECRET_ACCESS_KEY'),
region_name=DFF.ENV('AWS_REGION_NAME')
)# print(client.meta.config)
try:
nextToken = 'frist'
logData = []
while nextToken != '':
if nextToken == 'frist':
nextToken = ''
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
else:
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
nextToken=nextToken,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
try:
if len(response['events']) > 0:
data = []
lastTimeList = []
for i in response['events']:
# print("hii", i['logStreamName'])
log = {
'measurement': measurement,
'tags': {
'logGroupName': logGroupName,
'logStreamName': i['logStreamName'],
'host': '127.0.0.1'
},
'fields': {
'message': i['message'],
'time': i['timestamp']
}
}
data.append(log)
lastTimeList.append(i['timestamp'])
push_log(data)
print("max %s" % max(lastTimeList))
DFF.CACHE.set('last_time_%s' % measurement, max(lastTimeList), scope=scope_id, expire=None)
isPush = True
else:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)
nextToken = response['nextToken']
except:
nextToken = ''
except Exception as e:
print('Error: %s' % e )
return None
if not isPush:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)
def push_log(data):
datakit = DFF.SRC('datakit')
status_code, result = datakit.write_logging_many(data=data)
if status_code == 200:
print("total %d" % len(data))
print(status_code, result)
注意这里第四行的“ecs_log”,需要确保同一个 Func 中唯一,可以改成其它字母。第六行的“awc_ecs”即是刚才添加的脚本集 ID,第 40、41、42 行中的 AWS_LOG_KEY、AWS_LOG_SECRET_ACCESS_KEY、AWS_REGION_NAME 对应步骤 2.1 中的环境变量名,如果环境变量名变了,需要做对应修改.
2.5 测试脚本¶
如下图选择“run”,第二个框中,measurement 值输入 “ecs_log_source”,这个值对应观测云日志中的日志来源,logGroupName 对应前置条件中查到的 “awslogs-group”。interval 值对应采集频率,这里是 60 秒。
点击“执行”,输出“total 8”,即上报八条日志。
登录『观测云』,进入『日志』模块,数据源选择“ecs_log_source”,即可看到日志。
点击右上角的“发布”。
点击右上角的“结束编辑图标”。
2.6 自动采集日志¶
登录『Func』->『管理』-> 『自动触发配置』-> 『新建』,参数输入刚才执行的内容。
时间选择每分钟或者每5分钟,点击“保存”。
在“自动触发配置”列表中存在“aws_ecs log”的记录,点击“近期执行”查看执行情况。