Забавно, но не так просто понять и найти оказалось как из cli сделать просто пустой ключ в S3.
У меня есть задача создавать партиции в Athena когда создается определенный ключ (hour=00 например). И в этот ключ будут падать файлы статистики в количестве сотен и тысяч штук.
MSCK REPAIR TABLE дело хорошее на на наших объемах и количествах это всегда работает максимально долго и валится не закончив дело.
Появилась идея навесить Event на S3 bucket, и вызывать лямбду для создания партиции. Но тут как раз и появляется тот самый момент, когда вешать событие на создание файла нет никакого смысла ибо это будет вызывать лямбду на каждый вызов PUT или CreateObject.
Моя цепочка это такой вид:
BUCKET/stat/year=YYYY/month=MM/day=DD/hour=HH/
Реагировать я хочу на создание любого нового ключа после /stat/.
Если создать событие такого типа:
Events: либо Put либо ObjectCreate(All) - тут нет разницы по-моему для этого случая. Я поставил PUT
Prefix: указываю ключ начала цепочки. В моем случае это stat/ В конце "/" прям обязателен
Suffix: /
SendTo: Lambda
Lambda ARN
Создавать ключ получилось вот так:
aws s3api put-object --bucket BUCKET_NAME --key stat/NEW_KEY --body ./null
важно так же, что ./null это просто пустой файл (touch ./null). /dev/null не сработает - зашито в cli и требует файла. И важно, после NEW_KEY не должно быть слэша, иначе null появится как файл
Лямбда: python 2.7
import logging, json
import boto3, botocore
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
athena = boto3.client('athena')
database = 'MY_DATABASE'
table_name = 'MY_TABLE'
s3_output = 's3://MY_ATHENA_RESULTS'
#Executing the athena query:
def run_query(query, database):
query_response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
}
)
print('Execution ID: ' + query_response['QueryExecutionId'])
return query_response
def lambda_handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
logger.info('Reading {} from {}'.format(file_key, bucket_name))
checkIsHour="hour="
if file_key.find(checkIsHour) != -1:
logger.info('Found the hour key!')
athena_prefix,athena_year,athena_month,athena_day,athena_hour,athena_null = file_key.split("/")
logger.info('Prefix {} Year {} Month {} Day {} Hour {}'.format(athena_prefix,athena_year,athena_month,athena_day,athena_hour))
query = str("ALTER TABLE " + table_name +" ADD IF NOT EXISTS PARTITION (" +
athena_year + "," +
athena_month + "," +
athena_day + "," +
athena_hour +
") location \"s3://"+ bucket_name
+ "/" + athena_year + "/" + athena_month + "/" + athena_day + "/"
+ athena_hour + "\";")
logger.info('Query {}'.format(query))
#print(query)
run_query(query, database)
У меня есть задача создавать партиции в Athena когда создается определенный ключ (hour=00 например). И в этот ключ будут падать файлы статистики в количестве сотен и тысяч штук.
MSCK REPAIR TABLE дело хорошее на на наших объемах и количествах это всегда работает максимально долго и валится не закончив дело.
Появилась идея навесить Event на S3 bucket, и вызывать лямбду для создания партиции. Но тут как раз и появляется тот самый момент, когда вешать событие на создание файла нет никакого смысла ибо это будет вызывать лямбду на каждый вызов PUT или CreateObject.
Моя цепочка это такой вид:
BUCKET/stat/year=YYYY/month=MM/day=DD/hour=HH/
Реагировать я хочу на создание любого нового ключа после /stat/.
Если создать событие такого типа:
Events: либо Put либо ObjectCreate(All) - тут нет разницы по-моему для этого случая. Я поставил PUT
Prefix: указываю ключ начала цепочки. В моем случае это stat/ В конце "/" прям обязателен
Suffix: /
SendTo: Lambda
Lambda ARN
Создавать ключ получилось вот так:
aws s3api put-object --bucket BUCKET_NAME --key stat/NEW_KEY --body ./null
важно так же, что ./null это просто пустой файл (touch ./null). /dev/null не сработает - зашито в cli и требует файла. И важно, после NEW_KEY не должно быть слэша, иначе null появится как файл
Лямбда: python 2.7
import logging, json
import boto3, botocore
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
athena = boto3.client('athena')
database = 'MY_DATABASE'
table_name = 'MY_TABLE'
s3_output = 's3://MY_ATHENA_RESULTS'
#Executing the athena query:
def run_query(query, database):
query_response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
}
)
print('Execution ID: ' + query_response['QueryExecutionId'])
return query_response
def lambda_handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
logger.info('Reading {} from {}'.format(file_key, bucket_name))
checkIsHour="hour="
if file_key.find(checkIsHour) != -1:
logger.info('Found the hour key!')
athena_prefix,athena_year,athena_month,athena_day,athena_hour,athena_null = file_key.split("/")
logger.info('Prefix {} Year {} Month {} Day {} Hour {}'.format(athena_prefix,athena_year,athena_month,athena_day,athena_hour))
query = str("ALTER TABLE " + table_name +" ADD IF NOT EXISTS PARTITION (" +
athena_year + "," +
athena_month + "," +
athena_day + "," +
athena_hour +
") location \"s3://"+ bucket_name
+ "/" + athena_year + "/" + athena_month + "/" + athena_day + "/"
+ athena_hour + "\";")
logger.info('Query {}'.format(query))
#print(query)
run_query(query, database)
Комментариев нет:
Отправить комментарий