开启辅助访问 切换到宽版

精易论坛

 找回密码
 注册

QQ登录

只需一步,快速开始

用微信号发送消息登录论坛

新人指南 邀请好友注册 - 我关注人的新帖 教你赚取精币 - 每日签到


求职/招聘- 论坛接单- 开发者大厅

论坛版规 总版规 - 建议/投诉 - 应聘版主 - 精华帖总集 积分说明 - 禁言标准 - 有奖举报

查看: 268|回复: 6
收起左侧

[已解决] 帮忙py转易语言

 关闭 [复制链接]
结帖率:96% (52/54)
发表于 2025-7-30 09:04:40 | 显示全部楼层 |阅读模式   内蒙古自治区赤峰市
100精币
# coding=utf-8# !/usr/bin/python# by(finally)import sysimport ossys.path.append("..")import reimport hashlibimport hmacimport randomimport stringfrom Crypto.Util.Padding import unpadfrom concurrent.futures import ThreadPoolExecutorfrom Crypto.PublicKey import RSAfrom Crypto.Cipher import PKCS1_v1_5, AESfrom base64 import b64encode, b64decodeimport jsonimport timefrom base.spider import Spiderclass Spider(Spider):    def getName(self):        return "电影"    def init(self, extend=""):        self.device = self.device_id()        self.host = self.gethost()        pass    def isVideoFormat(self, url):        pass    def manualVideoCheck(self):        pass    def action(self, action):        pass    def destroy(self):        pass    t = str(int(time.time()))    def homeContent(self, filter):        result = {}        filters = {}        classes = []        bba = self.url()        data = self.fetch(f"{self.host}/api/v1/app/config?pack={bba[0]}&signature={bba[1]}", headers=self.header()).text        data1 = self.aes(data)        dy = {"class":"类型","area":"地区","lang":"语言","year":"年份","letter":"字母","by":"排序","sort":"排序"}        data1['data']['movie_screen']['sort'].pop(0)        for item in data1['data']['movie_screen']['sort']:            item['n'] = item.pop('name')            item['v'] = item.pop('value')        for item in data1['data']['movie_screen']['filter']:            has_non_empty_field = False            classes.append({"type_name": item["name"], "type_id": str(item["id"])})            for key in dy:                if key in item and item[key]:                    has_non_empty_field = True                    break            if has_non_empty_field:                filters[str(item["id"])] = []                filters[str(item["id"])].append(                    {"key": 'sort', "name": '排序', "value": data1['data']['movie_screen']['sort']})                for dkey in item:                    if dkey in dy and item[dkey]:                        item[dkey].pop(0)                        value_array = [                            {"n": value.strip(), "v": value.strip()}                            for value in item[dkey]                            if value.strip() != ""                        ]                        filters[str(item["id"])].append(                            {"key": dkey, "name": dy[dkey], "value": value_array}                        )        result["class"] = classes        result["filters"] = filters        return result    def homeVideoContent(self):        bba = self.url()        url = f'{self.host}/api/v1/movie/index_recommend?pack={bba[0]}&signature={bba[1]}'        data = self.fetch(url, headers=self.header()).json()        videos = []        for item in data['data']:            if len(item['list']) > 0:                for it in item['list']:                    try:                        videos.append(self.voides(it))                    except Exception as e:                        continue        result = {"list": videos}        return result    def categoryContent(self, tid, pg, filter, extend):        body = {"type_id": tid, "sort": extend.get("sort", "by_default"), "class": extend.get("class", "类型"),                "area": extend.get("area", "地区"), "year": extend.get("year", "年份"), "page": str(pg),                "pageSize": "21"}        result = {}        list = []        bba = self.url(body)        url = f"{self.host}/api/v1/movie/screen/list?pack={bba[0]}&signature={bba[1]}"        data = self.fetch(url, headers=self.header()).json()['data']['list']        for item in data:            list.append(self.voides(item))        result["list"] = list        result["page"] = pg        result["pagecount"] = 9999        result["limit"] = 90        result["total"] = 999999        return result    def detailContent(self, ids):        body = {"id": ids[0]}        bba = self.url(body)        url = f'{self.host}/api/v1/movie/detail?pack={bba[0]}&signature={bba[1]}'        data = self.fetch(url, headers=self.header()).json()['data']        video = {'vod_name': data.get('name'),'type_name': data.get('type_name'),'vod_year': data.get('year'),'vod_area': data.get('area'),'vod_remarks': data.get('dynami'),'vod_content': data.get('content')}        play = []        names = []        tasks = []        for itt in data["play_from"]:            name = itt["name"]            a = []            if len(itt["list"]) > 0:                names.append(name)                play.append(self.playeach(itt['list']))            else:                tasks.append({"movie_id": ids[0], "from_code": itt["code"]})                names.append(name)        if tasks:            with ThreadPoolExecutor(max_workers=len(tasks)) as executor:                results = executor.map(self.playlist, tasks)                for result in results:                    if result:                        play.append(result)                    else:                        play.append("")        video["vod_play_from"] = "$$$".join(names)        video["vod_play_url"] = "$$$".join(play)        result = {"list": [video]}        return result    def searchContent(self, key, quick, pg=1):        body = {"keyword": key, "sort": "", "type_id": "0", "page": str(pg), "pageSize": "10",                "res_type": "by_movie_name"}        bba = self.url(body)        url = f"{self.host}/api/v1/movie/search?pack={bba[0]}&signature={bba[1]}"        data = self.fetch(url, headers=self.header()).json()['data'].get('list')        videos = []        for it in data:            try:                videos.append(self.voides(it))            except Exception as e:                continue        result = {"list": videos, "page": pg}        return result    def playerContent(self, flag, id, vipFlags):        url = id        if "m3u8" not in url and "mp4" not in url:            try:                add = id.split('|||')                data = {"from_code": add[0], "play_url": add[1], "episode_id": add[2], "type": "play"}                bba = self.url(data)                data2 = self.fetch(f"{self.host}/api/v1/movie_addr/parse_url?pack={bba[0]}&signature={bba[1]}",                                   headers=self.header()).json()['data']                url = data2.get('play_url') or data2.get('download_url')                try:                    url1 = self.fetch(url, headers=self.header(), allow_redirects=False).headers['Location']                    if url1 and "http" in url1:                        url = url1                except:                    pass            except Exception as e:                pass        if '.jpg' in url or '.jpeg' in url or '.png' in url:            url = self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8"        result = {}        result["parse"] = 0        result["url"] = url        result["header"] = {'user-agent': 'okhttp/4.9.2'}        return result    def localProxy(self, param):        url = b64decode(param["url"]).decode('utf-8')        durl = url[:url.rfind('/')]        data = self.fetch(url, headers=self.header()).content.decode("utf-8")        lines = data.strip().split('\n')        for index, string in enumerate(lines):            # if 'URI="' in string and 'http' not in string:            #     lines[index] = index            # 暂时预留,貌似用不到            if '#EXT' not in string and 'http' not in string:                lines[index] = durl + ('' if string.startswith('/') else '/') + string        data = '\n'.join(lines)        return [200, "application/vnd.apple.mpegur", data]    def device_id(self):        characters = string.ascii_lowercase + string.digits        random_string = ''.join(random.choices(characters, k=32))        return random_string    def gethost(self):        headers = {            'User-Agent': 'okhttp/4.9.2',            'Connection': 'Keep-Alive',        }        response = self.fetch('https://app-site.ecoliving168.com/domain_v5.json', headers=headers).json()        url = response['api_service'].replace('/api/', '')        return url    def header(self):        headers = {            'User-Agent': 'Android',            'Accept': 'application/prs.55App.v2+json',            'timestamp': self.t,            'x-client-setting': '{"pure-mode":1}',            'x-client-uuid': '{"device_id":' + self.device + '}, "type":1,"brand":"Redmi", "model":"M2012K10C", "system_version":30, "sdk_version":"3.1.0.7"}',            'x-client-version': '3096 '        }        return headers    def url(self, id=None):        if not id:            id = {}        id["timestamp"] = self.t        public_key = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA02F/kPg5A2NX4qZ5JSns+bjhVMCC6JbTiTKpbgNgiXU+Kkorg6Dj76gS68gB8llhbUKCXjIdygnHPrxVHWfzmzisq9P9awmXBkCk74Skglx2LKHa/mNz9ivg6YzQ5pQFUEWS0DfomGBXVtqvBlOXMCRxp69oWaMsnfjnBV+0J7vHbXzUIkqBLdXSNfM9Ag5qdRDrJC3CqB65EJ3ARWVzZTTcXSdMW9i3qzEZPawPNPe5yPYbMZIoXLcrqvEZnRK1oak67/ihf7iwPJqdc+68ZYEmmdqwunOvRdjq89fQMVelmqcRD9RYe08v+xDxG9Co9z7hcXGTsUquMxkh29uNawIDAQAB'        encrypted_text = json.dumps(id)        public_key = RSA.import_key(b64decode(public_key))        cipher = PKCS1_v1_5.new(public_key)        encrypted_message = cipher.encrypt(encrypted_text.encode('utf-8'))        encrypted_message_base64 = b64encode(encrypted_message).decode('utf-8')        result = encrypted_message_base64.replace('+', '-').replace('/', '_').replace('=', '')        key = '635a580fcb5dc6e60caa39c31a7bde48'        sign = hmac.new(key.encode(), result.encode(), hashlib.md5).hexdigest()        return result, sign    def playlist(self, body):        try:            bba = self.url(body)            url = f'{self.host}/api/v1/movie_addr/list?pack={bba[0]}&signature={bba[1]}'            data = self.fetch(url, headers=self.header()).json()['data']            return self.playeach(data)        except Exception:            return []    def playeach(self,data):        play_urls = []        for it in data:            if re.search(r"mp4|m3u8", it["play_url"]):                play_urls.append(f"{it['episode_name']}${it['play_url']}")            else:                play_urls.append(                    f"{it['episode_name']}${it['from_code']}|||{it['play_url']}|||{it['episode_id']}"                )        return '#'.join(play_urls)        def voides(self, item):        if item['name'] or item['title']:            voide = {                "vod_id": item.get('id') or item.get('click'),                'vod_name': item.get('name') or item.get('title'),                'vod_pic': item.get('cover') or item.get('image'),                'vod_year': item.get('year') or item.get('label'),                'vod_remarks': item.get('dynamic') or item.get('sub_title')            }            return voide    def aes(self, text):        text = text.replace('-', '+').replace('_', '/') + '=='        key = b"e6d5de5fcc51f53d"        iv = b"2f13eef7dfc6c613"        cipher = AES.new(key, AES.MODE_CBC, iv)        pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size).decode("utf-8")        return json.loads(pt)

