It is All about Traffic, and Programming
Search
Archives
0 visitors online now
0 guests, 0 members

Multithreading

Note on OmniThreadLibrary (OTL): Communication Channel

To facilitate communication between two thread task workers,  OmniThreadLibrary provides a very handy mechanism, i.e., communication channel.   Two different task works, each can register the one end point of the channel, and communicate through that channel like this:

Taken from demo 8 RegisterComm,

In the main thread, create the communication channel, the task controls, and associate the two end points of the channel with each of the task worker:


procedure TfrmTestRegisterComm.FormCreate(Sender: TObject);
begin
  FCommChannel := CreateTwoWayChannel(1024);
  FClient1              := CreateTask(TCommTester.Create(FCommChannel.Endpoint1))
                                  .MonitorWith(OmniTED)
                                  .Run;
  FClient2              := CreateTask(TCommTester.Create(FCommChannel.Endpoint2))
                                  .MonitorWith(OmniTED)
                                  .Run;
end;

In the task worker’s constructor, keep a local reference to the end point:

constructor TCommTester.Create(commEndpoint: IOmniCommunicationEndpoint);
begin
  inherited Create;
  ctComm := commEndpoint;
end;

In the task worker’s initializer, register the end point with its task:

function TCommTester.Initialize: boolean;
begin
  Task.RegisterComm(ctComm);
  Result := true;
end;

From the above, you can see, the steps are:

  1. Create the two way channel using “CreateTwoWayChannel”,  the number 1024 indicates the length of the message queue;
  2. Associate the two end points of the channel with two task workers;
  3. Inside the task worker’s Initialize( ),  register the communication end point with the worker’s IOmniTask.

After all these are done, and wen a task worker wants to talk to the other task worker, simply do something like below, then the other task worker’s MSG_FORWARDING message handler will be invoked.  Isn’t that nice?

Therefore, if a task worker wants to talk to its task control,  it can keep using Task.Comm.Send, or if it wants to talk to another task worker, use
ctComm.Send(MSG_FORWARDING,  msg.MsgData);

In case you want a task worker to send messages to ITSELF,   you can register the same communication channel “twice” with the same task worker like the following:


function TCommTester.Initialize: boolean;
begin
  Task.RegisterComm(ctComm);
  Task.RegisterComm(ctComm.OtherEndpoint);
  Result := true;
end;

Cool!

Note on OmniThreadLibrary (OTL): ChainTo/MsgWait

  • ChainTo is a very handy decorator of IOmniTaskControl interface – the use case is

[we have a list of tasks, and we want to assign these tasks to a list working threads, one task a thread, and have these threads executing one after another, sequentially]

For example (Sample 16),

procedure TfrmTestChainTo.btnStartTasksClick(Sender: TObject);

var

task1: IOmniTaskControl;

task2: IOmniTaskControl;

task3: IOmniTaskControl;

taskA: IOmniTaskControl;

begin

task3 := CreateTask(BgTask, ‘3’).MonitorWith(OmniTED);

task2 := CreateTask(BgTask, ‘2’).MonitorWith(OmniTED).ChainTo(task3);

task1 := CreateTask(BgTask, ‘1’).MonitorWith(OmniTED).ChainTo(task2);

task1.Run;

taskA := CreateTask(BgTask, ‘A’).MonitorWith(OmniTED).ChainTo(

CreateTask(BgTask, ‘B’).MonitorWith(OmniTED).ChainTo(

CreateTask(BgTask, ‘C’).MonitorWith(OmniTED)));

taskA.Run;

end;

When task1.Run called, task1 runs first, after it finishes, task2 starts then task 3.

  • MsgWait

MsgWait is needed if the worker thread calls something that involves windows message loop. For example, a timer .

FHelloWorld := CreateTask(THelloWorld.Create(), ‘Hello, World!’)

.MonitorWith(oeMonitor)

.SetTimer(500, MSG_INTERNAL_TIMER)

//.MsgWait

.Run;

MsgWait is not needed for SetTimer, but needed for the following:

function THelloWorld.Initialize: boolean;

begin

FTimer := TDSiTimer.Create(true, 1000, DoTimer);

Result := true;

end;

The following is exercpted from The Delphi Geek Blog:

