开启辅助访问 切换到宽版

精易论坛

 找回密码
 注册

QQ登录

只需一步,快速开始

用微信号发送消息登录论坛

新人指南 邀请好友注册 - 我关注人的新帖 教你赚取精币 - 每日签到


求职/招聘- 论坛接单- 开发者大厅

论坛版规 总版规 - 建议/投诉 - 应聘版主 - 精华帖总集 积分说明 - 禁言标准 - 有奖举报

查看: 752|回复: 42
收起左侧

[易语言模块源码] 真正意义上的协程异步并发请求

[复制链接]
结帖率:69% (9/13)
发表于 7 天前 | 显示全部楼层 |阅读模式   浙江省宁波市
分享例程
界面截图:
备注说明: -
本帖最后由 hanwenlou 于 2026-3-13 19:18 编辑

ScreenShot_2026-03-13_130923_094.png
CURL是个好东西啊 大家可以研究他的高并发 我的高并发就是通过它的思路搞得 只不过我的不是CURL
异步并发请求 就是数据发给服务器不用等待服务器响应结果立即执行下一个
我的异步方式是通过其他语言的协程调度 创建协程池把请求丢进池子 协程池发起请求遇到网络I/O就挂起发送下一个请求

不懂协程的去看看协程的基础教程 大致了解一下进程线程协程的基础就行
游客,如果您要查看本帖隐藏内容请回复

下载例子和结构说明
异步和同步.e (1.02 MB, 下载次数: 27, 售价: 2 枚 精币)

评分

参与人数 2精币 +2 收起 理由
cui870222829 + 1 感谢分享,很给力!~
zjbin1989 + 1 感谢分享,很给力!~

查看全部评分


友情提醒:请选择可信度高的模块,勿用未知模块,防止小人在模块内加入木马程序。【发现问题模块请到站务投诉】。
结帖率:69% (9/13)

签到天数: 14 天

 楼主| 发表于 3 天前 | 显示全部楼层   浙江省宁波市
[Golang] 纯文本查看 复制代码
package main

/*
#include <stdlib.h>
#include <string.h>
#include <stdint.h>

// 请求结构体
typedef struct {
    const char* url;
    const char* method;
    const char* body;
    int body_is_base64;
    const char* proxy;
    int follow_redirect;
    int verify_ssl;
    const char* tls_version;
    const char* ja3;
    const char* http_version;
    int timeout;
    const char* user_agent;
    const char* cookies_text;
    const char* headers_text;
    int identifier;
} RequestOptions;

// 响应结构体
typedef struct {
    int status_code;
    const char* status;
    int response_size;
    const char* content_type;
    const char* duration;
    const char* timestamp;
    const char* proxy_used;
    int redirect_count;
    const char* final_url;
    const char* error_message;
    const char* user_agent;
    const char* ja3_fingerprint;
    const char* ja3_hash;
    const char* retHeaderStr;
    const char* retCookiesStr;
    int identifier;
} ResponseInfo;
*/
import "C"
import (
        "crypto/md5"
        "crypto/rand"
        "encoding/base64"
        "encoding/hex"
        "fmt"
        "strconv"
        "strings"
        "sync"
        "sync/atomic"
        "time"
        "unsafe"

        "github.com/Danny-Dasilva/CycleTLS/cycletls"
)

// ============================ 常量定义 ============================
const (
        CookiesDelim   = ";"
        MaxQueueSize   = 10000
        MaxWorkers     = 100
        DefaultTimeout = 30
        MinTaskID      = 0x40000000
        MaxTaskID      = 0x7FFFFFFF
)

// ============================ 协程池管理器 ============================

type TaskResult struct {
        mu          sync.RWMutex
        done        bool
        doneChan    chan struct{}
        response    []byte
        status      int
        duration    string
        timestamp   string
        errorMsg    string
        headers     map[string]string
        cookies     map[string]string
        finalURL    string
        contentType string
        ja3Hash     string
        identifier  int
}

type AsyncTask struct {
        id     int
        params *RequestParams
        result *TaskResult
}

type AsyncMultiManager struct {
        mu            sync.RWMutex
        tasks         map[int]*TaskResult
        queue         chan *AsyncTask
        workers       int
        workerWg      sync.WaitGroup
        stopChan      chan struct{}
        activeWorkers int32
        initialized   bool
        shuttingDown  bool
        clientPool    sync.Pool
}

type RequestParams struct {
        URL            string
        Method         string
        Body           string
        BodyIsBase64   bool
        Proxy          string
        FollowRedirect bool
        VerifySSL      bool
        TLSVersion     string
        JA3            string
        HTTPVersion    string
        Timeout        int
        UserAgent      string
        Identifier     int
        Cookies        map[string]string
        Headers        map[string]string
}

type FullResponse struct {
        StatusCode     int
        Status         string
        ResponseSize   int
        ContentType    string
        Duration       string
        Timestamp      string
        ProxyUsed      string
        RedirectCount  int
        FinalURL       string
        ErrorMessage   string
        UserAgent      string
        Ja3Fingerprint string
        Ja3Hash        string
        Identifier     int
        RetHeaderStr   string
        RetCookiesStr  string
}

// ============================ 会话管理器 ============================

var (
        sessionManagers sync.Map // map[int]*AsyncMultiManager
        sessionMutex    sync.Mutex
)

// ============================ 辅助函数 ============================

func safeCString(cStr *C.char, defaultValue string) string {
        if cStr == nil {
                return defaultValue
        }
        str := C.GoString(cStr)
        if str == "" {
                return defaultValue
        }
        return str
}

func generateRandomSessionID() int {
        return generateRandomTaskID() // 重用taskID的生成逻辑
}

