Files

373 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
抖音批量发布 - 登录+发布一体化
Cookie 过期时自动弹窗重新扫码,登录后立刻发布。
"""
import asyncio
import base64
import datetime
import hashlib
import hmac
import json
import os
import random
import string
import sys
import time
import zlib
from pathlib import Path
from urllib.parse import urlencode
import httpx
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from playwright.async_api import async_playwright
SCRIPT_DIR = Path(__file__).parent
COOKIE_FILE = SCRIPT_DIR / "douyin_storage_state.json"
VIDEO_DIR = Path("/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片")
BASE = "https://creator.douyin.com"
VOD_HOST = "https://vod.bytedanceapi.com"
CHUNK_SIZE = 3 * 1024 * 1024
UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
)
USER_ID = ""
TITLES = {
"早起不是为了开派对,是不吵老婆睡觉.mp4":
"每天6点起床不是因为自律是因为老婆还在睡。创业人最真实的起床理由你猜到了吗#Soul派对 #创业日记 #晨间直播 #真实创业",
"懒人的活法 动作简单有利可图正反馈.mp4":
"懒人也能赚钱关键就三个词动作简单、有利可图、正反馈。90%的人输在太勤快了 #Soul派对 #副业思维 #私域变现 #认知升级",
"初期团队先找两个IS比钱好使 ENFJ链接人ENTJ指挥.mp4":
"创业初期别急着找钱先找两个IS型人格。ENFJ负责链接ENTJ负责指挥比融资好使十倍 #MBTI创业 #团队搭建 #Soul派对 #合伙人",
"ICU出来一年多 活着要在互联网上留下东西.mp4":
"ICU出来一年多了。那之后我想明白一件事活着就要在互联网上留下点东西 #人生感悟 #创业觉醒 #Soul派对 #向死而生",
"MBTI疗愈SOUL 年轻人测MBTI40到60岁走五行八卦.mp4":
"20岁测MBTI40岁以后该学五行八卦了。年轻人用性格分类中年人靠命理运营自己 #MBTI #五行 #Soul派对 #认知觉醒",
"Soul业务模型 派对+切片+小程序全链路.mp4":
"一个人怎么跑通一条商业链路派对获客→AI切片→小程序变现全链路拆给你看 #Soul派对 #商业模式 #全链路 #一人公司",
"Soul切片30秒到8分钟 AI半小时能剪10到30个.mp4":
"AI剪辑有多快30秒到8分钟的切片半小时出10到30条。内容工厂的效率密码 #AI剪辑 #Soul派对 #内容效率 #批量生产",
"刷牙听业务逻辑 Soul切片变现怎么跑.mp4":
"刷牙3分钟刚好听完一套变现逻辑。Soul切片怎么从0到日产30条碎片时间才是生产力 #Soul派对 #碎片创业 #副业逻辑 #效率",
"国学易经怎么学 两小时七七八八,召唤作者对话.mp4":
"易经其实不难,两小时就能学个七七八八。关键是找到作者的思维频率,跟古人对话 #国学 #易经入门 #Soul派对 #终身学习",
"广点通能投Soul了1000曝光6到10块.mp4":
"广点通终于能投Soul了1000次曝光只要6到10块这个获客成本你敢信#Soul派对 #广点通投放 #低成本获客 #流量红利",
"建立信任不是求来的 卖外挂发邮件三个月拿下德国总代.mp4":
"信任不是求来的。一个卖外挂的小伙子,发了三个月邮件,拿下德国总代理。死磕比社交有用 #销售思维 #信任建立 #Soul派对 #死磕精神",
"核心就两个字 筛选。能开派对坚持7天的人再谈.mp4":
"别跟所有人合作核心就两个字筛选。能坚持开7天派对的人才值得深聊 #筛选思维 #Soul派对 #创业认知 #人性",
"睡眠不好?每天放下一件事,做减法.mp4":
"睡不好不是因为太累,是因为脑子里装太多。每天放下一件事,做减法,睡眠自然好 #睡眠 #做减法 #Soul派对 #心理健康",
"这套体系花了170万但前端几十块就能参与.mp4":
"后端花了170万搭的体系前端几十块就能参与。真正的商业模式是让别人低成本上车 #商业认知 #Soul派对 #低门槛创业 #体系思维",
"金融AI获客体系 后端30人沉淀12年前端丢手机.mp4":
"后端30人沉淀了12年前端操作就是丢个手机号。金融AI获客体系把复杂留给自己 #AI获客 #金融科技 #Soul派对 #系统思维",
}
class SecurityKeys:
def __init__(self, state_path: Path):
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
self.cookies = {c["name"]: c["value"] for c in state.get("cookies", [])
if "douyin.com" in c.get("domain", "")}
self.cookie_str = "; ".join(f"{k}={v}" for k, v in self.cookies.items())
self.ms_token = ""
self.ec_private_key = None
self.ec_public_key_bytes = b""
self.server_public_key = None
self.ticket = ""
self.ts_sign_raw = ""
self.csrf_token = self.cookies.get("passport_csrf_token", "")
for origin in state.get("origins", []):
if "creator.douyin.com" not in origin.get("origin", ""):
continue
for item in origin.get("localStorage", []):
name, val = item["name"], item["value"]
if "s_sdk_crypt_sdk" in name:
d = json.loads(json.loads(val)["data"])
pem = d["ec_privateKey"].replace("\\r\\n", "\n")
self.ec_private_key = serialization.load_pem_private_key(pem.encode(), password=None)
self.ec_public_key_bytes = self.ec_private_key.public_key().public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
elif "s_sdk_server_cert_key" in name:
cert_pem = json.loads(val)["cert"]
cert = x509.load_pem_x509_certificate(cert_pem.encode())
self.server_public_key = cert.public_key()
elif "s_sdk_sign_data_key" in name and "web_protect" in name:
d = json.loads(json.loads(val)["data"])
self.ticket = d["ticket"]
self.ts_sign_raw = d["ts_sign"]
elif name == "xmst":
self.ms_token = val
def compute_ticket_guard(self, path: str) -> dict:
if not self.ec_private_key or not self.server_public_key:
return {}
eph_priv = ec.generate_private_key(ec.SECP256R1())
eph_pub_bytes = eph_priv.public_key().public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
shared_secret = eph_priv.exchange(ec.ECDH(), self.server_public_key)
ts = int(time.time())
ts_hex = self.ts_sign_raw.replace("ts.2.", "")
ts_bytes = bytes.fromhex(ts_hex)
new_first = bytes(a ^ b for a, b in zip(ts_bytes[:32], shared_secret[:32]))
new_ts_sign = "ts.2." + (new_first + ts_bytes[32:]).hex()
msg = f"{self.ticket},{path},{ts}"
req_sign = hmac.new(shared_secret, msg.encode(), hashlib.sha256).digest()
client_data = {"ts_sign": new_ts_sign, "req_content": "ticket,path,timestamp",
"req_sign": base64.b64encode(req_sign).decode(), "timestamp": ts}
return {
"bd-ticket-guard-client-data": base64.b64encode(json.dumps(client_data).encode()).decode(),
"bd-ticket-guard-ree-public-key": base64.b64encode(eph_pub_bytes).decode(),
"bd-ticket-guard-version": "2",
"bd-ticket-guard-web-version": "2",
"bd-ticket-guard-web-sign-type": "1",
}
def _hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
def _rand(n=11):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
def aws4_sign(ak, sk, token, qs, method="GET", body=b""):
now = datetime.datetime.now(datetime.timezone.utc)
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
ds = now.strftime("%Y%m%d")
region, service = "cn-north-1", "vod"
body_hash = hashlib.sha256(body).hexdigest()
if method == "POST":
signed_headers = "content-type;x-amz-date;x-amz-security-token"
header_str = f"content-type:text/plain;charset=UTF-8\nx-amz-date:{amz_date}\nx-amz-security-token:{token}\n"
else:
signed_headers = "x-amz-date;x-amz-security-token"
header_str = f"x-amz-date:{amz_date}\nx-amz-security-token:{token}\n"
canonical = f"{method}\n/\n{qs}\n{header_str}\n{signed_headers}\n{body_hash}"
scope = f"{ds}/{region}/{service}/aws4_request"
sts = f"AWS4-HMAC-SHA256\n{amz_date}\n{scope}\n{hashlib.sha256(canonical.encode()).hexdigest()}"
k = _hmac_sha256(f"AWS4{sk}".encode(), ds)
k = _hmac_sha256(k, region); k = _hmac_sha256(k, service); k = _hmac_sha256(k, "aws4_request")
sig = hmac.new(k, sts.encode(), hashlib.sha256).hexdigest()
return f"AWS4-HMAC-SHA256 Credential={ak}/{scope}, SignedHeaders={signed_headers}, Signature={sig}", amz_date, token
async def do_login():
"""弹窗扫码登录,返回刷新后的 SecurityKeys"""
print("\n >>> 需要扫码登录,浏览器即将弹出 <<<")
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=False)
ctx = await browser.new_context(
user_agent=UA, viewport={"width": 1280, "height": 720})
await ctx.add_init_script("Object.defineProperty(navigator,'webdriver',{get:()=>undefined});")
page = await ctx.new_page()
await page.goto("https://creator.douyin.com/", timeout=60000)
await page.pause()
await ctx.storage_state(path=str(COOKIE_FILE))
await ctx.close()
await browser.close()
print(" >>> Cookie 已刷新 <<<\n")
return SecurityKeys(COOKIE_FILE)
async def check_cookie(keys):
"""检查 Cookie 是否有效,返回 (valid, user_id, nickname)"""
async with httpx.AsyncClient(timeout=10.0) as c:
resp = await c.get(f"{BASE}/web/api/media/user/info/",
headers={"Cookie": keys.cookie_str, "User-Agent": UA})
data = resp.json()
if data.get("status_code") != 0:
return False, "", ""
user = data.get("user") or data.get("user_info") or {}
uid = str(user.get("uid", "") or user.get("user_id", ""))
return True, uid, user.get("nickname", "unknown")
async def publish_one_video(keys, client, video_path, title, timing_ts):
"""上传+提交+发布 单条视频(全在一个 client 内快速完成)"""
global USER_ID
fsize = Path(video_path).stat().st_size
# 1) Auth
resp = await client.get(f"{BASE}/web/api/media/upload/auth/v5/",
headers={"Cookie": keys.cookie_str, "User-Agent": UA})
data = resp.json()
if data.get("status_code") != 0:
return False, "auth失败"
auth = json.loads(data["auth"])
# 2) Apply
params = {"Action": "ApplyUploadInner", "FileSize": str(fsize), "FileType": "video",
"IsInner": "1", "SpaceName": "aweme", "Version": "2020-11-19", "app_id": "2906", "s": _rand()}
qs = "&".join(f"{k}={v}" for k, v in sorted(params.items()))
authorization, amz_date, token = aws4_sign(auth["AccessKeyID"], auth["SecretAccessKey"], auth["SessionToken"], qs)
resp = await client.get(f"{VOD_HOST}/?{qs}",
headers={"authorization": authorization, "x-amz-date": amz_date,
"x-amz-security-token": token, "User-Agent": UA})
result = resp.json().get("Result", {})
nodes = (result.get("InnerUploadAddress") or {}).get("UploadNodes", [])
if not nodes:
return False, "无UploadNodes"
node = nodes[0]
store = node["StoreInfos"][0]
host, upload_id = node["UploadHost"], store["UploadID"]
base_url = f"https://{host}/upload/v1/{store['StoreUri']}"
auth_h = {"Authorization": store["Auth"], "User-Agent": UA}
# 3) Upload
raw = Path(video_path).read_bytes()
n_chunks = (len(raw) + CHUNK_SIZE - 1) // CHUNK_SIZE
crc_parts = []
for i in range(n_chunks):
chunk = raw[i*CHUNK_SIZE : (i+1)*CHUNK_SIZE]
crc = "%08x" % (zlib.crc32(chunk) & 0xFFFFFFFF)
resp = await client.post(f"{base_url}?uploadid={upload_id}&part_number={i+1}&phase=transfer",
content=chunk, headers={**auth_h, "Content-CRC32": crc, "Content-Type": "application/octet-stream"}, timeout=120.0)
rd = resp.json() if resp.status_code == 200 else {}
if rd.get("code") != 2000:
return False, f"chunk {i+1} 失败"
sv_crc = rd.get("data", {}).get("crc32", crc)
crc_parts.append(f"{i+1}:{sv_crc}")
print(f" chunk {i+1}/{n_chunks}")
finish_resp = await client.post(f"{base_url}?uploadid={upload_id}&phase=finish",
content=",".join(crc_parts).encode(), headers={**auth_h, "Content-Type": "text/plain"}, timeout=60.0)
fd = finish_resp.json() if finish_resp.status_code == 200 else {}
if fd.get("code") != 2000:
return False, f"finish: {fd.get('message','')}"
# 4) Commit
qs2_params = {"Action": "CommitUploadInner", "SpaceName": "aweme",
"Version": "2020-11-19", "app_id": "2906", "user_id": USER_ID}
qs2 = "&".join(f"{k}={v}" for k, v in sorted(qs2_params.items()))
body = json.dumps({"SessionKey": node["SessionKey"], "Functions": [{"Name": "GetMeta"}]}).encode("utf-8")
auth2, amz2, tok2 = aws4_sign(auth["AccessKeyID"], auth["SecretAccessKey"], auth["SessionToken"], qs2, method="POST", body=body)
resp = await client.post(f"{VOD_HOST}/?{qs2}", content=body,
headers={"authorization": auth2, "x-amz-date": amz2, "x-amz-security-token": tok2,
"content-type": "text/plain;charset=UTF-8", "User-Agent": UA}, timeout=30.0)
cd = resp.json()
results = cd.get("Result", {}).get("Results", [])
video_id = results[0].get("Vid", "") if results else ""
if not video_id:
return False, f"commit失败: {cd.get('ResponseMetadata',{}).get('Error',{})}"
print(f" video_id={video_id}")
# 5) create_v2
path = "/web/api/media/aweme/create_v2/"
creation_id = f"{_rand(8)}{int(time.time()*1000)}"
body_json = {"item": {"common": {
"text": title, "caption": title, "visibility_type": 0, "download": 1,
"timing": timing_ts if timing_ts > 0 else 0, "creation_id": creation_id,
"media_type": 4, "video_id": video_id, "music_source": 0, "music_id": None,
}, "cover": {"poster": "", "poster_delay": 0}}}
guard = keys.compute_ticket_guard(path)
qp = {"read_aid": "2906", "cookie_enabled": "true", "aid": "1128"}
if keys.ms_token:
qp["msToken"] = keys.ms_token
headers = {"Cookie": keys.cookie_str, "User-Agent": UA, "Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Referer": "https://creator.douyin.com/creator-micro/content/post/video",
"Origin": "https://creator.douyin.com"}
if keys.csrf_token:
headers["x-secsdk-csrf-token"] = f"000100000001{keys.csrf_token[:32]}"
headers.update(guard)
resp = await client.post(f"{BASE}{path}?" + urlencode(qp), headers=headers, json=body_json, timeout=30.0)
if not resp.text:
return False, "create_v2 空响应(403)"
r = resp.json()
if r.get("status_code") == 0:
return True, r.get("item_id", "")
return False, f"status={r.get('status_code')}: {r.get('status_msg','')}"
async def main():
videos = sorted(VIDEO_DIR.glob("*.mp4"))
if not videos:
print("[✗] 未找到视频")
return 1
print(f"[i] 共 {len(videos)} 条视频待发布\n")
keys = None
if COOKIE_FILE.exists():
keys = SecurityKeys(COOKIE_FILE)
valid, uid, name = await check_cookie(keys)
if valid:
global USER_ID
USER_ID = uid
print(f"[✓] 已有有效 Cookie: {name} (uid={uid})")
else:
keys = None
if not keys:
keys = await do_login()
valid, uid, name = await check_cookie(keys)
if not valid:
print("[✗] 登录失败")
return 1
USER_ID = uid
print(f"[✓] 登录成功: {name} (uid={uid})")
now_ts = int(time.time())
base_ts = ((now_ts + 3600) // 3600 + 1) * 3600
results = []
for i, vp in enumerate(videos):
ts = base_ts + i * 3600
title = TITLES.get(vp.name, f"{vp.stem} #Soul派对 #创业日记")
fname = vp.name
fsize = vp.stat().st_size
dt_str = datetime.datetime.fromtimestamp(ts).strftime("%m-%d %H:%M")
print(f"\n{'='*60}")
print(f" [{i+1}/{len(videos)}] {fname}")
print(f" 大小: {fsize/1024/1024:.1f}MB | 定时: {dt_str}")
print(f" 标题: {title[:60]}")
print(f"{'='*60}")
valid, _, _ = await check_cookie(keys)
if not valid:
print(" Cookie 过期,重新登录...")
keys = await do_login()
valid, uid, _ = await check_cookie(keys)
if not valid:
print(" [✗] 登录失败,跳过")
results.append((fname, False, ts))
continue
USER_ID = uid
try:
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
ok, msg = await publish_one_video(keys, client, str(vp), title, ts)
if ok:
print(f" [✓] 发布成功! item_id={msg}")
else:
print(f" [✗] 失败: {msg}")
results.append((fname, ok, ts))
except Exception as e:
print(f" [✗] 异常: {e}")
results.append((fname, False, ts))
if i < len(videos) - 1:
await asyncio.sleep(2)
print(f"\n{'='*60}")
print(" 发布汇总")
print(f"{'='*60}")
for name, ok, ts in results:
s = "" if ok else ""
t = datetime.datetime.fromtimestamp(ts).strftime("%m-%d %H:%M")
print(f" [{s}] {t} | {name}")
success = sum(1 for _, ok, _ in results if ok)
print(f"\n 成功: {success}/{len(results)}")
return 0 if success == len(results) else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))