13

Tôi đang gặp phải một nút cổ chai trên đơn đăng ký của mình và gặp khó khăn trong việc tìm kiếm giải pháp xung quanh nó. Một chút nền:Cách đáng tin cậy để thực thi hàng nghìn giao dịch độc lập?

  • ping Ứng dụng của tôi một API để thu thập thông tin về hàng trăm ngàn sản phẩm và lưu trữ chúng vào kho dữ liệu
  • Chúng tôi cần phải thực hiện quy tụ đơn giản trên một kết hợp của kích thước của các mặt hàng này, mà chúng tôi cố gắng và tính toán trong thời gian chúng tôi lưu trữ các mục

thực hiện hiện tại:

  • Chúng tôi bắt đầu tải xuống các mục này theo cách thủ công khi cần, tạo các tác vụ trên chương trình phụ trợ dành riêng để tải xuống các mục này. Mỗi tác vụ sẽ khởi chạy nhiều tác vụ hơn tùy thuộc vào số lượng cuộc gọi API được yêu cầu để phân trang và thu thập mọi mục.
  • Mỗi tác vụ sẽ tải xuống, phân tích cú pháp và lưu trữ hàng loạt các mục, trong khi vẫn giữ tập hợp chúng tôi muốn trong bộ nhớ bằng cách sử dụng từ điển.
  • Khi kết thúc mỗi lần thực hiện tác vụ, chúng tôi viết từ điển tổng hợp vào hàng đợi kéo.
  • Khi chúng tôi phát hiện chúng tôi sắp kết thúc cuộc gọi API, chúng tôi sẽ khởi chạy tác vụ tổng hợp thành cấu hình phụ trợ thứ hai
  • "Nhiệm vụ tổng hợp" này kéo từ hàng đợi kéo (20 lần) và hợp nhất từ ​​điển được tìm thấy trong mỗi tác vụ (làm việc trong bộ nhớ tổng hợp hơn), trước khi cố gắng lưu trữ mỗi tập hợp. Nhiệm vụ này cũng sẽ tung ra các nhiệm vụ khác để thực hiện quy tụ cho các nhiệm vụ còn lại trong hàng đợi kéo (hàng trăm)
  • Chúng tôi sử dụng cách tiếp cận sharded counter để giúp làm giảm bớt bất kỳ tranh chấp khi lưu trữ vào kho dữ liệu
  • Mỗi nhiệm vụ tập hợp có thể thử và lưu trữ 500 1500 tập hợp, tất cả nên được độc lập với nhau,

Có thêm kiểm tra và như vậy để đảm bảo tất cả các nhiệm vụ xếp hàng được xử lý đúng và tất cả các mục được tải xuống.

Vấn đề:

Chúng tôi muốn tải về và lưu trữ tất cả các mục và tập hợp càng nhanh càng tốt. Tôi có 20 trường hợp được kích hoạt cho mỗi cấu hình phụ trợ được mô tả (tôi sẽ gọi chúng là phụ trợ "aggregator" và phần phụ trợ "downloader"). Chương trình phụ trợ tải xuống dường như nhận được thông qua các cuộc gọi API khá nhanh. Tôi sử dụng nhiều thư viện NDB và các URL không đồng bộ Tìm nạp/Lưu trữ dữ liệu để có được điều này. Tôi cũng đã kích hoạt threadsafe: true để không có cá thể nào chờ đợi các cuộc gọi RPC kết thúc trước khi bắt đầu nhiệm vụ tiếp theo (tất cả các tác vụ có thể hoạt động độc lập với nhau và không có giá trị).

Phần phụ trợ tổng hợp là nơi có thời gian chờ đợi lớn. Lưu trữ 500-1500 của các tập hợp không đồng bộ thông qua các giao dịch mất 40 giây hoặc hơn (và tôi thậm chí không nghĩ rằng tất cả các giao dịch đang được cam kết đúng). Tôi giữ backend này với threadsafe: false vì tôi sử dụng thời hạn hết hạn hàng đợi kéo dài 300 giây, nhưng nếu tôi cho phép nhiều hơn một nhiệm vụ thực thi trên một cá thể, chúng có thể hạ xuống và đẩy một số nhiệm vụ trong 300 giây đánh dấu, do đó cho phép tác vụ khác kéo cùng một tác vụ lần thứ hai và có thể đếm hai lần.

Nhật ký hiển thị BadRequestError: Nested transactions are not supported. có lỗi trước đó (trong ngăn xếp ngăn xếp) của TransactionFailedError: too much contention on these datastore entities. please try again.. Một lỗi khác mà tôi thường thấy là BadRequestError(The referenced transaction has expired or is no longer valid.)

