Tôi đang gặp phải một nút cổ chai trên đơn đăng ký của mình và gặp khó khăn trong việc tìm kiếm giải pháp xung quanh nó. Một chút nền:Cách đáng tin cậy để thực thi hàng nghìn giao dịch độc lập?
- ping Ứng dụng của tôi một API để thu thập thông tin về hàng trăm ngàn sản phẩm và lưu trữ chúng vào kho dữ liệu
- Chúng tôi cần phải thực hiện quy tụ đơn giản trên một kết hợp của kích thước của các mặt hàng này, mà chúng tôi cố gắng và tính toán trong thời gian chúng tôi lưu trữ các mục
thực hiện hiện tại:
- Chúng tôi bắt đầu tải xuống các mục này theo cách thủ công khi cần, tạo các tác vụ trên chương trình phụ trợ dành riêng để tải xuống các mục này. Mỗi tác vụ sẽ khởi chạy nhiều tác vụ hơn tùy thuộc vào số lượng cuộc gọi API được yêu cầu để phân trang và thu thập mọi mục.
- Mỗi tác vụ sẽ tải xuống, phân tích cú pháp và lưu trữ hàng loạt các mục, trong khi vẫn giữ tập hợp chúng tôi muốn trong bộ nhớ bằng cách sử dụng từ điển.
- Khi kết thúc mỗi lần thực hiện tác vụ, chúng tôi viết từ điển tổng hợp vào hàng đợi kéo.
- Khi chúng tôi phát hiện chúng tôi sắp kết thúc cuộc gọi API, chúng tôi sẽ khởi chạy tác vụ tổng hợp thành cấu hình phụ trợ thứ hai
- "Nhiệm vụ tổng hợp" này kéo từ hàng đợi kéo (20 lần) và hợp nhất từ điển được tìm thấy trong mỗi tác vụ (làm việc trong bộ nhớ tổng hợp hơn), trước khi cố gắng lưu trữ mỗi tập hợp. Nhiệm vụ này cũng sẽ tung ra các nhiệm vụ khác để thực hiện quy tụ cho các nhiệm vụ còn lại trong hàng đợi kéo (hàng trăm)
- Chúng tôi sử dụng cách tiếp cận sharded counter để giúp làm giảm bớt bất kỳ tranh chấp khi lưu trữ vào kho dữ liệu
- Mỗi nhiệm vụ tập hợp có thể thử và lưu trữ 500 1500 tập hợp, tất cả nên được độc lập với nhau,
Có thêm kiểm tra và như vậy để đảm bảo tất cả các nhiệm vụ xếp hàng được xử lý đúng và tất cả các mục được tải xuống.
Vấn đề:
Chúng tôi muốn tải về và lưu trữ tất cả các mục và tập hợp càng nhanh càng tốt. Tôi có 20 trường hợp được kích hoạt cho mỗi cấu hình phụ trợ được mô tả (tôi sẽ gọi chúng là phụ trợ "aggregator" và phần phụ trợ "downloader"). Chương trình phụ trợ tải xuống dường như nhận được thông qua các cuộc gọi API khá nhanh. Tôi sử dụng nhiều thư viện NDB và các URL không đồng bộ Tìm nạp/Lưu trữ dữ liệu để có được điều này. Tôi cũng đã kích hoạt threadsafe: true để không có cá thể nào chờ đợi các cuộc gọi RPC kết thúc trước khi bắt đầu nhiệm vụ tiếp theo (tất cả các tác vụ có thể hoạt động độc lập với nhau và không có giá trị).
Phần phụ trợ tổng hợp là nơi có thời gian chờ đợi lớn. Lưu trữ 500-1500 của các tập hợp không đồng bộ thông qua các giao dịch mất 40 giây hoặc hơn (và tôi thậm chí không nghĩ rằng tất cả các giao dịch đang được cam kết đúng). Tôi giữ backend này với threadsafe: false vì tôi sử dụng thời hạn hết hạn hàng đợi kéo dài 300 giây, nhưng nếu tôi cho phép nhiều hơn một nhiệm vụ thực thi trên một cá thể, chúng có thể hạ xuống và đẩy một số nhiệm vụ trong 300 giây đánh dấu, do đó cho phép tác vụ khác kéo cùng một tác vụ lần thứ hai và có thể đếm hai lần.
Nhật ký hiển thị BadRequestError: Nested transactions are not supported.
có lỗi trước đó (trong ngăn xếp ngăn xếp) của TransactionFailedError: too much contention on these datastore entities. please try again.
. Một lỗi khác mà tôi thường thấy là BadRequestError(The referenced transaction has expired or is no longer valid.)
Từ hiểu biết của tôi, đôi khi các lỗi này có nghĩa là giao dịch vẫn có thể được cam kết mà không cần tương tác thêm. Làm thế nào để tôi biết nếu điều này đã được cam kết đúng? Tôi đang làm điều này một cách hợp lý/hiệu quả hoặc có nhiều phòng cho đồng thời mà không có nguy cơ rối tung mọi thứ lên?
Mã liên quan:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
Với việc thực hiện, căn phòng duy nhất cho "tranh chấp" Tôi thấy là nếu 2 hoặc nhiều nhiệm vụ tổng hợp cần phải cập nhật tên tổng hợp tương tự, mà không nên xảy ra quá thường xuyên, và với các quầy bị ngăn trở, tôi cho rằng sự trùng lặp này hiếm khi xảy ra. Tôi cho rằng lỗi BadRequestError(The referenced transaction has expired or is no longer valid.)
xuất hiện khi vòng lặp sự kiện kiểm tra trạng thái của tất cả các tác vụ và truy cập tham chiếu đến giao dịch đã hoàn tất. Vấn đề ở đây là nó lỗi ra do đó có nghĩa là tất cả các giao dịch được cắt sớm hoặc tôi có thể giả định tất cả các giao dịch đã đi qua? Tôi tiếp tục giả định dòng này res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
cần phải được chia thành một thử/ngoại trừ mỗi tasklet để phát hiện các lỗi này.
Trước khi tôi khiến bản thân bị điên về điều này, tôi đánh giá cao bất kỳ hướng dẫn/trợ giúp nào về cách tối ưu hóa quá trình này và thực hiện theo cách đáng tin cậy.
EDIT 1: tôi sửa đổi hành vi nhiệm vụ aggregator như sau:
- Nếu có nhiều hơn 1 nhiệm vụ được thuê của hàng đợi, tổng hợp các nhiệm vụ trong bộ nhớ, sau đó lưu trữ kết quả trong một nhiệm vụ trong kéo-hàng đợi, và ngay lập tức khởi động một "nhiệm vụ aggregator"
- khác, nếu 1 công việc được cho thuê, cố gắng để lưu các kết quả
này đã giúp giảm các lỗi tranh Tôi đã nhìn thấy, nhưng nó vẫn không đáng tin cậy lắm. Gần đây nhất, tôi nhấn BadRequestError: Nested transactions are not supported.
với theo dõi ngăn xếp cho biết RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
Tôi tin rằng việc sửa đổi này sẽ tối ưu hóa quy trình bằng cách cho phép tất cả các chồng chéo có thể được kết hợp và thử cùng một lúc trong một trường hợp, so với nhiều trường hợp tất cả các giao dịch thực hiện có thể va chạm. Tôi vẫn gặp sự cố khi lưu kết quả một cách đáng tin cậy.
Tôi nhận được kết quả hỗn hợp với mã. Đôi khi các giá trị tổng hợp là chính xác, các thời điểm khác chúng bị giảm sút nghiêm trọng và không phải do nhiều giá trị. Tài liệu AppEngine nói rằng các giao dịch có thể thực hiện nhiều lần hoặc vẫn có thể thành công bất kể lỗi được đưa ra. Trong ý nghĩa này, làm thế nào để chúng ta làm cho giao dịch không có giá trị? Nghĩa là, nếu một giao dịch thực hiện nhiều lần, nó sẽ không bao giờ tăng giá trị của nó nhiều hơn một lần? Điều đó thậm chí có thể? – someone1
Tôi đã thử nó và có một điều kiện lỗi (dòng 117), nơi nó được xử lý bằng: 'logging.error ('Shard% s bị hỏng'% name)' – Metalshark
Tại sao các tập hợp lại có thể bị tắt cho những điều trên tôi không biết. Có vẻ như không làm cho nó thất bại. Bạn có một số dữ liệu mẫu? – Metalshark