func generateRandomTaskID() int {
        var b [4]byte
        _, err := rand.Read(b[:])
        if err != nil {
                nano := time.Now().UnixNano()
                randomNum := int((nano >> 16) & 0x7FFFFFFF)
                if randomNum < 0 {
                        randomNum = -randomNum
                }
                randomNum = MinTaskID + (randomNum % (MaxTaskID - MinTaskID + 1))

                if randomNum < MinTaskID {
                        randomNum = MinTaskID
                } else if randomNum > MaxTaskID {
                        randomNum = MaxTaskID
                }
                return randomNum
        }

        randomNum := int(b[0])<<24 | int(b[1])<<16 | int(b[2])<<8 | int(b[3])

        if randomNum < 0 {
                randomNum = -randomNum
        }

        randomNum = MinTaskID + (randomNum % (MaxTaskID - MinTaskID + 1))

        if randomNum < MinTaskID {
                randomNum = MinTaskID
        } else if randomNum > MaxTaskID {
                randomNum = MaxTaskID
        }

        return randomNum
}

func safeGetClient(pool *sync.Pool) (cycletls.CycleTLS, bool) {
        obj := pool.Get()
        client, ok := obj.(cycletls.CycleTLS)
        return client, ok
}

// ============================ 异步管理器实现 ============================

func newAsyncMultiManager(queueSize, workers int) *AsyncMultiManager {
        if queueSize <= 0 {
                queueSize = 1000
        }
        if queueSize > MaxQueueSize {
                queueSize = MaxQueueSize
        }
        if workers <= 0 {
                workers = 10
        }
        if workers > MaxWorkers {
                workers = MaxWorkers
        }

        return &AsyncMultiManager{
                tasks:    make(map[int]*TaskResult, queueSize),
                queue:    make(chan *AsyncTask, queueSize),
                workers:  workers,
                stopChan: make(chan struct{}),
                clientPool: sync.Pool{
                        New: func() interface{} {
                                return cycletls.Init()
                        },
                },
        }
}

func (mgr *AsyncMultiManager) start() {
        mgr.mu.Lock()
        defer mgr.mu.Unlock()

        if mgr.shuttingDown || mgr.initialized {
                return
        }

        mgr.initialized = true

        for i := 0; i < mgr.workers; i++ {
                mgr.workerWg.Add(1)
                go mgr.workerRoutine()
        }
}

func (mgr *AsyncMultiManager) workerRoutine() {
        defer mgr.workerWg.Done()

        for {
                select {
                case task := <-mgr.queue:
                        if task == nil {
                                continue
                        }

                        atomic.AddInt32(&mgr.activeWorkers, 1)
                        mgr.processTask(task)
                        atomic.AddInt32(&mgr.activeWorkers, -1)

                case <-mgr.stopChan:
                        return
                }
        }
}

func (mgr *AsyncMultiManager) processTask(task *AsyncTask) {
        if task == nil || task.params == nil || task.result == nil {
                return
        }

        client, ok := safeGetClient(&mgr.clientPool)
        if !ok {
                task.result.mu.Lock()
                if !task.result.done {
                        task.result.errorMsg = "Failed to get HTTP client"
                        task.result.status = 0
                        task.result.duration = "0.000"
                        task.result.timestamp = strconv.FormatInt(time.Now().Unix(), 10)
                        task.result.identifier = task.params.Identifier
                        task.result.done = true
                        close(task.result.doneChan)
                }
                task.result.mu.Unlock()
                return
        }

        defer mgr.clientPool.Put(client)

        rawResponseBody, fullResponse := executeRequest(client, *task.params)

        task.result.mu.Lock()
        task.result.done = true
        task.result.response = rawResponseBody
        task.result.identifier = task.params.Identifier

        if fullResponse != nil {
                task.result.status = fullResponse.StatusCode
                task.result.duration = fullResponse.Duration
                task.result.timestamp = fullResponse.Timestamp
                task.result.errorMsg = fullResponse.ErrorMessage
                task.result.finalURL = fullResponse.FinalURL
                task.result.contentType = fullResponse.ContentType
                task.result.ja3Hash = fullResponse.Ja3Hash

                if fullResponse.RetHeaderStr != "" {
                        task.result.headers = parseHeadersText(fullResponse.RetHeaderStr)
                }
                if fullResponse.RetCookiesStr != "" {
                        task.result.cookies = parseCookiesText(fullResponse.RetCookiesStr)
                }
        }

        close(task.result.doneChan)
        task.result.mu.Unlock()
}

func (mgr *AsyncMultiManager) submitTask(task *AsyncTask) int {
        mgr.mu.RLock()
        if mgr.shuttingDown || !mgr.initialized {
                mgr.mu.RUnlock()
                return 0
        }
        mgr.mu.RUnlock()

        var taskID int
        maxAttempts := 5

        for attempt := 0; attempt < maxAttempts; attempt++ {
                taskID = generateRandomTaskID()

                mgr.mu.RLock()
                _, exists := mgr.tasks[taskID]
                mgr.mu.RUnlock()

                if !exists {
                        break
                }

                if attempt == maxAttempts-1 {
                        return 0
                }

                time.Sleep(time.Millisecond)
        }

        task.id = taskID

        result := &TaskResult{
                doneChan:   make(chan struct{}),
                headers:    make(map[string]string),
                cookies:    make(map[string]string),
                identifier: task.params.Identifier,
        }

        task.result = result

        mgr.mu.Lock()
        mgr.tasks[taskID] = result
        mgr.mu.Unlock()

        select {
        case mgr.queue <- task:
                return taskID
        default:
                mgr.mu.Lock()
                delete(mgr.tasks, taskID)
                mgr.mu.Unlock()
                return 0
        }
}

