利用ace的ACE_Task等类实现线程池的方法详解_C 语言

本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下。
注释非常详细啊。
头文件

复制代码 代码如下:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
/* In order to implement a thread pool, we have to have an object that
   can create a thread.  The ACE_Task<> is the basis for doing just
   such a thing.  */
#include "ace/Task.h"
//add by ychen 20070714 below
#include "ace/Mutex.h"
//add by ychen 20070714 above
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
/* We need a forward reference for ACE_Event_Handler so that our
   enqueue() method can accept a pointer to one.  */

class ACE_Event_Handler;
/* Although we modified the rest of our program to make use of the
   thread pool implementation, if you look closely you'll see that the
   changes were rather minor.  The "ACE way" is generally to create a
   helper object that abstracts away the details not relevant to your
   application.  That's what I'm trying to do here by creating the
   Thread_Pool object.  */
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;
  /* Provide an enumeration for the default pool size.  By doing this,
    other objects can use the value when they want a default.  */
  enum size_t
  {
    default_pool_size_ = 1
  };
  // Basic constructor
  Thread_Pool (void);
  /* Opening the thread pool causes one or more threads to be
    activated.  When activated, they all execute the svc() method
    declared below.  */
  int open (int pool_size = default_pool_size_);
  /* Some compilers will complain that our open() above attempts to
    override a virtual function in the baseclass.  We have no
    intention of overriding that method but in order to keep the
    compiler quiet we have to add this method as a pass-thru to the
    baseclass method.  */
  virtual int open (void *void_data)
  {
    return inherited::open (void_data);
  }
  /*
   */
  virtual int close (u_long flags = 0);
  /* To use the thread pool, you have to put some unit of work into
    it.  Since we're dealing with event handlers (or at least their
    derivatives), I've chosen to provide an enqueue() method that
    takes a pointer to an ACE_Event_Handler.  The handler's
    handle_input() method will be called, so your object has to know
    when it is being called by the thread pool.  */
  int enqueue (ACE_Event_Handler *handler);
  /* Another handy ACE template is ACE_Atomic_Op<>.  When
    parameterized, this allows is to have a thread-safe counting
    object.  The typical arithmetic operators are all internally
    thread-safe so that you can share it across threads without
    worrying about any contention issues.  */
    typedef ACE_Atomic_Op<ACE_Mutex, int> counter_t;

protected:
  /* Our svc() method will dequeue the enqueued event handler objects
    and invoke the handle_input() method on each.  Since we're likely
    running in more than one thread, idle threads can take work from
    the queue while other threads are busy executing handle_input() on
    some object.  */
  int svc (void);
  /* We use the atomic op to keep a count of the number of threads in
    which our svc() method is running.  This is particularly important
    when we want to close() it down!  */
  counter_t active_threads_;
};
#endif /* THREAD_POOL_H */

实现文件

复制代码 代码如下:

// thread_pool.cpp,v 1.9 1999/09/22 03:13:42 jcej Exp
#include "thread_pool.h"
/* We need this header so that we can invoke handle_input() on the
   objects we dequeue.  */
#include "ace/Event_Handler.h"
/* All we do here is initialize our active thread counter.  */
Thread_Pool::Thread_Pool (void)
  : active_threads_ (0)
{
}
/* Our open() method is a thin disguise around the ACE_Task<>
   activate() method.  By hiding activate() in this way, the users of
   Thread_Pool don't have to worry about the thread configuration
   flags.  */
int
Thread_Pool::open (int pool_size)
{
  return this->activate (THR_NEW_LWP|THR_DETACHED, pool_size);
}
/* Closing the thread pool can be a tricky exercise.  I've decided to
   take an easy approach and simply enqueue a secret message for each
   thread we have active.  */