最佳答案

查看完整内容

[e=1].版本 2 .支持库 spec .支持库 dp1 .支持库 e2ee ' 需要E2EE支持库 .程序集 电影爬虫 .程序集变量 device, 文本型 .程序集变量 host, 文本型 .程序集变量 t, 文本型 .子程序 _启动子程序, 整数型 .局部变量 爬虫, 电影爬虫 爬虫.init() 返回 0 .子程序 getName, 文本型 返回 "电影" .子程序 init device = 取设备ID() host = 取host() t = 时间_取现行时间戳() 调试输出("设备ID: " ...

回答提醒:如果本帖被关闭无法回复,您有更好的答案帮助楼主解决,请发表至 源码区 可获得加分喔。
友情提醒:本版被采纳的主题可在 申请荣誉值 页面申请荣誉值,获得 1点 荣誉值,荣誉值可兑换荣誉会员、终身vip用户组。
快捷通道:申请荣誉值无答案申请取消悬赏投诉有答案未采纳为最佳
结帖率:100% (47/47)

签到天数: 3 天

发表于 2025-7-30 09:04:41 | 显示全部楼层   河北省秦皇岛市
  
窗口程序集名保 留  保 留备 注
电影爬虫   
变量名类 型数组备 注
device文本型  
host文本型  
t文本型  

