爬取的主要目标就是从非结构性的数据源提取结构性数据,使用 Item 容易可以将采集来的数据进行不同的操作。使用的 Items 数据项操作分3种:
Scrapy 版本:2.6+
在这里插入图片描述
Items 提供了一个可以读取、写入、修改的数据的字典供使用。
from scrapy.item import Item, Field
class PeopleItem(Item):
name_field = Field()
age_field = Field()
......
dataclass objects 支持序列化定义项目数据中的数据类型。
from dataclasses import dataclass
@dataclass
class PeopleItem:
name_field: str
age_field: int
attrs objects 支持序列化转换数属性。
import attr
@attr.s
class PeopleItem:
name_field = attr.ib(str)
age_field = attr.ib(convert=float)
在这里插入图片描述
定义采集内容的列表字段名方法,采集后的数据会按照列联表的方式填充到数据保存的位置。
import scrapy
class People(scrapy.Item):
name = scrapy.Field()
age= scrapy.Field()
wuli= scrapy.Field()
zhili = scrapy.Field()
zhengzhi = scrapy.Field()
tongshuai = scrapy.Field()
updated_time = scrapy.Field(serializer=str)
创建 Items。
people= People(name='曹操', age=60)
print(people)
people(name='曹操', age=60)
获取Items的值。
people['name']
'曹操'
people.get('name')
'曹操'
people['price']
60
未查找到定义字段提示报错,同字典。
# 一般错误提示,同字典报错
people['data'] # 获取未定义的字段值
Traceback (most recent call last):
...
KeyError: 'data'
设置Items的值。
people['updated_time'] = 'today'
people['updated_time']
today
字典操作Items。
people.keys()
['name', 'age','updated_time']
people.items()
[('曹操', 60,'today')]
复制 Items。
people2 = people.copy()
people2 = People(people)
字典创建Items。
People({'name': '曹操', 'age': 60})
people(name='曹操', age=60)
直接定义数据类型扩展。
class People_(People):
new_field_1 = scrapy.Field(serializer=str)
new_field_2 = scrapy.Field()
使用序列化的方式进行定义数据类型。
class People_(People):
name = scrapy.Field(People.fields['name'])
from xxxx.items import People
def parse(self, response):
item = People()
item["name"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["age"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["wuli"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["zhili"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["zhengzhi"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["tongshuai"]= response.xpath('//div[@class="xxx"]/text()').extract()
item["updated_time"]= response.xpath('//div[@class="xxx"]/text()').extract()
yield item
在这里插入图片描述
import scrapy
class People(scrapy.Item):
name = scrapy.Field()
age= scrapy.Field()
wuli= scrapy.Field()
zhili = scrapy.Field()
zhengzhi = scrapy.Field()
tongshuai = scrapy.Field()
updated_time = scrapy.Field(serializer=str)
from scrapy.loader import ItemLoader
from xxxx.items import People
def parse(self, response):
l = ItemLoader(item=People(), response=response)
l.add_xpath('name', '//div[@class="name"]')
l.add_xpath('age', '//div[@class="age"]')
l.add_xpath('wuli', '//p[@class="wuli"]')
l.add_xpath('zhili', '//p[@class="zhili"]')
l.add_xpath('zhengzhi', '//p[@class="zhengzhi"]')
l.add_xpath('tongshuai', '//p[@class="tongshuai"]')
l.add_xpath('updated_time', '//p[@class="updated_time"]')
return l.load_item()
定义数据的字段类型和默认值。
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class InventoryItem:
name: Optional[str] = field(default=None)
age: Optional[int] = field(default=None)
wuli: Optional[int] = field(default=None)
zhili: Optional[int] = field(default=None)
zhengzhi: Optional[int] = field(default=None)
tongshuai: Optional[int] = field(default=None)
updated_time: Optional[str] = field(default=None)
每个 Item Loader 对每个 Field 都有一个输入处理器和一个输出处理器。 输入处理器在数据被接受到时执行,当数据收集完后调用 ItemLoader.load_item() 时再执行输出处理器,返回最终结果。
l = ItemLoader(People(), some_selector)
l.add_xpath('name', xpath1)
l.add_xpath('age', xpath2)
......
return l.load_item()
通过 _in 和 _out 后缀来定义输入和输出处理器,并且还可以定义默认的ItemLoader.default_input_processor 和 ItemLoader.default_input_processor。
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join
class PeopleLoader(ItemLoader):
default_output_processor = TakeFirst()
name_in = MapCompose(unicode.title)
name_out = Join()
age_in = MapCompose(unicode.strip)
......
import scrapy
from scrapy.loader.processors import Join, MapCompose, TakeFirst
from w3lib.html import remove_tags
# age 过滤方法
def filter_age(value):
if value.isdigit():
return value
class People(scrapy.Item):
name = scrapy.Field(
input_processor=MapCompose(remove_tags),
output_processor=Join(),
)
age= scrapy.Field(
input_processor=MapCompose(remove_tags, filter_age),
output_processor=TakeFirst(),
)
Item Loader 上下文被所有输入/输出处理器共享。
def parse_length(text, loader_context):
age = loader_context.get('unit', '岁')
# ... 这里写入长度解析代码 ...
return parsed_length
# 初始化和修改上下文的值
loader = ItemLoader(people)
loader.context['unit'] = '岁'
loader = ItemLoader(people, age='岁')
class ProductLoader(ItemLoader):
length_out = MapCompose(parse_length, age='岁')
基于公共的设置进行操作删除指定符号,建议在后期的数据清洗部分处理。
from itemloaders.processors import MapCompose
from xxxx.ItemLoaders import PepleLoader
def strip_dashes(x):
return x.strip('-')
class SiteSpecificLoader(PepleLoader):
name_in = MapCompose(strip_dashes, PepleLoader.name_in)
每个项目管道组件都是一个Python类。
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class AgePipeline:
vat_factor = 1.1
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if adapter.get('age'):
if adapter.get('age'):
adapter['age'] = adapter['age'] * self.vat_factor
return item
else:
raise DropItem(f"Missing age in {item}")
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self, spider):
self.file = open('items.json', 'w')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(ItemAdapter(item).asdict()) + "
"
self.file.write(line)
return item
import pymongo
from itemadapter import ItemAdapter
class MongoPipeline:
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
return item
import hashlib
from urllib.parse import quote
import scrapy
from itemadapter import ItemAdapter
class ScreenshotPipeline:
"""
每个Scrapy项目使用Splash渲染屏幕截图的管道
"""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
async def process_item(self, item, spider):
adapter = ItemAdapter(item)
encoded_item_url = quote(adapter["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
response = await spider.crawler.engine.download(request, spider)
if response.status != 200:
return item
# Save screenshot to file, filename will be hash of url.
url = adapter["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = f"{url_hash}.png"
with open(filename, "wb") as f:
f.write(response.body)
# Store filename in item.
adapter["screenshot_filename"] = filename
return item
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if adapter['id'] in self.ids_seen:
raise DropItem(f"重复 item found: {item!r}")
else:
self.ids_seen.add(adapter['id'])
return item
必须在 settings.py 中设置,否则抓取数据无法处理。
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
留言与评论(共有 0 条评论) “” |