Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/User/Project/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@

## Unreleased

### Bug Fixes

* Fix `parDemuxScan` deadlock when a worker throws an exception while the
driver is blocked on a full input buffer.

### Breaking Changes

* The following deprecated modules have been removed:
* Streamly.Data.Array.Foreign
* Streamly.Data.Fold.Tee
Expand Down
2 changes: 2 additions & 0 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ sendEOFToDriver sv = liftIO $ do
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver sv e = do
tid <- myThreadId
writeIORef (closedForInput sv) True
void $ tryPutMVar (inputSpaceDoorBell sv) ()
void $ sendToDriver sv (FoldException tid e)

data FromSVarState m a b =
Expand Down
34 changes: 34 additions & 0 deletions test/Streamly/Test/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
module Streamly.Test.Data.Scanl.Concurrent (main) where

import Control.Concurrent (threadDelay)
import Control.Exception (ErrorCall(..), try)
import Data.Function ( (&) )
import Data.IORef (newIORef, atomicModifyIORef')
import Data.List (sort)
Expand Down Expand Up @@ -103,6 +104,37 @@ parDemuxScan_StreamEnd concOpts = do
map snd (filter fst res) `shouldBe` filter even [1..streamLen]
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]

parDemuxScan_WorkerException :: (Scanl.Config -> Scanl.Config) -> IO ()
parDemuxScan_WorkerException concOpts = do
let throwAfter = 3
-- All items go to the same key so the driver stays in sendToWorker_
-- for a single worker channel, maximizing the chance of blocking on a
-- full buffer.
demuxer _ = (0 :: Int)
gen _ = pure
$ Scanl.lmapM
(\x -> do
-- Slow the worker down so the buffer fills up and the
-- driver blocks in sendToWorker_ waiting for space.
threadDelay 50000
if (x :: Int) > throwAfter
then error "worker exception"
else pure x)
$ Scanl.mkScanl1 (\_ x -> x)
-- Send enough items to fill the buffer (maxBuffer 1) and block
inpList = [1..100]
inpStream = Stream.fromList inpList
res <- try
$ Scanl.parDemuxScanM concOpts demuxer gen inpStream
& Stream.concatMap Stream.fromList
& fmap (\x -> (fst x,) <$> snd x)
& Stream.catMaybes
& Stream.fold Fold.toList
case res of
Left (ErrorCall msg) -> msg `shouldBe` "worker exception"
Right _ -> expectationFailure
"Expected ErrorCall exception but stream completed successfully"

main :: IO ()
main = hspec
$ H.parallel
Expand All @@ -118,3 +150,5 @@ main = hspec
$ parDemuxScan_StreamEnd (Scanl.maxBuffer 1)
it "parDemuxScanM (scan end) (maxBuffer 1)"
$ parDemuxScan_ScanEnd (Scanl.maxBuffer 1)
it "parDemuxScanM (worker exception) (maxBuffer 1)"
$ parDemuxScan_WorkerException (Scanl.maxBuffer 1)
Loading