背景介绍
在数据库的安全合规中,通过记录操作日志来进行数据库审计是一种常规的方法。Amazon Aurora MySQL 的高级审计功能,可以高效地记录对数据库的各种操作日志。
这些日志文件的大小与对数据库的操作频繁程度成正比,当日志文件的总大小超过了 Amazon Aurora 实例的本地可用存储空间后,系统会自动删除这些文件,我们可以配置日志文件到 Amazon CloudWatch 进行存储、分析等,安全合规中一般要求的日志保存时间都比较久,大量日志文件长时间在 Amazon CloudWatch 中存储成本较高,我们可以将审计日志文件直接备份到 Amazon S3 中存储,从而降低存储成本,同时也方便对审计日志的后续使用。
本篇博客主要会聚焦备份 Amazon Aurora MySQL 高级审计日志到 S3 进行展开,先介绍 Amazon Aurora MySQL 高级审计功能的实现及日志文件的属性、内容结构,再针对审计日志文件备份的整体架构来详细说明备份的方法及实现逻辑,最后介绍通过配置 Amazon Athena 进行查询的具体方法。
Amazon Aurora MySQL 高级审计介绍
Amazon Aurora MySQL 高级审计功能介绍
您可以在 Amazon Aurora MySQL 中使用高性能的高级审计功能来审核数据库活动。要启用该功能,您可以通过设置多个数据库集群参数来启用审计日志的收集。在启用了高级审计时,您可以用它来记录任意支持事件的组合。
开启高级审计功能后,审计日志会记录到审计日志文件中,审计日志文件可以通过 AWS 数据库控制台日志界面中查看到相应的文件,并可以对文件进行下载或查看。
Amazon Aurora MySQL 高级审计日志介绍
审计日志分别存储在每个实例的本地存储中。每个 Amazon Aurora 实例一次将写入分布在四个日志文件中。日志的最大大小总计为 100 MB。当达到这个不可配置的限制时,Amazon Aurora 将轮换文件并生成四个新文件。
- 文件编码:UTF-8
- 文件名格式:audit.log.[0-3].[%Y-%m-%d-%H-%M][-rotation]
- 文件位置:/rdsdbdata/log/audit/(在每个主机上)
- 文件轮换:每个日志文件的最大大小为 100 MB,目前客户无法配置。当四个日志文件中最大的一个达到 100 MB 时,系统将轮换到一组新的日志文件。
- 文件顺序:日志文件条目不按先后顺序排列。要对条目进行排序,请使用时间戳值(lastWritten)。要查看最新事件,您可能需要查看所有日志文件。
- 文件清理:系统将清理较旧的审计文件,以释放超过一定空间消耗和/或年限的空间。
架构设计
通过以上的架构设计可以看到通过 EC2 进行进行备份程序部署,获取 Amazon Aurora MySQL 的审计日志列表,然后通过 Amazon DynamoBD 存储审计日志文件的时间戳,来记录上次获取文件的时间条件,然后通过将当前获取到的日志文件依次上传到 Amazon S3 来进行备份,最后使用 Amazon Athena 来对 Amazon S3 中的文件进行查询分析。详细步骤如下:
1. python 配置定时任务(历史失败任务及新任务)定期执行下载日志。
2. 上传到 Amazon S3。
3. 在 Amazon DynomoDB 的记录中,查询最近一次所获取到的日志文件的时间戳的最新值。
4. 根据时间戳筛选未备份的 logs 文件列表。并且分别通过 rest 接口获取每个文件的url地址。根据 log 文件的 rest 地址将文件上传到 Amazon S3。
5. 如有失败的文件,重试上传3次,如依然失败记录失败的文件记录到 Amazon Dynamodb,获取上传成功的文件的最后时间戳,更新 Amazon Dynamodb 内时间戳的值。
6. 通过 Amazon Athena 查询日志内容。
具体实现
先决条件
- Amazon Aurora MySQL 已经开启高级审计功能。
- 存储 Amazon Aurora MySQL 审计日志的 S3 存储桶已创建。
- 本代码运行需要具有以下权限:
- 访问 Amazon Aurora MySQL 数据库 API 的权限;
- 存储 Amazon Aurora MySQL 审计日志的 Amazon S3 存储桶的上传及读取权限;
- Amazon DynamoDB 数据库的建表、表数据写入、查询、更新权限。
实例代码
代码从 logjob 方法为入口,定义了相关的参数,aurora_log_upload_s3 方法为总逻辑执行方法。
def logjob():
#DynamoDB中表名称
ddbTableName = 'rds_log_record'
#表的key id
id = 'auroraauditlog'
#数据库实例的名称
dbInstanceName = 'database-1-instance-1'
#日志文件的前缀,固定格式
logPrifix = 'audit/audit.log'
fileLastWritten = 1716452565044
#s3存储同名称
bucketname = 'rds-aurit-logs'
#数据库所在region
region = 'cn-northwest-1'
#逻辑方法
aurora_log_upload_s3(region,dbInstanceName,logPrifix,bucketname)
def aurora_log_upload_s3(region,dbInstanceName,logPrifix,bucketname):
ddbTableName = 'rds_log_record'
id = 'auroraauditlog'
#bucketname = 'rds-aurit-logs'
#检查ddb中是否有日志表,没有创建日志表auroraauditlog
check_db_tabel(ddbTableName)
#获取上次导出的日志的最后写入时间
lastWritten = int(get_lastwritten(ddbTableName,id))
#根据上次最后日志写入时间戳,获取新的日志列表'audit/audit.log'
describeDBLogFileslist = describe_bd_logfiles(dbInstanceName,lastWritten,logPrifix)
if describeDBLogFileslist:
#获取本次获取到到日志的最新的时间戳
newlastWritten = describeDBLogFileslist[0]['LastWritten']
print(f"newlastWritten:{newlastWritten}")
#根据获取列表上传log文件到s3
upload_to_s3_batch(region,dbInstanceName,bucketname,describeDBLogFileslist)
#更新ddb中上传记录的最后上传时间
save_or_update(ddbTableName,id,newlastWritten)
详细方法及代码如下:
1. 查找和记录日志文件的 lastWritten 值
lastWritten 是每个日志文写入最后一条日志的时间戳 ,通过此值可以记录每次查找日志列表的范围,本方案中首次获取全部审计日志的列表并且按照 lastWritten 排序后取得当前的最大值,作为本次获取日志列表的筛选条件,会将此值记录到 Amazon DynamoDB 中,下一次获取列表时会取出此值,获取大于此值的审计文件列表。
参数说明:
参数名称 |
值说明 |
备注 |
tablename |
Amazon Dynamo BD中表名 |
记录lastWritten值的表 |
lastWritten |
日志文件时间戳 |
日志文件时间戳 |
#查询Dynamo BD中是否存在记录如果不存在插入记录,如果存在更新记录
def save_or_update(tablename,id,lastWritten):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(tablename)
try:
response = table.get_item(Key={'id': id})
#return response
if 'Item' not in response:
print(f"insert beging........")
# 数据不存在,插入新数据
response = table.put_item(
Item = {
'id': id,
'last_written': lastWritten,
'created_time': str(datetime.datetime.now())
}
)
if response['ResponseMetadata']['HTTPStatusCode']==200:
print(f"insert success!")
else:
print(f"insert fail!")
else:
createdTime = str(datetime.datetime.now())
# 数据存在,可以选择更新数据
print(response['Item'])
response = table.update_item(
Key={'id': id},
UpdateExpression='SET last_written = :last_written_val,created_time = :created_time_val',
ExpressionAttributeValues={':last_written_val':lastWritten,':created_time_val':createdTime}
)
except Exception as e:
print(f".......{e}")
#获取上次获取日志的lastWritten时间戳
def get_lastwritten(tablename,id):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(tablename)
response = table.get_item(Key={'id': id})
# return response
lastWritten = 0;
if 'Item' in response:
lastWritten = response['Item']['last_written']
return lastWritten
2. 获取日志文件列表
通过 Amazon RDS SDK 的 describe_db_log_files方法获取日志文件列表。
参数说明:
参数名称 |
值说明 |
备注 |
instance_identifier |
数据库名称 |
数据库 |
lastWritten |
日志文件时间戳 |
日志文件时间戳 |
logPrifix |
audit/audit.log |
审计日志固定前缀 |
获取amazon aurora mysql日志列表
def describe_bd_logfiles(instance_identifier,lastWritten,logPrifix):
# 创建RDS客户端
rds = boto3.client('rds')
# 获取Aurora集群的日志文件列表
if lastWritten>0:
response = rds.describe_db_log_files(
DBInstanceIdentifier=instance_identifier,
FilenameContains=logPrifix,
FileLastWritten=lastWritten
)
else:
response = rds.describe_db_log_files(
DBInstanceIdentifier=instance_identifier,
FilenameContains=logPrifix
)
#对结果进行排序
sorted_list = sorted(response['DescribeDBLogFiles'], key=itemgetter('LastWritten'), reverse=True)
return sorted_list
3. 使用签名获取日志文件地址
通过 Amazon RDS REST 接口/v13/downloadCompleteLogFile/DBInstanceIdentifier/LogFileName 使用 Signature Version 4 签名获取的日志文件的下载地址。
参数说明:
参数名称 |
值说明 |
备注 |
instance |
数据库名称 |
数据库名称 |
filename |
审计日志文件名称 |
审计日志文件名称 |
region |
AWS 区域 |
AWS 区域 |
def sig_v4_url(instance, filename, region):
session = boto3.session.Session()
client = session.client(service_name='sts')
response = client.assume_role(RoleArn='arn:aws-cn:iam::542618119936:role/aurora-test-audit-role', RoleSessionName='admin')
aws_access_key_id = response['Credentials']['AccessKeyId']
aws_secret_access_key = response['Credentials']['SecretAccessKey']
aws_session_token = response['Credentials']['SessionToken']
# session = boto3.Session()
# cred = session.get_credentials()
# aws_access_key_id = cred.access_key
# aws_secret_access_key = cred.secret_key
# aws_session_token = cred.token
if aws_access_key_id is None or aws_secret_access_key is None or aws_session_token is None:
logger.error('Could not get AWS credentials')
return
# Step 1: Create a canonical request
## Ref: http://docs.thinkwithwp.com/general/latest/gr/sigv4-create-canonical-request.html
## HTTPMethod
method = 'GET'
service = 'rds'
#host = 'rds.' + region + '.amazonaws.com'
host = 'rds.' + region + '.amazonaws.com.cn'
## CanonicalURI
canonical_uri = '/v13/downloadCompleteLogFile/' + instance + '/' + filename
##SignedHeaders
signed_headers = 'host'
##CanonicalQueryString
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = '/'.join([datestamp, region, service, 'aws4_request'])
canonical_querystring = '&'.join(map(lambda p: '='.join(p), sorted({
'X-Amz-Algorithm': algorithm,
'X-Amz-Credential': urllib.parse.quote_plus(aws_access_key_id + '/' + credential_scope),
'X-Amz-Date': amz_date,
'X-Amz-Expires': '30',
'X-Amz-Security-Token': urllib.parse.quote_plus(aws_session_token),
'X-Amz-SignedHeaders': signed_headers
}.items())))
##CanonicalHeaders
canonical_headers = 'host:' + host + '\n'
##HashedPayload
hashed_payload = hashlib.sha256(''.encode("utf-8")).hexdigest()
canonical_request = '\n'.join([method, canonical_uri, canonical_querystring,
canonical_headers, signed_headers, hashed_payload])
# Step 2: Create a string to sign
string_to_sign = '\n'.join([algorithm, amz_date,
credential_scope, hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()])
# Step 3: Calculate Signature
kDate = sign_msg(('AWS4' + aws_secret_access_key).encode('utf-8'), datestamp)
kRegion = sign_msg(kDate, region)
kService = sign_msg(kRegion, service)
kSigning = sign_msg(kService, 'aws4_request')
signature = hmac.new(kSigning, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
# Step 4: Add the signature to the request
canonical_querystring += '&X-Amz-Signature=' + signature
endpoint = 'https://' + host
return endpoint + canonical_uri + "?" + canonical_querystring
4. 分段上传文件到 S3
获取到每个审计日志文件的地址后,通过文件地址获取文件流,以流的形式将文件分段上传到 S3 中。
参数说明:
参数名称 |
值说明 |
备注 |
file_url |
审计日志地址 |
审计日志地址 |
bucket_name |
备份日志的 Amazon S3 存储桶 |
备份日志的 Amazon S3 存储桶 |
object_key |
Amazon S3 中文件对象名 |
此处为审计日志文件名 |
#分段上传日志文件到s3
def upload_to_s3(file_url, bucket_name, object_key, chunk_size=5242880):
# 创建S3客户端
s3_client = boto3.client('s3')
# 从URL获取文件流
response = requests.get(file_url, stream=True)
# 检查请求是否成功
if response.status_code == 200:
# 创建一个MultiPart上传
upload_id = s3_client.create_multipart_upload(Bucket=bucket_name, Key=object_key)['UploadId']
parts = []
part_number = 1
chunk = BytesIO()
total_size = 0
for data in response.iter_content(chunk_size=chunk_size):
chunk.write(data)
total_size += len(data)
# 当块达到指定大小时,上传块
if chunk.tell() >= chunk_size:
chunk.seek(0)
part = s3_client.upload_part(
Body=chunk,
Bucket=bucket_name,
Key=object_key,
PartNumber=part_number,
UploadId=upload_id
)
parts.append({'PartNumber': part_number, 'ETag': part['ETag']})
part_number += 1
chunk = BytesIO()
# 上传剩余的数据作为最后一个部分
if chunk.tell() > 0:
chunk.seek(0)
part = s3_client.upload_part(
Body=chunk,
Bucket=bucket_name,
Key=object_key,
PartNumber=part_number,
UploadId=upload_id
)
parts.append({'PartNumber': part_number, 'ETag': part['ETag']})
# 完成MultiPart上传
s3_client.complete_multipart_upload(
Bucket=bucket_name,
Key=object_key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
print(f"文件成功上传到S3: {bucket_name}/{object_key}")
else:
print("无法获取文件流")
其余方法:
#根据log文件列表获取
def upload_to_s3_batch(region,dbInstanceName,bucketname,describeDBLogFiles):
number = 0
for log_file in describeDBLogFiles:
logfilename = log_file['LogFileName']
logsize = log_file['Size']
if logsize > 10000000:#测试效果用,实际生产去掉次判断
Logfileresturl = get_log_file_url(dbInstanceName, logfilename,region);
upload_to_s3(Logfileresturl, bucketname, logfilename)
number = number + 1;
print(f"第: {number}个文件上传完成[ {logfilename} ] /{logsize}")
日志查询
Amazon Athena 是一种交互式查询服务,允许您使用标准 SQL 语言直接分析存储在 Amazon S3 中的数据。
1. 在 Glue 中创建数据库、表
(1)在 Amazon 控制台进入 Amazon Glue 界面,点击右侧 Data Catalog 下的 Databases 菜单进入 Databases 列表页,点击 Add database 按钮,创建 database
(2)创建 database
(3)使用 SQL 创建数据表
进入 Amazon Athena 控制台,使用默认的数据源 Amazon DataCatalog,选择刚刚创建的 database,在查询编辑器中输入建表 SQL 并执行。
CREATE EXTERNAL TABLE IF NOT EXISTS `aurora-audit_logs`.`audit_logs` (
`timestamp` bigint,
`serverhost` string,
`username` string,
`host` string,
`connectionid` int,
`queryid` int,
`operation` string,
`database` string,
'object' string
`retcode` int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'S3://BUCKET/OBJECT/'
TBLPROPERTIES ('classification' = 'csv');
执行成功后,在右侧会显示创建好的表。
Amazon Aurora audit log 日志字段解释:
字段 |
描述 |
timestamp |
所记录事件的 Unix 时间戳,精度为微秒。 |
serverhost |
记录了其事件的实例的名称。 |
username |
已连接用户的用户名。 |
host |
用户发起连接时所在的主机。 |
connectionid |
所记录操作的连接 ID 号。 |
queryid |
查询 ID 号,可用于查找关系表事件和相关查询。对于 TABLE 事件,添加多行。 |
operation |
记录的操作类型。可能值为:CONNECT、QUERY、READ、WRITE、CREATE、ALTER、RENAME 和 DROP。 |
database |
活动数据库,由 USE 命令设置。 |
object |
对于 QUERY 事件,此值指示数据库执行的查询。对于 TABLE 事件,它指示表名。 |
retcode |
所记录操作的返回代码。 |
2. 查询日志
在查询器中我们可以使用 SQL 查询我们想了解的内容,以下是两个测试 SQL。
(1)统计各种操作的 SQL 数量
SQL:
select count(1),operation from audit_logs group by operation;
结果:
(2)查询非管理员账号,最后连接数据库的用户信息
SQL:
select * from audit_logs where operation = ‘CONNECT’ and username!=’rdsadmin’ order by timestamp desc limit 5;
结果:
总结
审计日志不论是在安全合规中还是在日常的业务处理中都是一个非常重要的资产,及时、快速、方便的备份审计日志是非常必要的。使用本方案可以满足 Amazon Aurora MySQL 高级审计日志在不同场景中的备份及查询需求,不论是在已存在一定量的日志文件场景下,首次需要长时间将已存在的日志文件上传至 Amazon S3,还是业务比较繁忙频繁产生大量的日志文件的场景下,都可以通过本方案来实现相关的需求。
本篇作者