2015-12-01 16 views
15

Tôi có khoảng 60 GB tệp JSON mà tôi đang phân tích bằng Python và sau đó chèn vào cơ sở dữ liệu MySQL bằng Trình kết nối Python-MySQL. Mỗi tệp JSON là khoảng 500MBLow InnoDB Viết mỗi giây - AWS EC2 tới MySQL RDS bằng cách sử dụng Python

Tôi đã sử dụng phiên bản AWS r3.xlớn EC2 với ổ đĩa thứ cấp chứa 60 GB dữ liệu JSON.

Tôi sau đó sử dụng phiên bản AWS RDS r3.xlớn MySQL. Những trường hợp này là tất cả trong cùng một khu vực và vùng sẵn có. Ví dụ EC2 đang sử dụng tập lệnh Python sau để tải JSON, phân tích cú pháp và sau đó chèn nó vào trong RDS của MySQL. python của tôi:

import json 
import mysql.connector 
from mysql.connector import errorcode 
from pprint import pprint 
import glob 
import os 

os.chdir("./json_data") 

for file in glob.glob("*.json"): 
    with open(file, 'rU') as data_file: 
     results = json.load(data_file) 
     print('working on file:', file) 

    cnx = mysql.connector.connect(user='', password='', 
     host='') 

    cursor = cnx.cursor(buffered=True) 

    DB_NAME = 'DB' 

    def create_database(cursor): 
     try: 
      cursor.execute(
       "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)) 
     except mysql.connector.Error as err: 
      print("Failed creating database: {}".format(err)) 
      exit(1) 

    try: 
     cnx.database = DB_NAME  
    except mysql.connector.Error as err: 
     if err.errno == errorcode.ER_BAD_DB_ERROR: 
      create_database(cursor) 
      cnx.database = DB_NAME 
     else: 
      print(err) 
      exit(1) 

    add_overall_data = ("INSERT INTO master" 
     "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)" 
     "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)") 

    add_polyline = ("INSERT INTO polyline" 
     "(Overview_polyline, request_no)" 
     "VALUES (%(Overview_polyline)s, %(request_no)s)") 

    add_summary = ("INSERT INTO summary" 
     "(summary, request_no)" 
     "VALUES (%(summary)s, %(request_no)s)") 

    add_warnings = ("INSERT INTO warnings" 
     "(warnings, request_no)" 
     "VALUES (%(warnings)s, %(request_no)s)") 

    add_waypoint_order = ("INSERT INTO waypoint_order" 
     "(waypoint_order, request_no)" 
     "VALUES (%(waypoint_order)s, %(request_no)s)") 

    add_leg_data = ("INSERT INTO leg_data" 
     "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
     "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)") 
    error_messages = [] 
    for result in results: 
     if result["status"] == "OK": 
      for leg in result['routes'][0]['legs']: 
       try: 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": leg['dtf']['value'], 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": leg['start_address'], 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": leg['end_address'] 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
       except KeyError, e: 
        error_messages.append(e) 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": "000", 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": 'unknown', 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": 'unknown' 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
      for overview_polyline in result['routes']: 
       params = { 
       "request_no": request_no, 
       "Overview_polyline": overview_polyline['overview_polyline']['points'] 
       } 
       cursor.execute(add_polyline, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for summary in result['routes']: 
       params = { 
       "request_no": request_no, 
       "summary": summary['summary'] 
       } 
       cursor.execute(add_summary, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for warnings in result['routes']: 
       params = { 
       "request_no": request_no, 
       "warnings": str(warnings['warnings']) 
       } 
       cursor.execute(add_warnings, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for waypoint_order in result['routes']: 
       params = { 
       "request_no": request_no, 
       "waypoint_order": str(waypoint_order['waypoint_order']) 
       } 
       cursor.execute(add_waypoint_order, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for steps in result['routes'][0]['legs'][0]['steps']: 
       params = { 
       "request_no": request_no, 
       "leg_dt": steps['dt']['value'], 
       "leg_ds": steps['ds']['value'], 
       "leg_O_l": steps['start_location']['lat'], 
       "leg_O_ln": steps['start_location']['lng'], 
       "leg_D_l": steps['end_location']['lat'], 
       "leg_D_ln": steps['end_location']['lng'], 
       "leg_html_inst": steps['html_instructions'], 
       "leg_polyline": steps['polyline']['points'], 
       "leg_travel_mode": steps['travel_mode'] 
       } 
       cursor.execute(add_leg_data, params) 
     cnx.commit() 
    print('error messages:', error_messages) 
    cursor.close() 
    cnx.close() 
    print('finished' + file) 

Sử dụng htop trên Linux Instance tôi có thể thấy như sau: htop of python process

Về cơ sở dữ liệu MySQL, sử dụng MySQL Workbench Tôi có thể thấy rằng:

MySQL WorkBench Output

Kịch bản python này đã được chugging đi trong nhiều ngày nhưng tôi chỉ chèn khoảng 20% ​​dữ liệu vào MySQL.

Câu hỏi của tôi - làm cách nào tôi có thể xác định nút cổ chai? Nó là kịch bản Python? Dường như nó đang sử dụng lượng bộ nhớ thấp - tôi có thể tăng điều này không? Tôi đã kiểm tra kích thước InnoDB đệm hồ bơi theo (How to improve the speed of InnoDB writes per second of MySQL DB) và thấy nó là lớn:

SELECT @@innodb_buffer_pool_size; 
+---------------------------+ 
| @@innodb_buffer_pool_size | 
+---------------------------+ 
|    11674845184 | 
+---------------------------+ 

Kể từ khi tôi đang sử dụng một ví dụ RDS và EC2 ở cùng vùng với tôi không tin rằng có một nút cổ chai mạng . Con trỏ về nơi tôi nên tìm kiếm các khoản tiết kiệm lớn nhất sẽ rất hoan nghênh!

CHỈNH SỬA

Tôi nghĩ rằng tôi có thể gặp phải vấn đề. Đối với hiệu quả trong quá trình phân tích cú pháp, tôi viết từng cấp JSON riêng biệt. Tuy nhiên, sau đó tôi phải thực hiện một truy vấn để khớp một phần lồng nhau của JSON với cấp cao hơn của nó. Truy vấn này có chi phí thấp khi sử dụng cơ sở dữ liệu nhỏ. Ive nhận thấy rằng tốc độ của chèn đã giảm đáng kể trên db này. Điều này là do nó phải tìm kiếm một db lớn hơn và đang phát triển để kết nối đúng dữ liệu JSON.

Tôi không chắc chắn làm thế nào để giải quyết này khác hơn là chờ đợi nó ra ....

+1

Bạn đã đề cập đến EC2 và RDS nằm trong cùng một khu vực; họ cũng có trong cùng một khu vực có sẵn không? Nếu không, đó có thể là một cách khá dễ dàng để xem các cải tiến hơn nữa. –

+0

Có - xem xét điều đó. Cả hai đều trong cùng một vùng sẵn có – LearningSlowly

+0

Bạn đã thử sử dụng IOP được cấp phép trên cá thể RDS chưa? – mickzer

Trả lời

1

Tôi không thể nhìn thấy bất kỳ định nghĩa bảng trong kịch bản Python .... Nhưng khi chúng ta cố gắng và làm Operations dữ liệu lớn - chúng tôi sẽ luôn vô hiệu hóa bất kỳ chỉ mục cơ sở dữ liệu nào khi tải xuống MySQL - cũng nếu bạn có bất kỳ ràng buộc/thực thi khóa ngoài nào - điều này sẽ bị vô hiệu hóa khi bạn đang tải.

Tự động tắt được tắt theo mặc định khi kết nối qua Trình kết nối/Python.

Nhưng tôi không thể nhìn thấy bất kỳ cam kết - tùy chọn trong mã bạn hiện

Để Tóm tắt

Disable/Remove (Để tải)

- Chỉ số
- Constraints - Phím nước ngoài - Kích hoạt

Trong chương trình tải của bạn

- Disable autocommit - cam kết bao giờ n hồ sơ (N sẽ phụ thuộc vào kích thước bộ đệm của bạn có sẵn)

1

englist tôi là người nghèo

nếu tôi làm công việc này, tôi sẽ

  1. sử dụng python convert json để txt

  2. sử dụng công cụ mysq imp, nhập khẩu txt để mysql

nếu bạn phải làm python + mysql allinone, tôi đề nghị sử dụng

insert table values(1),value(2)...value(xxx) 

tại sao 'SELECT request_no TỪ xảy ra master'multiple, cần được đọc từ json

englist của tôi là rất poor.so ..

0

Với thông tin này, có vẻ như cả tập lệnh DB hầu như không hoạt động. Tinh chỉnh bất cứ điều gì ở cấp độ MySQL sẽ là quá sớm.

Bạn cần hiển thị nhiều hơn về những gì chương trình của bạn đang làm.

Bắt đầu bằng cách ghi nhật ký truy vấn của bạn mất bao nhiêu thời gian, bạn nhận được bao nhiêu lỗi và v.v.

Những SELECTs có thể cần thêm chỉ mục để hoạt động tốt, nếu đó là vấn đề.

Các vấn đề liên quan