[特殊字符] 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)

[特殊字符] 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)
从零搭一个淘宝商品价格监控系统TOP API 定时任务 微信推送附Python源码用淘宝官方taobao.item.get定时拉商品价格 → 降价/涨价超阈值 →推企业微信/飞书/钉钉 Webhook。全程合法、无爬虫、可7×24运行。一、监控架构┌──────────────┐ 每N分钟 ┌────────────────────┐ │ APScheduler │ ──────────────▶ │ TOP API │ │ 定时任务 │ │ taobao.item.get │ └──────┬───────┘ └─────────┬──────────┘ │ 价格变化 Δ threshold │ ▼ │ ┌──────────────────┐ Webhook │ │ 企业微信/钉钉Bot │ ◀────────────────────┘ └──────────────────┘ (Markdown消息)二、完整可运行代码# taobao_price_monitor.py 淘宝商品价格监控系统 - 跟踪关注商品(num_iid)价格 - 变化幅度超 threshold 推企业微信机器人 - SQLite 本地存快照首次建表自动 依赖: requests (pip install requests) import hashlib import time import requests import sqlite3 import logging from typing import Dict, Optional from datetime import datetime # 封装好API供应商demo urlhttps://console.open.onebound.cn/console/?iLex # ── 配置区 ────────────────────────────────────── APP_KEY YOUR_TOP_APP_KEY APP_SECRET YOUR_TOP_APP_SECRET SANDBOX False # 生产False CHECK_INTERVAL_SEC 1800 # 30分钟 PRICE_CHANGE_THRESHOLD 0.05 # 涨跌超5%触发告警 WECOM_BOT_WEBHOOK ( # 企业微信群机器人Webhook https://qyapi.weixin.qq.com/cgi-bin/webhook/send?keyYOUR_KEY ) WATCH_LIST [ # 监控商品列表 {num_iid: 654321098765, alias: 304不锈钢保温杯 500ml}, # {num_iid: 612345678901, alias: 其它监控商品}, ] # ──────────────────────────────────────────────── # TOP Client (内联最小化版) class TopClient: GW_PROD https://gw.api.taobao.com/router/rest GW_SBOX https://gw.api.tbsandbox.com/router/rest def __init__(self, ak, ask, sandboxFalse): self.ak, self.ask ak, ask self.gw self.GW_SBOX if sandbox else self.GW_PROD def _sign(self, p: Dict) - str: filt sorted((k, v) for k, v in p.items() if v is not None and str(v).strip() ! and k ! sign) qs .join(f{k}{v} for k, v in filt) return hashlib.md5(f{self.ask}{qs}{self.ask}.encode()).hexdigest().upper() def call(self, method: str, biz: Dict, sessionNone) - Dict: p {method: method, app_key: self.ak, timestamp: str(int(time.time() * 1000)), format: json, v: 2.0, sign_method: md5} if session: p[session] session p.update(biz) p[sign] self._sign(p) r requests.post(self.gw, datap, timeout15) r.raise_for_status() d r.json() if error_response in d: err d[error_response] raise Exception(fTOP[{err.get(code)}]: {err.get(msg)} {err.get(sub_msg,)}) return d.get(list(d.keys() - {error_response})[0], {}) def get_price(self, num_iid: str) - float: resp self.call(taobao.item.get, { num_iid: num_iid, fields: num_iid,title,price,approve_status }) item resp.get(item, {}) if item.get(approve_status) ! onsale: raise ValueError(f商品{num_iid} 已下架/仓库中) return float(item.get(price) or 0) # # SQLite 价格快照 def init_db(db_pathprice_monitor.db): conn sqlite3.connect(db_path) conn.execute( CREATE TABLE IF NOT EXISTS price_snapshot( num_iid TEXT PRIMARY KEY, last_price REAL, updated_at TEXT ) ) conn.commit() return conn def load_last_price(conn, num_iid: str) - Optional[float]: cur conn.execute(SELECT last_price FROM price_snapshot WHERE num_iid?, (num_iid,)) row cur.fetchone() return row[0] if row else None def save_price(conn, num_iid: str, price: float): conn.execute( INSERT INTO price_snapshot(num_iid,last_price,updated_at) VALUES(?,?,?) ON CONFLICT(num_iid) DO UPDATE SET last_priceexcluded.last_price, updated_atexcluded.updated_at , (num_iid, price, datetime.now().strftime(%Y-%m-%d %H:%M:%S))) conn.commit() # # 企业微信推送 def push_wecom(title: str, old: float, new: float, alias: str, num_iid: str): if not WECOM_BOT_WEBHOOK: return direction 降价 if new old else 涨价 pct abs(new - old) / old * 100 if old else 0 content ( f## 淘宝商品价格变动通知\n f**商品**{alias}\n f**ID**{num_iid}\n f**{direction}**¥{old:.2f} → **¥{new:.2f}** (±{pct:.1f}%)\n f 监控时间{datetime.now().strftime(%Y-%m-%d %H:%M:%S)} ) try: requests.post(WECOM_BOT_WEBHOOK, json{ msgtype: markdown, markdown: {content: content} }, timeout10) except Exception as e: logging.warning(f微信推送失败: {e}) # 核心监控逻辑 def monitor_once(client: TopClient, conn): for w in WATCH_LIST: num_iid str(w[num_iid]) alias w.get(alias) or num_iid try: cur_price client.get_price(num_iid) last_price load_last_price(conn, num_iid) if last_price is None: # 首次记录不告警 save_price(conn, num_iid, cur_price) print(f[INIT] {alias} 初始价格 ¥{cur_price:.2f}) continue diff_ratio abs(cur_price - last_price) / last_price if last_price else 0 if diff_ratio PRICE_CHANGE_THRESHOLD: print(f[ALERT] {alias} ¥{last_price:.2f}→¥{cur_price:.2f} f(Δ{diff_ratio*100:.1f}%)) push_wecom(alias, last_price, cur_price, alias, num_iid) else: print(f[OK] {alias} 价格稳定 ¥{cur_price:.2f}) save_price(conn, num_iid, cur_price) except ValueError as ve: print(f[WARN] {alias}: {ve}) except Exception as e: print(f[ERR] {alias}: {e}) # 入口 if __name__ __main__: logging.basicConfig(levellogging.INFO, format%(asctime)s %(levelname)s %(message)s) client TopClient(APP_KEY, APP_SECRET, sandboxSANDBOX) conn init_db() print(f▶ 淘宝价格监控启动间隔{CHECK_INTERVAL_SEC}s阈值{PRICE_CHANGE_THRESHOLD*100}%) monitor_once(client, conn) # 立即执行一次 # ---- 常驻定时取消注释启用---- # import apscheduler # from apscheduler.schedulers.blocking import BlockingScheduler # sched BlockingScheduler() # sched.add_job(lambda: monitor_once(client, conn), # interval, secondsCHECK_INTERVAL_SEC, idprice_monitor) # try: # sched.start() # except (KeyboardInterrupt, SystemExit): # sched.shutdown()三、配置步骤安装依赖pip install requests apscheduler # apscheduler 仅常驻时使用填写配置APP_KEY/APP_SECRET淘宝开放平台应用WECOM_BOT_WEBHOOK企业微信群 → 添加机器人 → 复制 Webhook URLWATCH_LIST填入要监控的num_iid从商品URLid提取运行python taobao_price_monitor.py首次运行记录基准价不发告警后续价格变动 ±5%可改PRICE_CHANGE_THRESHOLD推企微。常驻取消代码尾部注释nohup python taobao_price_monitor.py monitor.log 21 四、避坑坑现象解决取不到 price商品下架approve_status!onsale代码已捕获跳过Invalid Signature时间戳秒级确保int(time.time()*1000)企推送无反应Webhook key错 / IP受限企业微信后台查机器人日志QPS触发限流监控商品过多令牌桶限速 或 加大CHECK_INTERVAL_SECnum_iid 错复制完整URL未提取ID只保留数字部分五、面试/方案一句话淘宝价格监控 定时任务调taobao.item.get(fieldsprice)取一口价 → SQLite存上次快照 → 差价超阈值调企业微信Markdown Webhook推送全程用官方TOP API合法获取不爬页面。需要我补钉钉/飞书 Webhook 消息格式​ 或多SKU规格价监控查 skus[].price​ 吗