在AirFlow 2.0中运行多个雅典娜查询

2022-03-12 00:00:00 python boto3 airflow amazon-athena

问题描述

我正在尝试创建一个DAG,其中一个任务使用boto3执行athena查询。它对一个查询有效,但是当我尝试运行多个雅典娜查询时遇到问题。

此问题可以按如下方式解决:-

  1. 翻阅thisblog可以看到,athena使用start_query_execution触发查询,get_query_execution获取statusqueryExecutionId等查询数据(athena的文档)

遵循上述模式后,我有以下代码:-

import json
import time
import asyncio
import boto3
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator


def execute_query(client, query, database, output_location):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )

    return response['QueryExecutionId']


async def get_ids(client_athena, query, database, output_location):
    query_responses = []
    for i in range(5):
        query_responses.append(execute_query(client_athena, query, database, output_location))    

    res = await asyncio.gather(*query_responses, return_exceptions=True)

    return res

def run_athena_query(query, database, output_location, region_name, **context):
    BOTO_SESSION = boto3.Session(
        aws_access_key_id = 'YOUR_KEY',
        aws_secret_access_key = 'YOUR_ACCESS_KEY')
    client_athena = BOTO_SESSION.client('athena', region_name=region_name)

    loop = asyncio.get_event_loop()
    query_execution_ids = loop.run_until_complete(get_ids(client_athena, query, database, output_location))
    loop.close()

    repetitions = 900
    error_messages = []
    s3_uris = []

    while repetitions > 0 and len(query_execution_ids) > 0:
        repetitions = repetitions - 1
        
        query_response_list = client_athena.batch_get_query_execution(
            QueryExecutionIds=query_execution_ids)['QueryExecutions']
      
        for query_response in query_response_list:
            if 'QueryExecution' in query_response and 
                    'Status' in query_response['QueryExecution'] and 
                    'State' in query_response['QueryExecution']['Status']:
                state = query_response['QueryExecution']['Status']['State']

                if state in ['FAILED', 'CANCELLED']:
                    error_reason = query_response['QueryExecution']['Status']['StateChangeReason']
                    error_message = 'Final state of Athena job is {}, query_execution_id is {}. Error: {}'.format(
                            state, query_execution_id, error_message
                        )
                    error_messages.append(error_message)
                    query_execution_ids.remove(query_response['QueryExecutionId'])
                
                elif state == 'SUCCEEDED':
                    result_location = query_response['QueryExecution']['ResultConfiguration']['OutputLocation']
                    s3_uris.append(result_location)
                    query_execution_ids.remove(query_response['QueryExecutionId'])
                 
                    
        time.sleep(2)
    
    logging.exception(error_messages)
    return s3_uris


DEFAULT_ARGS = {
    'owner': 'ubuntu',
    'depends_on_past': True,
    'start_date': datetime(2021, 6, 8),
    'retries': 0,
    'concurrency': 2
}

with DAG('resync_job_dag', default_args=DEFAULT_ARGS, schedule_interval=None) as dag:

    ATHENA_QUERY = PythonOperator(
        task_id='athena_query',
        python_callable=run_athena_query,
        provide_context=True,
        op_kwargs={
            'query': 'SELECT request_timestamp FROM "sampledb"."elb_logs" limit 10;', # query provide in athena tutorial
            'database':'sampledb',
            'output_location':'YOUR_BUCKET',
            'region_name':'YOUR_REGION'
        }
    )

    ATHENA_QUERY

运行上述代码时,我收到以下错误:-

[2021-06-16 20:34:52,981] {taskinstance.py:1455} ERROR - An asyncio.Future, a coroutine or an awaitable is required
Traceback (most recent call last):
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/ubuntu/iac-airflow/dags/helper/tasks.py", line 93, in run_athena_query
    query_execution_ids = loop.run_until_complete(get_ids(client_athena, query, database, output_location))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/ubuntu/iac-airflow/dags/helper/tasks.py", line 79, in get_ids
    res = await asyncio.gather(*query_responses, return_exceptions=True)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 602, in gather
    fut = ensure_future(arg, loop=loop)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 526, in ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

我不能到达我错的地方。如能就此问题给予一些提示

,我将不胜感激。

解决方案

我认为您在这里做的事情并不是真的需要。 您的问题是:

  1. 并行执行多个查询。
  2. 能够恢复每个查询queryExecutionId

这两个问题只需使用AWSAthenaOperator即可解决。操作员已经为您处理了您提到的所有事情。

示例:

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator


with DAG(
    dag_id="athena",
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
) as dag:

    start_op = DummyOperator(task_id="start_task")
    query_list = ["SELECT 1;", "SELECT 2;" "SELECT 3;"]

    for i, sql in enumerate(query_list):
        run_query = AWSAthenaOperator(
            task_id=f'run_query_{i}',
            query=sql,
            output_location='s3://my-bucket/my-path/',
            database='my_database'
        )
        start_op >> query_op

只需向query_list添加更多查询即可动态创建雅典娜任务:

请注意,QueryExecutionId是pushed to xcom,因此您可以根据需要访问下游任务中的。

相关文章