int
Thread_Pool::close (u_long flags)
{
  ACE_UNUSED_ARG(flags);
  /* Find out how many threads are currently active */
  int counter = active_threads_.value ();
  /* For each one of the active threads, enqueue a "null" event
    handler.  Below, we'll teach our svc() method that "null" means
    "shutdown".  */
  while (counter--)
    this->enqueue (0);
  /* As each svc() method exits, it will decrement the active thread
    counter.  We just wait here for it to reach zero.  Since we don't
    know how long it will take, we sleep for a quarter of a second
    between tries.  */
  while (active_threads_.value ())
    ACE_OS::sleep (ACE_Time_Value (0, 250000));
  return(0);
}
/* When an object wants to do work in the pool, it should call the
   enqueue() method.  We introduce the ACE_Message_Block here but,
   unfortunately, we seriously misuse it.  */
int
Thread_Pool::enqueue (ACE_Event_Handler *handler)
{
  /* An ACE_Message_Block is a chunk of data.  You put them into an
    ACE_Message_Queue.  ACE_Task<> has an ACE_Message_Queue built in.
    In fact, the parameter to ACE_Task<> is passed directly to
    ACE_Message_Queue.  If you look back at our header file you'll see
    that we used ACE_MT_SYNCH as the parameter indicating that we want
    MultiThread Synch safety.  This allows us to safely put
    ACE_Message_Block objects into the message queue in one thread and
    take them out in another.  */
  /* An ACE_Message_Block wants to have char* data.  We don't have
    that.  We could cast our ACE_Event_Handler* directly to a char*
    but I wanted to be more explicit.  Since casting pointers around
    is a dangerous thing, I've gone out of my way here to be very
    clear about what we're doing.
    First: Cast the handler pointer to a void pointer.  You can't do
    any useful work on a void pointer, so this is a clear message that
    we're making the pointer unusable.
    Next: Cast the void pointer to a char pointer that the ACE_Message_Block will accept.  */
  void *v_data = (void *) handler;
  char *c_data = (char *) v_data;
  ACE_Message_Block *mb;
  /* Construct a new ACE_Message_Block.  For efficiency, you might
    want to preallocate a stack of these and reuse them.  For
    simplicity, I'll just create what I need as I need it.  */
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (c_data),
                  -1);
  /* Our putq() method is a wrapper around one of the enqueue methods
    of the ACE_Message_Queue that we own.  Like all good methods, it
    returns -1 if it fails for some reason.  */
  if (this->putq (mb) == -1)
    {
      /* Another trait of the ACE_Message_Block objects is that they
        are reference counted.  Since they're designed to be passed
        around between various objects in several threads we can't
        just delete them whenever we feel like it.  The release()
        method is similar to the destroy() method we've used
        elsewhere.  It watches the reference count and will delete the
        object when possible.  */
      mb->release ();
      return -1;
    }
  return 0;
}
/* The "guard" concept is very powerful and used throughout
   multi-threaded applications.  A guard normally does some operation
   on an object at construction and the "opposite" operation at
   destruction.  For instance, when you guard a mutex (lock) object,
   the guard will acquire the lock on construction and release it on
   destruction.  In this way, your method can simply let the guard go
   out of scope and know that the lock is released.
   Guards aren't only useful for locks however.  In this application
   I've created two guard objects for quite a different purpose.  */
/* The Counter_Guard is constructed with a reference to the thread
   pool's active thread counter.  The guard increments the counter
   when it is created and decrements it at destruction.  By creating
   one of these in svc(), I know that the counter will be decremented
   no matter how or where svc() returns.  */
class Counter_Guard
{
public:
  Counter_Guard (Thread_Pool::counter_t &counter)
    : counter_ (counter)
  {
    ++counter_;
  }
  ~Counter_Guard (void)
  {
    --counter_;
  }
protected:
  Thread_Pool::counter_t &counter_;
};
/* My Message_Block_Guard is also a little non-traditional.  It
   doesn't do anything in the constructor but it's destructor ensures
   that the message block's release() method is called.  This is a
   cheap way to prevent a memory leak if I need an additional exit
   point in svc().  */
class Message_Block_Guard
{
public:
  Message_Block_Guard (ACE_Message_Block *&mb)
    : mb_ (mb)
  {
  }
  ~Message_Block_Guard (void)
  {
    mb_->release ();
  }
protected:
  ACE_Message_Block *&mb_;
};
/* Now we come to the svc() method.  As I said, this is being executed
   in each thread of the Thread_Pool.  Here, we pull messages off of
   our built-in ACE_Message_Queue and cause them to do work.  */
