2017-02-28 21 views
8

Vấn đề tôi đang cố gắng giải quyết như sau: Tôi có một danh sách trainimgs của tên tệp. Tôi đã xác định một sốTensorFlow: Enqueuing và dequeuing một hàng đợi từ nhiều chủ đề

  • tf.RandomShuffleQueue với capacity=len(trainimgs)min_after_dequeue=0.
  • Điều này tf.RandomShuffleQueue dự kiến ​​sẽ được lấp đầy bởi trainimgs cho số lượng epochlimit được chỉ định.
  • Một số chuỗi được dự kiến ​​sẽ hoạt động song song. Mỗi thread dequeues một yếu tố từ tf.RandomShuffleQueue và thực hiện một số hoạt động trên nó và enqueues nó vào hàng đợi khác. Tôi đã có một phần đúng.
  • Tuy nhiên khi 1 epoch của trainimgs đã được xử lý và tf.RandomShuffleQueue trống, với điều kiện epoch hiện tại e < epochlimit, hàng đợi phải được lấp đầy và chủ đề phải hoạt động trở lại.

Tin tốt là: Tôi đã nhận nó làm việc trong một trường hợp nào đó (Xem PS ở cuối !!)

Tin xấu là: Tôi nghĩ rằng có một cách tốt hơn để làm điều này.

Phương pháp tôi đang sử dụng để làm điều này tại là như sau (tôi đã đơn giản hóa các chức năng và đã gỡ bỏ xử lý hình ảnh điện tử dựa trên tiền xử lý và enqueuing tiếp theo nhưng trung tâm của quá trình xử lý vẫn là !! giống nhau):

with tf.Session() as sess: 
    train_filename_queue = tf.RandomShuffleQueue(capacity=len(trainimgs), min_after_dequeue=0, dtypes=tf.string, seed=0) 
    queue_size = train_filename_queue.size() 
    trainimgtensor = tf.constant(trainimgs) 
    close_queue = train_filename_queue.close() 
    epoch = tf.Variable(initial_value=1, trainable=False, dtype=tf.int32) 
    incrementepoch = tf.assign(epoch, epoch + 1, use_locking=True) 
    supplyimages = train_filename_queue.enqueue_many(trainimgtensor) 
    value = train_filename_queue.dequeue() 

    init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) 
    sess.run(init_op) 
    coord = tf.train.Coordinator() 
    tf.train.start_queue_runners(sess, coord) 
    sess.run(supplyimages) 
    lock = threading.Lock() 
    threads = [threading.Thread(target=work, args=(coord, value, sess, epoch, incrementepoch, supplyimages, queue_size, lock, close_queue)) for i in range(200)] 
    for t in threads: 
     t.start() 
    coord.join(threads) 

chức năng công việc là như sau:

def work(coord, val, sess, epoch, incrementepoch, supplyimg, q, lock,\ 
     close_op): 
while not coord.should_stop(): 
    if sess.run(q) > 0: 
     filename, currepoch = sess.run([val, epoch]) 
     filename = filename.decode(encoding='UTF-8') 
     print(filename + ' ' + str(currepoch)) 
    elif sess.run(epoch) < 2: 
     lock.acquire() 
     try: 
      if sess.run(q) == 0: 
       print("The previous epoch = %d"%(sess.run(epoch))) 
       sess.run([incrementepoch, supplyimg]) 
       sz = sess.run(q) 
       print("The new epoch = %d"%(sess.run(epoch))) 
       print("The new queue size = %d"%(sz)) 
     finally: 
      lock.release() 
    else: 
     try: 
      sess.run(close_op) 
     except tf.errors.CancelledError: 
      print('Queue already closed.') 
     coord.request_stop() 
return None 

vì vậy, mặc dù việc này, tôi có cảm giác rằng có một cách tốt hơn và sạch hơn để đạt được điều này. Vì vậy, trong một nutshell câu hỏi của tôi là:

  1. Có một cách đơn giản và sạch hơn để đạt được nhiệm vụ này trong TensorFlow?
  2. Có vấn đề gì với logic của mã này không? Tôi không có kinh nghiệm với các kịch bản đa luồng, vì vậy bất kỳ lỗi rõ ràng nào đã bỏ qua sự chú ý của tôi sẽ rất hữu ích đối với tôi.

