Often, threading problems are based on the need for tasks to
be serialized that is, to take care of things in order. ToastOMatic.cpp
must not only take care of things in order, it must be able to work on one
piece of toast without worrying that toast is falling on the floor in the
meantime. You can solve many threading problems by using a queue that
synchronizes access to the elements within:
This relatively small amount of code can solve a remarkable
number of problems.
//: C11:TestTQueue.cpp {RunByHand}
//{L} ZThread
#include <string>
#include <iostream>
#include "TQueue.h"
#include "zthread/Thread.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;
class LiftOffRunner : public Runnable {
TQueue<LiftOff*> rockets;
public:
void add(LiftOff* lo) { rockets.put(lo); }
void run() {
try {
while(!Thread::interrupted()) {
LiftOff* rocket = rockets.get();
rocket->run();
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Exiting LiftOffRunner"
<< endl;
}
};
int main() {
try {
LiftOffRunner* lor = new LiftOffRunner;
Thread t(lor);
for(int i = 0; i < 5; i++)
lor->add(new LiftOff(10, i));
cin.get();
lor->add(new LiftOff(10, 99));
cin.get();
t.interrupt();
} catch(Synchronization_Exception& e) {
cerr << e.what() << endl;
}
} ///:~
//: C11:ToastOMaticMarkII.cpp {RunByHand}
// Solving the problems using TQueues.
//{L} ZThread
#include <iostream>
#include <string>
#include <cstdlib>
#include <ctime>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
#include "TQueue.h"
using namespace ZThread;
using namespace std;
class Toast {
enum Status { DRY, BUTTERED, JAMMED };
Status status;
int id;
public:
Toast(int idn) : status(DRY), id(idn) {}
#ifdef __DMC__ // Incorrectly requires default
Toast() { assert(0); } // Should never be called
#endif
void butter() { status = BUTTERED; }
void jam() { status = JAMMED; }
string getStatus() const {
switch(status) {
case DRY: return "dry";
case BUTTERED: return "buttered";
case JAMMED: return "jammed";
default: return "error";
}
}
int getId() { return id; }
friend ostream& operator<<(ostream& os,
const Toast& t) {
return os << "Toast " << t.id
<< ": " << t.getStatus();
}
};
typedef CountedPtr< TQueue<Toast> >
ToastQueue;
class Toaster : public Runnable {
ToastQueue toastQueue;
int count;
public:
Toaster(ToastQueue& tq) : toastQueue(tq),
count(0) {}
void run() {
try {
while(!Thread::interrupted()) {
int delay = rand()/(RAND_MAX/5)*100;
Thread::sleep(delay);
// Make toast
Toast t(count++);
cout << t << endl;
// Insert into queue
toastQueue->put(t);
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Toaster off" <<
endl;
}
};
// Apply butter to toast:
class Butterer : public Runnable {
ToastQueue dryQueue, butteredQueue;
public:
Butterer(ToastQueue& dry, ToastQueue&
buttered)
: dryQueue(dry), butteredQueue(buttered) {}
void run() {
try {
while(!Thread::interrupted()) {
// Blocks until next piece of toast is
available:
Toast t = dryQueue->get();
t.butter();
cout << t << endl;
butteredQueue->put(t);
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Butterer off" <<
endl;
}
};
// Apply jam to buttered toast:
class Jammer : public Runnable {
ToastQueue butteredQueue, finishedQueue;
public:
Jammer(ToastQueue& buttered, ToastQueue&
finished)
: butteredQueue(buttered), finishedQueue(finished) {}
void run() {
try {
while(!Thread::interrupted()) {
// Blocks until next piece of toast is
available:
Toast t = butteredQueue->get();
t.jam();
cout << t << endl;
finishedQueue->put(t);
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Jammer off" << endl;
}
};
// Consume the toast:
class Eater : public Runnable {
ToastQueue finishedQueue;
int counter;
public:
Eater(ToastQueue& finished)
: finishedQueue(finished), counter(0) {}
void run() {
try {
while(!Thread::interrupted()) {
// Blocks until next piece of toast is
available:
Toast t = finishedQueue->get();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if(t.getId() != counter++ ||
t.getStatus() != "jammed") {
cout << ">>>> Error:
" << t << endl;
exit(1);
} else
cout << "Chomp! " << t
<< endl;
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Eater off" << endl;
}
};
int main() {
srand(time(0)); // Seed the random number generator
try {
ToastQueue dryQueue(new TQueue<Toast>),
butteredQueue(new TQueue<Toast>),
finishedQueue(new TQueue<Toast>);
cout << "Press <Return> to
quit" << endl;
ThreadedExecutor executor;
executor.execute(new Toaster(dryQueue));
executor.execute(new
Butterer(dryQueue,butteredQueue));
executor.execute(
new Jammer(butteredQueue, finishedQueue));
executor.execute(new Eater(finishedQueue));
cin.get();
executor.interrupt();
} catch(Synchronization_Exception& e) {
cerr << e.what() << endl;
}
} ///:~
Two things are immediately apparent in this solution: first,
the amount and complexity of code within each Runnable class is
dramatically reduced by the use of the TQueue because the guarding,
communication, and wait( )/signal( ) operations are now taken
care of by the TQueue. The Runnable classes don t have Mutexes
or Condition objects anymore. Second, the coupling between the classes
is eliminated because each class communicates only with its TQueues.
Notice that the definition order of the classes is now independent. Less code
and less coupling are always good things, which suggests that the use of the TQueue
has a positive effect here, as it does on most problems.