子程序名返回值类型公开备 注
_启动子程序整数型 
.局部变量 爬虫, 电影爬虫
爬虫.init ()
返回 0
子程序名返回值类型公开备 注
getName文本型 
返回 "电影"
子程序名返回值类型公开备 注
init  
device = 取设备ID ()
host = 取host ()
t = 时间_取现行时间戳 ()
调试输出 ("设备ID: " + device)
调试输出 ("主机地址: " + host)
调试输出 ("时间戳: " + t)
子程序名返回值类型公开备 注
取设备ID文本型 
.局部变量 i, 整数型
.局部变量 结果, 文本型
置随机数种子 ()
计次循环首 (32, i)
结果 = 结果 + 字符 (取随机数 (97, 122))  ; a-z
计次循环尾
返回 结果
子程序名返回值类型公开备 注
取host文本型 
.局部变量 响应, 文本型
.局部变量 JSON, 类_json
.局部变量 结果, 文本型

' 获取域名信息
响应 = 网页_访问 ("https://app-site.ecoliving168.com/domain_v5.json")

如果真 (JSON.解析 (响应))
结果 = JSON.取属性对象 ("api_service").取数据文本 ()
信息框 ("解析域名信息失败!", 0, "错误")
结果 = "https://api.example.com"  ' 默认值

