# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
# import redis
from scrapy.conf import settings
import pymongo
import pandas as pd #用来读MySQL
# import mysql.connector
from scrapy.exceptions import DropItem
from news import items
import KafkaAPI
import pykafka
import json
import time
kafka = {}
# client = pykafka.KafkaClient(hosts="172.16.0.82:9092")
# print(client.topics)
class NewsPipeline(object):
# def process_item(self, item, spider):
# return item
def __init__(self):
## self.urls = db[settings['MONGODB_DOCNAME']].find({},{'url':1})
## print(self.urls)
# redis_db.flushdb()
# redis_db = redis.Redis(host='127.0.0.1', port=6379, db=4) #连接redis,相当于MySQL的conn
# redis_data_dict = "url_news" #key的名字,写什么都可以,这里的key相当于字典名称,而不是key值。
# redis_data_now_dict = 'url_now'
port = settings['MONGODB_PORT']
host = settings['MONGODB_HOST']
db_name = settings['MONGODB_DBNAME']
client = pymongo.MongoClient(host=host, port=port)
db = client[db_name]
self.col = db[settings['MONGODB_DOCNAME']]
# pass
def process_item(self, item, spider):
# class NewsPipeline(object):
# if redis_db.hexists(redis_data_now_dict,item['url']):
# # print('same_url_now: ',item['url'])
# raise DropItem("Dupliicate item found: %s" % item)
dictItem = dict(item)
# print('web1:',item['web'])
try:
self.col.insert(dictItem)
except Exception as e:
# print(e)
pass
# for t in (items.NewsItem,):
# if isinstance(item, t):
# dst_topic = 'NewsCrawler'
# # print('dst_topic: ',dst_topic)
# if dst_topic not in kafka:
# kafka[dst_topic] = KafkaAPI.KafkaAPI(dst_topic)
# # print(kafka)
# kafka[dst_topic].init_producer()
# # kafka[dst_topic].send(json.dumps({
# # 'title': item['title'],
# # 'publish_time': item['publish_time'],
# # 'src': item['src'],
# # 'url': item['url'],
# # 'content': item['content'],
# # }, ensure_ascii=False))
# # break
return item
class SendToKafka(object):
def process_item(self, item, spider):
if isinstance(item, items.NewsItem):
# print('hahhahahhahah')
# print('web2:',item['web'])
dst_topic = 'NewsCrawler3'
if dst_topic not in kafka:
kafka[dst_topic] = KafkaAPI.KafkaAPI(dst_topic)
kafka[dst_topic].init_producer()
try:
kafka[dst_topic].send(json.dumps({
'title': item['title'],
'publish_time': '',
'web': item['web'],
'url': item['url'],
'contents': item['contents'],
'last_update_time': item['last_update_time']
}, ensure_ascii=False))
except pykafka.producer.MessageSizeTooLarge:
item['msg_to_large'] = True
# print 'msg too large', item['url']
return item
def close_spider(self, spider):
for _, k in kafka.iteritems():
k.producer.stop()
昵称
0 声望
这家伙太懒,什么都没留下
阻塞,直到所有消息发送。
kafka客户端发送的时候是先到缓存队列,在发到kafka的。说明你还有消息没发送成功呢。
你的答案