-
Notifications
You must be signed in to change notification settings - Fork 3
feat: thread pool #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |||||||
| when defined(linux): | ||||||||
| {.passl: "-Wl,-soname,libsds.so".} | ||||||||
|
|
||||||||
| import std/[typetraits, tables, atomics], chronos, chronicles | ||||||||
| import std/[typetraits, tables, atomics, locks], chronos, chronicles | ||||||||
| import | ||||||||
| ./sds_thread/sds_thread, | ||||||||
| ./alloc, | ||||||||
|
|
@@ -57,6 +57,29 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped | |||||||
| RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData | ||||||||
| ) | ||||||||
|
|
||||||||
| var | ||||||||
| ctxPool: seq[ptr SdsContext] | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
something like this - assuming you don't want more than 32 concurrent contexts
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's okay for now to allow as much as needed.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know how many is needed. And eventually we'll need a proper fix anyway There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, I mean using a seq here is undefined behavior - if it grows more than once, it'll crash - seqs are tied to a particular thread, arrays are not - for a cross-thread seq, one needs to use
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We tested this to work for now in Go tests and Status App. |
||||||||
| ctxPoolLock: Lock | ||||||||
|
|
||||||||
| proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext = | ||||||||
| ctxPoolLock.acquire() | ||||||||
| defer: ctxPoolLock.release() | ||||||||
| if ctxPool.len > 0: | ||||||||
| result = ctxPool.pop() | ||||||||
| else: | ||||||||
| result = sds_thread.createSdsThread().valueOr: | ||||||||
| let msg = "Error in createSdsThread: " & $error | ||||||||
| callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) | ||||||||
| return nil | ||||||||
|
|
||||||||
| proc releaseCtx(ctx: ptr SdsContext) = | ||||||||
| ctxPoolLock.acquire() | ||||||||
| defer: ctxPoolLock.release() | ||||||||
| ctx.userData = nil | ||||||||
| ctx.eventCallback = nil | ||||||||
| ctx.eventUserData = nil | ||||||||
| ctxPool.add(ctx) | ||||||||
|
|
||||||||
| proc handleRequest( | ||||||||
| ctx: ptr SdsContext, | ||||||||
| requestType: RequestType, | ||||||||
|
|
@@ -140,10 +163,9 @@ proc SdsNewReliabilityManager( | |||||||
| echo "error: missing callback in NewReliabilityManager" | ||||||||
| return nil | ||||||||
|
|
||||||||
| ## Create the SDS thread that will keep waiting for req from the main thread. | ||||||||
| var ctx = sds_thread.createSdsThread().valueOr: | ||||||||
| let msg = "Error in createSdsThread: " & $error | ||||||||
| callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) | ||||||||
| ## Create or reuse the SDS thread that will keep waiting for req from the main thread. | ||||||||
| var ctx = acquireCtx(callback, userData) | ||||||||
| if ctx.isNil(): | ||||||||
| return nil | ||||||||
|
|
||||||||
| ctx.userData = userData | ||||||||
|
|
@@ -183,14 +205,20 @@ proc SdsCleanupReliabilityManager( | |||||||
| initializeLibrary() | ||||||||
| checkLibsdsParams(ctx, callback, userData) | ||||||||
|
|
||||||||
| sds_thread.destroySdsThread(ctx).isOkOr: | ||||||||
| let msg = "libsds error: " & $error | ||||||||
| callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) | ||||||||
| let resetRes = handleRequest( | ||||||||
| ctx, | ||||||||
| RequestType.LIFECYCLE, | ||||||||
| SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), | ||||||||
| callback, | ||||||||
| userData, | ||||||||
| ) | ||||||||
|
|
||||||||
| if resetRes == RET_ERR: | ||||||||
| return RET_ERR | ||||||||
|
|
||||||||
| ## always need to invoke the callback although we don't retrieve value to the caller | ||||||||
| callback(RET_OK, nil, 0, userData) | ||||||||
| releaseCtx(ctx) | ||||||||
|
|
||||||||
| # handleRequest already invoked the callback; nothing else to signal here. | ||||||||
| return RET_OK | ||||||||
|
|
||||||||
| proc SdsResetReliabilityManager( | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seqcannot be used cross-thread until we migrate to the orc memory manager meaning that this list needs to be managed manually - the absolutely easiest way to do that is to use anarray+ counter and limit the number of sds contexts that can be created.