[http://www.thedelphigeek.com/2008/09/processing-windows-messages-in.html

MsgWait

In case you want to learn more about OTL internals, read ahead …

Let’s take a short look at the .MsgWait implementation. The function itself just sets two internal fields and returns the object itself so that we can chain another method to it.

function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl;

begin

 Options := Options + [tcoMessageWait];

 otcExecutor.WakeMask := wakeMask;

 Result := Self;

end; { TOmniTaskControl.MsgWait }

The hard work is done in TOmniTaskExecutor.Asy_DispatchMessages. If the tcoMessageWait option is set, the MsgWaitForMultipleObjectsEx will also wait for Windows messages (in addition to everything else it does) because it will receive non-null waitWakeMask. When a message is detected, the code will call ProcessThreadMessages method which simply peeks and dispatches all Windows messages (and Delphi’s internal message dispatch mechanism takes care of all the rest).

if tcoMessageWait in Options then

 waitWakeMask := WakeMask

else

 waitWakeMask := 0;

//...

awaited := MsgWaitForMultipleObjectsEx(numWaitHandles, waitHandles,

 cardinal(timeout_ms), waitWakeMask, flags);

//...

else if awaited = (WAIT_OBJECT_0 + numWaitHandles) then //message

 ProcessThreadMessages


procedure TOmniTaskExecutor.ProcessThreadMessages;

var

 msg: TMsg;

begin

 while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin

   TranslateMessage(Msg);

   DispatchMessage(Msg);

 end;

end; { TOmniTaskControl.ProcessThreadMessages }

The Asy_DispatchMessages is probably the most complicated part of the OTL (once you understand the lock-free structures inside the OtlContainers 😉 Once you understand how it works you’ll be fully prepared to write custom thread loops in highly specialized threaded code. But don’t worry, you can use OTL even if you don’t understand the magic hidden inside.

Note: The above is from the Delphi Geek Blog by Primoz Gabrijelcic, the Delphi Guru and developer of the great OmniThreadLibrary.

]

Note on OmniThreadLibrary (OTL): Task Group

As we have noted in a previous post that a task group can be used to “group” control a group of thread tasks.

The interface is declared as follows:

IOmniTaskGroup = interface [‘{B36C08B4-0F71-422C-8613-63C4D04676B7}’]

function GetTasks: IOmniTaskControlList;

function Add(const taskControl: IOmniTaskControl): IOmniTaskGroup;

function GetEnumerator: IOmniTaskControlListEnumerator;

function RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup;

function Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup;

function RunAll: IOmniTaskGroup;

procedure SendToAll(const msg: TOmniMessage);

function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;

function UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup;

function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;

property Tasks: IOmniTaskControlList read GetTasks;

end; { IOmniTaskGroup }

The most interesting interesting interface functions are listed below –

function RunAll: IOmniTaskGroup;

procedure SendToAll(const msg: TOmniMessage);

function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;

For example (sample 15)

procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);

var

i: integer;

begin

FTaskGroup := CreateTaskGroup;

for i := 1 to 10 do

CreateTask(TMyWorker.Create()).MonitorWith(OmniTED).Join(FTaskGroup);

Log(‘Starting all tasks’);

FTaskGroup.RunAll;

end;

and

procedure TfrmTestTaskGroup.btnStopTasksClick(Sender: TObject);

begin

if assigned(FTaskGroup) then begin

FTaskGroup.TerminateAll;

FTaskGroup := nil;

Log(‘All stopped’);

end

else

Log(‘Nothing to stop’);

end;

Note on OmniThreadLibrary (OTL): TerminateWhen

Once a worker thread has been launched, there are a few ways to terminate it:

  • IOmniTaskControl.Terminate

This will set the termination signal, and if it is parameter-less, then the worker will wait indefinitely till the thread finishes normally. If a wait-out time is specified, the work will wait until the wait-out time, then kill the thread forcefully, if it has not finished.

If a thread is killed forcefully, there might be resource leak (Cleanup and destructor may not get invoked).

  • IOmniTaskControl.TerminateWhen

Use an event, or an IOmniCancellationToken, for example (sample 14),

procedure TfrmTestTerminateWhen.btnStartTasksClick(Sender: TObject);
var
i: integer;
begin
if assigned(FCounter) and (FCounter.Value > 0) then
btnStopTasksClick(Sender);
FCounter := CreateCounter(10);
FTerminate := CreateOmniCancellationToken;
for i := 1 to FCounter.Value do begin
Log(Format(‘Task started: %d’,
[CreateTask(TMyWorker.Create()).TerminateWhen(FTerminate).WithCounter(FCounter).
MonitorWith(OmniTED).Run.UniqueID]));
end;

end

Then, somewhere in the code, call

FTerminate.Signal

This is equivalent to calling IOmniTaskControl.Terminate, i.e., it will set the termination signal and wait indefinitely till the thread finishes.  When the termination signal is set, IOmniTask.Terminated will be True

If the thread is currently inside an intense looping, we can use the following methods to jump out of the looping immediately

