내용으로 건너뛰기

분할된 Amazon S3 액세스 로그를 사용하여 Athena 쿼리 시간 초과를 방지하려면 어떻게 해야 합니까?

3분 분량
0

Amazon Simple Storage Service(Amazon S3) 액세스 로그에 대한 Amazon Athena 쿼리를 실행할 때 쿼리 시간이 초과됩니다. 이 문제를 해결하고 싶습니다.

해결 방법

Amazon S3 액세스 로그는 동일한 접두사로 저장됩니다. 데이터의 양이 많으면 Athena가 모든 데이터를 읽기 전에 Athena 쿼리 시간이 초과될 수 있습니다. 이 문제를 방지하려면 AWS Glue ETL 작업을 사용하여 Amazon S3 데이터를 분할합니다. 이후 제한된 파티션에서 Athena 쿼리를 실행합니다.

참고: 테이블, 스크립트 및 명령 예시에서 필요한 경우 다음 값을 원하는 값으로 바꾸십시오.

  • s3_access_logs_db를 데이터베이스의 이름으로 바꿉니다.
  • **s3://awsexamplebucket1-logs/prefix/**를 Amazon S3 액세스 로그를 저장하는 경로로 바꿉니다.
  • s3_access_logs를 테이블 이름으로 바꿉니다.
  • s3_access_logs_partitioned를 파티션된 테이블 이름으로 바꿉니다.
  • 2023, 0304를 파티션 값으로 바꿉니다.

Amazon S3 데이터 파티션

Athena에서 다음 테이블을 생성합니다.

CREATE EXTERNAL TABLE `s3_access_logs_db.s3_access_logs`(    
  `bucketowner` string,
  `bucket_name` string,
  `requestdatetime` string,
  `remoteip` string,
  `requester` string,
  `requestid` string,
  `operation` string,
  `key` string,
  `request_uri` string,
  `httpstatus` string,
  `errorcode` string,
  `bytessent` string,
  `objectsize` string,
  `totaltime` string,
  `turnaround_time` string,
  `referrer` string,
  `useragent` string,
  `version_id` string,
  `hostid` string,
  `sigv` string,
  `ciphersuite` string,
  `authtype` string,
  `endpoint` string,
  `tlsversion` string,
  `accesspoint_arn` string,
  `aclrequired` string)
ROW FORMAT SERDE
  'com.amazonaws.glue.serde.GrokSerDe'
WITH SERDEPROPERTIES (
  'input.format'='%{NOTSPACE:bucketowner} %{NOTSPACE:bucket_name} \\[%{INSIDE_BRACKETS:requestdatetime}\\] %{NOTSPACE:remoteip} %{NOTSPACE:requester} %{NOTSPACE:requestid} %{NOTSPACE:operation} %{NOTSPACE:key} \"%{INSIDE_QS:request_uri}\" %{NOTSPACE:httpstatus} %{NOTSPACE:errorcode} %{NOTSPACE:bytes_sent} %{NOTSPACE:objectsize} %{NOTSPACE:totaltime} %{NOTSPACE:turnaround_time} \"?%{INSIDE_QS:referrer}\"? \"%{INSIDE_QS:useragent}\" %{NOTSPACE:version_id} %{NOTSPACE:hostid} %{NOTSPACE:sigv} %{NOTSPACE:ciphersuite} %{NOTSPACE:authtype} %{NOTSPACE:endpoint} %{NOTSPACE:tlsversion}( %{NOTSPACE:accesspoint_arn} %{NOTSPACE:aclrequired})?',
  'input.grokCustomPatterns'='INSIDE_QS ([^\"]*)\nINSIDE_BRACKETS ([^\\]]*)')
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://awsexamplebucket1-logs/prefix/';

AWS Glue ETL 작업 생성

다음 단계를 완료합니다.

  1. AWS Glue 콘솔을 엽니다.

  2. ETL jobs(ETL 작업)를 선택한 다음 Spark script editor(Spark 스크립트 편집기)를 선택합니다.

  3. Create(생성)를 선택합니다.

  4. 스크립트 탭에서 다음 스크립트를 입력합니다.

    import sys  
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import split, col, size
    from awsglue.dynamicframe import DynamicFrame
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db', table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})
    
    df = dyf.toDF()
    df2=df.withColumn('filename',split(col("s3path"),"/"))\
            .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
            .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
            .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
            .drop('s3path','filename')
    
    output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
    
    partitionKeys = ['year', 'month', 'day']
    sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
                               enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
                               partitionKeys=partitionKeys)
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
    sink.writeFrame(output_dyf)
    
    job.commit()
  5. Job details(작업 세부 정보) 탭에서 작업 이름을 입력한 다음 IAM role(IAM 역할)을 선택합니다.

  6. Save(저장)를 선택하고 Run(실행)을 선택합니다.

참고: Amazon S3 액세스 로그는 정기적으로 전달됩니다. AWS Glue ETL 작업에 대한 시간 기반 일정을 설정하려면 트리거를 추가하십시오. 또한 작업 북마크를 켜십시오.

DynamicFrame 객체 및 파티션된 테이블 생성

Amazon S3 액세스 로그 테이블을 생성한 후 Amazon S3 액세스 로그를 포함하는 DynamicFrame 객체를 생성합니다. 그런 다음 DynamicFrame 객체를 기반으로 연도, 월, 일을 표시하는 키를 포함하는 파티션된 테이블을 생성할 수 있습니다.

다음 단계를 완료합니다.

  1. 다음 명령을 실행하여 DynamicFrame 객체를 생성하고 Amazon S3 액세스 로그 테이블을 스캔합니다.

      dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db, table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})

    참고: attachFilename 파라미터는 열 이름으로 사용됩니다.

  2. 다음 명령을 실행하여 Amazon S3 액세스 로그 경로에서 연도, 월, 일 열을 생성합니다.

    df = dyf.toDF()df2=df.withColumn('filename',split(col("s3path"),"/"))\
            .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
            .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
            .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
            .drop('s3path','filename')
  3. 다음 명령을 실행하여 Amazon S3 액세스 로그에 대한 파티션된 테이블을 생성합니다.

    output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
    partitionKeys = ['year', 'month', 'day']
    sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
                               enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
                               partitionKeys=partitionKeys)
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
    sink.writeFrame(output_dyf)

파티션된 테이블 쿼리

  1. Athena 콘솔을 엽니다.
  2. 다음 명령을 실행하여 테이블을 쿼리하고 테이블이 파티션되었는지 확인합니다.
    SELECT * FROM "s3_access_logs_db"."s3_access_logs_partitioned" WHERE year = '2023' AND month = '03' AND day = '04'
AWS 공식업데이트됨 일 년 전
댓글 없음

관련 콘텐츠