int
Thread_Pool::svc (void)
{
  /* The getq() method takes a reference to a pointer.  So... we need
    a pointer to give it a reference to.  */
  ACE_Message_Block *mb;
  /* Create the guard for our active thread counter object.  No matter
    where we choose to return() from svc(), we now know that the
    counter will be decremented.  */
  Counter_Guard counter_guard (active_threads_);
  /* Get messages from the queue until we have a failure.  There's no
    real good reason for failure so if it happens, we leave
    immediately.  */
  while (this->getq (mb) != -1)
    {
      /* A successful getq() will cause "mb" to point to a valid
        refernce-counted ACE_Message_Block.  We use our guard object
        here so that we're sure to call the release() method of that
        message block and reduce it's reference count.  Once the count
        reaches zero, it will be deleted.  */
      Message_Block_Guard message_block_guard (mb);
      /* As noted before, the ACE_Message_Block stores it's data as a
        char*.  We pull that out here and later turn it into an
        ACE_Event_Handler* */
      char *c_data = mb->base ();
      /* We've chosen to use a "null" value as an indication to leave.
        If the data we got from the queue is not null then we have
        some work to do.  */
      if (c_data)
        {
          /* Once again, we go to great lengths to emphasize the fact
            that we're casting pointers around in rather impolite
            ways.  We could have cast the char* directly to an
            ACE_Event_Handler* but then folks might think that's an OK
            thing to do.
            (Note: The correct way to use an ACE_Message_Block is to
            write data into it.  What I should have done was create a
            message block big enough to hold an event handler pointer
            and then written the pointer value into the block.  When
            we got here, I would have to read that data back into a
            pointer.  While politically correct, it is also a lot of
            work.  If you're careful you can get away with casting
            pointers around.)  */
          void *v_data = (void *) c_data;
          ACE_Event_Handler *handler = (ACE_Event_Handler *) v_data;
          /* Now that we finally have an event handler pointer, invoke
            it's handle_input() method.  Since we don't know it's
            handle, we just give it a default.  That's OK because we
            know that we're not using the handle in the method anyway.  */
          if (handler->handle_input (ACE_INVALID_HANDLE) == -1)
            {
              /* Tell the handler that it's time to go home.  The
                "normal" method for shutting down a handler whose
                handler failed is to invoke handle_close().  This will
                take care of cleaning it up for us.  Notice how we use
                the handler's get_handle() method to populate it's
                "handle" parameter.  Convenient isn't it?  */
              handler->handle_close (handler->get_handle (), 0);
              /* Also notice that we don't exit the svc() method here!
                The first time I did this, I was exiting.  After a few
                clients disconnect you have an empty thread pool.
                Hard to do any more work after that...  */
            }
        }
      else
        /* If we get here, we were given a message block with "null"
           data.  That is our signal to leave, so we return(0) to
           leave gracefully.  */
          return 0;  // Ok, shutdown request
      // message_block_guard goes out of scope here and releases the
      // message_block instance.
    }
  return 0;
}

其中,对其中类中的两个变量使用了管理的思想。Counter_Guard类和Message_Block_Guard 类分别对其进行了管理。
因为ACE_Task类是使用了ACE_message_block 进行对消息的封装。因此使用类,防止了内存的泄漏。
ACE_Event_Handler  是事件句柄,类似于操作符。当我们处理的时候,对其进行处理。

时间: 2024-10-06 00:27:00

利用ace的ACE_Task等类实现线程池的方法详解_C 语言的相关文章

深入java线程池的使用详解_C 语言

在Java 5.0之前启动一个任务是通过调用Thread类的start()方法来实现的,任务的提于交和执行是同时进行的,如果你想对任务的执行进行调度或是控制 同时执行的线程数量就需要额外编写代码来完成.5.0里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行,并且可以建立一个类似数据库连接 池的线程池来执行任务.这个架构主要有三个接口和其相应的具体类组成.这三个接口是Executor, ExecutorService.ScheduledExecutorService,让我们先用一个图

C++聚合关系类的构造函数的调用顺序详解_C 语言

如图,表示一个聚合关系 下面就用简单的代码来实现 #pragma once class Engine { public: Engine(); ~Engine(); }; Engine.h #include <iostream> #include "Engine.h" using namespace std; Engine::Engine() { cout << "调用构造函数:Engine()" << endl; } Engine

线程池的原理与实现详解_C 语言

一. 线程池的简介通常我们使用多线程的方式是,需要时创建一个新的线程,在这个线程里执行特定的任务,然后在任务完成后退出.这在一般的应用里已经能够满足我们应用的需求,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁. 但是在一些web.email.database等应用里,比如彩铃,我们的应用在任何时候都要准备应对数目巨大的连接请求,同时,这些请求所要完成的任务却又可能非常的简单,即只占用很少的处理时间.这时,我们的应用有可能处于不停的创建线程并销毁线程的状态.虽说比起

Linux线程管理必备:解析互斥量与条件变量的详解_C 语言

   做过稍微大一点项目的人都知道,力求程序的稳定性和调度的方便,使用了大量的线程,几乎每个模块都有一个专门的线程处理函数.而互斥量与条件变量在线程管理中必不可少,任务间的调度几乎都是由互斥量与条件变量控制.互斥量的实现与进程中的信号量(无名信号量)是类似的,当然,信号量也可以用于线程,区别在于初始化的时候,其本质都是P/V操作.编译时,记得加上-lpthread或-lrt哦.    有关进程间通信(消息队列)见:进程间通信之深入消息队列的详解 一.互斥量 1. 初始化与销毁:    对于静态分

从C++单例模式到线程安全详解_C 语言

先看一个最简单的教科书式单例模式: class CSingleton { public: static CSingleton* getInstance() { if (NULL == ps) {//tag1 ps = new CSingleton; } return ps; } private: CSingleton(){} CSingleton & operator=(const CSingleton &s); static CSingleton* ps; }; CSingleton*

C++类模板与模板类深入详解_C 语言

1.在c++的Template中很多地方都用到了typename与class这两个关键字,而且有时候二者可以替换,那么是不是这两个关键字完全一样呢? 事实上class用于定义类,在模板引入c++后,最初定义模板的方法为:template<class T>,这里class关键字表明T是一个类型,后来为了避免class在这两个地方的使用可能给人带来混淆,所以引入了typename这个关键字,它的作用同class一样表明后面的符号为一个类型,这样在定义模板的时候就可以使用下面的方式了:      t

C++读取INI配置文件类实例详解_C 语言

本文以实例讲解了C++读取配置文件的方法. 一般情况下,我们都喜欢使用ini扩展名的文件作为配置文件,可以读取及修改变量数值,也可以设置新的组,新的变量,本文的实例代码一个是读取INI的定义文件,另一个是CIniFile类实现文件,两者结合,完美实现VC++对INI文件的读写. 用户接口说明:在成员函数SetVarStr和SetVarInt函数中,当iType等于零,则如果用户制定的参数在ini文件中不存在,则就写入新的变量.当iType不等于零,则如果用户制定的参数在ini文件中不存在,就不写

C#使用Object类实现栈的方法详解_C#教程

本文实例讲述了C#使用Object类实现栈的方法.分享给大家供大家参考,具体如下: Stack类的代码: using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace 使用Object类实现后进先出队列 { class Stack { private Object[] _items; public Object[] Items { get { return this.

C++中声明类的class与声明结构体的struct关键字详解_C 语言

classclass 关键字声明类类型或定义类类型的对象. 语法 [template-spec] class [ms-decl-spec] [tag [: base-list ]] { member-list } [declarators]; [ class ] tag declarators; 参数 template-spec 可选模板说明. ms-decl-spec 可选存储类说明有关更多信息 tag 给定于类的类型名称.在类范围内的标记成为了保留字.标志是可选项.如果省略,定义匿名类. b