返回 结果
子程序名返回值类型公开备 注
url文本型 
DLL命令名返回值类型公开备 注
(暂未填写DLL命令名)   
DLL库文件名:
(未填写库文件名)
在DLL库中对应命令名:
(未填写命令名)
参数名类 型传址数组备 注
参数文本型
.局部变量 RSA, RSA加解密
.局部变量 公钥, 文本型
.局部变量 加密数据, 文本型

' 公钥
公钥 = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA02F/kPg5A2NX4qZ5JSns+bjhVMCC6JbTiTKpbgNgiXU+Kkorg6Dj76gS68gB8llhbUKCXjIdygnHPrxVHWfzmzisq9P9awmXBkCk74Skglx2LKHa/mNz9ivg6YzQ5pQFUEWS0DfomGBXVtqvBlOXMCRxp69oWaMsnfjnBV+0J7vHbXzUIkqBLdXSNfM9Ag5qdRDrJC3CqB65EJ3ARWVzZTTcXSdMW9i3qzEZPawPNPe5yPYbMZIoXLcrqvEZnRK1oak67/ihf7iwPJqdc+68ZYEmmdqwunOvRdjq89fQMVelmqcRD9RYe08v+xDxG9Co9z7hcXGTsUquMxkh29uNawIDAQAB"

' 初始化RSA
RSA.初始化 ()
RSA.导入公钥文本 (公钥)

' 加密参数
如果 (是否为空 (参数))
参数 = "default_params"

