Skip to content

Commit 83a9f8a

Browse files
Made burst and blocking thresholds configurable, fixed deadlock issue.
1 parent 5309a67 commit 83a9f8a

File tree

5 files changed

+85
-23
lines changed

5 files changed

+85
-23
lines changed

VSoft.Messaging.dspec

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"metadata": {
33
"id": "VSoft.Messaging",
4-
"version": "0.1.0",
4+
"version": "0.2.0",
55
"description": "VSoft.Messaging is a libary that provides an internal synchronous/asynchronous publish/subscribe messaging system for Delphi applications.",
66
"authors": "Vincent Parrett",
77
"projectUrl": "https://github.com/VSoftTechnologies/VSoft.SemanticVersion",
@@ -104,6 +104,13 @@
104104
"id": "Runtime",
105105
"project": ".\\packages\\Rad Studio $compilerWithCodeName$\\VSoft.MessagingR.dproj"
106106
}
107+
],
108+
"runtime" : [
109+
{
110+
"buildId" : "Runtime",
111+
"src" : "bin\\VSoft.MessagingR$LibSuffix$.bpl",
112+
"copyLocal" : true
113+
}
107114
]
108115
}
109116
]

demos/Vcl/MsgDemo.dproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -986,8 +986,8 @@
986986
</BorlandProject>
987987
<ProjectFileVersion>12</ProjectFileVersion>
988988
<DPM>
989-
<PackageReference id="VSoft.WeakReference" platform="Win32" version="0.0.1"/>
990-
<PackageReference id="VSoft.WeakReference" platform="Win64" version="0.0.1"/>
989+
<PackageReference id="VSoft.WeakReference" platform="Win64" version="0.1.0"/>
990+
<PackageReference id="VSoft.WeakReference" platform="Win32" version="0.1.0"/>
991991
</DPM>
992992
</ProjectExtensions>
993993
<Import Project="$(BDS)\Bin\CodeGear.Delphi.Targets" Condition="Exists('$(BDS)\Bin\CodeGear.Delphi.Targets')"/>

src/VSoft.Messaging.Dispatchers.pas

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ procedure TMessageDispatcherThread.Execute;
202202
end;
203203
while (not Self.Terminated) and (FDispatcher.FTarget <> nil) and (FDispatcher.FQueue.Count > 0) and FDispatcher.Enabled do
204204
begin
205-
msgs := FDispatcher.DequeueAtMost(cMaxBurst);
205+
msgs := FDispatcher.DequeueAtMost(TMessagingOptions.MaxBurst);
206206
for i := 0 to Length(msgs) -1 do
207207
begin
208208
try
@@ -427,12 +427,11 @@ procedure TMessageDispatcherUIThread.Execute;
427427
i : integer;
428428
msgs : TArray<IMessage>;
429429
begin
430-
msgs := FDispatcher.DequeueAtMost(cMaxBurst);
430+
msgs := FDispatcher.DequeueAtMost(TMessagingOptions.MaxBurst);
431431
if Length(msgs) > 0 then
432432
begin
433433
for i := 0 to Length(msgs) -1 do
434434
begin
435-
436435
if not FDispatcher.Enabled or Self.Terminated then
437436
break;
438437
try
@@ -441,8 +440,12 @@ procedure TMessageDispatcherUIThread.Execute;
441440
//not much we can do here!
442441
end;
443442
msgs[i] := nil;
444-
//potential tight loop.
445-
TThread.Yield;
443+
//potential tight loop, yield occasionally
444+
if TMessagingOptions.MaxBurst > 10 then
445+
begin
446+
if (i mod 10) = 0 then
447+
TThread.Yield;
448+
end;
446449
end;
447450
end;
448451
end;
@@ -458,8 +461,10 @@ procedure TMessageDispatcherUIThread.Execute;
458461
end;
459462
while (not Self.Terminated) and (FDispatcher.FTarget <> nil) and (FDispatcher.FQueue.Count > 0) and FDispatcher.Enabled do
460463
begin
461-
TThread.Queue(nil,threadProc);
462-
//potential tight loop, so allow other threads to run.
464+
//since we are queuing messages and want them to complete processing before the next one is dispatched, we cannot use TThread.Queue here.
465+
TThread.Synchronize(nil,threadProc);
466+
//calls SwitchToThread - Causes the calling thread to yield execution to another thread that is ready to run on the current processor.
467+
//The operating system selects the next thread to be executed.
463468
TThread.Yield;
464469
end;
465470
end;

src/VSoft.Messaging.Internal.pas

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,12 @@ interface
3030
uses
3131
System.SyncObjs;
3232

33-
const
34-
cMaxBurst = 20; // Maxium number of message to dequeue and process in one go.
35-
cBlockedThreshold = 100; // Start waiting for queues to unblock at this global queue depth
36-
cUnblockedThreshold = 50; // Stop waiting at this queue depth (must be at least one less than block threshold)
37-
3833
type
3934
//used to control pushback etc
4035
TMessagingControl = class
4136
private
4237
class var
43-
FGlobalQueueDepth : Integer;
38+
FGlobalQueueDepth : integer;
4439
FUnblockedSignal : TEvent;
4540
public
4641
class constructor Create;
@@ -78,7 +73,8 @@ TVSMessageWrapper<T> = class(TInterfacedObject,IMessage)
7873
implementation
7974