Từ hiểu biết của tôi, đôi khi các lỗi này có nghĩa là giao dịch vẫn có thể được cam kết mà không cần tương tác thêm. Làm thế nào để tôi biết nếu điều này đã được cam kết đúng? Tôi đang làm điều này một cách hợp lý/hiệu quả hoặc có nhiều phòng cho đồng thời mà không có nguy cơ rối tung mọi thứ lên?

Mã liên quan:

class GeneralShardConfig(ndb.Model): 
    """Tracks the number of shards for each named counter.""" 
    name = ndb.StringProperty(required=True) 
    num_shards = ndb.IntegerProperty(default=4) 

class GeneralAggregateShard(ndb.Model): 
    """Shards for each named counter""" 
    name = ndb.StringProperty(name='n', required=True) 
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now 

@ndb.tasklet 
def increment_batch(data_set): 
    def run_txn(name, value): 
     @ndb.tasklet 
     def txn(): 
      to_put = [] 
      dbkey = ndb.Key(GeneralShardConfig, name) 
      config = yield dbkey.get_async(use_memcache=False) 
      if not config: 
       config = GeneralShardConfig(key=dbkey,name=name) 
       to_put.append(config) 
      index = random.randint(0, config.num_shards-1) 
      shard_name = name + str(index) 
      dbkey = ndb.Key(GeneralAggregateShard, shard_name) 
      counter = yield dbkey.get_async() 
      if not counter: 
       counter = GeneralAggregateShard(key=dbkey, name=name) 
      counter.count += value 
      to_put.append(counter) 
      yield ndb.put_multi_async(to_put) 
     return ndb.transaction_async(txn, use_memcache=False, xg=True) 
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00] 
    raise ndb.Return(res) 

Với việc thực hiện, căn phòng duy nhất cho "tranh chấp" Tôi thấy là nếu 2 hoặc nhiều nhiệm vụ tổng hợp cần phải cập nhật tên tổng hợp tương tự, mà không nên xảy ra quá thường xuyên, và với các quầy bị ngăn trở, tôi cho rằng sự trùng lặp này hiếm khi xảy ra. Tôi cho rằng lỗi BadRequestError(The referenced transaction has expired or is no longer valid.) xuất hiện khi vòng lặp sự kiện kiểm tra trạng thái của tất cả các tác vụ và truy cập tham chiếu đến giao dịch đã hoàn tất. Vấn đề ở đây là nó lỗi ra do đó có nghĩa là tất cả các giao dịch được cắt sớm hoặc tôi có thể giả định tất cả các giao dịch đã đi qua? Tôi tiếp tục giả định dòng này res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00] cần phải được chia thành một thử/ngoại trừ mỗi tasklet để phát hiện các lỗi này.

Trước khi tôi khiến bản thân bị điên về điều này, tôi đánh giá cao bất kỳ hướng dẫn/trợ giúp nào về cách tối ưu hóa quá trình này và thực hiện theo cách đáng tin cậy.

EDIT 1: tôi sửa đổi hành vi nhiệm vụ aggregator như sau:

  • Nếu có nhiều hơn 1 nhiệm vụ được thuê của hàng đợi, tổng hợp các nhiệm vụ trong bộ nhớ, sau đó lưu trữ kết quả trong một nhiệm vụ trong kéo-hàng đợi, và ngay lập tức khởi động một "nhiệm vụ aggregator"
  • khác, nếu 1 công việc được cho thuê, cố gắng để lưu các kết quả

này đã giúp giảm các lỗi tranh Tôi đã nhìn thấy, nhưng nó vẫn không đáng tin cậy lắm. Gần đây nhất, tôi nhấn BadRequestError: Nested transactions are not supported. với theo dõi ngăn xếp cho biết RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

Tôi tin rằng việc sửa đổi này sẽ tối ưu hóa quy trình bằng cách cho phép tất cả các chồng chéo có thể được kết hợp và thử cùng một lúc trong một trường hợp, so với nhiều trường hợp tất cả các giao dịch thực hiện có thể va chạm. Tôi vẫn gặp sự cố khi lưu kết quả một cách đáng tin cậy.

Trả lời

5

Bằng cách giảm lưu lượng dữ liệu I/O (để làm việc với bộ tự động và tắt chỉ mục), bạn có thể chắc chắn hơn rằng kho dữ liệu viết hoàn toàn (ít tranh chấp) và nó sẽ nhanh hơn.

Cấu hình (bộ đếm được đổi tên) nằm ngoài giao dịch và có thể chạy đồng thời trong khi lặp qua các giao dịch.

