2017-12-24 104 views
5

Trong Luồng không khí, tôi đang gặp phải sự cố mà tôi cần phải chuyển số job_flow_id đến một trong các bước emr của tôi. Tôi có khả năng truy xuất số job_flow_id từ toán tử nhưng khi tôi sẽ tạo các bước để gửi cho cụm, giá trị task_instance là không đúng. Tôi có đoạn mã sau:Luồng không khí - Thao tác tác vụ trong nhà điều hành EMR

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

Vấn đề là, khi tôi kiểm tra EMR, thay vì thấy --cluster-id j-1234 trong bước load_data, tôi thấy --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", gây bước của tôi thất bại.

Làm cách nào để có được giá trị thực tế bên trong chức năng bước của tôi?

Cảm ơn và ngày lễ hạnh phúc

+0

bạn đã thử thêm giá trị không có dấu ngoặc kép chưa? load_data_steps [0] ["HadoopJarStep"] ["Args"]. chắp thêm ( {{task_instance.xcom_pull ('create_job_flow', key = 'return_value')}}) –

+0

nơi nào tôi nhận được '' 'task_instance''' đối tượng từ đâu? Tôi vẫn đang học cách sử dụng nó. – davideberdin

Trả lời

3

tôi phát hiện ra rằng có PR trên kho luồng không khí về this. Vấn đề là không có templating cho các bước trong EmrAddStepsOperator. Để khắc phục vấn đề này, tôi đã làm như sau:

  • Tạo một nhà điều hành tùy chỉnh được thừa kế từ EmrAddStepsOperator
  • Added toán tử này như Plugin
  • gọi là các nhà điều hành mới trong DAG tôi nộp

đây mã cho toán tử tùy chỉnh và plugin trong tệp custom_emr_add_step_operator.py (xem cây bên dưới)

from __future__ import division, absolute_import, print_function 

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils import apply_defaults 

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 


class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 

    @apply_defaults 
    def __init__(
      self, 
      *args, **kwargs): 
     super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     super(CustomEmrAddStepsOperator, self).execute(context=context) 


# Defining the plugin class 
class CustomPlugin(AirflowPlugin): 
    name = "custom_plugin" 
    operators = [CustomEmrAddStepsOperator] 

Trong file DAG của tôi, tôi gọi các plugin theo cách này

from airflow.operators import CustomEmrAddStepsOperator 

Cấu trúc của dự án và các plugin của tôi trông như thế này:

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

Nếu bạn đang sử dụng một IDE như PyCharm, điều này sẽ phàn nàn vì nó nói rằng nó không thể tìm thấy mô-đun. Nhưng khi bạn chạy Luồng không khí, vấn đề này sẽ không xuất hiện. Hãy nhớ cũng đảm bảo rằng trong airflow.cfg bạn sẽ trỏ đến thư mục plugins đúng để luồng không khí có thể đọc plugin mới được tạo của bạn.

Các vấn đề liên quan