func (mgr *AsyncMultiManager) getTaskResult(taskID int, timeoutMs int) (*TaskResult, bool) {
        if taskID <= 0 {
                return nil, false
        }

        mgr.mu.RLock()
        result, exists := mgr.tasks[taskID]
        mgr.mu.RUnlock()

        if !exists || result == nil {
                return nil, false
        }

        result.mu.RLock()
        done := result.done
        result.mu.RUnlock()

        if done {
                mgr.mu.Lock()
                delete(mgr.tasks, taskID)
                mgr.mu.Unlock()
                return result, true
        }

        var timeout time.Duration
        if timeoutMs > 0 {
                timeout = time.Duration(timeoutMs) * time.Millisecond
        } else {
                timeout = 30 * time.Second
        }

        select {
        case <-result.doneChan:
                mgr.mu.Lock()
                delete(mgr.tasks, taskID)
                mgr.mu.Unlock()
                return result, true
        case <-time.After(timeout):
                return nil, false
        }
}

func (mgr *AsyncMultiManager) cancelTask(taskID int) bool {
        if taskID <= 0 {
                return false
        }

        mgr.mu.Lock()
        defer mgr.mu.Unlock()

        if result, exists := mgr.tasks[taskID]; exists {
                result.mu.Lock()
                if !result.done {
                        result.errorMsg = "Task cancelled"
                        result.done = true
                        close(result.doneChan)
                }
                result.mu.Unlock()

                delete(mgr.tasks, taskID)
                return true
        }

        return false
}

func (mgr *AsyncMultiManager) shutdown() {
        mgr.mu.Lock()
        if mgr.shuttingDown {
                mgr.mu.Unlock()
                return
        }
        mgr.shuttingDown = true
        mgr.mu.Unlock()

        close(mgr.stopChan)
        mgr.workerWg.Wait()

        close(mgr.queue)

        mgr.mu.Lock()
        for taskID, result := range mgr.tasks {
                result.mu.Lock()
                if !result.done {
                        result.errorMsg = "Task cancelled during shutdown"
                        result.done = true
                        close(result.doneChan)
                }
                result.mu.Unlock()
                delete(mgr.tasks, taskID)
        }
        mgr.mu.Unlock()
}

// ============================ 工具函数 ============================

func parseHeadersText(headersText string) map[string]string {
        headers := make(map[string]string)
        if headersText == "" {
                return headers
        }

        text := strings.ReplaceAll(headersText, "\r\n", "\n")
        text = strings.ReplaceAll(text, "\r", "\n")

        lines := strings.Split(text, "\n")

        for _, line := range lines {
                line = strings.TrimSpace(line)
                if line == "" {
                        continue
                }

                colonIndex := strings.Index(line, ":")
                if colonIndex <= 0 {
                        continue
                }

                key := strings.TrimSpace(line[:colonIndex])
                if key == "" {
                        continue
                }

                value := ""
                if colonIndex+1 < len(line) {
                        value = strings.TrimSpace(line[colonIndex+1:])
                }

                headers[key] = value
        }

        return headers
}

func parseCookiesText(cookiesText string) map[string]string {
        cookies := make(map[string]string)
        if cookiesText == "" {
                return cookies
        }

        pairs := strings.Split(cookiesText, CookiesDelim)
        for _, pair := range pairs {
                pair = strings.TrimSpace(pair)
                if pair == "" {
                        continue
                }

                parts := strings.SplitN(pair, "=", 2)
                if len(parts) == 2 {
                        key := strings.TrimSpace(parts[0])
                        value := strings.TrimSpace(parts[1])
                        if key != "" {
                                cookies[key] = value
                        }
                }
        }

        return cookies
}

func initResponseInfo(responseInfo *C.ResponseInfo) {
        if responseInfo == nil {
                return
        }

        responseInfo.status_code = 0
        responseInfo.response_size = 0
        responseInfo.redirect_count = 0
        responseInfo.identifier = 0

        responseInfo.status = nil
        responseInfo.content_type = nil
        responseInfo.duration = nil
        responseInfo.timestamp = nil
        responseInfo.proxy_used = nil
        responseInfo.final_url = nil
        responseInfo.error_message = nil
        responseInfo.user_agent = nil
        responseInfo.ja3_fingerprint = nil
        responseInfo.ja3_hash = nil
        responseInfo.retHeaderStr = nil
        responseInfo.retCookiesStr = nil
}

func executeRequest(client cycletls.CycleTLS, params RequestParams) ([]byte, *FullResponse) {
        startTime := time.Now()
        params = setDefaultValues(params)
        options := buildCycleTLSOptions(params)

        response, err := client.Do(params.URL, options, params.Method)
        duration := time.Since(startTime).Seconds()

        var rawResponseBody []byte
        if err == nil {
                rawResponseBody = []byte(response.Body)
        } else {
                rawResponseBody = []byte("ERROR: " + err.Error())
        }

        fullResponse := buildFullResponse(params, response, err, duration)
        return rawResponseBody, fullResponse
}

func buildFullResponse(params RequestParams, response cycletls.Response, err error, duration float64) *FullResponse {
        result := &FullResponse{
                UserAgent:      params.UserAgent,
                Ja3Fingerprint: params.JA3,
                Duration:       fmt.Sprintf("%.6f", duration),
                Timestamp:      strconv.FormatInt(time.Now().Unix(), 10),
                ProxyUsed:      params.Proxy,
                FinalURL:       params.URL,
                RedirectCount:  0,
                Identifier:     params.Identifier,
        }

        if err != nil {
                result.ErrorMessage = err.Error()
                result.Status = "Network Error"
                result.StatusCode = 0
                result.ContentType = "text/plain"
                result.ResponseSize = len(result.ErrorMessage)
                result.Ja3Hash = calculateMD5Hash(params.JA3)
                return result
        }

        result.StatusCode = response.Status
        result.Status = fmt.Sprintf("%d %s", response.Status, httpStatusText(response.Status))
        result.ContentType = getContentType(response.Headers)
        result.ResponseSize = len(response.Body)

        if finalURL, ok := response.Headers["Location"]; ok && result.StatusCode >= 300 && result.StatusCode < 400 {
                result.FinalURL = finalURL
                result.RedirectCount = 1
        }

        result.RetHeaderStr = mapToHeaderText(response.Headers)
        result.RetCookiesStr = mapToCookiesText(response.Headers)
        result.Ja3Hash = calculateMD5Hash(params.JA3)

        return result
}

