diff --git a/docs/User/Project/Changelog.md b/docs/User/Project/Changelog.md index f87c8c2215..68e9dc9bb9 100644 --- a/docs/User/Project/Changelog.md +++ b/docs/User/Project/Changelog.md @@ -4,6 +4,13 @@ ## Unreleased +### Bug Fixes + +* Fix `parDemuxScan` deadlock when a worker throws an exception while the + driver is blocked on a full input buffer. + +### Breaking Changes + * The following deprecated modules have been removed: * Streamly.Data.Array.Foreign * Streamly.Data.Fold.Tee diff --git a/src/Streamly/Internal/Data/Fold/Channel/Type.hs b/src/Streamly/Internal/Data/Fold/Channel/Type.hs index 70fa41cb6d..e04146a09b 100644 --- a/src/Streamly/Internal/Data/Fold/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Fold/Channel/Type.hs @@ -313,6 +313,8 @@ sendEOFToDriver sv = liftIO $ do sendExceptionToDriver :: Channel m a b -> SomeException -> IO () sendExceptionToDriver sv e = do tid <- myThreadId + writeIORef (closedForInput sv) True + void $ tryPutMVar (inputSpaceDoorBell sv) () void $ sendToDriver sv (FoldException tid e) data FromSVarState m a b = diff --git a/test/Streamly/Test/Data/Scanl/Concurrent.hs b/test/Streamly/Test/Data/Scanl/Concurrent.hs index 8b3507231b..6141d9e534 100644 --- a/test/Streamly/Test/Data/Scanl/Concurrent.hs +++ b/test/Streamly/Test/Data/Scanl/Concurrent.hs @@ -10,6 +10,7 @@ module Streamly.Test.Data.Scanl.Concurrent (main) where import Control.Concurrent (threadDelay) +import Control.Exception (ErrorCall(..), try) import Data.Function ( (&) ) import Data.IORef (newIORef, atomicModifyIORef') import Data.List (sort) @@ -103,6 +104,37 @@ parDemuxScan_StreamEnd concOpts = do map snd (filter fst res) `shouldBe` filter even [1..streamLen] map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen] +parDemuxScan_WorkerException :: (Scanl.Config -> Scanl.Config) -> IO () +parDemuxScan_WorkerException concOpts = do + let throwAfter = 3 + -- All items go to the same key so the driver stays in sendToWorker_ + -- for a single worker channel, maximizing the chance of blocking on a + -- full buffer. + demuxer _ = (0 :: Int) + gen _ = pure + $ Scanl.lmapM + (\x -> do + -- Slow the worker down so the buffer fills up and the + -- driver blocks in sendToWorker_ waiting for space. + threadDelay 50000 + if (x :: Int) > throwAfter + then error "worker exception" + else pure x) + $ Scanl.mkScanl1 (\_ x -> x) + -- Send enough items to fill the buffer (maxBuffer 1) and block + inpList = [1..100] + inpStream = Stream.fromList inpList + res <- try + $ Scanl.parDemuxScanM concOpts demuxer gen inpStream + & Stream.concatMap Stream.fromList + & fmap (\x -> (fst x,) <$> snd x) + & Stream.catMaybes + & Stream.fold Fold.toList + case res of + Left (ErrorCall msg) -> msg `shouldBe` "worker exception" + Right _ -> expectationFailure + "Expected ErrorCall exception but stream completed successfully" + main :: IO () main = hspec $ H.parallel @@ -118,3 +150,5 @@ main = hspec $ parDemuxScan_StreamEnd (Scanl.maxBuffer 1) it "parDemuxScanM (scan end) (maxBuffer 1)" $ parDemuxScan_ScanEnd (Scanl.maxBuffer 1) + it "parDemuxScanM (worker exception) (maxBuffer 1)" + $ parDemuxScan_WorkerException (Scanl.maxBuffer 1)