분할된 Amazon S3 액세스 로그를 사용하여 Athena 쿼리 시간 초과를 방지하려면 어떻게 해야 합니까?
Amazon Simple Storage Service(Amazon S3) 액세스 로그에 대한 Amazon Athena 쿼리를 실행할 때 쿼리 시간이 초과됩니다. 이 문제를 해결하고 싶습니다.
해결 방법
Amazon S3 액세스 로그는 동일한 접두사로 저장됩니다. 데이터의 양이 많으면 Athena가 모든 데이터를 읽기 전에 Athena 쿼리 시간이 초과될 수 있습니다. 이 문제를 방지하려면 AWS Glue ETL 작업을 사용하여 Amazon S3 데이터를 분할합니다. 이후 제한된 파티션에서 Athena 쿼리를 실행합니다.
참고: 테이블, 스크립트 및 명령 예시에서 필요한 경우 다음 값을 원하는 값으로 바꾸십시오.
- s3_access_logs_db를 데이터베이스의 이름으로 바꿉니다.
- **s3://awsexamplebucket1-logs/prefix/**를 Amazon S3 액세스 로그를 저장하는 경로로 바꿉니다.
- s3_access_logs를 테이블 이름으로 바꿉니다.
- s3_access_logs_partitioned를 파티션된 테이블 이름으로 바꿉니다.
- 2023, 03 및 04를 파티션 값으로 바꿉니다.
Amazon S3 데이터 파티션
Athena에서 다음 테이블을 생성합니다.
CREATE EXTERNAL TABLE `s3_access_logs_db.s3_access_logs`( `bucketowner` string, `bucket_name` string, `requestdatetime` string, `remoteip` string, `requester` string, `requestid` string, `operation` string, `key` string, `request_uri` string, `httpstatus` string, `errorcode` string, `bytessent` string, `objectsize` string, `totaltime` string, `turnaround_time` string, `referrer` string, `useragent` string, `version_id` string, `hostid` string, `sigv` string, `ciphersuite` string, `authtype` string, `endpoint` string, `tlsversion` string, `accesspoint_arn` string, `aclrequired` string) ROW FORMAT SERDE 'com.amazonaws.glue.serde.GrokSerDe' WITH SERDEPROPERTIES ( 'input.format'='%{NOTSPACE:bucketowner} %{NOTSPACE:bucket_name} \\[%{INSIDE_BRACKETS:requestdatetime}\\] %{NOTSPACE:remoteip} %{NOTSPACE:requester} %{NOTSPACE:requestid} %{NOTSPACE:operation} %{NOTSPACE:key} \"%{INSIDE_QS:request_uri}\" %{NOTSPACE:httpstatus} %{NOTSPACE:errorcode} %{NOTSPACE:bytes_sent} %{NOTSPACE:objectsize} %{NOTSPACE:totaltime} %{NOTSPACE:turnaround_time} \"?%{INSIDE_QS:referrer}\"? \"%{INSIDE_QS:useragent}\" %{NOTSPACE:version_id} %{NOTSPACE:hostid} %{NOTSPACE:sigv} %{NOTSPACE:ciphersuite} %{NOTSPACE:authtype} %{NOTSPACE:endpoint} %{NOTSPACE:tlsversion}( %{NOTSPACE:accesspoint_arn} %{NOTSPACE:aclrequired})?', 'input.grokCustomPatterns'='INSIDE_QS ([^\"]*)\nINSIDE_BRACKETS ([^\\]]*)') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://awsexamplebucket1-logs/prefix/';
AWS Glue ETL 작업 생성
다음 단계를 완료합니다.
-
AWS Glue 콘솔을 엽니다.
-
ETL jobs(ETL 작업)를 선택한 다음 Spark script editor(Spark 스크립트 편집기)를 선택합니다.
-
Create(생성)를 선택합니다.
-
스크립트 탭에서 다음 스크립트를 입력합니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import split, col, size from awsglue.dynamicframe import DynamicFrame ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db', table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"}) df = dyf.toDF() df2=df.withColumn('filename',split(col("s3path"),"/"))\ .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\ .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\ .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\ .drop('s3path','filename') output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf') partitionKeys = ['year', 'month', 'day'] sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/', enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=partitionKeys) sink.setFormat("glueparquet") sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned') sink.writeFrame(output_dyf) job.commit() -
Job details(작업 세부 정보) 탭에서 작업 이름을 입력한 다음 IAM role(IAM 역할)을 선택합니다.
-
Save(저장)를 선택하고 Run(실행)을 선택합니다.
참고: Amazon S3 액세스 로그는 정기적으로 전달됩니다. AWS Glue ETL 작업에 대한 시간 기반 일정을 설정하려면 트리거를 추가하십시오. 또한 작업 북마크를 켜십시오.
DynamicFrame 객체 및 파티션된 테이블 생성
Amazon S3 액세스 로그 테이블을 생성한 후 Amazon S3 액세스 로그를 포함하는 DynamicFrame 객체를 생성합니다. 그런 다음 DynamicFrame 객체를 기반으로 연도, 월, 일을 표시하는 키를 포함하는 파티션된 테이블을 생성할 수 있습니다.
다음 단계를 완료합니다.
-
다음 명령을 실행하여 DynamicFrame 객체를 생성하고 Amazon S3 액세스 로그 테이블을 스캔합니다.
dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db, table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})참고: attachFilename 파라미터는 열 이름으로 사용됩니다.
-
다음 명령을 실행하여 Amazon S3 액세스 로그 경로에서 연도, 월, 일 열을 생성합니다.
df = dyf.toDF()df2=df.withColumn('filename',split(col("s3path"),"/"))\ .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\ .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\ .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\ .drop('s3path','filename') -
다음 명령을 실행하여 Amazon S3 액세스 로그에 대한 파티션된 테이블을 생성합니다.
output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf') partitionKeys = ['year', 'month', 'day'] sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/', enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=partitionKeys) sink.setFormat("glueparquet") sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned') sink.writeFrame(output_dyf)
파티션된 테이블 쿼리
- Athena 콘솔을 엽니다.
- 다음 명령을 실행하여 테이블을 쿼리하고 테이블이 파티션되었는지 확인합니다.
SELECT * FROM "s3_access_logs_db"."s3_access_logs_partitioned" WHERE year = '2023' AND month = '03' AND day = '04'
- 언어
- 한국어
