2016-07-25 38 views
6

Cách truyền tham số vào các nhiệm vụ phụ thuộc trong luồng không khí là gì? Tôi có rất nhiều tệp bị chặn và tôi đang cố gắng di chuyển phương pháp này vào luồng không khí, nhưng tôi không biết cách chuyển một số thuộc tính giữa các tác vụ.Tham số truyền luồng không khí vào tác vụ phụ thuộc

Đây là một ví dụ thực tế:

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

Trong t2 tôi cần phải truy cập vào tên dir tạo ra trong t1.

Giải pháp

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

Đó không phải là giải pháp dứt khoát, vì vậy cải thiện được hoan nghênh. Cảm ơn.

+0

Một giải pháp, theo ý kiến ​​của tôi, là tạo một số tệp với các thuộc tính được tạo trong t1 và sử dụng cùng tệp này trong t2. –

Trả lời

8

Khám phá XComs - http://airflow.incubator.apache.org/concepts.html#xcoms. Chúng được sử dụng để giao tiếp trạng thái giữa các tác vụ.

+0

Tôi giải quyết bằng cách sử dụng phương pháp này, nhưng quên hoàn toàn để thêm giải pháp ở đây. Cảm ơn. –

Các vấn đề liên quan