func calculateMD5Hash(input string) string {
        if input == "" {
                return ""
        }
        hash := md5.Sum([]byte(input))
        return hex.EncodeToString(hash[:])
}

func mapToHeaderText(headers map[string]string) string {
        if len(headers) == 0 {
                return ""
        }

        var lines []string
        for key, value := range headers {
                if key == "FollowRedirect" || key == "Cookie" {
                        continue
                }
                line := fmt.Sprintf("%s: %s", key, value)
                lines = append(lines, line)
        }

        return strings.Join(lines, "\r\n")
}

func mapToCookiesText(headers map[string]string) string {
        cookies := parseCookiesFromHeaders(headers)
        return mapToCookiesTextFromMap(cookies)
}

func mapToCookiesTextFromMap(cookies map[string]string) string {
        if len(cookies) == 0 {
                return ""
        }

        var pairs []string
        for name, value := range cookies {
                pairs = append(pairs, fmt.Sprintf("%s=%s", name, value))
        }

        return strings.Join(pairs, CookiesDelim)
}

func parseCookiesFromHeaders(headers map[string]string) map[string]string {
        cookies := make(map[string]string)

        var setCookieValues []string
        for key, value := range headers {
                if strings.EqualFold(key, "Set-Cookie") {
                        setCookieValues = append(setCookieValues, value)
                }
        }

        for _, cookieStr := range setCookieValues {
                parts := strings.Split(cookieStr, ";")
                if len(parts) > 0 {
                        keyValue := strings.TrimSpace(parts[0])
                        if keyValue != "" {
                                kvParts := strings.SplitN(keyValue, "=", 2)
                                if len(kvParts) == 2 {
                                        name := strings.TrimSpace(kvParts[0])
                                        value := strings.TrimSpace(kvParts[1])
                                        cookies[name] = value
                                }
                        }
                }
        }

        return cookies
}

func httpStatusText(code int) string {
        statusMap := map[int]string{
                100: "Continue", 101: "Switching Protocols", 102: "Processing", 103: "Early Hints",
                200: "OK", 201: "Created", 202: "Accepted", 203: "Non-Authoritative Information",
                204: "No Content", 205: "Reset Content", 206: "Partial Content", 207: "Multi-Status",
                208: "Already Reported", 226: "IM Used",
                300: "Multiple Choices", 301: "Moved Permanently", 302: "Found", 303: "See Other",
                304: "Not Modified", 305: "Use Proxy", 307: "Temporary Redirect", 308: "Permanent Redirect",
                400: "Bad Request", 401: "Unauthorized", 402: "Payment Required", 403: "Forbidden",
                404: "Not Found", 405: "Method Not Allowed", 406: "Not Acceptable", 407: "Proxy Authentication Required",
                408: "Request Timeout", 409: "Conflict", 410: "Gone", 411: "Length Required",
                412: "Precondition Failed", 413: "Payload Too Large", 414: "URI Too Long", 415: "Unsupported Media Type",
                416: "Range Not Satisfiable", 417: "Expectation Failed", 418: "I'm a teapot", 421: "Misdirected Request",
                422: "Unprocessable Entity", 423: "Locked", 424: "Failed Dependency", 425: "Too Early",
                426: "Upgrade Required", 428: "Precondition Required", 429: "Too Many Requests", 431: "Request Header Fields Too Large",
                451: "Unavailable For Legal Reasons",
                500: "Internal Server Error", 501: "Not Implemented", 502: "Bad Gateway", 503: "Service Unavailable",
                504: "Gateway Timeout", 505: "HTTP Version Not Supported", 506: "Variant Also Negotiates", 507: "Insufficient Storage",
                508: "Loop Detected", 510: "Not Extended", 511: "Network Authentication Required",
        }

        if text, ok := statusMap[code]; ok {
                return text
        }
        return fmt.Sprintf("Unknown Status Code: %d", code)
}

func getContentType(headers map[string]string) string {
        for _, key := range []string{"Content-Type", "content-type", "Content-type", "ContentType"} {
                if val, ok := headers[key]; ok {
                        if idx := strings.Index(val, ";"); idx != -1 {
                                return strings.TrimSpace(val[:idx])
                        }
                        return val
                }
        }
        return ""
}

func setDefaultValues(params RequestParams) RequestParams {
        if params.Method == "" {
                params.Method = "GET"
        } else {
                params.Method = strings.ToUpper(params.Method)
        }

        if params.JA3 == "" {
                params.JA3 = "771,4865-4867-4866-49195-49199-52393-52392-49196-49200-49162-49161-49171-49172-51-57-47-53-10,0-23-65281-10-11-35-16-5-51-43-13-45-28-21,29-23-24-25-256-257,0"
        }

        if params.UserAgent == "" {
                params.UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
        }

        if params.Timeout == 0 {
                params.Timeout = 30
        }

        return params
}

func buildCycleTLSOptions(params RequestParams) cycletls.Options {
        opts := cycletls.Options{
                Ja3:       params.JA3,
                UserAgent: params.UserAgent,
                Proxy:     params.Proxy,
                Timeout:   params.Timeout,
                Headers:   make(map[string]string),
        }

        if params.BodyIsBase64 && params.Body != "" {
                if decoded, err := base64.StdEncoding.DecodeString(params.Body); err == nil {
                        opts.Body = string(decoded)
                } else {
                        opts.Body = params.Body
                }
        } else {
                opts.Body = params.Body
        }

        for key, value := range params.Headers {
                opts.Headers[key] = value
        }

        if len(params.Cookies) > 0 {
                var cookiePairs []string
                for name, value := range params.Cookies {
                        cookiePairs = append(cookiePairs, name+"="+value)
                }
                opts.Headers["Cookie"] = strings.Join(cookiePairs, "; ")
        }

        return opts
}

