2017-05-31 44 views
6

Tôi có một SubDAG trong luồng không khí với bước dài (thường khoảng 2 giờ, mặc dù nó thay đổi dựa trên đơn vị nào đang được chạy). Dưới 1.7.1.3, bước này sẽ liên tục gây ra AIRFLOW-736 và SubDAG sẽ dừng ở trạng thái 'đang chạy' khi tất cả các bước trong thành công. Chúng ta có thể làm việc xung quanh điều này vì chúng ta không có các bước sau SubDAG bằng cách đánh dấu thủ công SubDagOperator thành công (thay vì chạy) trong cơ sở dữ liệu.Luồng không khí - tác vụ chạy trong SubDag được đánh dấu là không thành công sau một giờ

Chúng tôi đang thử nghiệm Airflow 1.8.1 bây giờ, nâng cấp bằng cách làm như sau:

  1. Shuting xuống lên lịch và công nhân
  2. Via pip của chúng tôi, gỡ bỏ cài đặt luồng không khí và lắp đặt apache-luồng không khí (phiên bản 1.8.1)
  3. runing luồng không khí upgradedb
  4. Chạy lên lịch luồng không khí và công nhân

với o hệ thống không bị ảnh hưởng, DAG tương tự hiện không đạt 100% thời gian sau khi nhiệm vụ chạy dài đánh dấu 1 giờ (mặc dù kỳ lạ, không chính xác 3600 giây sau đó - nó có thể ở bất cứ đâu từ 30 đến 90 giây sau khi đánh dấu giờ) với thông báo "Ví dụ nhiệm vụ của báo cáo Executor hoàn thành (không thành công) mặc dù nhiệm vụ cho biết nó đang chạy. Một cách nào đó, có sự bất đồng giữa người lên lịch bị nhầm lẫn trong suy nghĩ nhiệm vụ thất bại (xem this line của jobs.py) dựa trên cơ sở dữ liệu, mặc dù nhiệm vụ thực tế

Tôi đã xác nhận rằng, bằng cách nào đó, trạng thái là 'không thành công' trong bảng task_instance của cơ sở dữ liệu luồng không khí.Vì vậy, tôi muốn biết điều gì có thể đặt trạng thái tác vụ thành không thành công khi tác vụ bản thân vẫn chạy

Dưới đây là một dag mẫu mà gây nên vấn đề:.

from datetime import datetime 
from airflow.models import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.subdag_operator import SubDagOperator 

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)} 

def define_sub(dag, step_name, sleeptime): 
    op = BashOperator(
     task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag 
    ) 
    return dag 

def gen_sub_dag(parent_name, step_name, sleeptime): 
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS) 
    define_sub(sub, step_name, sleeptime) 
    return sub 

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None) 

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent 
) 
+0

Hôm nay, tôi đã gặp phải sự cố tương tự, Thẻ con với một tác vụ chạy dài, sau hơn một giờ, tôi đã nhận được thông báo lỗi. Điều thú vị là, người lên lịch đã cố gắng khởi động lại tác vụ, điều đó đã không thành công do một luồng tài nguyên bị chặn của luồng không khí bị chặn. Tác vụ gốc tiếp tục chạy và kết thúc chính xác, luồng không khí đánh dấu thẻ phụ là không thành công, trước khi tác vụ kết thúc. –

+0

Bạn đang sử dụng trình xử lý nào. Có cần Celery + Redis không? –

Trả lời

0

Nếu bạn đang thực sự chạy với Celery và Redis có một cái nhìn tại visibility timeout setting cho Celery và tăng nó vượt quá thời gian kết thúc dự kiến ​​của nhiệm vụ của bạn.

Mặc dù chúng tôi định cấu hình Celery thành tác vụ-ack-late nhưng vẫn có vấn đề với nhiệm vụ biến mất. Chúng tôi xem xét điều này a bug trong Celery.

Các vấn đề liên quan