home | O'Reilly's CD bookshelfs | FreeBSD | Linux | Cisco | Cisco Exam  


Previous Section Next Section

14.5 Threaded Program Architecture

A threaded program should always arrange for a single thread to deal with any given object or subsystem that is external to the program (such as a file, a database, a GUI, or a network connection). Having multiple threads that deal with the same external object can often cause unpredictable problems.

Whenever your threaded program must deal with some external object, devote a thread to such dealings, using a Queue object from which the external-interfacing thread gets work requests that other threads post. The external-interfacing thread can return results by putting them on one or more other Queue objects. The following example shows how to package this architecture into a general, reusable class, assuming that each unit of work on the external subsystem can be represented by a callable object:

import Threading, Queue
class ExternalInterfacing(Threading.Thread):
    def _ _init_ _(self, externalCallable, **kwds):
        Threading.Thread._ _init_ _(self, **kwds)
        self.setDaemon(1)
        self.externalCallable = externalCallable
        self.workRequestQueue = Queue.Queue(  )
        self.resultQueue = Queue.Queue(  )
        self.start(  )
    def request(self, *args, **kwds):
        "called by other threads as externalCallable would be"
        self.workRequestQueue.put((args,kwds))
        return self.resultQueue.get(  )
    def run(self):
        while 1:
            args, kwds = self.workRequestQueue.get(  )
            self.resultQueue.put(self.externalCallable(*args, **kwds))

Once some ExternalInterfacing object ei is instantiated, all other threads may now call ei.request just like they would call someExternalCallable without such a mechanism (with or without arguments as appropriate). The advantage of the ExternalInterfacing mechanism is that all calls upon someExternalCallable are now serialized. This means they are performed by just one thread (the thread object bound to ei) in some defined sequential order, without overlap, race conditions (hard-to-debug errors that depend on which thread happens to get there first), or other anomalies that might otherwise result.

If several callables need to be serialized together, you can pass the callable as part of the work request, rather than passing it at the initialization of class ExternalInterfacing, for greater generality. The following example shows this more general approach:

import Threading, Queue
class Serializer(Threading.Thread):
    def _ _init_ _(self, **kwds):
        Threading.Thread._ _init_ _(self, **kwds)
        self.setDaemon(1)
        self.workRequestQueue = Queue.Queue(  )
        self.resultQueue = Queue.Queue(  )
        self.start(  )
    def apply(self, callable, *args, **kwds):
        "called by other threads as callable would be"
        self.workRequestQueue.put((callable, args,kwds))
        return self.resultQueue.get(  )
    def run(self):
        while 1:
            callable, args, kwds = self.workRequestQueue.get(  )
            self.resultQueue.put(callable(*args, **kwds))

Once a Serializer object ser has been instantiated, other threads may call ser.apply(someExternalCallable) just like they would call someExternalCallable without such a mechanism (with or without further arguments as appropriate). The Serializer mechanism has the same advantages as ExternalInterfacing, except that all calls to the same or different callables wrapped by a single ser instance are now serialized.

The user interface of the whole program is an external subsystem and thus should be dealt with by a single thread, specifically the main thread of the program (this is mandatory for some user interface toolkits and advisable even when not mandatory). A Serializer thread is therefore inappropriate. Rather, the program's main thread should deal only with user interface issues, and farm out actual work to worker threads that accept work requests on a Queue object and return results on another. A set of worker threads is also known as a thread pool. As shown in the following example, all worker threads should share a single queue of requests and a single queue of results, since the main thread will be the only one posting work requests and harvesting results:

import Threading
class Worker(Threading.Thread):
    requestID = 0
    def _ _init_ _(self, requestsQueue, resultsQueue, **kwds):
        Threading.Thread._ _init_ _(self, **kwds)
        self.setDaemon(1)
        self.workRequestQueue = requestsQueue
        self.resultQueue = resultsQueue
        self.start(  )
    def performWork(self, callable, *args, **kwds):
        "called by the main thread as callable would be, but w/o return"
        Worker.requestID += 1
        self.workRequestQueue.put((Worker.requestID, callable, args,kwds))
        return Worker.requestID
    def run(self):
        while 1:
            requestID, callable, args, kwds = self.workRequestQueue.get(  )
            self.resultQueue.put((requestID, callable(*args, **kwds)))

The main thread creates the two queues, then instantiates worker threads as follows:

import Queue
requestsQueue = Queue.Queue(  )
resultsQueue = Queue.Queue(  )
for i in range(numberOfWorkers):
    worker = Worker(requestsQueue, resultsQueue)

Now, whenever the main thread needs to farm out work (execute some callable object that may take substantial time to produce results), the main thread calls worker.performWork(callable) much like it would call callable without such a mechanism (with or without further arguments as appropriate). However, performWork does not return the result of the call. Instead of the results, the main thread gets an id that identifies the work request. If the main thread needs the results, it can keep track of that id, since the request's results will be tagged with that id when they appear. The advantage of the mechanism is that the main thread does not block waiting for the callable's lengthy execution to complete, but rather becomes ready again at once and can immediately return to its main business of dealing with the user interface.

The main thread must arrange to check the resultsQueue, since the result of each work request eventually appears there, tagged with the request's id, when the worker thread that took that request from the queue finishes computing the result. How the main thread arranges to check for both user interface events and the results coming back from worker threads onto the results queue depends on what user interface toolkit is used, or, if the user interface is text-based, on the platform on which the program runs.

A widely applicable general strategy is for the main thread to poll (i.e., check the state of the results queue periodically). On most Unix-like platforms, function alarm of module signal allows polling. The Tkinter GUI toolkit supplies method after, usable for polling. Some toolkits and platforms afford more effective strategies, letting a worker thread alert the main thread when it places some result on the results queue, but there is no generally available, cross-platform, and cross-toolkit way to arrange for this. Therefore, the following artificial example ignores user interface events, and just simulates work by evaluating random expressions, with random delays, on several worker threads, thus completing the previous example:

import random, time
def makeWork(  ):
    return "%d %s %d"%(random.randrange(2,10),
        random.choice(('+', '-', '*', '/', '%', '**')),
        random.randrange(2,10))
def slowEvaluate(expressionString):
    time.sleep(random.randrange(1,5))
    return eval(expressionString)
workRequests = {  }
def showResults(  ):
    while 1:
        try: id, results = resultsQueue.get_nowait(  )
        except Queue.Empty: return
        print 'Result %d: %s -> %s' % (id, workRequests[id], results)
        del workRequests[id]
for i in range(10):
    expressionString = makeWork(  )
    id = worker.performWork(slowEvaluate, expressionString)
    workRequests[id] = expressionString
    print 'Submitted request %d: %s' % (id, expressionString)
    time.sleep(1)
    showResults(  )
while workRequests:
    time.sleep(1)
    showResults(  )
    Previous Section Next Section