跳转至

观测云采集 Amazon ECS 日志


简介

Amazon Elastic Container Service (Amazon ECS) 是一项高度可扩展的快速容器管理服务,可以使用它轻松运行、停止和管理群集上的容器。这些容器可以运行在自己的 EC2 服务器上,也可以运行在由 AWS Fargate 托管的无服务器基础设施。针对任务使用 Fargate 的启动类型,需要启动容器的 awslogs 日志驱动程序, 运行在容器中的应用以 STDOUT 和 STDERR I/O 流的方式输出的日志,会被发送到 CloudWatch Logs 的日志组中,再通过 Func 采集这些日志,Func 再把日志通过 EC2 上部署的 DataKit 写入观测云中。本文的日志采集就是针对 AWS Fargate 托管的容器。

image

环境版本

  • DataKit 1.4.18

前置条件

这里使用到的 ECS 集群名称是 cluster-docker,下面查看示例的日志及日志组。登录『AWS』,进入『Elastic Container Service』,点击 『集群』->『cluster-docker』。

image

点击服务名称。

image

进入任务。

image

在详细信息标签的容器下面找到日志配置。

image

点击日志标签,里面是应用的日志,接下来采集这些日志。

image

操作步骤

步骤 1: AWS 配置

1.1 用户密钥

使用部署 ECS 用到的账号,AWS 创建该用户时提供的 Access key IDSecret access key 后面会使用到。

1.2 设置 AWS 用户权限

登录 AWS 的 IAM 控制台,在用户下面找到 ECS 所在的用户,点击“添加权限”。

image

点击“直接附加现有策略”。筛选策略输入 CloudWatchLogsReadOnlyAccess、CloudWatchEventsReadOnlyAccess,选中,然后点击“下一步:审核”。

image

步骤 2: Func 配置

2.1 配置环境变量

登录『Func』-> 『开发』-> 『环境变量』-> 『添加环境变量』。这里添加三个环境变量,AWS_LOG_KEY 值对应步骤 1 中 AWS 用户的 Access key ID,AWS_LOG_SECRET_ACCESS_KEY 值对应步骤 1 中 AWS 用户的 Secret access key,AWS_REGION_NAME 值对应 AWS 用户所在的 REGION。

image

2.2 配置连接器

登录『Func』->『开发』-> 『连接器』-> 『添加连接器』。这里 ID 必须填 datakit,主机对应已安装 DataKit 的地址,端口是 DataKit 的端口,本示例直接用 IP,所以协议填 HTTP,点击“测试连通性”,有对号返回,说明 DataKit 可用。

image

2.3 PIP 工具配置

登录『Func』-> 『管理』-> 『实验性功能』,右侧选中“开启 PIP 工具模块”。

image

点击左侧的“PIP 工具”,选择“阿里云镜像”,输入“boto3”,点击“安装”。

image

2.4 脚本库

登录『Func』->『开发』-> 『脚本库』-> 『添加脚本集』,ID 可以自定义,点击“保存”。

image

找到“AWS 日志采集”,点击“添加脚本”。

image

输入 ID 这里定义为“aws_ecs__log”,点击“保存”。

image

点击“编辑”。

image

输入如下内容。

import boto3
import json
import time
scope_id='ecs_log'

@DFF.API('aws_ecs log', timeout=500, api_timeout=180)
def run(measurement, logGroupName, interval):
    print(measurement, logGroupName, interval)
    get_log_data(measurement, logGroupName, interval)
    # if data is not None:
    #     push_log(data)
    # else:
    #     print("None")


def get_cron_time(interval, measurement):
    cache = DFF.CACHE.get('last_time_%s' %measurement,scope=scope_id)
    if cache == None:
        currentTime = int(round(time.time() * 1000))
        startTime = currentTime - int(interval) * 1000
        endTime = currentTime
    else:
        currentTime = int(round(time.time() * 1000))
        if currentTime - int(cache) > 10 * 60 * 1000:
            startTime = currentTime - int(interval) * 1000
            endTime = currentTime
        else:
            startTime = int(cache) + 1
            endTime = currentTime
    print(startTime, endTime)
    return  startTime, endTime

