技巧
下图展示了VS.NET在调试模式时运行这个多线程监听应用的情形。要注意的是下拉的窗口显示了每个线程的名字。选择线程后,代码窗口就会移动到该线程正在执行的地方。要注意的是一个线程的名字只可以设置一次。因此,当工作项目使用同一个线程时,如果设置了Name属性,代码就会抛出一个例外。
QueueListener是真正由多个线程上取回MSMQ队列的类,它还包含有一个构造器,该构造器接收机器名字,并且以队列的形式将名字送至监视器。public Listen方法由队列中接收信息,而public Monitor方法初始化处理并且创建线程池。private ProcessMsg方法则是用来处理接收信息的。最后是public Finish方法,它可以接收一个超时参数,可让QueueListener类使用的线程在一个指定的时间内完成工作。
首先,要注意到Listen方法接收一个状态对象作为参数。该对象将包含有一个EventState的实例,该实例将被Listen用来检查该方法是否正在处理信息还是已经完成处理。通过这样做可确保Finish方法阻塞直到所有的线程完成它们当前的处理。在设置ThreadPriority和Name,以及接收EventState后,你将会注意到该方法仅包含有一个放在Try块中的While循环。该循环反复调用MessageQueue类的Receive方法,方法将返回在指定的超时时间内的第一个得到的信息。如果没有信息,在返回前,就会使用一个TimeSpan对象来通知Receive方法阻塞一秒。如果没有信息接收,将会抛出一个MessageQueueException对象。要注意的是如果有信息到达,该方法将会继续运行并调用Reset方法,Reset方法属于EventState对象内的ResetEvent字段。无论是哪种情况,Finally块都会调用ResetEvent字段的Set方法,表示线程已经完成这个循环处理。
前面已经提及,EventState的ResetEvent字段包含有一个ManualResetEvent的实例,该实例是一个事件对象,它的signaled和non-signaled状态都是可以通过Reset和Set方法手工修改的。在调用Reset方法时,状态就会变为non-signaled,它表明该线程正忙。当状态通过Set事件设置为signaled时,则表明该线程已经完成处理,因此可以安全地破坏。
其中有意思的部分是由Monitor方法完成的。在这个方法中,会创建一个类级别的ManualResetEvent数组,该数组的大小和池将要服务的工作项目的数目一样。
注意 要记住的是,在这篇文章中,工作项目和线程并不是一件事情。工作项目是由线程完成的,但是在应用中,工作项目的数目可以比线程更多。当前runtime支持的线程池大小是30,因此如果提交超过30个工作项目到池中将自动令一些工作项目必须等待其它的工作项目完成。在这个例子中,如果有超过30个工作项目的话,那将永远不会运行,因为每个工作项目调用Listen,它将一直控制线程,直到Finish方法被调用。因此,为了确保runtime还有其它的线程作其它用途,WorkItems(工作项目)不要超过15个。
工作项目的数目可以通过QueueListener类的WorkItems属性设置,它的默认值是7。接着就会通过一个For循环来创建每个ManualResetEvent对象,并且将它们和一个新的EventState相联系。然后结果对象objState就会作为第二个参数传送到ThreadPool类的共享方法QueueUserWorkItem中。就象它的名字隐含的意思一样,该方法令工作项目以队列的形式送给runtime管理的线程池,以等待下一个工作线程完成它。第一个参数是用来指定在工作项目开始执行的时候需要回调的方法,在这里是Listen。通过传送EventState作为第二个参数,Listen方法可以接收该对象,并且如我们前面讨论的一样,使用里面的状态信息。在这里,状态包含有用来调试的线程名字和一个用来同步线程的ManualResetEvent对象。在循环完成后,指定数目的工作项目将会以队列的形式被线程池执行。此时线程将会不断地检查指定的队列以得到新信息。
在客户端最终调用Finish方法来完成执行时,首先会设置其private mFinished变量为True。Listen方法在每次循环时都会检查该变量,如果设置为True时,就会退出循环,释放线程并且返回到池中。接着Finish方法将会使用WaitHandle类的共享WaitAll方法阻塞,直到mEvs数组中的所有ManualReset事件对象都被设置为signaled状态(True)。如果超时值被传送给该方法时,就会使用可选的第二个参数,在反阻塞现有的线程前,会等待指定的时间。使用这种方法,就可以确保Finish方法将一直阻塞,直到每个工作线程已经完成Listen方法中的当前循环。值得一提的是,线程确实被返回到池中而没有被破坏。这样在下一次调用Monitor时,将会重新使用现有的线程,不会重新创建它们而带来系统开销。
对于使用QueueListener的客户来说,其实现如下面的代码所示:
Dim objQ As New QueueListener("ssosa", "tester")
objQ.WorkItems = 10 objQ.Monitor()
' Do other work here
objQ.Finish() |
在初始化一个新的对象,并且传送它监听的机器名和队列,工作项目的数字就被设置好,并且调用Monitor方法。其后,客户端可以调用Finish方法来清除工作线程(可带超时参数)。
这个例子向你解释了如何使用ThreadPool类,不过它当然不是创建线程池以执行监视消息队列的唯一方法。例如,可以很容易地修改QueueListener来创建和跟踪类中的Thread对象数组,以实现线程池。接着Finish方法在设置mFinished标志后,就可以执行一个循环来监视IsAlive属性,以决定线程池何时耗尽,这时就无需使用ManualResetEvent对象了。此外,上面提到TLS技巧可以用来传送状态信息给线程。这个体系可让你更好地控制线程,实际上,当runtinme管理的线程已经很繁重或者需要更多的工作项目时,这个方法将是更好的。
查看本文来源