越来越多的企业构建数据湖以加速企业的数字化转型,提升企业的市场竞争能力,数据湖内的数据是企业的重要资产,确保数据资产被正确的用户访问是保护数据资产的重要工作,分析数据的访问行为是保护数据资产的重要方法。
本文受众:数据湖架构师、云中心成本管理员、安全管理员
一. S3 Bucket访问日志记录方法
Amazon官方提供了两种S3 Bucket的访问日志记录方法,官方也提供了详细的比较:
Performance and Cost |
AWS CloudTrail |
Amazon S3 Server Logs |
Price |
Management events (first delivery) are free; data events incur a fee, in addition to storage of logs |
No additional cost in addition to storage of logs |
Speed of log delivery |
Data events every 5 mins; management events every 15 mins |
Within a few hours |
Log format |
JSON |
Log file with space-separated, newline-delimited records |
它们提供了类似功能,但是在时效性、成本上有所差异。如果客户对于数据资产访问行为的成本敏感,本文提供了一种低成本、时效性较高的解决方法。
二. 详细实现方法
Athena可以访问分析S3 Server Logs,官方给出了详细的方法:
CREATE EXTERNAL TABLE `s3_access_logs_db.mybucket_logs`(
`bucketowner` STRING,
`bucket_name` STRING,
`requestdatetime` STRING,
`remoteip` STRING,
`requester` STRING,
`requestid` STRING,
`operation` STRING,
`key` STRING,
`request_uri` STRING,
`httpstatus` STRING,
`errorcode` STRING,
`bytessent` BIGINT,
`objectsize` BIGINT,
`totaltime` STRING,
`turnaroundtime` STRING,
`referrer` STRING,
`useragent` STRING,
`versionid` STRING,
`hostid` STRING,
`sigv` STRING,
`ciphersuite` STRING,
`authtype` STRING,
`endpoint` STRING,
`tlsversion` STRING)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
'input.regex'='([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://awsexamplebucket1-logs/prefix/'
创建该表后,用户就可以直接访问S3 Bucket的访问日志数据,S3 Bucket内的日志文件组织如下:
细心的工程师会发现,
- 随着数据资产的增多,频繁的访问,日志量会越来越多。
- 由于数据没有分区,访问某一时间段的数据会变得很慢。
- 同时针对每个数据资产创建单独的访问日志表也会带来高额的管理成本。
针对以上问题,本文提供了如下的功能增强:
- 集中管理数据资产访问日志。
- 对数据进行分区,提供按数据资产,时间:年、月、日、小时的细粒度分析能力。
实现架构如下:
第一步:创建支持分区功能的S3 Server Logs表
创建存储S3 Server Logs的Bucket,并且依据内部数据治理标准设置Lifecycle。
CREATE EXTERNAL TABLE `salog_rawdata_4`(
`bucketowner` string COMMENT '',
`bucket_name` string COMMENT '',
`requestdatetime` string COMMENT '',
`remoteip` string COMMENT '',
`requester` string COMMENT '',
`requestid` string COMMENT '',
`operation` string COMMENT '',
`objkey` string COMMENT '',
`request_uri` string COMMENT '',
`httpstatus` string COMMENT '',
`errorcode` string COMMENT '',
`bytessent` string COMMENT '',
`objectsize` string COMMENT '',
`totaltime` string COMMENT '',
`turnaroundtime` string COMMENT '',
`referrer` string COMMENT '',
`useragent` string COMMENT '',
`versionid` string COMMENT '',
`hostid` string COMMENT '',
`sigv` string COMMENT '',
`ciphersuite` string COMMENT '',
`authtype` string COMMENT '',
`request_endpoint` string COMMENT '',
`tlsversion` string COMMENT '')
PARTITIONED BY (
`reqbucket` string,
`reqyear` bigint,
`reqmonth` bigint,
`reqday` bigint,
`reqhour` bigint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
'input.regex'='([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://serveraccesslog-bucket/'
第二步:启用S3 Bucket Server Logs
设置Target bucket
S3://serveraccesslog-bucket/reqbucket={current-bucket-name}/
第三步:创建Lambda,按照partition规范重组日志数据
例如:fn-reorg
import json
import boto3
import re
import os
def lambda_handler(event, context):
# TODO implement
bucketName = os.getenv('bucketName')
salogTableName = os.getenv('salogTableName')
athena_cache = os.getenv('AthenaCache')
dsBucketName_List = event['dsBucketNameList']
client = boto3.client('s3')
for dsBucketName in dsBucketName_List:
l2_prefix = client.list_objects_v2(Bucket = bucketName, Delimiter = '/', Prefix = 'reqbucket='+dsBucketName+'/')
#print(l2_prefix)
print("----------------------%s----------------------"%dsBucketName)
if l2_prefix.get('Contents',False):
for log in l2_prefix['Contents']:
log_name = log['Key']
#print(log_name)
match = re.search('(?P<requestedbucket>reqbucket=.*)/(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})-(?P<hour>\d{2})',log_name,re.ASCII)
match1 = re.search('reqbucket=.*/(?P<logshortname>\d{4}-.*)', log_name, re.ASCII)
if match and match1:
#print(match.group('requestedbucket'))
requested_bucket=match.group('requestedbucket')
#print(match.group('year'))
requested_year='reqyear=' + match.group('year')
#print(match.group('month'))
requested_month='reqmonth='+ match.group('month')
#print(match.group('day'))
requested_day='reqday='+ match.group('day')
#print(match.group('hour'))
requested_hour='reqhour='+ match.group('hour')
#print(match1.group('logshortname'))
log_short_name=match1.group('logshortname')
# copy object
client.copy({'Bucket': bucketName,'Key': log_name}, Bucket = bucketName, Key = requested_bucket + '/' + requested_year + '/' + requested_month + '/' + requested_day + '/' + requested_hour + '/' + log_short_name)
print("The log file: %s has been moved!"%log_name)
# delete object
client.delete_object(Bucket = bucketName, Key = log_name)
print('--------------------------------------------------------')
return {
'statusCode': 200,
'body': json.dumps('S3 Server Log re-organize function!')
}
重组后的S3 Bucket结构
serveraccesslog/reqbucket=airports/reqyear=2022/reqmonth=04/reqday=07/reqhour=03/
创建刷新分区的Lambda
例如:fn-refresh-partition
import json
import boto3
import re
import os
def lambda_handler(event, context):
# TODO implement
bucketName = os.getenv('bucketName')
salogTableName = os.getenv('salogTableName')
athena_cache = os.getenv('AthenaCache')
dsBucketName_List = event['dsBucketNameList']
client1 = boto3.client('athena')
rsp=client1.start_query_execution(QueryString='msck repair table '+ salogTableName, ResultConfiguration={'OutputLocation': athena_cache})
#print(rsp)
query_execution_id = rsp["QueryExecutionId"]
# Query in progress
print("Refresh Partition", end="", flush=True)
while 1:
print(".", end="", flush=True)
query_status = client1.get_query_execution(QueryExecutionId=query_execution_id)
#print(query_status["QueryExecutions"][0]["Status"]["State"])
if query_status["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
print("\nDone",flush=True)
break
return {
'statusCode': 200,
'body': json.dumps('S3 Server Log re-organize function!')
}
第四步:创建EventBridge Rule
fn-reorg
调度频率:15分钟
{
"dsBucketNameList": ["airports", "airport-data", "airport-analysis"]
}
fn-refresh-partition
调度频率:15分钟。
本篇作者