Hello! 欢迎来到小浪云!


【Linux】进程间通信——进程池


进程池(process pool)是一种管理进程的技术,通过预先创建一定数量的进程来减少频繁创建和销毁进程的开销。这种方法特别适用于需要并发执行大量任务的场景,尤其是在处理cpu密集型任务时。

【Linux】进程间通信——进程池

上图展示了进程池模型,其中父进程(master进程)通过创建多个子进程(worker或slaver进程)并通过管道连接,向各个子进程派发任务。

进程池的主要作用包括:

  • 提高性能:通过预先创建进程,减少了频繁创建和销毁进程的开销,提升整体执行效率。
  • 减少系统资源消耗:避免了频繁创建和销毁进程导致的资源浪费,进程池通过复用已有进程,节省了资源。
  • 提升任务响应速度:由于进程池中的进程是预先创建的,可以快速分配空闲进程处理任务。
  • 更好的资源管理:进程池可以限制系统中的最大并发进程数,保护系统资源,避免过载。
  • 并行处理:对于CPU密集型任务,进程池通过并行化处理多个任务,显著提升处理效率。

进程池适用于大规模并发任务的处理,如Web爬虫、数据处理和大规模计算等场景。

代码模拟进程池管道信息

首先,我们通过控制创建子进程的数量来实现进程池。可以通过main函数的参数控制子进程的创建个数。

enum{     OK = 0,     UsageError,     PipeError,     ForkError, };  void Usage(string proc){     cout << "Usage: " << proc << " <number_of_processes>" << endl;     cout << "Example: " << proc << " 4" << endl;     exit(UsageError); }  // main函数的第一个参数是数组元素个数,第二个元素是创建子进程个数 int main(int argc, char *argv[]){     if(argc != 2){         Usage(argv[0]);         return UsageError;     }     int num = atoi(argv[1]);     // 接下来创建管道和子进程 }

我们封装一个类来管理进程池:

using work_t = function<void>;  class ProcessPool{ public:     ProcessPool(int n, work_t w) : num(n), work(w) {}     ~ProcessPool() {}     int InitProcesspool();     void DisPatchTasks();     void CleanProcessPool(); private:     vector<channel> channels; // 管道     int num; // 进程的个数     work_t work; // 工作类型(void()) };

Channel类用于管理管道:

class Channel{ public:     Channel(int wfd, pid_t who) : _wfd(wfd), _who(who) {         _name = "Channel-" + to_string(wfd) + '-' + to_string(who);     }     string Name() { return _name; }     void Send(int cmd) { write(_wfd, &cmd, sizeof(cmd)); }     void Close() { close(_wfd); }     pid_t Id() { return _who; }     ~Channel() {} private:     int _wfd; // master的写端     string _name; // 对应worker的名字     pid_t _who; // 记录哪个子进程的管道 };

任务类模拟三种任务:

void Download() { cout << "Downloading..." << endl; } void Log() { cout << "Logging..." << endl; } void Sql() { cout << "Executing SQL..." << endl; }  using task_t = function<void>;  class TaskManager{ public:     TaskManager(){         srand(time(nullptr));         InsertTask(Download);         InsertTask(Log);         InsertTask(Sql);     }     ~TaskManager() {}     void InsertTask(task_t t) { tasks[number++] = t; }     int SelectTask() { return rand() % number; }     void Excute(int number) {         if(tasks.find(number) == tasks.end()) return;         tasks[number]();     } private:     unordered_map<int, task_t> tasks; };  static int number = 0; TaskManager tmp;  void Usage(string proc){     cout << "Usage: " << proc << " <number_of_processes>" << endl;     cout << "Example: " << proc << " 4" << endl;     exit(UsageError); }

接下来实现InitProcesspool()、DisPatchTasks()和CleanProcessPool()函数:

int InitProcesspool(){     for (int i = 0; i < num; i++) {         int pipefd[2];         if (pipe(pipefd) < 0) {             perror("pipe");             return PipeError;         }         pid_t pid = fork();         if (pid < 0) {             perror("fork");             return ForkError;         } else if (pid == 0) {             close(pipefd[1]);             dup2(pipefd[0], 0);             close(pipefd[0]);             work();             exit(OK);         } else {             close(pipefd[0]);             channels.push_back(Channel(pipefd[1], pid));         }     }     return OK; }  void DisPatchTasks(){     int who = 0;     int num = 20;     while (num--) {         int task = tmp.SelectTask();         Channel &curr = channels[who];         who++;         who %= channels.size();         cout << "Dispatch task " << task << " to " << curr.Name() << endl;         curr.Send(task);     } }  void CleanProcessPool() {     for (auto &c : channels) {         c.Close();     }     for (auto &e : channels) {         pid_t rid = waitpid(e.Id(), nullptr, 0);         if (rid > 0) {             cout << "子进程 " << rid << " 已回收" << endl;         }     } }

最后,封装main.cc文件:

#include "ProcessPool.hpp" #include "Task.hpp"  int main(int argc, char *argv[]){     if(argc != 2) {         Usage(argv[0]);         return UsageError;     }     int num = atoi(argv[1]);     ProcessPool *pp = new ProcessPool(num, Worker);     pp->InitProcesspool();     pp->DisPatchTasks();     pp->CleanProcessPool();     delete pp;     return 0; }

相关头文件和Makefile:

// Channel.hpp #ifndef __CHANNEL_HPP__ #define __CHANNEL_HPP__ #include <iostream> #include <string> #include <unistd.h> using namespace std; // ... (Channel类定义) #endif  // ProcessPool.hpp #include <string> #include <unistd.h> #include <cstdlib> #include <vector> #include <sys> #include <functional> #include <sys> #include "Channel.hpp" #include "Task.hpp" // ... (ProcessPool类定义)  // Task.hpp #pragma once #include <iostream> #include <unordered_map> #include <functional> #include <ctime> using namespace std; // ... (TaskManager类定义)  // Makefile BIN=processpool cc=g++ FLAGS=-c -Wall -std=c++11 LDFLAGS=-o SRC=$(shell ls *.cc) OBJ=$(SRC:.cc=.o) $(BIN):$(OBJ)     $(cc) $(LDFLAGS) $@ $^ %.o:%.cc     $(cc) $(FLAGS) $< clean:     rm -f $(OBJ) $(BIN)

【Linux】进程间通信——进程池

总结:

本文详细介绍了进程池的概念及其在实际应用中的作用。通过代码模拟,我们展示了如何初始化进程池、分发任务、执行任务逻辑以及清理进程池。文章还涵盖了相关的封装类和文件结构,如main.cc、Channel.hpp、ProcessPool.hpp、Task.hpp和Makefile,这些内容为理解和实现进程池提供了全面的指导。

进程池是一种有效的资源管理技术,能够提高多任务处理的效率和系统性能。通过合理的设计和实现,进程池可以在复杂的系统中发挥重要作用,减少资源浪费并提升任务执行的稳定性。希望本文的内容能为读者在实际项目中应用进程池提供有价值的参考。

【Linux】进程间通信——进程池

相关阅读