func parseRequestOptions(options *C.RequestOptions) *RequestParams {
        if options == nil {
                return nil
        }

        cookiesText := safeCString(options.cookies_text, "")
        headersText := safeCString(options.headers_text, "")

        params := &RequestParams{
                URL:            safeCString(options.url, ""),
                Method:         safeCString(options.method, "GET"),
                Body:           safeCString(options.body, ""),
                BodyIsBase64:   options.body_is_base64 != 0,
                Proxy:          safeCString(options.proxy, ""),
                FollowRedirect: options.follow_redirect != 0,
                VerifySSL:      options.verify_ssl != 0,
                TLSVersion:     safeCString(options.tls_version, "1.3"),
                JA3:            safeCString(options.ja3, ""),
                HTTPVersion:    safeCString(options.http_version, "1.1"),
                Timeout:        int(options.timeout),
                UserAgent:      safeCString(options.user_agent, ""),
                Identifier:     int(options.identifier),
                Cookies:        parseCookiesText(cookiesText),
                Headers:        parseHeadersText(headersText),
        }

        if params.URL == "" {
                return nil
        }

        if params.Timeout <= 0 {
                params.Timeout = DefaultTimeout
        }

        return params
}

// 获取系统CPU核心数
func getCPUCount() int {
        // 模拟获取CPU核心数,实际使用runtime.NumCPU()
        // 这里简化处理,返回一个合理的值
        // 在实际代码中可以使用:return runtime.NumCPU()
        return 512 // 根据你提供的512CPU
}

// 根据workers自动计算queueSize
func calculateQueueSizeFromWorkers(workers int) int {
        if workers <= 0 {
                return 1000
        }

        // 智能算法:根据workers数量计算queueSize
        switch {
        case workers < 10:
                return workers * 100
        case workers < 50:
                return workers * 50
        case workers < 200:
                return workers * 20
        case workers < 1000:
                return workers * 10
        default:
                return workers * 5
        }
}

// 获取推荐的workers数量
func getRecommendedWorkers() int {
        cpuCount := getCPUCount()

        // 根据CPU核心数推荐workers数量
        switch {
        case cpuCount >= 256: // 512CPU
                return 500
        case cpuCount >= 128:
                return 250
        case cpuCount >= 64:
                return 100
        case cpuCount >= 32:
                return 50
        case cpuCount >= 16:
                return 20
        case cpuCount >= 8:
                return 10
        default:
                return 5
        }
}

// 获取推荐配置
func getRecommendedConfig() (queueSize, workers int) {
        workers = getRecommendedWorkers()
        queueSize = calculateQueueSizeFromWorkers(workers)
        return
}

// 验证和调整配置
func validateAndAdjustConfig(queueSize, workers int) (int, int) {
        // workers限制
        if workers < 1 {
                workers = 1
        }
        if workers > 2000 { // 你的最大并发限制
                workers = 2000
        }

        // queueSize限制
        if queueSize < 1 {
                queueSize = 1
        }
        if queueSize > 100000 { // 合理限制
                queueSize = 100000
        }

        return queueSize, workers
}

// ============================ 导出函数 ============================
// 将以下函数添加到你的代码的导出函数部分

// 完全自动配置:根据系统性能自动计算最优配置

//export CTLSMultiCreateSessionFullAuto
func CTLSMultiCreateSessionFullAuto() C.int {
        // 获取推荐配置
        queueSize, workers := getRecommendedConfig()

        // 验证和调整配置
        queueSize, workers = validateAndAdjustConfig(queueSize, workers)

        // 调用原有的CTLSMultiCreateSession
        return CTLSMultiCreateSession(C.int(queueSize), C.int(workers))
}

// 自动配置:只指定workers,自动计算queueSize

//export CTLSMultiCreateSessionAuto
func CTLSMultiCreateSessionAuto(workers C.int) C.int {
        workersInt := int(workers)

        // 如果workers为0,使用推荐配置
        if workersInt <= 0 {
                workersInt = getRecommendedWorkers()
        }

        // 自动计算queueSize
        queueSizeInt := calculateQueueSizeFromWorkers(workersInt)

        // 验证和调整配置
        queueSizeInt, workersInt = validateAndAdjustConfig(queueSizeInt, workersInt)

        // 调用原有的CTLSMultiCreateSession
        return CTLSMultiCreateSession(C.int(queueSizeInt), C.int(workersInt))
}

// 获取系统信息和推荐配置

//export CTLSMultiGetSystemInfo
func CTLSMultiGetSystemInfo(
        cpuCount *C.int,
        recommendedWorkers *C.int,
        recommendedQueueSize *C.int,
        maxWorkers *C.int,
        maxQueueSize *C.int) C.int {

        // CPU核心数
        if cpuCount != nil {
                *cpuCount = C.int(getCPUCount())
        }

        // 推荐配置
        if recommendedWorkers != nil || recommendedQueueSize != nil {
                recQueueSize, recWorkers := getRecommendedConfig()
                if recommendedQueueSize != nil {
                        *recommendedQueueSize = C.int(recQueueSize)
                }
                if recommendedWorkers != nil {
                        *recommendedWorkers = C.int(recWorkers)
                }
        }

        // 最大限制
        if maxWorkers != nil {
                *maxWorkers = C.int(2000) // 你的最大并发限制
        }

        if maxQueueSize != nil {
                *maxQueueSize = C.int(100000) // 合理限制
        }

        return 1
}

// 根据workers计算推荐的queueSize

//export CTLSMultiCalculateQueueSize
func CTLSMultiCalculateQueueSize(workers C.int) C.int {
        workersInt := int(workers)
        if workersInt <= 0 {
                return 0
        }

        // 计算queueSize
        queueSizeInt := calculateQueueSizeFromWorkers(workersInt)

        // 验证调整
        queueSizeInt, _ = validateAndAdjustConfig(queueSizeInt, workersInt)

        return C.int(queueSizeInt)
}

