2016-11-17 21 views
28

Tôi đã khởi động máy chủ web luồng không khí và lên lịch một số dấu gạch ngang. Tôi có thể thấy các dấu gạch ngang trên GUI web.Luồng không khí: cách xóa DAG?

Làm cách nào để xóa một DAG cụ thể khỏi hoạt động và được hiển thị trong GUI web? Có một lệnh CLI luồng không khí để làm điều đó?

Tôi đã xem xét xung quanh nhưng không thể tìm thấy câu trả lời cho một cách đơn giản để xóa DAG khi nó đã được tải và lên lịch.

+0

Không có CLI cho việc này. Nhưng có một yêu cầu kéo đã bị hủy bỏ nếu bạn muốn thử và làm sống lại nó: https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake

Trả lời

2

Không có gì có sẵn trong luồng không khí cho bạn. Để xóa DAG, xóa nó khỏi kho lưu trữ và xóa các mục cơ sở dữ liệu trong bảng metastore luồng không khí - dag.

+0

Tôi cũng phải khởi động lại máy mà lịch biểu và máy chủ web chạy để kết thúc dọn dẹp. Chỉ cần khởi động lại máy chủ web và trình lên lịch là không đủ. –

7

Tôi vừa viết một tập lệnh xóa tất cả mọi thứ liên quan đến một thẻ đặc biệt, nhưng điều này chỉ dành cho MySQL. Bạn có thể viết một phương thức kết nối khác nếu bạn đang sử dụng PostgreSQL. Ban đầu các lệnh được đăng bởi Lance trên https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 Tôi chỉ cần đặt nó trong kịch bản. Hi vọng điêu nay co ich. Format: python script.py dag_id

import sys 
import MySQLdb 

dag_input = sys.argv[1] 

query = {'delete from xcom where dag_id = "' + dag_input + '"', 
     'delete from task_instance where dag_id = "' + dag_input + '"', 
     'delete from sla_miss where dag_id = "' + dag_input + '"', 
     'delete from log where dag_id = "' + dag_input + '"', 
     'delete from job where dag_id = "' + dag_input + '"', 
     'delete from dag_run where dag_id = "' + dag_input + '"', 
     'delete from dag where dag_id = "' + dag_input + '"' } 

def connect(query): 
     db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database") 
     cur = db.cursor() 
     cur.execute(query) 
     db.commit() 
     db.close() 
     return 

for value in query: 
     print value 
     connect(value) 
10

Không chắc chắn tại sao Apache Airflow không có một cách rõ ràng và dễ dàng để xóa một DAG

Filed https://issues.apache.org/jira/browse/AIRFLOW-1002

+2

PR cho điều này là mở nhưng chưa được sáp nhập. Liên kết dành cho những người quan tâm - https://github.com/apache/incubator-airflow/pull/2199. –

14

Đây là mã chuyển thể của tôi sử dụng PostgresHook với mặc định connection_id.

import sys 
from airflow.hooks.postgres_hook import PostgresHook 

dag_input = sys.argv[1] 
hook=PostgresHook(postgres_conn_id= "airflow_db") 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    sql="delete from {} where dag_id='{}'".format(t, dag_input) 
    hook.run(sql, True) 
+2

Tôi nghĩ rằng bạn cũng có thể thêm 'task_fail' và' dag_stats' vào danh sách các bảng – marengaz

4

Tôi đã viết một tập lệnh xóa tất cả siêu dữ liệu liên quan đến một thẻ cụ thể liên quan đến DB mặc định SQLite. Điều này được dựa trên câu trả lời của Chúa Giêsu ở trên nhưng điều chỉnh từ Postgres sang SQLite. Người dùng nên đặt ../airflow.db ở bất cứ nơi nào script.py được lưu trữ liên quan đến tệp airflow.db mặc định (thường là ~/airflow). Để thực thi, hãy sử dụng python script.py dag_id.

import sqlite3 
import sys 

conn = sqlite3.connect('../airflow.db') 
c = conn.cursor() 

dag_input = sys.argv[1] 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    query = "delete from {} where dag_id='{}'".format(t, dag_input) 
    c.execute(query) 

conn.commit() 
conn.close() 
+0

công trình này và là một giải pháp tốt ít nhất cho đến khi PR được hợp nhất –

1

Bạn có thể xóa một tập các ví dụ nhiệm vụ, như thể họ không bao giờ chạy với:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31 

Và sau đó loại bỏ tập tin từ thư mục dag DAG

+1

Điều này có thể dẫn đến một số dữ liệu không sạch trong các bảng 'dag' – Chengzhi

Các vấn đề liên quan