def get_log_data(measurement, logGroupName, interval):
    logTime = get_cron_time(interval, measurement)
    startTime = logTime[0]
    endTime = logTime[1]
    isPush = False
    client = boto3.client(
        'logs',
        aws_access_key_id=DFF.ENV('AWS_LOG_KEY'),
        aws_secret_access_key=DFF.ENV('AWS_LOG_SECRET_ACCESS_KEY'),
        region_name=DFF.ENV('AWS_REGION_NAME')
    )# print(client.meta.config)
    try:
        nextToken = 'frist'
        logData = []
        while nextToken != '':
            if nextToken == 'frist':
                nextToken = ''
                response = client.filter_log_events(
                    logGroupName=logGroupName,
                    startTime=startTime,
                    endTime=endTime,
                    limit=1000,
                    #filterPattern="?ERROR ?WARN ?error ?warn",
                    interleaved=False
                )
            else:
                response = client.filter_log_events(
                    logGroupName=logGroupName,
                    startTime=startTime,
                    endTime=endTime,
                    nextToken=nextToken,
                    limit=1000,
                    #filterPattern="?ERROR ?WARN ?error ?warn",
                    interleaved=False
                )
            try:
                if len(response['events']) > 0:
                    data = []
                    lastTimeList = []
                    for i in response['events']:
                        # print("hii", i['logStreamName'])
                        log = {
                            'measurement': measurement,
                            'tags': {
                                'logGroupName': logGroupName,
                                'logStreamName': i['logStreamName'],
                                'host': '127.0.0.1'
                            },
                            'fields': {
                                'message': i['message'],
                                'time': i['timestamp']
                            }
                        }
                        data.append(log)
                        lastTimeList.append(i['timestamp'])
                    push_log(data)
                    print("max %s"  % max(lastTimeList))
                    DFF.CACHE.set('last_time_%s' % measurement, max(lastTimeList), scope=scope_id, expire=None)
                    isPush = True
                else:
                    DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)
                nextToken = response['nextToken']
            except:
                nextToken = ''
    except Exception as  e:
        print('Error: %s' % e )
        return None
    if not isPush:
        DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)

def push_log(data):
    datakit = DFF.SRC('datakit')
    status_code, result = datakit.write_logging_many(data=data)
    if status_code == 200:
        print("total %d"  % len(data))
        print(status_code, result)

注意这里第四行的“ecs_log”,需要确保同一个 Func 中唯一,可以改成其它字母。第六行的“awc_ecs”即是刚才添加的脚本集 ID,第 40、41、42 行中的 AWS_LOG_KEY、AWS_LOG_SECRET_ACCESS_KEY、AWS_REGION_NAME 对应步骤 2.1 中的环境变量名,如果环境变量名变了,需要做对应修改.

2.5 测试脚本

如下图选择“run”,第二个框中,measurement 值输入 “ecs_log_source”,这个值对应观测云日志中的日志来源,logGroupName 对应前置条件中查到的 “awslogs-group”。interval 值对应采集频率,这里是 60 秒。

image

点击“执行”,输出“total 8”,即上报八条日志。

image

登录『观测云』,进入『日志』模块,数据源选择“ecs_log_source”,即可看到日志。

image

点击右上角的“发布”。

image

点击右上角的“结束编辑图标”。

image

2.6 自动采集日志

登录『Func』->『管理』-> 『自动触发配置』-> 『新建』,参数输入刚才执行的内容。

{
  "measurement": "ecs_log_source",
  "logGroupName": "/ecs/demo-task",
  "interval": 60
}

image

时间选择每分钟或者每5分钟,点击“保存”。

image

在“自动触发配置”列表中存在“aws_ecs log”的记录,点击“近期执行”查看执行情况。

image

image