[method1]
while True do
begin

if WaitForSingleObject(task.TerminateEvent, 0) = WAIT_OBJECT_0 then
Break;

end

or
[method2]
while True do
begin

if task.Terminated then
Break;

end

or

[method3]
while True do
begin

if task.CancellationToken.IsSignalled then
Break;

end

To use the last one, we need to associated a cancellation token as follows:

CreateTask(TMyWorker.Create())
.TerminateWhen(FTerminate)
.CancelWith(FTerminate)
.WithCounter(FCounter)
.M
onitorWith(OmniTED)
.Run

Note on OmniThreadLibrary (OTL): Exception

When an exception happens from a working thread, OTL allows to pass the exception information to the calling thread (i.e., the one where the relevant IOmniTaskControl is created). (See OTL sample 13)

The information is saved in IOmniTaskControl.FatalException, it is a normal Exception sub class, therefore if it is not nil, then from its class name and message, one can get the exception information. For example,

if Assigned(aTaskControl.FatalException) then

lvExInfo := aTaskControl.FatalException.ClassName + ‘: ‘ + aTaskControl.FatalException.Message;

In sample 13, the decorator Enforced(True) is used, i.e.,

CreateTask(TExceptionTest.Create(Sender = btnInitException)).Enforced(True)

According to the author –

“…In short – OTL always tries to execute your task. If you call taskControl.Terminate before the task has even started, OTL will set the termination signal and start executing task. This is not a good idea if the task was waiting in the thread pool queue and threadPool.CancelAll or threadPool.Terminate was executed. To bypass this auto-execute behaviour, you can call .Enforced(false).

Given the above statement, Enforced(True) is not required, since by default it is already set to true. Therefore, sample 13 runs the same if we remove Enforced(True).

Some special remarks for exceptions that happen with TOmniWorker instances –

  • When an exception occurs, be it from inside a worker object’s Initialize( ), Cleanup( ), or an event handler for an internal timer, the worker thread will be immediately terminated (by its task control); it is after the termination, the exception information is assigned to its task control’s FatalException property, and available from there.

In all, I am not quite sure about the rationale of this behavior, but I guess the author’s intention is to pass the exception information to the task control in the first place, though, before that, some necessary handling must have been performed (at the user’s responsibility) in the worker thread where the exception is raised before passing that information. Otherwise, I would expect high possibility of resource leak since after the exception the work thread will be terminated unconditionally and depending on where the exception happens, Cleanup( ), and/or the worker object’s destructor may not even get invoked.

Note on OmniThreadLibrary (OTL): Lock

These series posts on OmniThreadLibrary (OTL) only serve as my personal mnemonics notes on the usage of OTL. OTL’s internal workings are quite sophisticated, as documented in several scattering posts by the original author http://otl.17slon.com/tutorials.htm

For thread synchronization, OTL implements Spin Lock, and Ticket Spin Lock. More information on Spin Lock from here, and Ticket Spin Lock from here.

Under OTL framework, a lock can be associated with a IOmniTaskControl by

function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskControl; overload;

function WithLock(const lock: IOmniCriticalSection): IOmniTaskControl; overload;

For example (excerpted from OTL sample 11)

procedure TfrmTestLock.btnTestLockClick(Sender: TObject);

var

task : IOmniTaskControl;

iRepeat: integer;

begin

task := CreateTask(TestArray);

if sender = btnTestLock then

task.WithLock(TTicketSpinLock.Create);

task.Run;

for iRepeat := 1 to CTestRepetitions do

if not OneTest(task.Lock) then

begin

Log(Format(‘Test failed at repetition %d’, [iRepeat]));

task.Terminate;

break; //for iRepeat

end;

task.WaitFor(INFINITE);

Log(‘Completed’);

end;

The autoDestroyLock by default is true, meaning the reference count will be decremented when the omni task control goes out of scope.

Note on OmniThreadLibrary (OTL): Thread Pool

