Tôi có khoảng 60 GB tệp JSON mà tôi đang phân tích bằng Python và sau đó chèn vào cơ sở dữ liệu MySQL bằng Trình kết nối Python-MySQL. Mỗi tệp JSON là khoảng 500MBLow InnoDB Viết mỗi giây - AWS EC2 tới MySQL RDS bằng cách sử dụng Python
Tôi đã sử dụng phiên bản AWS r3.xlớn EC2 với ổ đĩa thứ cấp chứa 60 GB dữ liệu JSON.
Tôi sau đó sử dụng phiên bản AWS RDS r3.xlớn MySQL. Những trường hợp này là tất cả trong cùng một khu vực và vùng sẵn có. Ví dụ EC2 đang sử dụng tập lệnh Python sau để tải JSON, phân tích cú pháp và sau đó chèn nó vào trong RDS của MySQL. python của tôi:
import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os
os.chdir("./json_data")
for file in glob.glob("*.json"):
with open(file, 'rU') as data_file:
results = json.load(data_file)
print('working on file:', file)
cnx = mysql.connector.connect(user='', password='',
host='')
cursor = cnx.cursor(buffered=True)
DB_NAME = 'DB'
def create_database(cursor):
try:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
try:
cnx.database = DB_NAME
except mysql.connector.Error as err:
if err.errno == errorcode.ER_BAD_DB_ERROR:
create_database(cursor)
cnx.database = DB_NAME
else:
print(err)
exit(1)
add_overall_data = ("INSERT INTO master"
"(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
"VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")
add_polyline = ("INSERT INTO polyline"
"(Overview_polyline, request_no)"
"VALUES (%(Overview_polyline)s, %(request_no)s)")
add_summary = ("INSERT INTO summary"
"(summary, request_no)"
"VALUES (%(summary)s, %(request_no)s)")
add_warnings = ("INSERT INTO warnings"
"(warnings, request_no)"
"VALUES (%(warnings)s, %(request_no)s)")
add_waypoint_order = ("INSERT INTO waypoint_order"
"(waypoint_order, request_no)"
"VALUES (%(waypoint_order)s, %(request_no)s)")
add_leg_data = ("INSERT INTO leg_data"
"(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)"
"VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
error_messages = []
for result in results:
if result["status"] == "OK":
for leg in result['routes'][0]['legs']:
try:
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": leg['dtf']['value'],
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": leg['start_address'],
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": leg['end_address']
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
except KeyError, e:
error_messages.append(e)
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": "000",
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": 'unknown',
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": 'unknown'
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for overview_polyline in result['routes']:
params = {
"request_no": request_no,
"Overview_polyline": overview_polyline['overview_polyline']['points']
}
cursor.execute(add_polyline, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for summary in result['routes']:
params = {
"request_no": request_no,
"summary": summary['summary']
}
cursor.execute(add_summary, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for warnings in result['routes']:
params = {
"request_no": request_no,
"warnings": str(warnings['warnings'])
}
cursor.execute(add_warnings, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for waypoint_order in result['routes']:
params = {
"request_no": request_no,
"waypoint_order": str(waypoint_order['waypoint_order'])
}
cursor.execute(add_waypoint_order, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for steps in result['routes'][0]['legs'][0]['steps']:
params = {
"request_no": request_no,
"leg_dt": steps['dt']['value'],
"leg_ds": steps['ds']['value'],
"leg_O_l": steps['start_location']['lat'],
"leg_O_ln": steps['start_location']['lng'],
"leg_D_l": steps['end_location']['lat'],
"leg_D_ln": steps['end_location']['lng'],
"leg_html_inst": steps['html_instructions'],
"leg_polyline": steps['polyline']['points'],
"leg_travel_mode": steps['travel_mode']
}
cursor.execute(add_leg_data, params)
cnx.commit()
print('error messages:', error_messages)
cursor.close()
cnx.close()
print('finished' + file)
Sử dụng htop trên Linux Instance tôi có thể thấy như sau:
Về cơ sở dữ liệu MySQL, sử dụng MySQL Workbench Tôi có thể thấy rằng:
Kịch bản python này đã được chugging đi trong nhiều ngày nhưng tôi chỉ chèn khoảng 20% dữ liệu vào MySQL.
Câu hỏi của tôi - làm cách nào tôi có thể xác định nút cổ chai? Nó là kịch bản Python? Dường như nó đang sử dụng lượng bộ nhớ thấp - tôi có thể tăng điều này không? Tôi đã kiểm tra kích thước InnoDB đệm hồ bơi theo (How to improve the speed of InnoDB writes per second of MySQL DB) và thấy nó là lớn:
SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
| 11674845184 |
+---------------------------+
Kể từ khi tôi đang sử dụng một ví dụ RDS và EC2 ở cùng vùng với tôi không tin rằng có một nút cổ chai mạng . Con trỏ về nơi tôi nên tìm kiếm các khoản tiết kiệm lớn nhất sẽ rất hoan nghênh!
CHỈNH SỬA
Tôi nghĩ rằng tôi có thể gặp phải vấn đề. Đối với hiệu quả trong quá trình phân tích cú pháp, tôi viết từng cấp JSON riêng biệt. Tuy nhiên, sau đó tôi phải thực hiện một truy vấn để khớp một phần lồng nhau của JSON với cấp cao hơn của nó. Truy vấn này có chi phí thấp khi sử dụng cơ sở dữ liệu nhỏ. Ive nhận thấy rằng tốc độ của chèn đã giảm đáng kể trên db này. Điều này là do nó phải tìm kiếm một db lớn hơn và đang phát triển để kết nối đúng dữ liệu JSON.
Tôi không chắc chắn làm thế nào để giải quyết này khác hơn là chờ đợi nó ra ....
Bạn đã đề cập đến EC2 và RDS nằm trong cùng một khu vực; họ cũng có trong cùng một khu vực có sẵn không? Nếu không, đó có thể là một cách khá dễ dàng để xem các cải tiến hơn nữa. –
Có - xem xét điều đó. Cả hai đều trong cùng một vùng sẵn có – LearningSlowly
Bạn đã thử sử dụng IOP được cấp phép trên cá thể RDS chưa? – mickzer