mysql
  • python-3.x
  • airflow
  • 2017-10-10 4313 views 5 likes 
    5
    def mysql_operator_test(): 
        DEFAULT_DATE = datetime(2017, 10, 9) 
        t = MySqlOperator(
         task_id='basic_mysql', 
         sql="SELECT count(*) from table 1 where id>100;", 
         mysql_conn_id='mysql_default', 
         dag=dag) 
        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) 
    
    run_this = PythonOperator(
        task_id='getRecoReq', 
        python_callable=mysql_operator_test, 
        # xcom_push=True, 
        dag=dag) 
    
    task2 = PythonOperator(
        task_id= 'mysql_select', 
        provide_context=True, 
        python_callable = blah, 
        templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, 
        dag=dag) 
    
    run_this.set_downstream(task2) 
    

    Tôi muốn ghi lại số đếm được trả về bởi MySqlOperator bằng cách sử dụng xcoms. Có thể ai đó xin vui lòng hướng dẫn về cùng?Cách sử dụng các luồng không khí xcom với MySqlOperator

    Trả lời

    1

    Bạn đang ở rất gần! Tuy nhiên, cách bạn hỏi câu hỏi này là một kiểu chống mẫu. Bạn không muốn chia sẻ dữ liệu giữa các tác vụ trong luồng không khí. Ngoài ra, bạn không muốn sử dụng toán tử như bạn đang ở trong mysql_operator_test. Thật là hấp dẫn, tôi đã làm điều tương tự khi tôi bắt đầu.

    Tôi đã thử một cái gì đó rất giống với điều này nhưng với kết nối SFTP. Cuối cùng tôi đã làm mọi thứ bên trong của PythonOperator và sử dụng các móc cơ bản.

    Tôi khuyên bạn nên sử dụng MySQLHook bên trong của python_callable. Một cái gì đó như thế này:

    def count_mysql_and_then_use_the_count(): 
        """ 
        Returns an SFTP connection created using the SSHHook 
        """ 
        mysql_hook = MySQLHook(...) 
        cur = conn.cursor() 
        cur.execute("""SELECT count(*) from table 1 where id>100""") 
        for count in cur: 
         # Do something with the count... 
    

    Tôi không chắc chắn nếu điều này sẽ làm việc như là nhưng ý tưởng là sử dụng một cái móc bên trong Python của bạn callable, tôi không sử dụng MySQLHook thường nhưng tôi đã làm điều này với SSHHook và nó hoạt động rất tốt.

    Các vấn đề liên quan