Vấn đề tôi đang cố gắng giải quyết như sau: Tôi có một danh sách trainimgs
của tên tệp. Tôi đã xác định một sốTensorFlow: Enqueuing và dequeuing một hàng đợi từ nhiều chủ đề
tf.RandomShuffleQueue
vớicapacity=len(trainimgs)
vàmin_after_dequeue=0
.- Điều này
tf.RandomShuffleQueue
dự kiến sẽ được lấp đầy bởitrainimgs
cho số lượngepochlimit
được chỉ định. - Một số chuỗi được dự kiến sẽ hoạt động song song. Mỗi thread dequeues một yếu tố từ
tf.RandomShuffleQueue
và thực hiện một số hoạt động trên nó và enqueues nó vào hàng đợi khác. Tôi đã có một phần đúng. - Tuy nhiên khi
1 epoch
củatrainimgs
đã được xử lý vàtf.RandomShuffleQueue
trống, với điều kiện epoch hiện tạie < epochlimit
, hàng đợi phải được lấp đầy và chủ đề phải hoạt động trở lại.
Tin tốt là: Tôi đã nhận nó làm việc trong một trường hợp nào đó (Xem PS ở cuối !!)
Tin xấu là: Tôi nghĩ rằng có một cách tốt hơn để làm điều này.
Phương pháp tôi đang sử dụng để làm điều này tại là như sau (tôi đã đơn giản hóa các chức năng và đã gỡ bỏ xử lý hình ảnh điện tử dựa trên tiền xử lý và enqueuing tiếp theo nhưng trung tâm của quá trình xử lý vẫn là !! giống nhau):
with tf.Session() as sess:
train_filename_queue = tf.RandomShuffleQueue(capacity=len(trainimgs), min_after_dequeue=0, dtypes=tf.string, seed=0)
queue_size = train_filename_queue.size()
trainimgtensor = tf.constant(trainimgs)
close_queue = train_filename_queue.close()
epoch = tf.Variable(initial_value=1, trainable=False, dtype=tf.int32)
incrementepoch = tf.assign(epoch, epoch + 1, use_locking=True)
supplyimages = train_filename_queue.enqueue_many(trainimgtensor)
value = train_filename_queue.dequeue()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
tf.train.start_queue_runners(sess, coord)
sess.run(supplyimages)
lock = threading.Lock()
threads = [threading.Thread(target=work, args=(coord, value, sess, epoch, incrementepoch, supplyimages, queue_size, lock, close_queue)) for i in range(200)]
for t in threads:
t.start()
coord.join(threads)
chức năng công việc là như sau:
def work(coord, val, sess, epoch, incrementepoch, supplyimg, q, lock,\
close_op):
while not coord.should_stop():
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
print(filename + ' ' + str(currepoch))
elif sess.run(epoch) < 2:
lock.acquire()
try:
if sess.run(q) == 0:
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
finally:
lock.release()
else:
try:
sess.run(close_op)
except tf.errors.CancelledError:
print('Queue already closed.')
coord.request_stop()
return None
vì vậy, mặc dù việc này, tôi có cảm giác rằng có một cách tốt hơn và sạch hơn để đạt được điều này. Vì vậy, trong một nutshell câu hỏi của tôi là:
- Có một cách đơn giản và sạch hơn để đạt được nhiệm vụ này trong TensorFlow?
- Có vấn đề gì với logic của mã này không? Tôi không có kinh nghiệm với các kịch bản đa luồng, vì vậy bất kỳ lỗi rõ ràng nào đã bỏ qua sự chú ý của tôi sẽ rất hữu ích đối với tôi.
P.S: Dường như mã này không hoàn hảo sau tất cả. Khi tôi chạy với 1,2 triệu hình ảnh và 200 chủ đề, nó chạy. Tuy nhiên khi tôi chạy nó cho 10 hình ảnh và 20 chủ đề, nó mang lại cho các lỗi sau:
CancelledError (see above for traceback): RandomShuffleQueue '_0_random_shuffle_queue' is closed.
[[Node: random_shuffle_queue_EnqueueMany = QueueEnqueueManyV2[Tcomponents=[DT_STRING], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](random_shuffle_queue, Const)]]
tôi nghĩ rằng tôi đã bao phủ bởi except tf.errors.CancelledError
. Cái quái gì đang diễn ra ở đây?
Cảm ơn bạn nhưng tôi muốn sử dụng nhiều chủ đề để tăng tốc vì có các bước tiền xử lý phức tạp tôi phải làm – Ujjwal
Bạn có thể sử dụng một chuỗi để chỉ đặt tên tệp vào hàng đợi chính và sau đó là nhiều chủ đề để loại bỏ các tên tệp đó , preprocess, và enqueue chúng trong hàng đợi cuối cùng. –