Files

211 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
拦截抖音创作者中心视频发布的网络请求。
用 Playwright 走一次完整的上传→发布流程,记录所有 API 调用。
"""
import asyncio
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from playwright.async_api import async_playwright, Request
SCRIPT_DIR = Path(__file__).parent
COOKIE_FILE = SCRIPT_DIR / "douyin_storage_state.json"
STEALTH_JS = Path("/Users/karuo/Documents/开发/3、自营项目/万推/backend/utils/stealth.min.js")
VIDEO = "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/广点通能投Soul了1000曝光6到10块.mp4"
LOG_FILE = SCRIPT_DIR / "intercepted_requests.json"
CHROME = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
captured = []
def on_request(request: Request):
url = request.url
if "creator.douyin.com" not in url and "bytedanceapi" not in url and "snssdk" not in url and "bytedancevod" not in url:
return
if any(skip in url for skip in ["monitor_browser", "mcs.snssdk", "mon.zijie", "mssdk.bytedance", ".css", ".js", ".png", ".jpg", ".woff"]):
return
entry = {
"timestamp": datetime.now().isoformat(),
"method": request.method,
"url": url[:500],
"headers": dict(request.headers),
"post_data": None,
}
try:
pd = request.post_data
if pd:
entry["post_data"] = pd[:2000] if len(pd) > 2000 else pd
except Exception:
pass
captured.append(entry)
short = url.split("?")[0]
print(f" [NET] {request.method} {short}")
async def main():
if not COOKIE_FILE.exists():
print("[!] Cookie 不存在,请先运行 douyin_login.py")
return 1
print("[i] 启动浏览器,拦截网络请求...")
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
executable_path=CHROME if os.path.exists(CHROME) else None,
)
context = await browser.new_context(storage_state=str(COOKIE_FILE))
if STEALTH_JS.exists():
await context.add_init_script(path=str(STEALTH_JS))
page = await context.new_page()
page.on("request", on_request)
# 1. 打开上传页
print("[1] 打开上传页...")
await page.goto("https://creator.douyin.com/creator-micro/content/upload", wait_until="domcontentloaded", timeout=60000)
await page.wait_for_url("**/upload**", timeout=60000)
await page.wait_for_load_state("load", timeout=20000)
if await page.get_by_text("手机号登录").count() or await page.get_by_text("扫码登录").count():
print("[!] Cookie 失效")
await browser.close()
return 1
print("[1] 上传页就绪")
try:
await page.get_by_text("上传视频", exact=False).first.wait_for(state="visible", timeout=15000)
except Exception:
pass
await asyncio.sleep(3)
# 2. 上传文件
print("[2] 上传视频...")
for sel in ["input[type='file']", "[class^='upload-btn-input']"]:
try:
loc = page.locator(sel).first
await loc.wait_for(state="attached", timeout=5000)
await loc.set_input_files(VIDEO, timeout=60000)
print(f"[2] 上传成功: {sel}")
break
except Exception:
continue
# 3. 等待发布页
print("[3] 等待发布页...")
for _ in range(120):
try:
await page.wait_for_url("**/publish*", timeout=2000)
break
except Exception:
try:
await page.wait_for_url("**/post/video*", timeout=2000)
break
except Exception:
pass
print(f"[3] 当前URL: {page.url}")
await asyncio.sleep(3)
# 4. 填标题
print("[4] 填标题...")
try:
tc = page.get_by_text("作品标题").locator("..").locator("xpath=following-sibling::div[1]").locator("input")
if await tc.count():
await tc.fill("广点通能投Soul了 #Soul派对 #测试")
else:
nl = page.locator(".notranslate").first
await nl.click(timeout=5000)
await page.keyboard.press("Meta+KeyA")
await page.keyboard.press("Delete")
await page.keyboard.type("广点通能投Soul了 #Soul派对 #测试", delay=20)
await page.keyboard.press("Enter")
except Exception as e:
print(f"[4] 标题异常: {e}")
print("[4] 标题已填")
# 5. 等待上传完成
print("[5] 等待上传完成...")
for _ in range(120):
try:
n = await page.locator('[class^="long-card"] div:has-text("重新上传")').count()
if n > 0:
break
except Exception:
pass
await asyncio.sleep(2)
print("[5] 上传完毕")
await asyncio.sleep(2)
# 清空之前的记录,只保留发布相关
print("\n" + "="*60)
print(" 即将点击发布,开始重点监控网络请求")
print("="*60 + "\n")
captured.clear()
# 6. 点击发布
print("[6] 点击发布...")
for attempt in range(20):
try:
pub = page.get_by_role("button", name="发布", exact=True)
if await pub.count():
await pub.click()
print(f"[6] 已点击发布 (attempt {attempt+1})")
await page.wait_for_url("**/content/manage**", timeout=8000)
print("[6] 发布成功!页面已跳转到管理页")
break
except Exception:
# 处理封面
try:
if await page.get_by_text("请设置封面后再发布").first.is_visible():
print("[6] 需要封面...")
cover = page.locator('[class^="recommendCover-"]').first
if await cover.count():
await cover.click()
await asyncio.sleep(1)
if await page.get_by_text("是否确认应用此封面?").first.is_visible():
await page.get_by_role("button", name="确定").click()
await asyncio.sleep(1)
except Exception:
pass
await asyncio.sleep(3)
await asyncio.sleep(3)
# 保存 Cookie
await context.storage_state(path=str(COOKIE_FILE))
# 保存拦截到的请求
print(f"\n[i] 共捕获 {len(captured)} 个相关请求")
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(captured, f, ensure_ascii=False, indent=2)
print(f"[i] 已保存到: {LOG_FILE}")
# 打印关键请求
print("\n" + "="*60)
print(" 关键 POST 请求(发布相关)")
print("="*60)
for req in captured:
if req["method"] == "POST":
url_short = req["url"].split("?")[0]
print(f"\n POST {url_short}")
if req.get("post_data"):
print(f" Body: {req['post_data'][:300]}")
ct = req["headers"].get("content-type", "")
print(f" Content-Type: {ct}")
await context.close()
await browser.close()
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))