Cách truyền tham số vào các nhiệm vụ phụ thuộc trong luồng không khí là gì? Tôi có rất nhiều tệp bị chặn và tôi đang cố gắng di chuyển phương pháp này vào luồng không khí, nhưng tôi không biết cách chuyển một số thuộc tính giữa các tác vụ.Tham số truyền luồng không khí vào tác vụ phụ thuộc
Đây là một ví dụ thực tế:
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id='extract_account',
bash_command=sqoop_template,
params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id='s3_upload',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
Trong t2 tôi cần phải truy cập vào tên dir tạo ra trong t1.
Giải pháp
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = dirpath(table_name)
sqoop_job = job_default_config(job_name, hdfs)
#call(sqoop_job)
return {'hdfs_dir': hdfs, 's3_dir': s3}
def s3_upload(**context):
hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir']
s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir']
s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]
#call(s3_cpdist_job)
return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import')
def sns_notify(**context):
s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir']
client = boto3.client('sns')
arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'
response = client.publish(TargetArn=arn, Message=s3)
return response
Đó không phải là giải pháp dứt khoát, vì vậy cải thiện được hoan nghênh. Cảm ơn.
Một giải pháp, theo ý kiến của tôi, là tạo một số tệp với các thuộc tính được tạo trong t1 và sử dụng cùng tệp này trong t2. –