Scrapy 2.6 Items 数据项定义、加载、传输使用指南

爬取的主要目标就是从非结构性的数据源提取结构性数据,使用 Item 容易可以将采集来的数据进行不同的操作。使用的 Items 数据项操作分3种:

  • Items 数据项: 数据爬取过程中从非结构化源(通常是网页)中提取结构化数据,需要实现定义抓取的数据字段信息就要使用到 Item。
  • Item Loaders 数据项加载: 数据采集过程中 Item 为抓取的数据提供的容器,使用 Item Loader 可以非常方便的将数据输入填充到容器中。
  • Item Pipeline 数据项管道: 数据采集过程中用于处理通过 Scrapy 抓取来的数据的传输通道。

Scrapy 版本:2.6+

Scrapy 2.6 Items 数据项定义、加载、传输使用指南

在这里插入图片描述

Items 数据项定义

Items 提供了一个可以读取、写入、修改的数据的字典供使用。

  • dictionaries:数据类型是字典。
  • Item objects:拥有与字典相同的操作。
from scrapy.item import Item, Field

class PeopleItem(Item):
    name_field = Field()
    age_field = Field()
    ......

dataclass objects 支持序列化定义项目数据中的数据类型。

from dataclasses import dataclass

@dataclass
class PeopleItem:
    name_field: str
    age_field: int

attrs objects 支持序列化转换数属性。

import attr

@attr.s
class PeopleItem:
    name_field = attr.ib(str)
    age_field = attr.ib(convert=float)

Items 基本使用


Scrapy 2.6 Items 数据项定义、加载、传输使用指南

在这里插入图片描述


声明字段

定义采集内容的列表字段名方法,采集后的数据会按照列联表的方式填充到数据保存的位置。

import scrapy

class People(scrapy.Item):
    name = scrapy.Field()
    age= scrapy.Field()
    wuli= scrapy.Field()
    zhili = scrapy.Field()
    zhengzhi = scrapy.Field()
    tongshuai = scrapy.Field()
    updated_time = scrapy.Field(serializer=str)

字段数据

创建 Items。

people= People(name='曹操', age=60)
print(people)
people(name='曹操', age=60)

获取Items的值。

people['name']
'曹操'

people.get('name')
'曹操'

people['price']
60

未查找到定义字段提示报错,同字典。

# 一般错误提示,同字典报错
people['data'] # 获取未定义的字段值

Traceback (most recent call last):
    ...
KeyError: 'data'

设置Items的值。

people['updated_time'] = 'today'
people['updated_time']
today

字典操作Items。

people.keys()
['name', 'age','updated_time']

people.items()
[('曹操', 60,'today')]

复制 Items。

people2 = people.copy()

people2 = People(people)

字典创建Items。

People({'name': '曹操', 'age': 60})
people(name='曹操', age=60)

直接定义数据类型扩展。

class People_(People):
    new_field_1 = scrapy.Field(serializer=str)
    new_field_2 = scrapy.Field()

使用序列化的方式进行定义数据类型。

class People_(People):
    name = scrapy.Field(People.fields['name'])

Spider 中应用

from xxxx.items import People

