背景说明
应用系统的日志收集与分析工作对运维来说至关重要。常见的系统解决方案中开源技术栈ELK(Elastic Stack: Elasticsearch, Logstash, Kibana)是当前比较流行的选择。下面我们会讨论另一种构建于云原生设计的类似于ELK的解决方案EKK(Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)。
EKK的优势在于组件是AWS托管服务,不必自己安装、运维,并且与AWS的其它服务轻松集成,可以很轻松的部署一套可靠、可扩展、安全、容错以及解耦和基于事件的解决方案。
传统的Elasticsearch中,日志数据的不断膨胀,对数据的生命周期管理越来越重要(应对此需求的新功能ILM(index lifecycle management)在Elasticsearch 7.0中闪亮登场)。本文不介绍ILM,介绍另一种解决方案:使用Lambda配合实现数据的轮换。
文中使用的AWS服务简介
1. Amazon Elasticsearch Service
是一项完全托管的服务,方便您部署、保护和运行大量 Elasticsearch 操作,且不用停机。该服务提供开源 Elasticsearch API、受托管的 Kibana 以及与 Logstash 和其他 AWS 服务的集成,支持您安全获取任何来源的数据,并开展实时搜索、分析和可视化。
2. Amazon Kinesis Firehose
Firehose 是将流数据可靠地加载到数据湖、数据存储和分析工具的最简单方式。它可以捕获、转换流数据并将其加载到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service 和 Splunk,让您可以借助正在使用的现有商业智能工具和控制面板进行近乎实时的分析。这是一项完全托管的服务,可以自动扩展以匹配数据吞吐量,并且无需持续管理。它还可以在加载数据前对其进行批处理、压缩和加密,从而最大程度地减少目的地使用的存储量,同时提高安全性。
本文中,Firehouse会获取并自动加载日志的流式数据到Amazon ES里,并在S3中还会再进行一次备份。
3. AWS Lambda
通过 AWS Lambda,无需预置或管理服务器即可运行代码。您只需按使用的计算时间付费 – 代码未运行时不产生费用。
借助 Lambda,您几乎可以为任何类型的应用程序或后端服务运行代码,而且完全无需管理。只需上传您的代码,Lambda 会处理运行和扩展高可用性代码所需的一切工作。您可以将您的代码设置为自动从其他 AWS 产品触发,或者直接从任何 Web 或移动应用程序调用。
本文中使用Lambda实现Elasticsearch Index的rotate功能,将历史数据从ES中移除,解决由历史数据太大造成的性能下降,成本上升等问题。
并使用Lambda实现了,将S3中的历史数据文件Load回ES,借助ES进行临时性历史日志分析。
4. Amazon CloudWatch
Amazon CloudWatch 是一种面向开发运营工程师、开发人员、站点可靠性工程师 (SRE) 和 IT 经理的监控和可观测性服务。CloudWatch 为您提供相关数据和切实见解,以监控应用程序、响应系统范围的性能变化、优化资源利用率,并在统一视图中查看运营状况。CloudWatch 以日志、指标和事件的形式收集监控和运营数据,让您能够在统一查看在 AWS 和本地服务器上运行的资源、应用程序和服务。您可以使用 CloudWatch 检测环境中的异常行为、设置警报、并排显示日志和指标、执行自动化操作、排查问题,以及发现可确保应用程序正常运行的见解。
5. Amazon Athena
Amazon Athena 是一种交互式查询服务,让您能够轻松使用标准 SQL 分析 Amazon S3 中的数据。Athena 没有服务器,因此您无需管理任何基础设施,且只需为您运行的查询付费。
Athena 简单易用。只需指向您存储在 Amazon S3 中的数据,定义架构并使用标准 SQL 开始查询。就可在数秒内获取最多的结果。使用 Athena,无需执行复杂的 ETL 作业来为数据分析做准备。这样一来,具备 SQL 技能的任何人都可以轻松快速地分析大规模数据集。
本文最后简单介绍实用Athena服务,如何直接进行基于S3全量日志的分析工作。
6. AWS Identity and Access Management (IAM)
AWS Identity and Access Management (IAM) 使您能够安全地管理对 AWS 服务和资源的访问。您可以使用 IAM 创建和管理 AWS 用户和组,并使用各种权限来允许或拒绝他们对 AWS 资源的访问。
架构
1. EKK 和 数据Rotate架构
2. Load 历史数据到ES架构
在IAM服务中创建身份授权IAM Role
1. 为需要收集日志的EC2,创建Role:EKK-EC2
2. 为Kinesis Firehose 创建Role: EKK-Firehose
设置权限
在Amazon Elasticsearch服务中创建创建 ES cluster
1. 计算存储要求
源数据 * (1 + 副本数量) * 1.45 = 最小存储要求
(元数据 + 增长空间) * (1 + 索引开销) / 所需的分片大小 = 主分片的大约数量
尽量使分片大小保持在 10–50GiB 之间是一种比较好的做法。
2. 实例类型
尝试从每 100GiB 存储要求更接近 2 个 vCPU 核心和 8GiB 内存的配置开始。
3. 专用主节点
实例计数 |
推荐的最小专用主实例类型 |
1–10 |
c5.large.elasticsearch |
10–30 |
c5.xlarge.elasticsearch |
30–75 |
c5.2xlarge.elasticsearch |
75–200 |
r5.4xlarge.elasticsearch |
参考https://docs.amazonaws.cn/elasticsearch-service/latest/developerguide/sizing-domains.html
4. 控制台创建集群
IP地址设置为自己的IP地址CIDR段,此CIDR段之外地址无权访问ES、Kibana。
5. 可获得ES Cluster 和 Kibana的地址
在Kinesis服务中创建Kinesis Firehose
配置需要收集日志服务器
1. 在EC2服务中为需要收集日志的EC2分配角色
分配角色
Linux日志环境
安装Agent
$sudo yum install –y aws-kinesis-agent
或者
$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
打开并编辑配置文件:/etc/aws-kinesis/agent.json
{
"cloudwatch.emitMetrics": true,
"firehose.endpoint": "firehose.cn-north-1.amazonaws.com.cn",
"flows": [
{
"filePattern": "/var/log/httpd/access_log*",
"deliveryStream": "EKK-LogFirehose-apachelog",
"dataProcessingOptions": [
{
"optionName": "LOGTOJSON",
"logFormat": "COMMONAPACHELOG"
}
]
}
]
}
$sudo service aws-kinesis-agent start
$sudo chkconfig aws-kinesis-agent on
2. Windows日志环境
下载安装Agent
https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/AWSKinesisTap.1.1.168.1.msi
C:\Program Files\Amazon\AWSKinesisTap\appsettings.json
{
"Sources": [
{
"Id": "PerformanceCounter",
"SourceType": "WindowsPerformanceCounterSource",
"Categories": [
{
"Category": "Server",
"Counters": [
"Files Open",
"Logon Total"
]
},
{
"Category": "LogicalDisk",
"Instances": "*",
"Counters": [
"% Free Space",
{
"Counter": "Disk Reads/sec",
"Unit": "Count/Second"
}
]
}
],
}
],
"Sinks": [
{
"Namespace": "MyServiceMetrics",
"Region": "cn-north-1",
"Id": "CloudWatchSink",
"SinkType": "CloudWatch"
},
{
"Id": "WindowsLogKinesisFirehoseSink",
"SinkType": "KinesisFirehose",
"StreamName": "EKK-LogFirehose-iislog",
"Region": "cn-north-1",
"QueueType": "file"
}
],
"Pipes": [
{
"Id": "PerformanceCounterToCloudWatch",
"SourceRef": "PerformanceCounter",
"SinkRef": "WindowsLogKinesisFirehoseSink"
}
]
}
参考: https://docs.thinkwithwp.com/zh_cn/kinesis-agent-windows/latest/userguide/getting-started.html
【可选】模拟一个Apache Log环境为后面的测试验证
在EC2服务中创建一个Linux EC2
chmod 400 EKK-test.pem
ssh -i EKK-test.pem ec2-user@52.81.85.86
安装Kinesis Agent
$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
配置Agent
$sudo vi /etc/aws-kinesis/agent.json
{
"cloudwatch.emitMetrics": true,
"firehose.endpoint": "firehose.cn-north-1.amazonaws.com.cn",
"flows": [
{
"filePattern": "/var/log/httpd/access_log*",
"deliveryStream": "EKK-LogFirehose-apachelog",
"dataProcessingOptions": [
{
"optionName": "LOGTOJSON",
"logFormat": "COMMONAPACHELOG"
}
]
}
]
}
$sudo service aws-kinesis-agent start
$sudo chkconfig aws-kinesis-agent on
$sudo mkdir /var/log/httpd
安装Fake日志生成程序
参考 https://github.com/kiritbasu/Fake-Apache-Log-Generator
$sudo yum install -y git
$git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$sudo yum install python-pip -y
$cd Fake-Apache-Log-Generator/
$sudo pip install -r requirements.txt
产生日志脚本
$cd ~
$sudo vi test.sh
#!/bin/bash # chkconfig: 2345 10 90 cd /var/log/httpd/ while true do sudo python /home/ec2-user/Fake-Apache-Log-Generator/apache-fake-log-gen.py -n 100 -o LOG sleep 10 done |
后台运行脚本:
$sudo sh ./test.sh &
Fake日志产生在目录 /var/log/httpd 中。
【可选】可使用Systems Manager(SSM)安装Agent
1. 为EC2的Role EKK-EC2 附加SSM需要的权限
2. 使用SSM服务
Linux 可使用AWS-RunShellScript
运行Shell的目标EC2 可直接选择,或者按Tag筛选
Windows 类似,可使用AWS-RunPowerShellScript 运行powershell script
使用Lambda在ES轮换数据数据(Rotate)
1. 为Lambda设置IAM Role
2. 在Lambda中创建个Layer(层)
把项目需要的依赖包放到层里,方便Lambda的使用。
层参考: https://docs.thinkwithwp.com/zh_cn/lambda/latest/dg/configuration-layers.html
参考命令:
$mkdir python $pip3 install elasticsearch-curator -t ./python/ $zip -q -r layer.zip ./python |
3. 创建Lambda: ES-Rotate
设置Lambda运行的Layers
保存
设置运行代码,点击
在下面Function code区域更新代码,host地址换成我们创建的ES Cluster地址
import json
import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator
host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn' # For example, search-my-domain.region.es.amazonaws.com
region = 'cn-north-1' # For example, us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
def lambda_handler(event, context):
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
index_list = curator.IndexList(es)
index_list.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=1)
print("Found %s indices to delete" % len(index_list.indices))
# If our filtered list contains any indices, delete them.
if index_list.indices:
curator.DeleteIndices(index_list).do_action()
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
修改Lambda 运行时内存和超时时间
4. 测试
定义测试用例输入参数,
因此Lambda测试用例,不需输入参数,可使用默认设置,
运行测试用例
在ES中查看
参考 https://docs.thinkwithwp.com/zh_cn/elasticsearch-service/latest/developerguide/curator.html
5. 在CloudWatch设置定时轮换
Load to ES from S3
1. 创建Lambad s3-to-es-bulk-by-hour
2. 设置Layer
3. 设置Code
Bucket 设置为Kinesis Firehose 中设置的bucket
Host 设置为ES 的endpoint
import boto3
import re
import requests
from requests_aws4auth import AWS4Auth
import json
from elasticsearch.helpers import bulk
import time
from elasticsearch import Elasticsearch, RequestsHttpConnection
region = 'cn-north-1' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
bucket = 'zhnc-ekk-full-log'
host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn'
type = 'log'
headers = { "Content-Type": "application/json" }
s3 = boto3.client('s3', region_name=region)
def get_all_s3_keys(bucket, prefix):
keys = []
kwargs = {'Bucket': bucket, 'Prefix':prefix}
while True:
resp = s3.list_objects_v2(**kwargs)
for obj in resp['Contents']:
keys.append(obj['Key'])
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return keys
# Lambda execution starts here
def lambda_handler(event, context):
print(event)
for record in event['Records']:
msg = json.loads(json.dumps(eval(record['Sns']['Message'])))
year=msg['year']
month=msg['month']
day=msg['day']
hour=msg['hour']
index = 'apachelog-{}-{}-{}-{}'.format(year,month,day,hour)
print(index)
keys = get_all_s3_keys(bucket,"apachelog/{}/{}/{}/{}".format(year,month,day,hour))
print(len(keys));
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
for key in keys:
ACTIONS = []
obj = s3.get_object(Bucket=bucket, Key=key)
body = obj['Body'].read()
lines = body.splitlines()
for line in lines:
document = json.loads(line)
action = {
"_index": index,
"_type": type,
"_source": document
}
ACTIONS.append(action)
success, _ = bulk(es, ACTIONS, index=index, raise_on_error=True)
print('Performed %d actions' % success)
|
4. 设置运行内存,超时时间
5. 测试
这个Lambda将被SNS触发,创建一个模拟SNS的事件,
消息Message 设置为将要测试的事件,Lambda会读取对应的S3文件到ES中,
消息内容为,
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"Sns": {
"Type": "Notification",
"MessageId": "71c6da65-49bf-5301-8270-8ff199faaa1b",
"TopicArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day",
"Subject": "None",
"Message": "{'year':'2019','month':'06','day':'30','hour':'02'}",
"Timestamp": "2019-07-02T03: 22: 45.020Z",
"SignatureVersion": "1",
"Signature": "ihaGN/JL8u/v57xEY1RTFekpUpgVukM9Ebj9IIM9Rr9KGkUMe6dO7hze7estD0yM9K0QRQAreQ5XiB0Tfj/jOCvyjL9IrRcTplQcWPzMHmVqd4C3942gduFkHyul2+lYa0DJZM46J/Yy7mihe9EfXUySf2Eyok4NsUC6WtnbyJPN17FG1t4fnEWpRwU2Yg+MLM4bJWr3sK5/6xRnUVerLlMm5tCsynybW6FQCYsVgl7SJLW6nBmbCe3v6jRMuKCNW8xptVyEAnII4h5uPVElts0IWhnE+EQG3FNFmOZmj8OLZutRadSrNFexRMZebmKwRZRD5dTaCoD5E6v6TTYGbQ==",
"SigningCertUrl": "https: //sns.cn-north-1.amazonaws.com.cn/SimpleNotificationService-3250158c6506d40f628c21ed8dad1787.pem",
"UnsubscribeUrl": "https://sns.cn-north-1.amazonaws.com.cn/?Action=Unsubscribe&SubscriptionArn=arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"MessageAttributes": {}
}
}
]
}
此事例将读取以下位置的日志文件
点击测试
ES Index已经导入
创建一个SNS Topic(s3-to-es-by-hour) 触发Lambda(s3-to-es-bulk-by-hour)
1. 创建Topic
进入SNS服务页面
2. 订阅事件
3. 测试
消息
{"year":"2019","month":"06","day":"30","hour":"05"}
复制Topic的ARN,下面Lambda会发送消息进来
创建Lambda split-day-to-24-hour
负责把按天ES index的导入事件拆分成24个按小时导入的事件
1. 更新代码
替换SNS Topic ARN 为上面创建的Topic
import json
import boto3
sns = boto3.client('sns')
def lambda_handler(event, context):
for record in event['Records']:
msg = json.loads(json.dumps(eval(record['Sns']['Message'])))
# TODO implement
for i in range(0,24) :
msg['hour'] = "%02d" % i
print(msg)
response = sns.publish(
TopicArn='arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-hour',
Message=json.dumps(msg),
)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
2. 创建SNS Topic 用于触发此Lambda
3. 订阅Lambda
4. 测试
{"year":"2019","month":"06","day":"30"}
2019年6月30日的日志文件,会并发为24个Lambda按小时导入到ES
Athena query data in S3 using SQL
1. 基于S3路径,创建Table
CREATE EXTERNAL TABLE IF NOT EXISTS log.apachelogtest (
'bytes' string,
'datetime' string,
'host' string,
'request' string,
'response' string,
'datetime' string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://poc-zhnc-hive/log/'
TBLPROPERTIES ('has_encrypted_data'='false');
2. 使用Table查询
本篇作者