// 注意:你原有的所有其他函数保持不变

//export CTLSMultiCreateSession
func CTLSMultiCreateSession(queueSize, workers C.int) C.int {
        sessionMutex.Lock()
        defer sessionMutex.Unlock()

        var sessionID int
        maxAttempts := 5

        for attempt := 0; attempt < maxAttempts; attempt++ {
                sessionID = generateRandomSessionID()

                // 检查SessionID是否已存在
                if _, exists := sessionManagers.Load(sessionID); !exists {
                        break
                }

                if attempt == maxAttempts-1 {
                        return 0
                }

                time.Sleep(time.Millisecond)
        }

        mgr := newAsyncMultiManager(int(queueSize), int(workers))
        mgr.start()

        sessionManagers.Store(sessionID, mgr)
        return C.int(sessionID)
}

//export CTLSMultiDestroySession
func CTLSMultiDestroySession(sessionID C.int) {
        sid := int(sessionID)
        if sid == 0 {
                return
        }

        if val, ok := sessionManagers.Load(sid); ok {
                if mgr, ok := val.(*AsyncMultiManager); ok {
                        mgr.shutdown()
                }
                sessionManagers.Delete(sid)
        }
}

//export CTLSMultiResetSession
func CTLSMultiResetSession(sessionID C.int, newQueueSize, newWorkers C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        // 参数校验和规范化
        queueSize := int(newQueueSize)
        workers := int(newWorkers)

        if queueSize <= 0 {
                queueSize = 1000
        }
        if queueSize > MaxQueueSize {
                queueSize = MaxQueueSize
        }
        if workers <= 0 {
                workers = 10
        }
        if workers > MaxWorkers {
                workers = MaxWorkers
        }

        // 获取现有管理器
        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        oldMgr, ok := val.(*AsyncMultiManager)
        if !ok || oldMgr == nil {
                return 0
        }

        // 检查是否真的需要重置
        oldMgr.mu.RLock()
        sameConfig := oldMgr.workers == workers && cap(oldMgr.queue) == queueSize
        oldMgr.mu.RUnlock()

        if sameConfig {
                return 1
        }

        // 创建新管理器
        newMgr := newAsyncMultiManager(queueSize, workers)

        // 复制未完成的任务
        oldMgr.mu.RLock()

        // 1. 收集未完成的任务
        var pendingTasks []*AsyncTask
        for taskID, result := range oldMgr.tasks {
                result.mu.RLock()
                done := result.done
                result.mu.RUnlock()

                if !done {
                        // 重新包装任务
                        task := &AsyncTask{
                                id:     taskID,
                                result: result,
                        }
                        pendingTasks = append(pendingTasks, task)

                        // 添加到新管理器的任务映射
                        newMgr.mu.Lock()
                        newMgr.tasks[taskID] = result
                        newMgr.mu.Unlock()
                }
        }

        oldMgr.mu.RUnlock()

        // 2. 将未完成的任务重新放入队列
        for _, task := range pendingTasks {
                select {
                case newMgr.queue <- task:
                        // 成功入队
                default:
                        // 队列已满,从任务映射中移除
                        newMgr.mu.Lock()
                        delete(newMgr.tasks, task.id)
                        newMgr.mu.Unlock()
                }
        }

        // 3. 启动新管理器
        newMgr.start()

        // 4. 替换管理器
        sessionManagers.Store(sid, newMgr)

        // 5. 异步关闭旧管理器
        go func() {
                oldMgr.shutdown()
        }()

        return 1
}

