2012-01-18 32 views
7

Tôi xác định một nhà xuất khẩu mặt hàng sẽ đẩy các mục vào hàng đợi thư. Dưới đây là mã.Xuất khẩu tùy chỉnh phế liệu

from scrapy.contrib.exporter import JsonLinesItemExporter 
from scrapy.utils.serialize import ScrapyJSONEncoder 
from scrapy import log 

from scrapy.conf import settings 

from carrot.connection import BrokerConnection, Exchange 
from carrot.messaging import Publisher 

log.start() 


class QueueItemExporter(JsonLinesItemExporter): 

    def __init__(self, **kwargs): 

     log.msg("Initialising queue exporter", level=log.DEBUG) 

     self._configure(kwargs) 

     host_name = settings.get('BROKER_HOST', 'localhost') 
     port = settings.get('BROKER_PORT', 5672) 
     userid = settings.get('BROKER_USERID', "guest") 
     password = settings.get('BROKER_PASSWORD', "guest") 
     virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/") 

     self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs) 

     log.msg("Connecting to broker", level=log.DEBUG) 
     self.q_connection = BrokerConnection(hostname=host_name, port=port, 
         userid=userid, password=password, 
         virtual_host=virtual_host) 
     self.exchange = Exchange("scrapers", type="topic") 
     log.msg("Connected", level=log.DEBUG) 

    def start_exporting(self): 
     spider_name = "test" 
     log.msg("Initialising publisher", level=log.DEBUG) 
     self.publisher = Publisher(connection=self.q_connection, 
         exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name) 
     log.msg("done", level=log.DEBUG) 

    def finish_exporting(self): 
     self.publisher.close() 

    def export_item(self, item): 
     log.msg("In export item", level=log.DEBUG) 
     itemdict = dict(self._get_serialized_fields(item)) 
     self.publisher.send({"scraped_data": self.encoder.encode(itemdict)}) 
     log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG) 

Tôi đang gặp một số sự cố. Các mục không được gửi đến hàng đợi. Ive đã thêm thông tin sau vào cài đặt của tôi:

FEED_EXPORTERS = { 
    "queue": 'scrapers.exporters.QueueItemExporter' 
} 

FEED_FORMAT = "queue" 

LOG_STDOUT = True 

Mã không gây ra bất kỳ lỗi nào và tôi không thể thấy bất kỳ thông báo ghi nhật ký nào. Im tại trí thông minh của tôi kết thúc về cách gỡ lỗi này.

Mọi trợ giúp sẽ được đánh giá cao.

+0

Tôi nghĩ rằng viết một đường ống mục sẽ đơn giản hơn cho mục đích này, và liên quan đến ít mã boilerplate, đó là một nguồn tiềm năng cho những sai lầm. Vì vậy, tôi sẽ refactor mã của bạn để làm việc như một đường ống dẫn, thay vì một nhà xuất khẩu mặt hàng tùy chỉnh. Xem [doc đường dẫn tài liệu] (http://doc.scrapy.org/en/latest/topics/item-pipeline.html) –

+1

Tôi đã viết một đường ống dẫn, nhưng suy nghĩ của tôi là, vì đây là cách tôi muốn đầu ra của tôi từ các scraper, các nhà xuất khẩu sẽ là một nơi tốt hơn để đặt nó. – zsquare

+0

@zsquare, có thành công nào trong vấn đề này không? Tôi biết đó là một bài cũ nhưng bạn đã làm gì? – Medeiros

Trả lời

5

"Nhà xuất khẩu nguồn cấp dữ liệu" là các lối tắt nhanh (và bằng cách nào đó bẩn) để gọi một số nhà xuất khẩu mặt hàng "chuẩn". Thay vì thiết lập một nước xuất khẩu thức ăn chăn nuôi từ cài đặt, dây cứng xuất khẩu mục tùy chỉnh của bạn để đường ống dẫn tùy chỉnh của bạn, như đã giải thích ở đây http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters:

from scrapy.xlib.pydispatch import dispatcher 
from scrapy import signals 
from scrapy.contrib.exporter import XmlItemExporter 

class MyPipeline(object): 

    def __init__(self): 
     ... 
     dispatcher.connect(self.spider_opened, signals.spider_opened) 
     dispatcher.connect(self.spider_closed, signals.spider_closed) 
     ... 

    def spider_opened(self, spider): 
     self.exporter = QueueItemExporter() 
     self.exporter.start_exporting() 

    def spider_closed(self, spider): 
     self.exporter.finish_exporting() 

    def process_item(self, item, spider): 
     # YOUR STUFF HERE 
     ... 
     self.exporter.export_item(item) 
     return item 
+0

Điều này có vẻ đầy hứa hẹn. Bất cứ ai đã thử nghiệm nó? – Medeiros

+3

Có gì "bẩn" về chúng? Tôi mới dùng Scrapy và tự hỏi mình câu hỏi giống như OP của @ zsquare, và tôi phải nói bình luận của anh ấy, "suy nghĩ của tôi là, vì đây là cách tôi muốn đầu ra từ scraper, nhà xuất khẩu sẽ là nơi tốt hơn để đặt nó, "phản ánh suy nghĩ của riêng tôi. – feuGene

1

Mẹo: ví dụ tốt để bắt đầu từ là Writing Items to MongoDb từ tài liệu chính thức.

Tôi đã thực hiện một điều tương tự. Tôi đã tạo ra một đường ống mà đặt mỗi mục vào một dịch vụ giống S3 (tôi đang sử dụng Minio ở đây, nhưng bạn có được ý tưởng). Nó tạo ra một cái xô mới cho mỗi con nhện và đặt mỗi vật phẩm vào một vật thể với một cái tên ngẫu nhiên. Mã nguồn đầy đủ có thể được tìm thấy trong my repo.

Bắt đầu với dấu ngoặc kép đơn giản nhện từ hướng dẫn:

import scrapy 

class QuotesSpider(scrapy.Spider): 
    name = "quotes" 
    start_urls = ['http://quotes.toscrape.com/page/1/', 
        'http://quotes.toscrape.com/page/1/'] 

    def parse(self, response): 
     for quote in response.css('div.quote'): 
      yield { 
        'text':quote.css('span.text::text').extract_first(), 
        'author':quote.css('span small::text').extract_first(), 
        'tags':quote.css('div.tags a.tag::text').extract() 
        } 
     next_page = response.css('li.next a::attr(href)').extract_first() 
     if next_page is not None: 
      next_page = response.urljoin(next_page) 
      yield scrapy.Request(next_page, callback=self.parse) 

Trong settings.py:

ITEM_PIPELINES = { 
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300, 
} 

Trong scrapy_quotes/pipelines.py tạo ra một đường ống và một nhà xuất khẩu mặt hàng:

import uuid 
from StringIO import StringIO 
from scrapy.contrib.exporter import BaseItemExporter 
from scrapy.conf import settings 
from scrapy import signals 
from scrapy.xlib.pydispatch import dispatcher 
from scrapy import log 
from scrapy.utils.python import to_bytes 
from scrapy.utils.serialize import ScrapyJSONEncoder 

class S3ItemExporter(BaseItemExporter): 
    def __init__(self, bucket, **kwargs): 
     self._configure(kwargs) 
     self.bucket = bucket 
     kwargs.setdefault('ensure_ascii', not self.encoding) 
     self.encoder = ScrapyJSONEncoder(**kwargs) 

    def start_exporting(self): 
     self.client = connect() 
     create_bucket(self.client, self.bucket) 

    def finish_exporting(self): 
     log.msg("Done from S3 item exporter", level=log.DEBUG) 

    def export_item(self, item): 
     log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG) 
     itemdict = dict(self._get_serialized_fields(item)) 
     data = self.encoder.encode(itemdict) 
     size = len(data) 
     object_data = StringIO(data) 
     name = str(uuid.uuid4()) 
     put_object(self.client, self.bucket, name, object_data, size) 


class ScrapyQuotesPipeline(object): 
    """Export scraped items, to different buckets, 
    one per spider""" 
    @classmethod 
    def from_crawler(cls, crawler): 
     pipeline = cls() 
     crawler.signals.connect(pipeline.spider_opened, signals.spider_opened) 
     crawler.signals.connect(pipeline.spider_closed, signals.spider_closed) 
     return pipeline 

    def spider_opened(self, spider): 
     self.exporter = S3ItemExporter(spider.name) 
     self.exporter.start_exporting() 

    def spider_closed(self, spider): 
     self.exporter.finish_exporting() 

    def process_item(self, item, spider): 
     self.exporter.export_item(item) 
     return item 

# S3 Related 
from minio import Minio 
import os 
from minio.error import ResponseError 
def connect(): 
    return Minio('192.168.1.111:9000', 
      access_key='0M6PYKBBAVQVQGVWVZKQ', 
      secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8', 
      secure=False) 

def create_bucket(client, name): 
    client.make_bucket(name) 

def put_object(client, bucket_name, object_name, object_data, size): 
    client.put_object(bucket_name, object_name, object_data, size) 
0

tôi nhấn cùng một vấn đề, mặc dù có thể là một số bản cập nhật phiên bản phía trước. Chi tiết nhỏ đã giải quyết nó cho tôi là thiết lập FEED_URI = "something" trong settings.py. Nếu không có điều này, mục nhập trong FEED_EXPORTERS không được tôn trọng chút nào.

Các vấn đề liên quan