OmniThreadLibarry (OTL) is a Delphi-based threading library developed by Delphi guru Primoz Gabrijelcic. It is a great gift that Primoz contributes to the Delphi community (free with source code http://otl.17slon.com/index.htm). The library is very handy except currently somewhat missing systematic documentation/tutorial, therefore a user would have to go through the supplied samples.

Here is a little mnemonics on OTL’s thread pool interface, IOmniThreadPool, based on Primoz’s sample 11.

1. To create a thread pool, use the utility function

  • function CreateThreadPool(const threadPoolName: string): IOmniThreadPool;

Var

FThreadPool : IOmniThreadPool;



FThreadPool := CreateThreadPool(‘SignalOptimizationThreadPool’);



A meaningful name can be set for the thread pool via the argument.

2. After creating an interface, some properties need to be configured (if not, default values will be automatically used).

  • MaxExecuting

Maximum number of working threads that are allowed to be executing concurrently. It can be any number up to 60. If it is set to 0, NO tasks will be executed.

  • MaxQueued

Maximum number of queuing threads that are allowed to be waiting.

  • IdleWorkerThreadTimeout_sec

The “time out” for idling threads, i.e., the idling time before the thread in question is removed from the thread pool.

  • UniqueID

Unique ID of the thread pool. For example,

Log(Format(‘Thread %d destroyed in thread pool %d’, [threadID, pool.UniqueID]));

  • WaitOnTerminate_sec

The waiting time after a termiated is true. If the thread cannot finish its task then after this time the thread will be killed forcefully.

3. To associate a thread pool with tasks, use IOmniTaskControl.schedule



CreateTask(THelloWorker.Create(Handle,delay_ms)).MonitorWith(OmniTED).Schedule(FThreadPool);

or

CreateTask(THelloWorker.Create(Handle,delay_ms)).MonitorWith(OmniTED).Schedule;

The second case, having no thread pool explicitly specified, will use a default GlobalOmniThreadPool automatically.

For unobserved tasks (i.e., the task controls are not assigned to explicit variables, nor a task monitor is associated), we can use

FObservableTask := CreateTask(THelloWorker.Create(Handle), ‘observed task’).OnTerminated(ReleaseObservableTask).Schedule(FThreadPool);

where, ReleaseObservableTask is defined as:

procedure TfrmTestOtlThreadPool.ReleaseObservableTask(const task: IOmniTaskControl);

begin

FObservableTask := nil;

end;

To learn more about Unobserved, see Primoz Gabrijelcic‘s original post:

http://www.thedelphigeek.com/2009/11/omnithreadlibrary-patterns-task.html

4. To terminate an individual task, we can either call IOmniTaskControl.Terminate, or IOmniThreadPool.Cancel(taskID). The latter will wait for a WaitOnTerminate_sec time; after that, the thread will be forcefully killed if it has not been finished normally. The former, similarly, you can either specify a (non-zero) customized wait time, leave it parameter-less (meaning wait indefinitely), or terminate immediately (by specifying 0 wait time).

5. IOmniThreadPoo.CancelAll can be used to cancel/terminate all tasks with the thread pool. For example,

GlobalOmniThreadPool.CancelAll,

or


GlobalOmniThreadPool.CancelAll(1000).

In the latter example a wait out time is specified.

6. Sometimes we wish to wait for all tasks to finish before moving on to next action, in this case, we can associate the thread pool with a task group

for example,



waitGroup := CreateTaskGroup;

for iTask := 1 to 6 do

CreateTask(THelloWorker.Create(Handle, 1000)).Unobserved.Join(waitGroup).Schedule;

while not waitGroup.WaitForAll(50) do

begin

Application.ProcessMessages;

end;



7. Events

The following events can be associated with a IOmniEventMonitor.

  • OnPoolThreadCreated
  • OnPoolThreadDestroying
  • OnPoolThreadKilled
  • OnPoolWOrkItemCompleted

For example, if we have the following worker,

THelloWorker = class(TOmniWorker)

strict private

FDelayTIme : Integer;

FFormHandle : THandle;

FTaskID : Int64;

public

constructor Create(aFormHandle:THandle;aDelay_ms:Integer = 1000);

destructor Destroy; override;

function Initialize: Boolean; override;

procedure SleepTask(var aMsg: TMessage); message MSG_SLEEP;

procedure Cleanup; override;

end;

If we start the task by

CreateTask(THelloWorker.Create(Handle,delay_ms)).MonitorWith(OmniTED).Schedule;



Then the global GlobalOmniTHreadPool is associated with the task. Through the life cycle of the THelloWorker, the sequence of events is as follows

THelloWoker.Create -> OnPoolThreadCreated -> THelloWorker.Initialize-> … (if Terminate is called with a generous wait out time ) -> THelloWorker.Cleanup-> THelloWorker.Destroy-> OnTaskTerminated->OnPoolWorkItemCompleted-> after IdleWorkerThreadTimeout_sec, OnPoolThreadDestroying

If a terminate (or cancel) is called with less then sufficient wait out time, then the thread is killed forcefully. In this case, the flow is as follows

THelloWoker.Create -> OnPoolThreadCreated -> THelloWorker.Initialize-> … (if Terminate is called with a stingy wait out time ) -> OnPoolWorkItemCompleted-> OnPoolThreadKilled