//export CTLSMultiSubmit
func CTLSMultiSubmit(sessionID C.int, options *C.RequestOptions) C.int {
        sid := int(sessionID)
        if sid == 0 || options == nil {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        params := parseRequestOptions(options)
        if params == nil || params.URL == "" {
                return 0
        }

        task := &AsyncTask{
                params: params,
        }

        taskID := mgr.submitTask(task)
        return C.int(taskID)
}

//export CTLSMultiGetResult
func CTLSMultiGetResult(sessionID C.int, taskID C.int, timeoutMs C.int,
        resultSize *C.int, responseInfo *C.ResponseInfo) *C.char {

        sid := int(sessionID)
        if sid == 0 || taskID == 0 {
                return nil
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return nil
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return nil
        }

        result, ok := mgr.getTaskResult(int(taskID), int(timeoutMs))
        if !ok || result == nil {
                return nil
        }

        result.mu.RLock()
        defer result.mu.RUnlock()

        if resultSize != nil {
                *resultSize = C.int(len(result.response))
        }

        if responseInfo != nil {
                initResponseInfo(responseInfo)

                responseInfo.status_code = C.int(result.status)
                responseInfo.response_size = C.int(len(result.response))
                responseInfo.identifier = C.int(result.identifier)

                if result.status > 0 {
                        statusText := fmt.Sprintf("%d %s", result.status, httpStatusText(result.status))
                        cstr := C.CString(statusText)
                        responseInfo.status = cstr
                }

                if result.contentType != "" {
                        cstr := C.CString(result.contentType)
                        responseInfo.content_type = cstr
                }

                if result.duration != "" {
                        cstr := C.CString(result.duration)
                        responseInfo.duration = cstr
                }

                if result.timestamp != "" {
                        cstr := C.CString(result.timestamp)
                        responseInfo.timestamp = cstr
                }

                if result.errorMsg != "" {
                        cstr := C.CString(result.errorMsg)
                        responseInfo.error_message = cstr
                }

                if result.finalURL != "" {
                        cstr := C.CString(result.finalURL)
                        responseInfo.final_url = cstr
                }

                if result.ja3Hash != "" {
                        cstr := C.CString(result.ja3Hash)
                        responseInfo.ja3_hash = cstr
                }

                if len(result.headers) > 0 {
                        var headerLines []string
                        for k, v := range result.headers {
                                if k != "Cookie" && k != "Set-Cookie" {
                                        headerLines = append(headerLines, fmt.Sprintf("%s: %s", k, v))
                                }
                        }
                        if len(headerLines) > 0 {
                                cstr := C.CString(strings.Join(headerLines, "\r\n"))
                                responseInfo.retHeaderStr = cstr
                        }
                }

                if len(result.cookies) > 0 {
                        var cookiePairs []string
                        for k, v := range result.cookies {
                                cookiePairs = append(cookiePairs, fmt.Sprintf("%s=%s", k, v))
                        }
                        if len(cookiePairs) > 0 {
                                cstr := C.CString(strings.Join(cookiePairs, ";"))
                                responseInfo.retCookiesStr = cstr
                        }
                }
        }

        if len(result.response) == 0 {
                return nil
        }

        bodyPtr := C.malloc(C.size_t(len(result.response)))
        if bodyPtr == nil {
                return nil
        }

        C.memcpy(bodyPtr, unsafe.Pointer(&result.response[0]), C.size_t(len(result.response)))

        return (*C.char)(bodyPtr)
}

//export CTLSMultiCancel
func CTLSMultiCancel(sessionID C.int, taskID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 || taskID == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        if mgr.cancelTask(int(taskID)) {
                return 1
        }
        return 0
}

//export CTLSMultiGetStatus
func CTLSMultiGetStatus(sessionID C.int, taskID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 || taskID == 0 {
                return -1
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return -2
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return -2
        }

        mgr.mu.RLock()
        result, exists := mgr.tasks[int(taskID)]
        mgr.mu.RUnlock()

        if !exists {
                return 0
        }

        result.mu.RLock()
        done := result.done
        result.mu.RUnlock()

        if done {
                return 1
        }
        return 2
}

//export CTLSMultiGetPendingCount
func CTLSMultiGetPendingCount(sessionID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        mgr.mu.RLock()
        defer mgr.mu.RUnlock()

        pendingCount := 0
        for _, result := range mgr.tasks {
                result.mu.RLock()
                if !result.done {
                        pendingCount++
                }
                result.mu.RUnlock()
        }

        return C.int(pendingCount)
}

//export CTLSMultiGetTotalCount
func CTLSMultiGetTotalCount(sessionID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        mgr.mu.RLock()
        defer mgr.mu.RUnlock()

        return C.int(len(mgr.tasks))
}

//export CTLSMultiGetCompletedCount
func CTLSMultiGetCompletedCount(sessionID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        mgr.mu.RLock()
        defer mgr.mu.RUnlock()

        completedCount := 0
        for _, result := range mgr.tasks {
                result.mu.RLock()
                if result.done {
                        completedCount++
                }
                result.mu.RUnlock()
        }

        return C.int(completedCount)
}

//export CTLSMultiGetActiveWorkers
func CTLSMultiGetActiveWorkers(sessionID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        return C.int(atomic.LoadInt32(&mgr.activeWorkers))
}

//export CTLSMultiGetQueueSize
func CTLSMultiGetQueueSize(sessionID C.int) C.int {
        sid := int(sessionID)
        if sid == 0 {
                return 0
        }

        val, ok := sessionManagers.Load(sid)
        if !ok {
                return 0
        }

        mgr, ok := val.(*AsyncMultiManager)
        if !ok || mgr == nil {
                return 0
        }

        return C.int(len(mgr.queue))
}

//export CTLSRequest
func CTLSRequest(options *C.RequestOptions, resultSize *C.int, responseInfo *C.ResponseInfo) *C.char {
        if resultSize != nil {
                *resultSize = 0
        }

        if responseInfo != nil {
                initResponseInfo(responseInfo)
        }

        if options == nil || options.url == nil {
                return nil
        }

        urlStr := C.GoString(options.url)
        if urlStr == "" {
                return nil
        }

        cookiesText := safeCString(options.cookies_text, "")
        headersText := safeCString(options.headers_text, "")

        params := RequestParams{
                URL:            urlStr,
                Method:         safeCString(options.method, "GET"),
                Body:           safeCString(options.body, ""),
                BodyIsBase64:   options.body_is_base64 != 0,
                Proxy:          safeCString(options.proxy, ""),
                FollowRedirect: options.follow_redirect != 0,
                VerifySSL:      options.verify_ssl != 0,
                TLSVersion:     safeCString(options.tls_version, "1.3"),
                JA3:            safeCString(options.ja3, ""),
                HTTPVersion:    safeCString(options.http_version, "1.1"),
                Timeout:        int(options.timeout),
                UserAgent:      safeCString(options.user_agent, ""),
                Identifier:     int(options.identifier),
                Cookies:        parseCookiesText(cookiesText),
                Headers:        parseHeadersText(headersText),
        }

        if params.URL == "" {
                return nil
        }

        client := cycletls.Init()
        defer client.Close()

        rawResponseBody, fullResponse := executeRequest(client, params)
        if fullResponse == nil {
                return nil
        }

        if resultSize != nil {
                *resultSize = C.int(len(rawResponseBody))
        }

        if responseInfo != nil {
                responseInfo.status_code = C.int(fullResponse.StatusCode)
                responseInfo.response_size = C.int(len(rawResponseBody))
                responseInfo.redirect_count = C.int(fullResponse.RedirectCount)
                responseInfo.identifier = C.int(fullResponse.Identifier)

                if fullResponse.Status != "" {
                        cstr := C.CString(fullResponse.Status)
                        responseInfo.status = cstr
                }
                if fullResponse.ContentType != "" {
                        cstr := C.CString(fullResponse.ContentType)
                        responseInfo.content_type = cstr
                }
                if fullResponse.Duration != "" {
                        cstr := C.CString(fullResponse.Duration)
                        responseInfo.duration = cstr
                }
                if fullResponse.Timestamp != "" {
                        cstr := C.CString(fullResponse.Timestamp)
                        responseInfo.timestamp = cstr
                }
                if fullResponse.ProxyUsed != "" {
                        cstr := C.CString(fullResponse.ProxyUsed)
                        responseInfo.proxy_used = cstr
                }
                if fullResponse.FinalURL != "" {
                        cstr := C.CString(fullResponse.FinalURL)
                        responseInfo.final_url = cstr
                }
                if fullResponse.ErrorMessage != "" {
                        cstr := C.CString(fullResponse.ErrorMessage)
                        responseInfo.error_message = cstr
                }
                if fullResponse.UserAgent != "" {
                        cstr := C.CString(fullResponse.UserAgent)
                        responseInfo.user_agent = cstr
                }
                if fullResponse.Ja3Fingerprint != "" {
                        cstr := C.CString(fullResponse.Ja3Fingerprint)
                        responseInfo.ja3_fingerprint = cstr
                }
                if fullResponse.Ja3Hash != "" {
                        cstr := C.CString(fullResponse.Ja3Hash)
                        responseInfo.ja3_hash = cstr
                }
                if fullResponse.RetHeaderStr != "" {
                        cstr := C.CString(fullResponse.RetHeaderStr)
                        responseInfo.retHeaderStr = cstr
                }
                if fullResponse.RetCookiesStr != "" {
                        cstr := C.CString(fullResponse.RetCookiesStr)
                        responseInfo.retCookiesStr = cstr
                }
        }

        if len(rawResponseBody) == 0 {
                return nil
        }

        bodyPtr := C.malloc(C.size_t(len(rawResponseBody)))
        if bodyPtr == nil {
                return nil
        }

        C.memcpy(bodyPtr, unsafe.Pointer(&rawResponseBody[0]), C.size_t(len(rawResponseBody)))

        return (*C.char)(bodyPtr)
}

//export FreeResponseInfo
func FreeResponseInfo(responseInfo *C.ResponseInfo) {
        if responseInfo == nil {
                return
        }

        fields := []**C.char{
                &responseInfo.status,
                &responseInfo.content_type,
                &responseInfo.duration,
                &responseInfo.timestamp,
                &responseInfo.proxy_used,
                &responseInfo.final_url,
                &responseInfo.error_message,
                &responseInfo.user_agent,
                &responseInfo.ja3_fingerprint,
                &responseInfo.ja3_hash,
                &responseInfo.retHeaderStr,
                &responseInfo.retCookiesStr,
        }

        for _, field := range fields {
                if *field != nil {
                        C.free(unsafe.Pointer(*field))
                        *field = nil
                }
        }
}

//export FreeBuffer
func FreeBuffer(buffer *C.char) {
        if buffer != nil {
                C.free(unsafe.Pointer(buffer))
        }
}

//export DLLCleanup
func DLLCleanup() {
        sessionManagers.Range(func(key, value interface{}) bool {
                if mgr, ok := value.(*AsyncMultiManager); ok {
                        mgr.shutdown()
                }
                sessionManagers.Delete(key)
                return true
        })
}

func main() {
        
}


代码公开了 会的自然会 不会的别问我哈 我也不会
回复 支持 反对

使用道具 举报

签到天数: 12 天

发表于 昨天 22:22 | 显示全部楼层   广西壮族自治区柳州市
看看怎么样
回复 支持 反对

使用道具 举报

签到天数: 12 天

发表于 昨天 22:18 | 显示全部楼层   江苏省淮安市
111222233336665544
回复 支持 反对

使用道具 举报

结帖率:94% (83/88)

签到天数: 13 天

发表于 昨天 21:19 | 显示全部楼层   美国
进程线程协程的基础就行
回复 支持 反对

使用道具 举报

结帖率:100% (4/4)

签到天数: 20 天

发表于 昨天 18:58 | 显示全部楼层   广西壮族自治区玉林市
我看看怎么个事
回复 支持 反对

使用道具 举报

结帖率:77% (10/13)

签到天数: 14 天

发表于 昨天 18:54 | 显示全部楼层   福建省宁德市
66666666666666
回复 支持 反对

使用道具 举报

发表于 昨天 18:19 | 显示全部楼层   湖北省武汉市
学习学习学习学习学习学习学习学习
回复 支持 反对

使用道具 举报

结帖率:100% (1/1)

签到天数: 17 天

发表于 昨天 16:55 | 显示全部楼层   广东省深圳市

感谢分享,很给力!~
回复 支持 反对

使用道具 举报

结帖率:75% (3/4)

签到天数: 3 天

发表于 昨天 13:28 | 显示全部楼层   河北省张家口市
我看看怎么个事
回复 支持 反对

使用道具 举报

结帖率:50% (2/4)

签到天数: 19 天

发表于 昨天 12:48 | 显示全部楼层   广东省东莞市
路过看看,学习学习,感谢分享
回复 支持 反对

使用道具 举报

签到天数: 11 天

发表于 昨天 11:24 | 显示全部楼层   广东省佛山市

感谢分享
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则 致发广告者

发布主题 收藏帖子 返回列表

sitemap| 易语言源码| 易语言教程| 易语言论坛| 易语言模块| 手机版| 广告投放| 精易论坛
拒绝任何人以任何形式在本论坛发表与中华人民共和国法律相抵触的言论,本站内容均为会员发表,并不代表精易立场!
论坛帖子内容仅用于技术交流学习和研究的目的,严禁用于非法目的,否则造成一切后果自负!如帖子内容侵害到你的权益,请联系我们!
防范网络诈骗,远离网络犯罪 违法和不良信息举报QQ: 793400750,邮箱:wp@125.la
网站简介:精易论坛成立于2009年,是一个程序设计学习交流技术论坛,隶属于揭阳市揭东区精易科技有限公司所有。
Powered by Discuz! X3.4 揭阳市揭东区精易科技有限公司 ( 粤ICP备2025452707号) 粤公网安备 44522102000125 增值电信业务经营许可证 粤B2-20192173

快速回复 返回顶部 返回列表