import re
import time
import json
import execjs
import requests
from urllib.parse import urljoin, urlparse
class LanZouCloudParser:
def __init__ (self):
# 定义JavaScript计算函数
self.acw_js_code = """
var MASK_B64="MzAwMDE3NjAwMDg1NjAwNjA2MTUwMTUzMzAwMzY5MDAyNzgwMDM3NQ==";
var POS_LIST=[15,35,29,24,33,16,1,38,10,9,19,31,40,27,22,23,25,13,6,11,39,18,20,8,14,21,32,26,2,30,7,4,17,5,3,28,34,37,12,36];
function base64Decode (str){
var chars= var output= var buffer=0,bits=0;
for (var i=0;i<str.length;i++){
var c=chars.indexOf (str.charAt (i));
if (c<0)continue;
buffer= (buffer<<6)|c;
bits+=6;
if (bits>=8){
bits-=8;
output+=String.fromCharCode ( (buffer>>bits)&0xFF);
}
}
return output;
}
function compute_acw_sc_v2 (arg1){
var posToIdx={};
for (var i=0;i<pos_list.length;i++)postoidx[pos_list[i]]=i;
var output=[];
for (var j=1;j<=arg1.length;j++){
var idx=posToIdx[j];
if (typeof idx=== output[idx]=arg1.charAt (j-1);
}
}
var rearranged=output.join ( var mask=base64Decode (MASK_B64);
var result= var k=0;
while (k<rearranged.length&&k<mask.length){
var dataChunkText=rearranged.substr (k,2);
var maskChunkText=mask.substr (k,2);
if (!dataChunkText||!maskChunkText)break;
var dataChunk=parseInt (dataChunkText,16);
var maskChunk=parseInt (maskChunkText,16);
var xorVal=dataChunk^maskChunk;
var hex=xorVal.toString (16);
if (hex.length<2)hex= result+=hex;
k+=2;
}
return result.toLowerCase ();
}
"""
# 编译JavaScript
self.ctx = execjs.compile (self.acw_js_code)
# 请求头
self.headers = {
}
# 创建会话,自动处理cookies
self.session = requests.Session ()
self.session.headers.update (self.headers)
def compute_acw_sc_v2 (self, arg1):
"""计算acw_sc__v2值"""
try:
result = self.ctx.call ( return result
except Exception as e:
print (f"JavaScript计算错误: {e}")
return ""
def extract_file_id_from_page (self, html_content):
"""从HTML页面中提取文件ID"""
# 从报告链接中提取(最常见)
# 格式: /q/jb/?f=271513649&report=1
report_match = re.search (r if report_match:
return report_match.group (1)
# 尝试其他可能的模式
file_match = re.search (r if file_match:
return file_match.group (1)
return "271513649" # 默认值
def parse_fn_page (self, fn_url, original_page_html):
"""解析fn页面,获取最终下载链接"""
try:
print (f"访问fn页面: {fn_url}")
# 先提取文件ID(从原始页面)
file_id = self.extract_file_id_from_page (original_page_html)
print (f"使用的文件ID: {file_id}")
# 访问fn页面
response = self.session.get (fn_url)
response.raise_for_status ()
html_content = response.text
print (f"fn页面长度: {len (html_content)}")
# 提取关键参数
# 1. 提取 wp_sign
wp_sign_match = re.search (r"var wp_sign = if wp_sign_match:
wp_sign = wp_sign_match.group (1)
print (f"提取到wp_sign: {wp_sign}")
else:
print ("未找到wp_sign")
return None
# 2. 提取 ajaxdata(通常是 VJvp)
ajaxdata_match = re.search (r"var ajaxdata = if ajaxdata_match:
ajaxdata = ajaxdata_match.group (1)
print (f"提取到ajaxdata: {ajaxdata}")
else:
# 使用默认值
ajaxdata = "VJvp"
print (f"使用默认ajaxdata: {ajaxdata}")
# 3. 构建正确的ajax_url(使用正确的文件ID)
base_url = f"{urlparse (fn_url).scheme}://{urlparse (fn_url).netloc}"
ajax_url = f"{base_url}/ajaxm.php?file={file_id}"
print (f"构建的ajax_url: {ajax_url}")
# 构建POST数据
post_data = {
}
print (f"POST数据: {post_data}")
# 设置AJAX请求头
ajax_headers = {
}
# 立即发送AJAX请求
print (f"发送POST请求到: {ajax_url}")
ajax_response = self.session.post (ajax_url, data=post_data, headers=ajax_headers)
print (f"AJAX响应状态码: {ajax_response.status_code}")
# 检查响应内容
response_text = ajax_response.text
print (f"AJAX响应内容: {response_text[:200]}...")
# 尝试解析JSON
try:
json_data = ajax_response.json ()
print (f"解析到的JSON: json_data")
# 修正:zt=1 表示成功
if json_data.get ( dom = json_data.get ( url = json_data.get ( if url and dom:
# 处理转义字符
dom = dom.replace ( url = url.replace ( # 构建最终下载链接
# 注意:url 以 if url.startswith ( download_url = f"{dom}/file{url}"
else:
download_url = f"{dom}/file/{url}"
print (f"最终下载链接: {download_url}")
return download_url
else:
print (f"缺少必要字段: dom={dom}, url={url}")
else:
print (f"状态不为1: {json_data.get ( print (f"错误信息: {json_data.get ( except json.JSONDecodeError as e:
print (f"JSON解析错误: {e}")
print (f"完整响应: {response_text}")
return None
except requests.exceptions.RequestException as e:
print (f"请求错误: {e}")
return None
except Exception as e:
print (f"解析错误: {e}")
return None
def parse_lanzou_url (self, url):
"""解析蓝奏云链接"""
try:
print (f"正在解析链接: {url}")
# 第一次请求
print ("第一次请求...")
response = self.session.get (url)
response.raise_for_status ()
# 打印cookies信息
print (f"第一次响应Cookies: {self.session.cookies.get_dict ()}")
# 提取arg1
html_content = response.text
arg1_match = re.search (r"arg1= if not arg1_match:
arg1_match = re.search (r if not arg1_match:
print ("未找到arg1参数")
return None
arg1 = arg1_match.group (1)
print (f"提取到的arg1: {arg1}")
# 计算acw_sc__v2值
acw_sc_v2_value = self.compute_acw_sc_v2 (arg1)
print (f"计算出的acw_sc__v2值: {acw_sc_v2_value}")
if not acw_sc_v2_value:
print ("acw_sc__v2计算失败")
return None
# 添加acw_sc__v2到cookies
self.session.cookies.set ( # 立即进行第二次请求
print ("第二次请求...")
self.session.headers.update ({ response2 = self.session.get (url)
response2.raise_for_status ()
second_page_html = response2.text
print (f"第二次响应状态码: {response2.status_code}")
print (f"第二次响应长度: {len (second_page_html)}")
# 检查是否包含下载链接(iframe)
iframe_match = re.search (r if not iframe_match:
print ("未找到iframe下载链接")
return None
iframe_src = iframe_match.group (1)
print (f"找到iframe链接: {iframe_src}")
# 转换为完整URL
if iframe_src.startswith ( base_url = f"{urlparse (url).scheme}://{urlparse (url).netloc}"
fn_url = urljoin (base_url, iframe_src)
else:
fn_url = iframe_src
# 提取文件信息
filename_match = re.search (r if filename_match:
filename = filename_match.group (1)
print (f"文件名: {filename}")
else:
filename = "未知文件名"
filesize_match = re.search (r if filesize_match:
filesize = filesize_match.group (1)
print (f"文件大小: {filesize}")
else:
filesize = "未知大小"
# 解析fn页面获取最终下载链接
download_url = self.parse_fn_page (fn_url, second_page_html)
if download_url:
return {
}
else:
print ("未能获取到下载链接")
return {
}
except requests.exceptions.RequestException as e:
print (f"请求错误: {e}")
return None
except
</rearranged.length&&k<mask.length){