2017-01-24 18 views
5

Tôi đang cố gắng tự động tải một csv vào bảng MySQL khi nó được nhận vào thùng S3.Đọc csv từ S3 và chèn vào bảng MySQL với AWS Lambda

Chiến lược của tôi là S3 khởi chạy một sự kiện khi nó nhận tệp vào một nhóm được chỉ định (hãy gọi nó là 'nhóm-tệp'). Đây là sự kiện được thông báo đến một hàm AWS Lambda sẽ tải xuống và xử lý tệp chèn mỗi hàng vào một bảng MySql (hãy gọi nó là 'target_table').

Chúng tôi phải xem xét rằng RDS nằm trong VPC.

Cấu hình cho phép hiện tại của xô là:

{ 
    "Version": "2008-10-17", 
    "Statement": [ 
     { 
      "Sid": "PublicReadForGetBucketObjects", 
      "Effect": "Allow", 
      "Principal": { 
       "AWS": "*" 
      }, 
      "Action": "s3:GetObject", 
      "Resource": "arn:aws:s3:::bucket-file/*" 
     } 
    ] 
} 

tôi đã tạo ra một vai trò với các chính sách sau, AmazonS3FullAccess và AWSLambdaVPCAccessExecutionRole gắn liền với chức năng AWS Lambda.

Mã lambda là:

from __future__ import print_function 
import boto3 
import logging 
import os 
import sys 
import uuid 
import pymysql 
import csv 
import rds_config 


rds_host = rds_config.rds_host 
name = rds_config.db_username 
password = rds_config.db_password 
db_name = rds_config.db_name 


logger = logging.getLogger() 
logger.setLevel(logging.INFO) 

try: 
    conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5) 
except Exception as e: 
    logger.error("ERROR: Unexpected error: Could not connect to MySql instance.") 
    logger.error(e) 
    sys.exit() 

logger.info("SUCCESS: Connection to RDS mysql instance succeeded") 

s3_client = boto3.client('s3') 

def handler(event, context): 

    bucket = event['Records'][0]['s3']['bucket']['name'] 
    key = event['Records'][0]['s3']['object']['key'] 
    download_path = '/tmp/{}{}'.format(uuid.uuid4(), key) 

    s3_client.download_file(bucket, key,download_path) 

    csv_data = csv.reader(file(download_path)) 

    with conn.cursor() as cur: 
     for idx, row in enumerate(csv_data): 

      logger.info(row) 
      try: 
       cur.execute('INSERT INTO target_table(column1, column2, column3)' \ 
           'VALUES("%s", "%s", "%s")' 
           , row) 
      except Exception as e: 
       logger.error(e) 

      if idx % 100 == 0: 
       conn.commit() 

     conn.commit() 

    return 'File loaded into RDS:' + str(download_path) 

Tôi đã thử nghiệm chức năng và S3 sẽ gửi sự kiện khi một tập tin được tải lên, Lambda kết nối với các trường hợp RDS và nhận được thông báo. Tôi đã kiểm tra rằng tên nhóm là 'bucket-file' và tên tệp cũng đúng. Vấn đề là khi hàm đạt đến dòng s3_client.download_file(bucket, key,download_path) nơi nó bị kẹt cho đến khi đạt đến thời gian hết hạn lamdba.

Xem các bản ghi nó nói:

[INFO] 2017-01-24T14:36:52.102Z SUCCESS: Connection to RDS mysql instance succeeded 
[INFO] 2017-01-24T14:36:53.282Z Starting new HTTPS connection (1): bucket-files.s3.amazonaws.com 
[INFO] 2017-01-24T14:37:23.223Z Starting new HTTPS connection (2): bucket-files.s3.amazonaws.com 
2017-01-24T14:37:48.684Z Task timed out after 60.00 seconds 

Tôi cũng đã đọc rằng nếu bạn đang làm việc trong một VPC, để truy cập vào xô S3 bạn phải tạo một Endpoint VPC đó cấp quyền truy cập để S3 cho mạng con này. Tôi cũng đã thử giải pháp này nhưng kết quả là như nhau.

Tôi đánh giá cao một số ý tưởng.

Cảm ơn trước!

Trả lời

2

Cuối cùng tôi đã nhận được nó!

Vấn đề là vấn đề VPC. Như tôi đã nói, tôi đã tạo một VPC Endpoint để làm cho dịch vụ S3 có thể truy cập vào biểu mẫu VPC của tôi, nhưng tôi đã định cấu hình sai bảng định tuyến của mình. Vì vậy, trong kết luận, nếu bạn đang làm việc trong một VPC với lambda và bạn muốn truy cập vào S3, bạn cần tạo một VPC Endpoint (điểm cuối VPC). Bên cạnh đó, nếu bạn muốn truy cập bất kỳ dịch vụ internet nào khác ngoài VPC, bạn cần cấu hình một NAT Gateway.

Các vấn đề liên quan