2011-09-26 29 views
18

Tương tự như bài đăng khác tôi đã thực hiện, câu trả lời này cho bài đăng và tạo câu hỏi mới.Tạo kết nối DB và duy trì trên nhiều quy trình (đa xử lý)

Tóm tắt: Tôi cần phải cập nhật mọi bản ghi trong cơ sở dữ liệu không gian trong đó tôi có tập hợp dữ liệu các điểm phủ lớp dữ liệu đa giác. Đối với mỗi tính năng điểm tôi muốn gán một khóa để liên kết nó với đối tượng địa lý đa giác nằm trong đó. Vì vậy, nếu điểm của tôi 'Thành phố New York' nằm trong đa giác Hoa Kỳ và cho đa giác Hoa Kỳ 'GID = 1' tôi sẽ chỉ định 'gid_fkey = 1' cho điểm của tôi Thành phố New York.

Được rồi, điều này đã đạt được bằng cách sử dụng đa xử lý. Tôi đã nhận thấy tốc độ tăng 150% khi sử dụng tính năng này để nó hoạt động. Nhưng tôi nghĩ rằng có một loạt các chi phí không cần thiết như một kết nối DB là cần thiết cho mỗi bản ghi.

Vì vậy, đây là mã:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

Có vẻ là giữa 0,3 và 1,5 giây cho mỗi kết nối như tôi có đo nó với 'thời gian' module.

Có cách nào để tạo kết nối DB cho mỗi quy trình và sau đó chỉ sử dụng thông tin city_id làm biến mà tôi có thể đưa vào truy vấn cho con trỏ trong phần mở này không? Bằng cách này tôi nói rằng bốn tiến trình mỗi với một kết nối DB và sau đó thả tôi city_id bằng cách nào đó để xử lý.

Trả lời

31

Cố gắng cô lập việc tạo ra các kết nối của bạn trong constructor tiêu dùng, sau đó đưa nó cho công tác thực hiện:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

Mate mà làm việc một điều trị. Không có kudo để cung cấp cho bạn dấu kiểm duyệt nhưng mã đó hoàn toàn là ma thuật. Loại bỏ các kết nối DB không đổi có thể dễ dàng tăng tốc độ thêm 50% nữa. Có thể gần hơn đến 100% trong một số trường hợp. Cảm ơn một lần nữa. –

+0

@EnE_: Tôi rất vui vì nó đã giúp bạn :). Bạn nên chấp nhận câu trả lời, bạn có quyền làm điều đó bởi vì bạn là chủ sở hữu của câu hỏi. –

+0

Được rồi, tôi phải thừa nhận rằng tôi nghĩ rằng tôi nên bấm mũi tên lên chứ không phải là đánh dấu. 'Đánh dấu phê duyệt' là một cách tự nhiên không may lên đến lượt của cụm từ = D –

Các vấn đề liên quan