четверг, 1 ноября 2018 г.

AWS. Create S3 folder (key), invoking Lambda for add partition for Athena

Забавно, но не так просто понять и найти оказалось как из cli сделать просто пустой ключ в S3.
У меня есть задача создавать партиции в Athena когда создается определенный ключ (hour=00 например). И в этот ключ будут падать файлы статистики в количестве сотен и тысяч штук.
MSCK REPAIR TABLE дело хорошее на на наших объемах и количествах это всегда работает максимально долго и валится не закончив дело.
Появилась идея навесить Event на S3 bucket, и вызывать лямбду для создания партиции. Но тут как раз и появляется тот самый момент, когда вешать событие на создание файла нет никакого смысла ибо это будет вызывать лямбду на каждый вызов PUT или CreateObject.
Моя цепочка это такой вид:

BUCKET/stat/year=YYYY/month=MM/day=DD/hour=HH/
Реагировать я хочу на создание любого нового ключа после /stat/.

Если создать событие такого типа:

Events: либо Put либо ObjectCreate(All) - тут нет разницы по-моему для этого случая. Я поставил PUT
Prefix: указываю ключ начала цепочки. В моем случае это stat/ В конце "/" прям обязателен
Suffix: /
SendTo: Lambda
Lambda ARN

Создавать ключ получилось вот так:
aws s3api put-object --bucket BUCKET_NAME --key stat/NEW_KEY --body ./null

важно так же, что ./null это просто пустой файл (touch ./null). /dev/null не сработает - зашито в cli и требует файла. И важно, после NEW_KEY не должно быть слэша, иначе null появится как файл

Лямбда: python 2.7

import logging, json
import boto3, botocore

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client('s3')
athena = boto3.client('athena')
database = 'MY_DATABASE'
table_name = 'MY_TABLE'
s3_output = 's3://MY_ATHENA_RESULTS'

#Executing the athena query:
def run_query(query, database):
        query_response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output
            }
        )
        print('Execution ID: ' + query_response['QueryExecutionId'])
        return query_response


def lambda_handler(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    file_key = event['Records'][0]['s3']['object']['key']
    logger.info('Reading {} from {}'.format(file_key, bucket_name))

    checkIsHour="hour="
    if file_key.find(checkIsHour) != -1:
       logger.info('Found the hour key!')
       athena_prefix,athena_year,athena_month,athena_day,athena_hour,athena_null = file_key.split("/")
       logger.info('Prefix {} Year {} Month {} Day {} Hour {}'.format(athena_prefix,athena_year,athena_month,athena_day,athena_hour))
       query = str("ALTER TABLE " + table_name +" ADD IF NOT EXISTS PARTITION (" +
       athena_year + "," +
       athena_month + "," +
       athena_day + "," +
       athena_hour +
       ") location \"s3://"+ bucket_name
       + "/" + athena_year + "/" + athena_month + "/" + athena_day + "/"
       + athena_hour + "\";")
       logger.info('Query {}'.format(query))
       #print(query)
       run_query(query, database)



    

Комментариев нет: