我作为生产者用scrapy写了一个爬虫爬取数据,发送到kafka,但在程序运行结束一直卡在INFO:blocking until all message are sent.过不去,为什么呢?

freedom 发表于: 2018-08-17   最后更新时间: 2018-08-17 23:15:44   5,812 游览
# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
# import redis
from scrapy.conf import settings
import pymongo
import pandas as pd  #用来读MySQL
# import mysql.connector
from scrapy.exceptions import DropItem
from news import items
import KafkaAPI
import pykafka
import json
import time

kafka = {}
# client = pykafka.KafkaClient(hosts="172.16.0.82:9092")
# print(client.topics)
class NewsPipeline(object):
    # def process_item(self, item, spider):
    #     return item

    def __init__(self):
        ## self.urls = db[settings['MONGODB_DOCNAME']].find({},{'url':1})
        ## print(self.urls)
        # redis_db.flushdb()

    # redis_db = redis.Redis(host='127.0.0.1', port=6379, db=4) #连接redis,相当于MySQL的conn
    # redis_data_dict = "url_news"  #key的名字,写什么都可以,这里的key相当于字典名称,而不是key值。
    # redis_data_now_dict = 'url_now'
        port = settings['MONGODB_PORT']
        host = settings['MONGODB_HOST']
        db_name = settings['MONGODB_DBNAME']
        client = pymongo.MongoClient(host=host, port=port)
        db = client[db_name]
        self.col = db[settings['MONGODB_DOCNAME']]
        # pass

    def process_item(self, item, spider):



        # class NewsPipeline(object):
        # if redis_db.hexists(redis_data_now_dict,item['url']):
        #     # print('same_url_now: ',item['url'])
        #     raise DropItem("Dupliicate item found: %s" % item)
        dictItem = dict(item)
        # print('web1:',item['web'])
        try:
            self.col.insert(dictItem)
        except Exception as e:
            # print(e)
            pass

        # for t in (items.NewsItem,):
        #     if isinstance(item, t):
        #         dst_topic = 'NewsCrawler'
        #         # print('dst_topic: ',dst_topic)
        #         if dst_topic not in kafka:
        #             kafka[dst_topic] = KafkaAPI.KafkaAPI(dst_topic)
        #             # print(kafka)
        #             kafka[dst_topic].init_producer()
        #         # kafka[dst_topic].send(json.dumps({
        #         #     'title': item['title'],
        #         #     'publish_time': item['publish_time'],
        #         #     'src': item['src'],
        #         #     'url': item['url'],
        #         #     'content': item['content'],
        #         # }, ensure_ascii=False))
        #         # break
        return item



class SendToKafka(object):

    def process_item(self, item, spider):
        if isinstance(item, items.NewsItem):
            # print('hahhahahhahah')
            # print('web2:',item['web'])
            dst_topic = 'NewsCrawler3'
            if dst_topic not in kafka:
                kafka[dst_topic] = KafkaAPI.KafkaAPI(dst_topic)
                kafka[dst_topic].init_producer()
            try:
                kafka[dst_topic].send(json.dumps({
                    'title': item['title'],
                    'publish_time': '',
                    'web': item['web'],
                    'url': item['url'],
                    'contents': item['contents'],
                    'last_update_time': item['last_update_time']
                }, ensure_ascii=False))
            except pykafka.producer.MessageSizeTooLarge:
                item['msg_to_large'] = True
                # print 'msg too large', item['url']
        return item

    def close_spider(self, spider):
        for _, k in kafka.iteritems():
            k.producer.stop()
发表于 2018-08-17
添加评论

阻塞,直到所有消息发送。
kafka客户端发送的时候是先到缓存队列,在发到kafka的。说明你还有消息没发送成功呢。

你的答案

查看kafka相关的其他问题或提一个您自己的问题