C#——I/O完成端口的类定义和测试实例

发表于:2007-06-21来源:作者:点击数: 标签:
我们采用的是I/O Complete Port(以下简称IOCP)处理机制。 简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有

   
  我们采用的是I/O Complete Port(以下简称IOCP)处理机制。
  

  简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:
  
  1先在主线程中调用CreateIoCompletionPort创建IOCP。
  
  CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。
  
  此时我们只需传递INVALID_HANDLE_VALUE,NULL和0即可。
  
  第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。
  
  2我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。
  
  在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。
  
  如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT为1秒。
  
  当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息:    传输的字节数、完成键和OVERLAPPED结构的地址。
  
  在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。
  
  GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。
  
  需要注意的是:
  
  第一,  线程池的数目是有限制的,和CPU数目有关系。
  
  第二,  IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;
  
  第三,  最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。
  
  测试代码:
  
  using System;
  using System.Threading; // Included for the Thread.Sleep call
  using Continuum.Threading;
  using System.Runtime.InteropServices;
  
  namespace IOCPDemo
  {
    //=============================================================================
    /**//// Sample class for the threading class
    public class UtilThreadingSample
    {
      //*****************************************************************************
      /**//// Test Method
      static void Main()
      {
        // Create the MSSQL IOCP Thread Pool
        IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
  
        //for(int i =1;i<10000;i++)
        {
          pThreadPool.PostEvent(1234);
        }
  
        Thread.Sleep(100);
  
        pThreadPool.Dispose();
      }
  
      //********************************************************************
      /**//// Function to be called by the IOCP thread pool. Called when
      ///      a command is posted for processing by the SocketManager

      /// The value provided by the thread posting the event
      static public void IOCPThreadFunction(int iValue)
      {
        try
        {
          Console.WriteLine("Value: {0}", iValue.ToString());
          Thread.Sleep(3000);
        }
  
        catch (Exception pException)
        {
          Console.WriteLine(pException.Message);
        }
      }
    }
  
  }
  
  类代码:
  using System;
  using System.Threading;
  using System.Runtime.InteropServices;
  
  namespace IOCPThreading
  {
    [StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]
  
    public sealed class IOCPThreadPool
    {
      [DllImport("Kernel32", CharSet=CharSet.Auto)]
      private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);
  
      [DllImport("Kernel32", CharSet=CharSet.Auto)]
      private unsafe static extern Boolean CloseHandle(UInt32 hObject);
  
      [DllImport("Kernel32", CharSet=CharSet.Auto)]
      private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);
  
      [DllImport("Kernel32", CharSet=CharSet.Auto)]
      private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);
  
      private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;
      private const UInt32 INIFINITE = 0xffffffff;
      private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;
      public delegate void USER_FUNCTION(int iValue);
      private UInt32 m_hHandle;
      private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }
  
      private Int32 m_uiMaxConcurrency;
  
      private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }
  
  
      private Int32 m_iMinThreadsInPool;
  
      private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }
  
      private Int32 m_iMaxThreadsInPool;
  
      private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }
  
  
      private Object m_pCriticalSection;
  
      private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }
  
  
      private USER_FUNCTION m_pfnUserFunction;
  
      private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }
  
  
      private Boolean m_bDisposeFlag;
  
      /**//// SimType: Flag to indicate if the class is disposing
  
      private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }
  
      private Int32 m_iCurThreadsInPool;
  
      /**//// SimType: The current number of threads in the thread pool
  
      public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }
  
      /**//// SimType: Increment current number of threads in the thread pool
  
      private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }
  
      /**//// SimType: Decrement current number of threads in the thread pool
  
      private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }
  
  
      private Int32 m_iActThreadsInPool;
  
      /**//// SimType: The current number of active threads in the thread pool
  
      public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }
  
      /**//// SimType: Increment current number of active threads in the thread pool

原文转自:http://www.ltesting.net