Phương thức và tổng số thuộc tính đã được thêm vào Số lượt truy cập (hy vọng) giúp dễ dàng sửa đổi trong tương lai.

Tạo thuộc tính ndb mới để hỗ trợ thập phân (giả sử đó là lý do bạn chỉ định 0,00 thay vì 0,0).

EDIT:

Removed nhu cầu giao dịch và thay đổi hệ thống sharding cho độ tin cậy.

import webapp2 

import copy 
import decimal 
import logging 
import random 
import string 

from google.appengine.api import datastore_errors 
from google.appengine.datastore import entity_pb 
from google.appengine.ext import deferred 
from google.appengine.ext import ndb 


TEST_BATCH_SIZE = 250 
TEST_NAME_LEN = 12 


class DecimalProperty(ndb.Property): 
    """A Property whose value is a decimal.Decimal object.""" 

    def _datastore_type(self, value): 
     return str(value) 

    def _validate(self, value): 
     if not isinstance(value, decimal.Decimal): 
     raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r' 
              % (value,)) 
     return value 

    def _db_set_value(self, v, p, value): 
     value = str(value) 
     v.set_stringvalue(value) 
     if not self._indexed: 
      p.set_meaning(entity_pb.Property.TEXT) 

    def _db_get_value(self, v, _): 
     if not v.has_stringvalue(): 
      return None 
     value = v.stringvalue() 
     return decimal.Decimal(value) 

class BatchInProgress(ndb.Model): 
    """Use a scheduler to delete batches in progress after a certain time""" 

    started = ndb.DateTimeProperty(auto_now=True) 

    def clean_up(self): 
     qry = Shard.query().filter(Shard.batch_key == self.key) 
     keys = qry.fetch(keys_only=True) 
     while keys: 
      ndb.delete_multi(keys) 
      keys = qry.fetch(keys_only=True) 

def cleanup_failed_batch(batch_key): 
    batch = batch_key.get() 

    if batch: 
     batch.clean_up() 
     batch.delete() 

class Shard(ndb.Model): 
    """Shards for each named counter""" 

    counter_key = ndb.KeyProperty(name='c') 
    batch_key = ndb.KeyProperty(name='b') 
    count = DecimalProperty(name='v', default=decimal.Decimal('0.00'), 
          indexed=False) 

class Counter(ndb.Model): 
    """Tracks the number of shards for each named counter""" 

    @property 
    def shards(self): 
     qry = Shard.query().filter(Shard.counter_key == self.key) 
     results = qry.fetch(use_cache=False, use_memcache=False) 
     return filter(None, results) 

    @property 
    def total(self): 
     count = decimal.Decimal('0.00') # Use initial value if no shards 

     for shard in self.shards: 
      count += shard.count 

     return count 

    @ndb.tasklet 
    def incr_async(self, value, batch_key): 
     index = batch_key.id() 
     name = self.key.id() + str(index) 

     shard = Shard(id=name, count=value, 
         counter_key=self.key, batch_key=batch_key) 

     yield shard.put_async(use_cache=False, use_memcache=False) 

    def incr(self, *args, **kwargs): 
     return self.incr_async(*args, **kwargs).get_result() 

@ndb.tasklet 
def increment_batch(data_set): 
    batch_key = yield BatchInProgress().put_async() 
    deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600) 

    # NOTE: mapping is modified in place, hence copying 
    mapping = copy.copy(data_set) 

    # (1/3) filter and fire off counter gets 
    #  so the futures can autobatch 
    counters = {} 
    ctr_futs = {} 
    ctr_put_futs = [] 
    zero_values = set() 
    for name, value in mapping.iteritems(): 
     if value != decimal.Decimal('0.00'): 
      ctr_fut = Counter.get_by_id_async(name) # Use cache(s) 
      ctr_futs[name] = ctr_fut 
     else: 
      # Skip zero values because... 
      zero_values.add(name) 
      continue 

    for name in zero_values: 
     del mapping[name] # Remove all zero values from the mapping 
    del zero_values 

    while mapping: # Repeat until all transactions succeed 

     # (2/3) wait on counter gets and fire off increment transactions 
     #  this way autobatchers should fill time 
     incr_futs = {} 
     for name, value in mapping.iteritems(): 
      counter = counters.get(name) 
      if not counter: 
       counter = counters[name] = yield ctr_futs.pop(name) 
      if not counter: 
       logging.info('Creating new counter %s', name) 
       counter = counters[name] = Counter(id=name) 
       ctr_put_futs.append(counter.put_async()) 
      else: 
       logging.debug('Reusing counter %s', name) 
      incr_fut = counter.incr_async(value, batch_key) 
      incr_futs[(name, value)] = incr_fut 

     # (3/3) wait on increments and handle errors 
     #  by using a tuple key for variable access 
     for (name, value), incr_fut in incr_futs.iteritems(): 
      counter = counters[name] 
      try: 
       yield incr_fut 
      except: 
       pass 
      else: 
       del mapping[name] 

     if mapping: 
      logging.warning('%i increments failed this batch.' % len(mapping)) 

    yield batch_key.delete_async(), ctr_put_futs 

    raise ndb.Return(counters.values()) 

