Trong Luồng không khí, tôi đang gặp phải sự cố mà tôi cần phải chuyển số job_flow_id
đến một trong các bước emr của tôi. Tôi có khả năng truy xuất số job_flow_id
từ toán tử nhưng khi tôi sẽ tạo các bước để gửi cho cụm, giá trị task_instance
là không đúng. Tôi có đoạn mã sau:Luồng không khí - Thao tác tác vụ trong nhà điều hành EMR
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
Vấn đề là, khi tôi kiểm tra EMR, thay vì thấy --cluster-id j-1234
trong bước load_data
, tôi thấy --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"
, gây bước của tôi thất bại.
Làm cách nào để có được giá trị thực tế bên trong chức năng bước của tôi?
Cảm ơn và ngày lễ hạnh phúc
bạn đã thử thêm giá trị không có dấu ngoặc kép chưa? load_data_steps [0] ["HadoopJarStep"] ["Args"]. chắp thêm ( {{task_instance.xcom_pull ('create_job_flow', key = 'return_value')}}) –
nơi nào tôi nhận được '' 'task_instance''' đối tượng từ đâu? Tôi vẫn đang học cách sử dụng nó. – davideberdin