def parse(self, response):
    item = People()
    item["name"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["age"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["wuli"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["zhili"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["zhengzhi"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["tongshuai"]= response.xpath('//div[@class="xxx"]/text()').extract()
    item["updated_time"]= response.xpath('//div[@class="xxx"]/text()').extract()
    yield item


Scrapy 2.6 Items 数据项定义、加载、传输使用指南

在这里插入图片描述


ItemLoader 数据加载器的定义

  • item: 加载器解析的Item对象。
  • context: ItemLoader的数据详细内容。
  • default_item_class: 实例化方法。
  • default_input_processor: 指定字段的默认输入处理器。
  • default_output_processor: 指定字段的默认输出处理器。
  • default_selector_class: 用于构造 selector 中的 ItemLoader。
  • selecto: 在 Selector 对象中提取数据。
  • add_css (field_name, CSS, *processors, kw):** 用于提取数据,通过 processors 和 kwargs 的处理后,存储到 field_name。
  • add_value(field_name, value, *processors, kw):** 处理添加给定的 value 对于给定的字段。
  • add_xpath(field_name, value, *processors, kw):** 用于与此相关的选择器中提取字符串列表。
  • get_collected_values(field_name):** 返回字段的收集值。
  • get_css (CSS, *processors, kw):** 用于从与此相关的选择器中提取 Unicode 字符串列表。
  • get_output_value(field_name): 返回使用输出处理器为给定字段解析的收集值。
  • get_value(value, *processors, kw):** 处理给定的 value 被给予processors 关键字参数。
  • get_xpath(XPath, *processors, kw):** 用于与此相关的选择器中提取 Unicode 字符串的列表。
  • load_item(): 收集的数据填充并返回。
  • nested_css(CSS, context):** 使用 CSS 选择器创建嵌套加载器。
  • nested_xpath (XPath, context):** 使用XPath选择器创建嵌套加载器。
  • replace_css(field_name, CSS, *processors, kw):** 类似于add_css()但取代收集的数据而不是添加数据。
  • replace_value(field_name, 价值, *processors, kw):** 类似于 add_value() 但是将收集到的数据替换为新值,而不是添加它。
  • replace_xpath(field_name, XPath, *processors, kw):** 类似于 add_xpath() 但取代收集的数据而不是添加数据。

ItemLoader 基本使用

定义 Items

import scrapy

class People(scrapy.Item):
    name = scrapy.Field()
    age= scrapy.Field()
    wuli= scrapy.Field()
    zhili = scrapy.Field()
    zhengzhi = scrapy.Field()
    tongshuai = scrapy.Field()
    updated_time = scrapy.Field(serializer=str)

Spider 中加载填充数据

from scrapy.loader import ItemLoader
from xxxx.items import People

def parse(self, response):
    l = ItemLoader(item=People(), response=response)
    l.add_xpath('name', '//div[@class="name"]')
    l.add_xpath('age', '//div[@class="age"]')
    l.add_xpath('wuli', '//p[@class="wuli"]')
    l.add_xpath('zhili', '//p[@class="zhili"]')
    l.add_xpath('zhengzhi', '//p[@class="zhengzhi"]')
    l.add_xpath('tongshuai', '//p[@class="tongshuai"]')
    l.add_xpath('updated_time', '//p[@class="updated_time"]')

    return l.load_item()

处理数据集项

定义数据的字段类型和默认值。

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class InventoryItem:
    name: Optional[str] = field(default=None)
    age: Optional[int] = field(default=None)
    wuli: Optional[int] = field(default=None)
    zhili: Optional[int] = field(default=None)
    zhengzhi: Optional[int] = field(default=None)
    tongshuai: Optional[int] = field(default=None)
    updated_time: Optional[str] = field(default=None)

输入/输出处理器

每个 Item Loader 对每个 Field 都有一个输入处理器和一个输出处理器。 输入处理器在数据被接受到时执行,当数据收集完后调用 ItemLoader.load_item() 时再执行输出处理器,返回最终结果。

l = ItemLoader(People(), some_selector)
l.add_xpath('name', xpath1) 
l.add_xpath('age', xpath2) 
......
return l.load_item()

Item Loader 声明

通过 _in 和 _out 后缀来定义输入和输出处理器,并且还可以定义默认的ItemLoader.default_input_processor 和 ItemLoader.default_input_processor。

from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join

class PeopleLoader(ItemLoader):
    default_output_processor = TakeFirst()

    name_in = MapCompose(unicode.title)
    name_out = Join()

    age_in = MapCompose(unicode.strip)
    ......

在Field定义中声明输入/输出处理器

  • 在 Item Loader 中定义的 field_in 和 field_out 。
  • Filed元数据( input_processor 和 output_processor 关键字)
import scrapy
from scrapy.loader.processors import Join, MapCompose, TakeFirst
from w3lib.html import remove_tags

# age 过滤方法
def filter_age(value):
    if value.isdigit():
        return value

class People(scrapy.Item):
    name = scrapy.Field(
        input_processor=MapCompose(remove_tags),
        output_processor=Join(),
    )
    age= scrapy.Field(
        input_processor=MapCompose(remove_tags, filter_age),
        output_processor=TakeFirst(),
    )

Item Loader 上下文

Item Loader 上下文被所有输入/输出处理器共享。

def parse_length(text, loader_context):
    age = loader_context.get('unit', '岁')
    # ... 这里写入长度解析代码 ...
    return parsed_length

# 初始化和修改上下文的值
loader = ItemLoader(people)
loader.context['unit'] = '岁'

loader = ItemLoader(people, age='岁')

class ProductLoader(ItemLoader):
    length_out = MapCompose(parse_length, age='岁')

Item Loaders 重写和拓展

基于公共的设置进行操作删除指定符号,建议在后期的数据清洗部分处理。

from itemloaders.processors import MapCompose
from xxxx.ItemLoaders import PepleLoader

def strip_dashes(x):
    return x.strip('-')

class SiteSpecificLoader(PepleLoader):
    name_in = MapCompose(strip_dashes, PepleLoader.name_in)

pipeline 基础方法

每个项目管道组件都是一个Python类。

  1. process_item(self, item, spider): 用于处理 Items 定义的内容。
  2. open_spider(self, spider): 打开 Spider 时调用此方法。
  3. close_spider(self, spider): 关闭 Spider 时调用此方法。
  4. from_crawler(cls, crawler): 当创建一个 pipline 实例的时候该方法会被调用,必须返回一个pipline实例对象,一般在项目 setting 中配置。

pipeline 基本使用

用于调整age数据示例

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class AgePipeline:
    vat_factor = 1.1
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if adapter.get('age'):
            if adapter.get('age'):
                adapter['age'] = adapter['age'] * self.vat_factor
            return item
        else:
            raise DropItem(f"Missing age in {item}")

数据写入JSON文件

import json

from itemadapter import ItemAdapter

class JsonWriterPipeline:

    def open_spider(self, spider):
        self.file = open('items.json', 'w')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(ItemAdapter(item).asdict()) + "
"
        self.file.write(line)
        return item

数据写入写入MongoDB

import pymongo
from itemadapter import ItemAdapter

class MongoPipeline:

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
        return item

页面截图

import hashlib
from urllib.parse import quote

import scrapy
from itemadapter import ItemAdapter

class ScreenshotPipeline:
    """
    每个Scrapy项目使用Splash渲染屏幕截图的管道
    """

    SPLASH_URL = "http://localhost:8050/render.png?url={}"

    async def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        encoded_item_url = quote(adapter["url"])
        screenshot_url = self.SPLASH_URL.format(encoded_item_url)
        request = scrapy.Request(screenshot_url)
        response = await spider.crawler.engine.download(request, spider)

        if response.status != 200:
            return item

        # Save screenshot to file, filename will be hash of url.
        url = adapter["url"]
        url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
        filename = f"{url_hash}.png"
        with open(filename, "wb") as f:
            f.write(response.body)

        # Store filename in item.
        adapter["screenshot_filename"] = filename
        return item

数据重复过滤

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DuplicatesPipeline:

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if adapter['id'] in self.ids_seen:
            raise DropItem(f"重复 item found: {item!r}")
        else:
            self.ids_seen.add(adapter['id'])
            return item

pipeline激活方法

必须在 settings.py 中设置,否则抓取数据无法处理。

ITEM_PIPELINES = {
    'myproject.pipelines.PricePipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 800,
}


Scrapy 2.6 Items 数据项定义、加载、传输使用指南



发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章