class ShardTestHandler(webapp2.RequestHandler): 

    @ndb.synctasklet 
    def get(self): 
     if self.request.GET.get('delete'): 
      ndb.delete_multi_async(Shard.query().fetch(keys_only=True)) 
      ndb.delete_multi_async(Counter.query().fetch(keys_only=True)) 
      ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True)) 
     else: 
      data_set_test = {} 
      for _ in xrange(TEST_BATCH_SIZE): 
       name = '' 
       for _ in xrange(TEST_NAME_LEN): 
        name += random.choice(string.letters) 
       value = decimal.Decimal('{0:.2f}'.format(random.random() * 100)) 
       data_set_test[name] = value 
      yield increment_batch(data_set_test) 
     self.response.out.write("Done!") 

app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True) 
app = ndb.toplevel(app.__call__) 
+0

Tôi nhận được kết quả hỗn hợp với mã. Đôi khi các giá trị tổng hợp là chính xác, các thời điểm khác chúng bị giảm sút nghiêm trọng và không phải do nhiều giá trị. Tài liệu AppEngine nói rằng các giao dịch có thể thực hiện nhiều lần hoặc vẫn có thể thành công bất kể lỗi được đưa ra. Trong ý nghĩa này, làm thế nào để chúng ta làm cho giao dịch không có giá trị? Nghĩa là, nếu một giao dịch thực hiện nhiều lần, nó sẽ không bao giờ tăng giá trị của nó nhiều hơn một lần? Điều đó thậm chí có thể? – someone1

+0

Tôi đã thử nó và có một điều kiện lỗi (dòng 117), nơi nó được xử lý bằng: 'logging.error ('Shard% s bị hỏng'% name)' – Metalshark

+0

Tại sao các tập hợp lại có thể bị tắt cho những điều trên tôi không biết. Có vẻ như không làm cho nó thất bại. Bạn có một số dữ liệu mẫu? – Metalshark

5

Cụ thể về chủ đề của "Giao dịch tham chiếu đã hết hạn hoặc không còn giá trị" BadRequestError, đó là một thực tế ít được quảng cáo rằng các giao dịch sẽ hết hiệu sớm hơn nhiều so với yêu cầu sẽ. Từ sáng tạo, bạn nhận được 15 giây miễn phí, và sau đó giao dịch sẽ bị giết nếu nó mất 15 giây liên tục nhàn rỗi (tuổi thọ tối thiểu là 30 giây), và bị giết cứng bất kể sau 60 giây. Điều này làm cho nó khó khăn để chạy số lượng lớn các giao dịch song song kể từ khi tranh chấp CPU và một thuật toán lập kế hoạch nhiệm vụ không công bằng có thể âm mưu để giữ cho một số giao dịch nhàn rỗi quá lâu.

Ví dụ sau đây cho phương thức giao dịch của ndb giúp bit bằng cách thử lại các giao dịch đã hết hạn, nhưng cuối cùng bạn phải điều chỉnh đợt của mình để giảm ganh đua đến mức có thể quản lý.

_ndb_context_transaction = ndb.Context.transaction 

@ndb.tasklet 
def _patched_transaction(self, callback, **ctx_options): 
    if (self.in_transaction() and 
     ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT): 
    raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options))) 

    attempts = 1 
    start_time = time.time() 
    me = random.getrandbits(16) 
    logging.debug('Transaction started <%04x>', me) 
    while True: 
    try: 
     result = yield _ndb_context_transaction(self, callback, **ctx_options) 
    except datastore_errors.BadRequestError as e: 
     if not ('expired' in str(e) and 
       attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS): 
     raise 
     logging.warning(
      'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s', 
      me, attempts, time.time() - start_time, e) 
     attempts += 1 
    else: 
     logging.debug(
      'Transaction finished <%04x> (attempt #%d, %.1f seconds)', 
      me, attempts, time.time() - start_time) 
     raise ndb.Return(result) 

ndb.Context.transaction = _patched_transaction 
Các vấn đề liên quan