8075
uses
81-
System.Classes;
76+
System.Classes,
77+
VSoft.Messaging;
8278

8379
{ TMessagingControl }
8480

@@ -89,7 +85,7 @@ implementation
8985

9086
class procedure TMessagingControl.DecrementGlobalQueueDepth;
9187
begin
92-
if TInterlocked.Decrement(FGlobalQueueDepth) = cUnblockedThreshold then
88+
if TInterlocked.Decrement(FGlobalQueueDepth) = TMessagingOptions.UnblockThreshold then
9389
FUnblockedSignal.SetEvent; // Unblock "PushbackIfNeeded"
9490
end;
9591

@@ -105,15 +101,15 @@ class function TMessagingControl.GetGlobalQueueDepth: integer;
105101

106102
class procedure TMessagingControl.IncrementGlobalQueueDepth;
107103
begin
108-
if TInterlocked.Increment(FGlobalQueueDepth) = cBlockedThreshold then
104+
if TInterlocked.Increment(FGlobalQueueDepth) = TMessagingOptions.BlockedThreshold then
109105
FUnblockedSignal.ResetEvent; // Block "PushbackIfNeeded"
110106
end;
111107

112108
class procedure TMessagingControl.PushbackIfNeeded;
113109
var
114110
res : TWaitResult;
115111
begin
116-
if FGlobalQueueDepth < cBlockedThreshold then
112+
if FGlobalQueueDepth < TMessagingOptions.BlockedThreshold then
117113
exit;
118114

119115
if MainThreadID = TThread.CurrentThread.ThreadID then

src/VSoft.Messaging.pas

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ TChannelHelper = record
4444
FPostMessageProc : TMessageProc;
4545
FSendMessageProc : TMessageProc;
4646
public
47-
//10.4 update 2 broke record constraints, commenting out for now.
4847
//async
49-
procedure PostMessage<T {: record}>(const message : T);
48+
procedure PostMessage<T : record>(const message : T);
5049
//sync
51-
procedure SendMessage<T {: record}>(const message : T);
50+
procedure SendMessage<T : record>(const message : T);
5251
constructor Create(const postMessageProc : TMessageProc; const sendMessageProc : TMessageProc);
5352
end;
5453

@@ -140,13 +139,35 @@ TMessageDispatcherFactory = class
140139
class function CreateUIDispatcher(const target : TObject = nil) : IMessageDispatcher;
141140
end;
142141

142+
TMessagingOptions = class
143+
private
144+
class var
145+
FMaxBurst : integer; // Maxium number of message to dequeue and process in one go.
146+
FBlockedThreshold : integer; // Start waiting for queues to unblock at this global queue depth
147+
FUnblockedThreshold : integer;
148+
protected
149+
class procedure SetMaxBurst(const Value: integer); static;
150+
class procedure SetBlockedThreshold(const Value: integer); static;
151+
class procedure SetUnblockThreashold(const Value: integer); static;
152+
class constructor Create;
153+
public
154+
class property MaxBurst : integer read FMaxBurst write SetMaxBurst; // Maxium number of message to dequeue and process in one go.
155+
class property BlockedThreshold : integer read FBlockedThreshold write SetBlockedThreshold; // Start waiting for queues to unblock at this global queue depth
156+
class property UnblockThreshold : integer read FUnblockedThreshold write SetUnblockThreashold; //Stop waiting at this queue depth (must be at least one less than block threshold)
157+
end;
158+
143159
implementation
144160

145161
uses
146162
System.Classes,
147163
VSoft.Messaging.Channel,
148164
VSoft.Messaging.Dispatchers;
149165

166+
const
167+
cDefaultMaxBurst = 20; // Maxium number of message to dequeue and process in one go.
168+
cDefaultBlockedThreshold = 100; // Start waiting for queues to unblock at this global queue depth
169+
cDefaultUnblockedThreshold = 50; // Stop waiting at this queue depth (must be at least one less than block threshold)
170+
150171

151172

152173
{ TChannelHelper }
@@ -194,4 +215,37 @@ class function TMessageDispatcherFactory.CreateUIDispatcher(const target : TObje
194215
result := TUIMessageDispatcher.Create(target);
195216
end;
196217

218+
{ TMessagingOptions }
219+
220+
class constructor TMessagingOptions.Create;
221+
begin
222+
FMaxBurst := cDefaultMaxBurst;
223+
FBlockedThreshold := cDefaultBlockedThreshold;
224+
FUnblockedThreshold := cDefaultUnblockedThreshold;
225+
end;
226+
227+
class procedure TMessagingOptions.SetBlockedThreshold(const Value: integer);
228+
begin
229+
if value > 1 then
230+
FBlockedThreshold := Value
231+
else
232+
FBlockedThreshold := 2;
233+
end;
234+
235+
class procedure TMessagingOptions.SetMaxBurst(const Value: integer);
236+
begin
237+
if value > 0 then
238+
FMaxBurst := Value
239+
else
240+
FMaxBurst := 1;
241+
end;
242+
243+
class procedure TMessagingOptions.SetUnblockThreashold(const Value: integer);
244+
begin
245+
if value < FBlockedThreshold -1 then
246+
FUnblockedThreshold := Value
247+
else
248+
FUnblockedThreshold := FBlockedThreshold -1;
249+
end;
250+
197251
end.

0 commit comments

Comments
 (0)