2017-05-08 20 views
6

Chúng tôi đang cố gắng chạy một đường ống đơn giản được phân phối trên cụm cụm bến tàu. Các công nhân luigi được triển khai như các dịch vụ docker nhân bản. Họ bắt đầu thành công và sau một vài giây yêu cầu làm việc với máy chủ luigi, họ bắt đầu chết vì không có công việc nào được giao cho họ và tất cả các nhiệm vụ đều được giao cho một công nhân.Công nhân chết sớm do phân phối công việc không đồng đều ở Luigi (2.6.1)

Chúng tôi phải đặt keep_alive = True trong luigi.cfg nhân viên của chúng tôi để buộc họ không chết, nhưng giữ công nhân xung quanh sau khi đường ống được thực hiện có vẻ là một ý tưởng tồi. Có cách nào để kiểm soát việc phân phối công việc không?

đường ống thử nghiệm của chúng tôi:

class RunAllTasks(luigi.Task): 

    tasks = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    def requires(self): 
     for i in range(self.tasks): 
      yield RunExampleTask(i, self.sleep_time) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write('All done!') 

    def output(self): 
     return LocalTarget('/data/RunAllTasks.txt') 


class RunExampleTask(luigi.Task): 

    number = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    @property 
    def cmd(self): 
     return """ 
       docker run --rm --name example_{number} hello-world 
      """.format(number=self.number) 

    def run(self): 
     time.sleep(self.sleep_time) 
     logger.debug(self.cmd) 
     out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True) 
     logger.debug(out) 
     with self.output().open('w') as f: 
      f.write(str(out)) 

    def output(self): 
     return LocalTarget('/data/{number}.txt'.format(number=self.number)) 


if __name__ == "__main__": 
    luigi.run() 
+0

Dường như không phải là một câu hỏi về docker? – johnharris85

+0

Bạn có bắt đầu 'RunAllTasks' trên mỗi nút không? – MattMcKnight

+0

@MattMcKnight Có, chúng tôi đóng gói đường ống như một dịch vụ docker, do đó, bầy đàn bắt đầu một bản sao trên mỗi nút (vòng robin). – fcisneros

Trả lời

1

vấn đề của bạn là kết quả của yield ing một yêu cầu duy nhất tại một thời gian, thay vào đó bạn muốn yield tất cả chúng cùng một lúc, như sau:

def requires(self): 
    reqs = [] 
    for i in range(self.tasks): 
     reqs.append(RunExampleTask(i, self.sleep_time)) 
    yield reqs 
Các vấn đề liên quan