加密数据 = RSA.公钥加密 (参数, #RSA_PKCS1_PADDING )
返回 编码_BASE64编码 (加密数据)  ' 返回Base64编码的加密数据
子程序名返回值类型公开备 注
aes解密文本型 
DLL命令名返回值类型公开备 注
(暂未填写DLL命令名)   
DLL库文件名:
(未填写库文件名)
在DLL库中对应命令名:
(未填写命令名)
参数名类 型传址数组备 注
密文文本型
.局部变量 AES, AES加解密
.局部变量 key, 文本型
.局部变量 iv, 文本型
.局部变量 解密数据, 文本型

key = "e6d5de5fcc51f53d"
iv = "2f13eef7dfc6c613"

' 初始化AES
AES.初始化 ()
AES.设置密钥 (key, #AES_CBC_128 )
AES.设置IV (iv)

' 解密数据
解密数据 = AES.解密 (编码_BASE64解码 (密文))
返回 解密数据
子程序名返回值类型公开备 注
homeContent  
.局部变量 响应, 文本型
.局部变量 解密数据, 文本型
.局部变量 JSON, 类_json

' 获取配置数据
响应 = 网页_访问 (host + "/api/v1/app/config")

如果真 (响应 = "")
信息框 ("获取配置失败!", 0, "错误")
返回

' 解密数据
解密数据 = aes解密 (响应)

如果真 (解密数据 = "")
信息框 ("解密配置失败!", 0, "错误")
返回

' 解析JSON
如果真 (JSON.解析 (解密数据))
' 在这里处理解析后的JSON数据
调试输出 ("成功解析配置数据")
' 示例: 获取版本号
.局部变量 版本号, 文本型
版本号 = JSON.取属性值 ("version", "")
调试输出 ("应用版本: " + 版本号)
信息框 ("解析配置数据失败!", 0, "错误")


i支持库列表   支持库注释   
spec特殊功能支持库
dp1数据操作支持库一
e2ee  ' 需要E2EE支持库(未知支持库)


自己对照修改着写出来。
回复

使用道具 举报

结帖率:100% (12/12)
发表于 2025-7-30 09:10:40 | 显示全部楼层   江苏省苏州市
10倍jb帮你转
回复

使用道具 举报

结帖率:96% (52/54)

签到天数: 3 天

 楼主| 发表于 2025-7-30 09:19:05 | 显示全部楼层   内蒙古自治区赤峰市

只有500
回复

使用道具 举报

结帖率:54% (7/13)

签到天数: 8 天

发表于 2025-7-30 09:50:43 | 显示全部楼层   浙江省嘉兴市
都快当RMB使用了勒
回复

使用道具 举报

结帖率:100% (2/2)

签到天数: 5 天

发表于 2025-7-30 10:33:20 | 显示全部楼层   四川省乐山市
你贴出了一个 Python 爬虫脚本的代码片段,整体是一个继承自 `base.spider.Spider` 的爬虫类,用于抓取电影资源。这个脚本的主要功能包括:

---

### ✅ **功能概述**
1. **获取分类与筛选条件**(`homeContent`)
2. **获取首页推荐视频**(`homeVideoContent`)
3. **按分类分页获取视频列表**(`categoryContent`)
4. **获取视频详情与播放地址**(`detailContent`)
5. **搜索视频**(`searchContent`)
6. **解析播放链接**(`playerContent`)
7. **本地代理 m3u8 流**(`localProxy`)

---

###
回复

使用道具 举报

签到天数: 5 天

发表于 2025-7-30 10:45:10 | 显示全部楼层   安徽省滁州市
难不难就是烦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则 致发广告者

发布主题 收藏帖子 返回列表

sitemap| 易语言源码| 易语言教程| 易语言论坛| 易语言模块| 手机版| 广告投放| 精易论坛
拒绝任何人以任何形式在本论坛发表与中华人民共和国法律相抵触的言论,本站内容均为会员发表,并不代表精易立场!
论坛帖子内容仅用于技术交流学习和研究的目的,严禁用于非法目的,否则造成一切后果自负!如帖子内容侵害到你的权益,请联系我们!
防范网络诈骗,远离网络犯罪 违法和不良信息举报QQ: 793400750,邮箱:wp@125.la
网站简介:精易论坛成立于2009年,是一个程序设计学习交流技术论坛,隶属于揭阳市揭东区精易科技有限公司所有。
Powered by Discuz! X3.4 揭阳市揭东区精易科技有限公司 ( 粤ICP备2025452707号) 粤公网安备 44522102000125 增值电信业务经营许可证 粤B2-20192173

快速回复 返回顶部 返回列表