观测云采集 AWS OpenSearch logs 解决方案

    观测云采集-AWS-OpenSearch-logs-解决方案.png

    一、 简介

    为方便 AWS OpenSearch 日志更方便查看和告警,我们观测云提供了 Func 采集 AWS OpenSearch 日志的完整解决方案。

    二、 前置条件

    三、设置 AWS OpenSearch 访问密钥

    3.1 登录IAM控制台

    3.2 创建账户

    设置用户名称、凭证类型, 选择「访问密钥」 - 「编程访问」

    3.3 设置 AK 权限

    设置 CloudWatchLogsReadOnlyAccess、CloudWatchEventsReadOnlyAccess 权限

    3.4 复制 AK 密钥

    四、配置 Func 脚本和定时任务

    4.1 添加环境变量

    设置 AWS_ACCESS_KEY、AWS_REGION_NAME、AWS_SECRET_ACCESS_KEY

    4.2 添加 Datakit 数据源

    字段
    ID datakit
    主机 DataKit 的主机地址(注意设置HTTP 服务
    端口 9529

    4.3 添加 AWS SDK

    4.3.1 开启PIP工具模块

    「管理」- 「实验功能」-「PIP工具」- 「开启PIP工具模块」

    4.3.2 安装 boto3

    4.4 添加脚本集

    4.5 添加 Func 脚本

    4.6 添加采集代码

    import boto3
    import json
    import time
    
    
    @DFF.API('aws log', timeout=500, api_timeout=180)
    def run(measurement, logGroupName, interval):
        print(measurement, logGroupName, interval)
        get_log_data(measurement, logGroupName, interval)
        # if data is not None:
        #     push_log(data)
        # else:
        #     print("None")
    
    
    def get_cron_time(interval, measurement):
        cache = DFF.CACHE.get('last_time_%s' %measurement,scope='aws_log')
        if cache == None:
            currentTime = int(round(time.time() * 1000))
            startTime = currentTime - int(interval) * 1000
            endTime = currentTime
        else:
            currentTime = int(round(time.time() * 1000))
            if currentTime - int(cache) > 10 * 60 * 1000:
                startTime = currentTime - int(interval) * 1000
                endTime = currentTime
            else:
                startTime = int(cache) + 1
                endTime = currentTime
        print(startTime, endTime)
        return  startTime, endTime
    
    def get_log_data(measurement, logGroupName, interval):
        logTime = get_cron_time(interval, measurement)
        startTime = logTime[0]
        endTime = logTime[1]
        isPush = False
        client = boto3.client(
            'logs',
            aws_access_key_id=DFF.ENV('AWS_ACCESS_KEY'),
            aws_secret_access_key=DFF.ENV('AWS_SECRET_ACCESS_KEY'),
            region_name=DFF.ENV('AWS_REGION_NAME')
        )# print(client.meta.config)
        try:
            nextToken = 'frist'
            logData = []
            while nextToken != '':
                if nextToken == 'frist':
                    nextToken = ''
                    response = client.filter_log_events(
                        logGroupName=logGroupName,
                        startTime=startTime,
                        endTime=endTime,
                        limit=1000,
                        #filterPattern="?ERROR ?WARN ?error ?warn",
                        interleaved=False
                    )
                else:
                    response = client.filter_log_events(
                        logGroupName=logGroupName,
                        startTime=startTime,
                        endTime=endTime,
                        nextToken=nextToken,
                        limit=1000,
                        #filterPattern="?ERROR ?WARN ?error ?warn",
                        interleaved=False
                    )
                try:
                    if len(response['events']) > 0:
                        data = []
                        lastTimeList = []
                        for i in response['events']:
                            # print("hii", i['logStreamName'])
                            log = {
                                'measurement': measurement,
                                'tags': {
                                    'logGroupName': logGroupName,
                                    'logStreamName': i['logStreamName'],
                                    'host': '127.0.0.1'
                                },
                                'fields': {
                                    'message': i['message'],
                                    'time': i['timestamp']
                                }
                            }
                            data.append(log)
                            lastTimeList.append(i['timestamp'])
                        push_log(data)
                        print("max %s"  % max(lastTimeList))
                        DFF.CACHE.set('last_time_%s' % measurement, max(lastTimeList), scope='aws_log', expire=None)
                        isPush = True
                    else:
                        DFF.CACHE.set('last_time_%s' % measurement, endTime , scope='aws_log', expire=None)
                    nextToken = response['nextToken']
                except:
                    nextToken = ''
        except Exception as  e:
            print('Error: %s' % e )
            return None
        if not isPush:
            DFF.CACHE.set('last_time_%s' % measurement, endTime , scope='aws_log', expire=None)
    
    def push_log(data):
        datakit = DFF.SRC('datakit')
        status_code, result = datakit.write_logging_many(data=data)
        if status_code == 200:
            print("total %d"  % len(data))
            print(status_code, result)
    

    4.7 发布代码

    发布代码后,才能在自动触发配置关联脚本

    4.8 新建自动触发配置

    「管理」-「自动触发配置」

    4.9 设置触发配置

    字段 描述
    measurement 观测云 source名称
    logGroupName AWS 日志组
    interval 初次启动的间隔时间(s)不宜太大
    {
      "measurement": "es-search-log",
      "logGroupName": "/aws/OpenSearchService/domains/df-prd-es/search-logs",
      "interval": 60
    }
    

    4.10 设置自动触发频率

    4.11 查看任务情况

    五、验证日志

    登录观测云,查看日志是否采集成功

    联系我们

    加入社区

    微信扫码
    加入官方交流群

    立即体验

    在线开通,按量计费,真正的云服务!

    立即开始

    选择观测云版本

    代码托管平台