合法合规前提
遵守规则
- 查看网站的
robots.txt - 遵守网站的使用条款
- 避免过度频繁访问
- 尊重版权和隐私
技术方案选择
静态网页爬取
import requests
from bs4 import BeautifulSoup
import pandas as pd
headers = {
'User-Agent': 'Mozilla/5.0...',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
# 数据提取
data = []
for item in soup.select('.item-class'):= item.select_one('.title').text
price = item.select_one('.price').text
data.append({'title': title, 'price': price})
动态网页爬取
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# 使用Selenium处理JavaScript渲染
driver = webdriver.Chrome()
driver.get(url)
# 等待元素加载
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "target-class"))
)
# 获取渲染后的页面源码
html = driver.page_source
API接口调用
import requests
import json
# 分析网站API
api_url = "https://api.example.com/data"
params = {
'page': 1,
'limit': 20,
'timestamp': '...'
}
response = requests.get(api_url, params=params, headers=headers)
data = response.json()
关键技术要点
请求管理
# 会话管理
session = requests.Session()
session.headers.update(headers)
# 代理设置
proxies = {
'http': 'http://your-proxy:port',
'https': 'https://your-proxy:port'
}
# 延迟设置
import time
time.sleep(2) # 避免过快请求
反爬虫应对
# 随机User-Agent
from fake_useragent import UserAgent
ua = UserAgent()
headers['User-Agent'] = ua.random
# 使用cookies
session = requests.Session()
session.cookies.update({'key': 'value'})
# 处理验证码(需要OCR或第三方服务)
数据存储
# 多种存储方式
import csv
import json
import sqlite3
# CSV存储
with open('data.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
writer.writerows(data)
# JSON存储
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
高级方案
Scrapy框架
import scrapy
class MySpider(scrapy.Spider):
name = 'myspider'
start_urls = ['http://example.com']
def parse(self, response):
# 解析逻辑
yield {
'title': response.css('h1::text').get(),
'content': response.css('.content::text').get()
}
异步爬取
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
最佳实践
项目结构
crawler-project/
├── spiders/
│ ├── base_spider.py
│ └── specific_spider.py
├── utils/
│ ├── proxy_pool.py
│ └── user_agent.py
├── data/
│ ├── raw/
│ └── processed/
├── config.py
└── main.py
错误处理
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
# 重试逻辑
retry_count = 0
while retry_count < 3:
time.sleep(2 ** retry_count)
retry_count += 1
监控和日志
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 记录爬取状态
logging.info(f"已爬取 {count} 条数据")
伦理和法律建议
-
数据使用限制

- 仅用于合法目的
- 不侵犯隐私
- 遵守数据保护法规(如GDPR)
-
技术限制
- 设置合理的爬取频率
- 不绕过安全机制
- 不爬取敏感数据
替代方案
如果网站反爬虫措施严格,可以考虑:
- 官方API:查看是否提供官方接口
- 数据合作:联系网站获取数据合作
- 第三方数据服务:购买合法数据服务
- 公开数据集:使用已有的公开数据集
需要根据具体需求选择合适的爬取方案,如果是商业用途,建议咨询法律专业人士。
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。