观测云采集 AWS OpenSearch logs 解决方案
一、 简介
为方便 AWS OpenSearch 日志更方便查看和告警,我们观测云提供了 Func 采集 AWS OpenSearch 日志的完整解决方案。
二、 前置条件
三、设置 AWS OpenSearch 访问密钥
3.1 登录IAM控制台

3.2 创建账户
设置用户名称、凭证类型, 选择「访问密钥」 - 「编程访问」

3.3 设置 AK 权限
设置 CloudWatchLogsReadOnlyAccess、CloudWatchEventsReadOnlyAccess 权限

3.4 复制 AK 密钥

四、配置 Func 脚本和定时任务
4.1 添加环境变量
设置 AWS_ACCESS_KEY、AWS_REGION_NAME、AWS_SECRET_ACCESS_KEY

4.2 添加 Datakit 数据源
| 字段 | 值 |
|---|---|
| ID | datakit |
| 主机 | DataKit 的主机地址(注意设置HTTP 服务) |
| 端口 | 9529 |


4.3 添加 AWS SDK
4.3.1 开启PIP工具模块
「管理」- 「实验功能」-「PIP工具」- 「开启PIP工具模块」

4.3.2 安装 boto3

4.4 添加脚本集

4.5 添加 Func 脚本

4.6 添加采集代码
import boto3
import json
import time
@DFF.API('aws log', timeout=500, api_timeout=180)
def run(measurement, logGroupName, interval):
print(measurement, logGroupName, interval)
get_log_data(measurement, logGroupName, interval)
# if data is not None:
# push_log(data)
# else:
# print("None")
def get_cron_time(interval, measurement):
cache = DFF.CACHE.get('last_time_%s' %measurement,scope='aws_log')
if cache == None:
currentTime = int(round(time.time() * 1000))
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
currentTime = int(round(time.time() * 1000))
if currentTime - int(cache) > 10 * 60 * 1000:
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
startTime = int(cache) + 1
endTime = currentTime
print(startTime, endTime)
return startTime, endTime
def get_log_data(measurement, logGroupName, interval):
logTime = get_cron_time(interval, measurement)
startTime = logTime[0]
endTime = logTime[1]
isPush = False
client = boto3.client(
'logs',
aws_access_key_id=DFF.ENV('AWS_ACCESS_KEY'),
aws_secret_access_key=DFF.ENV('AWS_SECRET_ACCESS_KEY'),
region_name=DFF.ENV('AWS_REGION_NAME')
)# print(client.meta.config)
try:
nextToken = 'frist'
logData = []
while nextToken != '':
if nextToken == 'frist':
nextToken = ''
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
else:
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
nextToken=nextToken,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
try:
if len(response['events']) > 0:
data = []
lastTimeList = []
for i in response['events']:
# print("hii", i['logStreamName'])
log = {
'measurement': measurement,
'tags': {
'logGroupName': logGroupName,
'logStreamName': i['logStreamName'],
'host': '127.0.0.1'
},
'fields': {
'message': i['message'],
'time': i['timestamp']
}
}
data.append(log)
lastTimeList.append(i['timestamp'])
push_log(data)
print("max %s" % max(lastTimeList))
DFF.CACHE.set('last_time_%s' % measurement, max(lastTimeList), scope='aws_log', expire=None)
isPush = True
else:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope='aws_log', expire=None)
nextToken = response['nextToken']
except:
nextToken = ''
except Exception as e:
print('Error: %s' % e )
return None
if not isPush:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope='aws_log', expire=None)
def push_log(data):
datakit = DFF.SRC('datakit')
status_code, result = datakit.write_logging_many(data=data)
if status_code == 200:
print("total %d" % len(data))
print(status_code, result)
4.7 发布代码
发布代码后,才能在自动触发配置关联脚本

4.8 新建自动触发配置
「管理」-「自动触发配置」


4.9 设置触发配置
| 字段 | 描述 |
|---|---|
| measurement | 观测云 source名称 |
| logGroupName | AWS 日志组 |
| interval | 初次启动的间隔时间(s)不宜太大 |
{
"measurement": "es-search-log",
"logGroupName": "/aws/OpenSearchService/domains/df-prd-es/search-logs",
"interval": 60
}


4.10 设置自动触发频率

4.11 查看任务情况

五、验证日志
登录观测云,查看日志是否采集成功
