7

Tôi đang cố gắng tìm nạp dữ liệu meta khoảng 10k + torrents mỗi ngày bằng cách sử dụng python libtorrent.tạo daemon bằng cách sử dụng Python libtorrent để tìm nạp dữ liệu meta của 100k + torrents

Đây là dòng chảy hiện tại của mã

  1. Bắt đầu libtorrent Session.
  2. Nhận tổng số lượng torrent mà chúng tôi cần siêu dữ liệu để tải lên trong vòng 1 ngày qua.
  3. lấy băm torrent từ DB theo các khối
  4. tạo liên kết nam châm sử dụng các băm đó và thêm các URI của nam châm đó vào phiên bằng cách tạo tay cầm cho mỗi URI nam châm.
  5. ngủ trong một giây trong khi Siêu dữ liệu được tìm nạp và tiếp tục kiểm tra xem liệu siêu dữ liệu có được tìm thấy hay không.
  6. Nếu nhận được dữ liệu meta, hãy thêm dữ liệu vào trong DB khác nếu chúng tôi tìm kiếm dữ liệu meta trong khoảng 10 phút, nếu có thì hãy xóa xử lý tức là không tìm siêu dữ liệu nữa.
  7. làm trên vô thời hạn. và lưu trạng thái phiên cho tương lai.

cho đến nay tôi đã thử cách này.

#!/usr/bin/env python 
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri 

import libtorrent as lt # libtorrent library 
import tempfile # for settings parameters while fetching metadata as temp dir 
import sys #getting arguiments from shell or exit script 
from time import sleep #sleep 
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things 
from pprint import pprint # for debugging, showing object data 
import MySQLdb # DB connectivity 
import os 
from datetime import date, timedelta 

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0) 
session.listen_on(6881, 6891) 
session.add_extension('ut_metadata') 
session.add_extension('ut_pex') 
session.add_extension('smart_ban') 
session.add_extension('metadata_transfer') 

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state" 

if(os.path.isfile(session_save_filename)): 

    fileread = open(session_save_filename, 'rb') 
    session.load_state(lt.bdecode(fileread.read())) 
    fileread.close() 
    print('session loaded from file') 
else: 
    print('new session started') 

session.add_dht_router("router.utorrent.com", 6881) 
session.add_dht_router("router.bittorrent.com", 6881) 
session.add_dht_router("dht.transmissionbt.com", 6881) 
session.add_dht_router("dht.aelitis.com", 6881) 

session.start_dht() 
session.start_lsd() 
session.start_upnp() 
session.start_natpmp() 