P.S: Dường như mã này không hoàn hảo sau tất cả. Khi tôi chạy với 1,2 triệu hình ảnh và 200 chủ đề, nó chạy. Tuy nhiên khi tôi chạy nó cho 10 hình ảnh và 20 chủ đề, nó mang lại cho các lỗi sau:

CancelledError (see above for traceback): RandomShuffleQueue '_0_random_shuffle_queue' is closed. 
    [[Node: random_shuffle_queue_EnqueueMany = QueueEnqueueManyV2[Tcomponents=[DT_STRING], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](random_shuffle_queue, Const)]] 

tôi nghĩ rằng tôi đã bao phủ bởi except tf.errors.CancelledError. Cái quái gì đang diễn ra ở đây?

Trả lời

3

Cuối cùng tôi đã tìm ra câu trả lời. Vấn đề là nhiều chủ đề đã va chạm với nhau trên các điểm khác nhau trong hàm work(). Chức năng work() sau hoạt động hoàn hảo.

def work(coord, val, sess, epoch, maxepochs, incrementepoch, supplyimg, q, lock, close_op): 
    print('I am thread number %s'%(threading.current_thread().name)) 
    print('I can see a queue with size %d'%(sess.run(q))) 
    while not coord.should_stop(): 
     lock.acquire() 
     if sess.run(q) > 0: 
      filename, currepoch = sess.run([val, epoch]) 
      filename = filename.decode(encoding='UTF-8') 
      tid = threading.current_thread().name 
      print(filename + ' ' + str(currepoch) + ' thread ' + str(tid)) 
     elif sess.run(epoch) < maxepochs: 
      print('Thread %s has acquired the lock'%(threading.current_thread().name)) 
      print("The previous epoch = %d"%(sess.run(epoch))) 
      sess.run([incrementepoch, supplyimg]) 
      sz = sess.run(q) 
      print("The new epoch = %d"%(sess.run(epoch))) 
      print("The new queue size = %d"%(sz)) 
    else: 
      coord.request_stop() 
     lock.release() 

    return None 
1

Tôi khuyên bạn nên có một chủ đề gọi enqueue_many lần epochs để enqueue số lượng hình ảnh chính xác. Sau đó nó có thể đóng hàng đợi. Điều này sẽ cho phép bạn đơn giản hóa chức năng công việc của bạn và các chủ đề khác.

+0

Cảm ơn bạn nhưng tôi muốn sử dụng nhiều chủ đề để tăng tốc vì có các bước tiền xử lý phức tạp tôi phải làm – Ujjwal

+0

Bạn có thể sử dụng một chuỗi để chỉ đặt tên tệp vào hàng đợi chính và sau đó là nhiều chủ đề để loại bỏ các tên tệp đó , preprocess, và enqueue chúng trong hàng đợi cuối cùng. –

1

Tôi nghĩ GIL sẽ ngăn bất kỳ thực tế song song nào được thực hiện trong các chủ đề đó.

Để có được hiệu suất với lưu lượng, bạn cần lưu dữ liệu của bạn trong lưu lượng.

Flowor Flow's reading data guide giải thích cách giải quyết một loại vấn đề rất giống nhau.

Cụ thể hơn, bạn dường như đã viết lại một đoạn quan trọng là string_input_producer.

+0

Tôi đang sử dụng dữ liệu thực tế. 'string_input_producer()' không cho biết rằng epoch dữ liệu nào được trích xuất tại bất kỳ thời điểm nào. Nó chỉ làm cho sue rằng bất kỳ mẩu dữ liệu nào được trích xuất số lần 'epoch'. Vì vậy, thực hiện của tôi không phải là một viết lại của 'string_input_producer()'. Tôi hiểu rằng cách tôi làm nó có lẽ không phải là cách tốt nhất, nhưng tôi cần phải kiểm tra trực tiếp và chính xác về thời đại và lặp lại một dữ liệu đến và dường như tôi không tìm thấy bất kỳ điều gì trong hướng dẫn dữ liệu về nó. Tôi sẽ đọc nó một lần nữa mặc dù. – Ujjwal

+0

Ah, cảm ơn vì đã làm rõ. Tôi đoán tôi không hoàn toàn hiểu được vấn đề. Tại sao không sử dụng (chuỗi, epoch) cặp? – mdaoust

+0

Tôi đã sử dụng nó. Tôi đã sử dụng biến 'epoch' để tính toán' epoch' – Ujjwal

Các vấn đề liên quan