DEV Community

海前 王
海前 王

Posted on

THREAD POOL


CPP
#include <windows.h>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include<functional>
// 线程池实现
class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    // 向线程池中添加任务
    template<class F>
    void enqueue(F&& f);

private:
    // 工作线程执行的函数
    void worker();

    // 任务队列
    std::queue<std::function<void()>> tasks;

    // 线程池的线程
    std::vector<std::thread> workers;

    // 互斥量用于同步任务队列
    std::mutex queueMutex;

    // 条件变量用于线程等待
    std::condition_variable condition;

    // 线程池停止标志
    bool stop;
};

// 构造函数
ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        workers.emplace_back(&ThreadPool::worker, this);
    }
}

// 析构函数
ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) {
        worker.join();
    }
}

// 向任务队列添加任务
template<class F>
void ThreadPool::enqueue(F&& f) {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace(std::forward<F>(f));
    }
    condition.notify_one();
}

// 工作线程执行的函数
void ThreadPool::worker() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] {
                return stop || !tasks.empty();
                });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}

// 文件处理函数
void process_file_chunk(const char* start, size_t size) {
    std::cout.write(start, size);
}

// 将 ANSI 字符串转换为宽字符字符串
std::wstring to_wstring(const std::string& str) {
    int size_needed = MultiByteToWideChar(CP_ACP, 0, str.c_str(), str.length(), nullptr, 0);
    std::wstring wide_str(size_needed, 0);
    MultiByteToWideChar(CP_ACP, 0, str.c_str(), str.length(), &wide_str[0], size_needed);
    return wide_str;
}

// 主程序
int main() {
    const std::string filePath = "D:\\TODO\\village\\villa_test.v5.21.scproj"; // 替换为实际的大文件路径
    std::wstring wideFilePath = to_wstring(filePath);

    // 打开文件
    HANDLE hFile = CreateFileW(wideFilePath.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
    if (hFile == INVALID_HANDLE_VALUE) {
        std::cerr << "CreateFileW failed with error " << GetLastError() << std::endl;
        return 1;
    }

    // 获取文件大小
    LARGE_INTEGER fileSize;
    if (!GetFileSizeEx(hFile, &fileSize)) {
        std::cerr << "GetFileSizeEx failed with error " << GetLastError() << std::endl;
        CloseHandle(hFile);
        return 1;
    }

    // 创建文件映射
    HANDLE hFileMapping = CreateFileMappingW(hFile, nullptr, PAGE_READONLY, 0, fileSize.QuadPart, nullptr);
    if (hFileMapping == nullptr) {
        std::cerr << "CreateFileMappingW failed with error " << GetLastError() << std::endl;
        CloseHandle(hFile);
        return 1;
    }

    // 映射视图到进程地址空间
    const char* fileData = static_cast<const char*>(MapViewOfFile(hFileMapping, FILE_MAP_READ, 0, 0, fileSize.QuadPart));
    if (fileData == nullptr) {
        std::cerr << "MapViewOfFile failed with error " << GetLastError() << std::endl;
        CloseHandle(hFileMapping);
        CloseHandle(hFile);
        return 1;
    }

    // 创建线程池
    ThreadPool pool(4);

    // 文件分块
    const size_t chunkSize = fileSize.QuadPart / 4; // 分块大小,假设有4个线程

    // 将文件数据分块并添加到线程池
    for (size_t i = 0; i < 4; ++i) {
        size_t offset = i * chunkSize;
        size_t size = (i == 3) ? fileSize.QuadPart - offset : chunkSize; // 最后一块处理剩余部分
        pool.enqueue([fileData, offset, size] {
            process_file_chunk(fileData + offset, size);
            });
    }

    // 等待线程池完成所有任务
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 清理资源
    UnmapViewOfFile(fileData);
    CloseHandle(hFileMapping);
    CloseHandle(hFile);

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)