alive = True 
while alive: 

    db_conn = MySQLdb.connect( host = '', user = '', passwd = '', db = '', unix_socket='/mysql/mysql.sock') # Open database connection 
    #print('reconnecting') 
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ; 

    yesterday = date.today() - timedelta(1) 
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S') 
    #print(yesterday) 

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ") 
    #print(total_count_query) 
    try: 
     total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
     total_count_cursor.execute(total_count_query) # Execute the SQL command 
     total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists. 
     total_count = total_count_results[0] 
     print(total_count) 
    except: 
      print "Error: unable to select data" 

    total_pages = total_count/subset_count 
    #print(total_pages) 

    current_page = 1 
    while(current_page <= total_pages): 
     from_count = (current_page * subset_count) - subset_count 

     #print(current_page) 
     #print(from_count) 

     hashes = [] 

     get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ") 
     #print(get_mysql_data_query) 
     try: 
      get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
      get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command 
      get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists. 
      for row in get_mysql_data_results: 
       hashes.append(row[0].upper()) 
     except: 
      print "Error: unable to select data" 

     #print(hashes) 

     handles = [] 

     for hash in hashes: 
      tempdir = tempfile.mkdtemp() 
      add_magnet_uri_params = { 
       'save_path': tempdir, 
       'duplicate_is_error': True, 
       'storage_mode': lt.storage_mode_t(2), 
       'paused': False, 
       'auto_managed': True, 
       'duplicate_is_error': True 
      } 
      magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80" 
      #print(magnet_uri) 
      handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params) 
      handles.append(handle) #push handle in handles list 

     #print("handles length is :") 
     #print(len(handles)) 

     while(len(handles) != 0): 
      for h in handles: 
       #print("inside handles for each loop") 
       if h.has_metadata(): 
        torinfo = h.get_torrent_info() 
        final_info_hash = str(torinfo.info_hash()) 
        final_info_hash = final_info_hash.upper() 
        torfile = lt.create_torrent(torinfo) 
        torcontent = lt.bencode(torfile.generate()) 
        tfile_size = len(torcontent) 
        try: 
         insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
         insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""", [final_info_hash , torcontent]) 
         db_conn.commit() 
         #print "data inserted in DB" 
        except MySQLdb.Error, e: 
         try: 
          print "MySQL Error [%d]: %s" % (e.args[0], e.args[1]) 
         except IndexError: 
          print "MySQL Error: %s" % str(e)  


        shutil.rmtree(h.save_path()) # remove temp data directory 
        session.remove_torrent(h) # remove torrnt handle from session 
        handles.remove(h) #remove handle from list 

       else: 
        if(h.status().active_time > 600): # check if handle is more than 10 minutes old i.e. 600 seconds 
         #print('remove_torrent') 
         shutil.rmtree(h.save_path()) # remove temp data directory 
         session.remove_torrent(h) # remove torrnt handle from session 
         handles.remove(h) #remove handle from list 
       sleep(1)   
       #print('sleep1') 

     #print('sleep10') 
     #sleep(10) 
     current_page = current_page + 1 

     #save session state 
     filewrite = open(session_save_filename, "wb") 
     filewrite.write(lt.bencode(session.save_state())) 
     filewrite.close() 


    print('sleep60') 
    sleep(60) 

    #save session state 
    filewrite = open(session_save_filename, "wb") 
    filewrite.write(lt.bencode(session.save_state())) 
    filewrite.close() 

Tôi đã cố gắng giữ ở trên tập lệnh chạy qua đêm và chỉ tìm thấy khoảng 1200 dữ liệu meta của torrent được tìm thấy trong phiên qua đêm. vì vậy tôi đang tìm kiếm để cải thiện hiệu suất của tập lệnh.

Tôi thậm chí đã thử Giải mã tệp save_state và nhận thấy có 700+ DHT nodes Tôi đang kết nối với. vì vậy nó không giống như DHT không hoạt động,

Điều tôi định làm là, keep the handles active trong phiên vô thời hạn trong khi dữ liệu meta không được tìm nạp. và sẽ không tháo các tay cầm sau 10 phút nếu không có dữ liệu meta nào được tìm nạp trong 10 phút, như tôi hiện đang làm.

Tôi có một vài câu hỏi về liên kết python lib-torrent.

  1. Tôi có thể tiếp tục chạy bao nhiêu tay cầm? là có bất kỳ giới hạn cho chạy xử lý?
  2. sẽ chạy xử lý 10k + hoặc 100k làm chậm hệ thống của tôi? hoặc ăn tài nguyên? nếu có thì nguồn nào? Tôi có nghĩa là RAM, NETWORK?
  3. Tôi đứng sau tường lửa, có thể là cổng bị chặn khiến tốc độ tìm nạp siêu dữ liệu chậm?
  4. có thể sử dụng máy chủ DHT như router.bittorrent.com hoặc bất kỳ địa chỉ IP BAN nào khác của tôi để gửi quá nhiều yêu cầu không?
  5. Các đồng nghiệp khác có thể BAN địa chỉ IP của tôi nếu họ phát hiện ra tôi đang thực hiện quá nhiều yêu cầu, chỉ tìm nạp dữ liệu meta không?
  6. Tôi có thể chạy nhiều phiên bản của tập lệnh này không? hoặc có thể là đa luồng? nó sẽ cho hiệu suất tốt hơn?
  7. nếu sử dụng nhiều phiên bản của cùng một tập lệnh, mỗi tập lệnh sẽ nhận được id nút duy nhất tùy thuộc vào ip và cổng tôi đang sử dụng, đây có phải là giải pháp khả thi không?

Có cách tiếp cận nào tốt hơn không? để đạt được những gì tôi đang cố gắng?

Trả lời

3

Tôi không thể trả lời các câu hỏi cụ thể cho API của libtorrent, nhưng một số câu hỏi của bạn áp dụng cho bittorrent nói chung.

sẽ chạy xử lý 10k + hoặc 100k làm chậm hệ thống của tôi? hoặc ăn tài nguyên? nếu có thì nguồn nào? tôi có nghĩa là RAM, NETWORK?

Tải xuống siêu dữ liệu không được sử dụng nhiều tài nguyên vì chúng chưa tải xuống đầy đủ, tức là chúng không thể phân bổ tệp thực hoặc bất kỳ thứ gì tương tự. Nhưng họ sẽ cần một số ram/không gian đĩa cho chính siêu dữ liệu khi họ lấy đoạn đầu tiên của những người.

Tôi đứng sau tường lửa, có thể là cổng bị chặn khiến tốc độ tìm nạp siêu dữ liệu chậm?

có, bằng cách giảm số lượng đồng nghiệp có thể thiết lập kết nối, việc tìm nạp siêu dữ liệu (hoặc thiết lập bất kỳ kết nối nào) trở nên khó khăn hơn với số lượng ngang hàng thấp.

NAT có thể gây ra cùng một vấn đề.

có thể máy chủ DHT như router.bittorrent.com hoặc bất kỳ địa chỉ IP BAN nào khác của tôi gửi quá nhiều yêu cầu không?

router.bittorrent.com là nút khởi động, không phải máy chủ. Tra cứu không truy vấn một nút duy nhất, chúng truy vấn nhiều khác nhau (trong số hàng triệu). Nhưng có, các nút riêng lẻ có thể cấm, hoặc nhiều khả năng tỷ lệ giới hạn, bạn.

Điều này có thể được giảm nhẹ bằng cách tìm các ID được phân phối ngẫu nhiên để truyền tải trên toàn bộ không gian khóa DHT.

tôi có thể chạy nhiều phiên bản của tập lệnh này không? hoặc có thể là đa luồng? nó sẽ cho hiệu suất tốt hơn?

AIUI libtorrent là không chặn hoặc đa luồng mà bạn có thể lên lịch nhiều torrent cùng một lúc.

Tôi không biết liệu libtorrent có giới hạn tốc độ cho các yêu cầu DHT gửi đi hay không.

nếu sử dụng nhiều phiên bản của cùng một tập lệnh, mỗi tập lệnh sẽ nhận được id nút duy nhất tùy thuộc vào ip và cổng tôi đang sử dụng, là giải pháp khả thi này?

Nếu bạn ngụ ý ID nút DHT, thì chúng bắt nguồn từ IP (theo BEP 42), không phải cổng. Mặc dù một số phần tử ngẫu nhiên được bao gồm, do đó, một số lượng giới hạn các ID có thể thu được trên mỗi IP.

Và một số điều này cũng có thể có thể được áp dụng cho trường hợp của bạn: http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

Và tùy chọn khác là my own DHT implementation trong đó bao gồm một số lượng lớn CLI để nạp torrents.

+0

tôi thực sự đánh giá cao ý kiến ​​của bạn, cảm ơn bạn. – AMB

Các vấn đề liên quan