Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dynamodb-simple.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ library
semigroups, bytestring >= 0.10.8.0, containers,
template-haskell, transformers, exceptions,
amazonka, monad-loops, conduit, hashable,
amazonka-core, aeson, vector, scientific,
tagged, uuid-types, mtl
aeson, vector, scientific,
tagged, uuid-types, mtl, resourcet
-- hspec, safe-exceptions
hs-source-dirs: src
default-language: Haskell2010
Expand All @@ -47,6 +47,6 @@ test-suite spec
build-depends: base, dynamodb-simple, hspec, text, lens, transformers,
safe-exceptions, amazonka-dynamodb >= 1.6.0, amazonka, conduit,
semigroups, hashable, containers, unordered-containers,
tagged
tagged, resourcet
default-language: Haskell2010
hs-source-dirs: test
2 changes: 1 addition & 1 deletion src/Control/Monad/Supply.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ instance (Monoid w, MonadSupply s m) => MonadSupply s (WriterT w m) where
peek = lift peek
exhausted = lift exhausted

instance Semigroup a => Semigroup (Supply s a) where
instance Data.Semigroup.Semigroup a => Semigroup (Supply s a) where
m1 <> m2 = (<>) <$> m1 <*> m2

instance (Semigroup a, Monoid a) => Monoid (Supply s a) where
Expand Down
100 changes: 51 additions & 49 deletions src/Database/DynamoDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ module Database.DynamoDB (
, TableScan
) where

import Control.Lens ((%~), (.~), (^.))
import Control.Lens ((%~), (.~), (?~), (^.))
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.Trans.Resource
import Data.Bool (bool)
import Data.Function ((&))
import Data.Maybe (fromMaybe)
import Data.Proxy
import Data.Semigroup ((<>))
import Network.AWS
Expand All @@ -113,54 +115,54 @@ import Database.DynamoDB.QueryRequest


dDeleteItem :: DynamoTable a r => Proxy a -> PrimaryKey a r -> D.DeleteItem
dDeleteItem p pkey = D.deleteItem (tableName p) & D.diKey .~ dKeyToAttr p pkey
dDeleteItem p pkey = D.newDeleteItem (tableName p) & D.deleteItem_key .~ dKeyToAttr p pkey

dGetItem :: DynamoTable a r => Proxy a -> PrimaryKey a r -> D.GetItem
dGetItem p pkey = D.getItem (tableName p) & D.giKey .~ dKeyToAttr p pkey
dGetItem p pkey = D.newGetItem (tableName p) & D.getItem_key .~ dKeyToAttr p pkey

-- | Write item into the database; overwrite any previously existing item with the same primary key.
putItem :: (MonadAWS m, DynamoTable a r) => a -> m ()
putItem item = void $ send (dPutItem item)
putItem :: (DynamoTable a r, MonadResource m, MonadThrow m) => Env -> a -> m ()
putItem env item = void $ send env (dPutItem item)

-- | Write item into the database only if it doesn't already exist.
insertItem :: forall a r m. (MonadAWS m, DynamoTable a r) => a -> m ()
insertItem item = do
insertItem :: forall a r m. (DynamoTable a r, MonadResource m) => Env -> a -> m ()
insertItem env item = do
let keyfields = primaryFields (Proxy :: Proxy a)
-- Create condition attribute_not_exist(hash_key)
pkeyMissing = (AttrMissing . nameGenPath . pure . IntraName) $ head keyfields
(expr, attnames, attvals) = dumpCondition pkeyMissing
cmd = dPutItem item & D.piExpressionAttributeNames .~ attnames
& D.piConditionExpression .~ Just expr
& bool (D.piExpressionAttributeValues .~ attvals) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
void $ send cmd
cmd = dPutItem item & D.putItem_expressionAttributeNames ?~ attnames
& D.putItem_conditionExpression ?~ expr
& bool (D.putItem_expressionAttributeValues ?~ attvals) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
void $ send env cmd


-- | Read item from the database; primary key is either a hash key or (hash,range) tuple depending on the table.
getItem :: forall m a r. (MonadAWS m, DynamoTable a r) => Consistency -> Proxy a -> PrimaryKey a r -> m (Maybe a)
getItem consistency p key = do
let cmd = dGetItem p key & D.giConsistentRead . consistencyL .~ consistency
rs <- send cmd
let result = rs ^. D.girsItem
getItem :: forall m a r. (DynamoTable a r, MonadResource m, MonadThrow m) => Env -> Consistency -> Proxy a -> PrimaryKey a r -> m (Maybe a)
getItem env consistency p key = do
let cmd = dGetItem p key & D.getItem_consistentRead . consistencyL .~ consistency
rs <- send env cmd
let result = fromMaybe mempty (rs ^. D.getItemResponse_item)
if | null result -> return Nothing
| otherwise ->
case dGsDecode result of
Right res -> return (Just res)
Left err -> throwM (DynamoException $ "Cannot decode item: " <> err)
Left err -> Control.Monad.Catch.throwM (DynamoException $ "Cannot decode item: " Data.Semigroup.<> err)

-- | Delete item from the database by specifying the primary key.
deleteItemByKey :: forall m a r. (MonadAWS m, DynamoTable a r) => Proxy a -> PrimaryKey a r -> m ()
deleteItemByKey p pkey = void $ send (dDeleteItem p pkey)
deleteItemByKey :: forall m a r. (DynamoTable a r, MonadResource m) => Env -> Proxy a -> PrimaryKey a r -> m ()
deleteItemByKey env p pkey = void $ send env (dDeleteItem p pkey)

-- | Delete item from the database by specifying the primary key and a condition.
-- Throws AWS exception if the condition does not succeed.
deleteItemCondByKey :: forall m a r.
(MonadAWS m, DynamoTable a r) => Proxy a -> PrimaryKey a r -> FilterCondition a -> m ()
deleteItemCondByKey p pkey cond =
(DynamoTable a r, MonadResource m) => Env -> Proxy a -> PrimaryKey a r -> FilterCondition a -> m ()
deleteItemCondByKey env p pkey cond =
let (expr, attnames, attvals) = dumpCondition cond
cmd = dDeleteItem p pkey & D.diExpressionAttributeNames .~ attnames
& bool (D.diExpressionAttributeValues .~ attvals) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
& D.diConditionExpression .~ Just expr
in void (send cmd)
cmd = dDeleteItem p pkey & D.deleteItem_expressionAttributeNames ?~ attnames
& bool (D.deleteItem_expressionAttributeValues ?~ attvals) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
& D.deleteItem_conditionExpression ?~ expr
in void (send env cmd)

-- | Generate update item object; automatically adds condition for existence of primary
-- key, so that only existing objects are modified
Expand All @@ -174,56 +176,56 @@ dUpdateItem p pkey actions mcond =
pkeyExists = (AttrExists . nameGenPath . pure . IntraName) (head keyfields)

genAction actparams =
D.updateItem (tableName p) & D.uiKey .~ dKeyToAttr p pkey
& addActions actparams
& addCondition (Just pkeyExists <> mcond)
D.newUpdateItem (tableName p) & D.updateItem_key .~ dKeyToAttr p pkey
& addActions actparams
& addCondition (Just pkeyExists <> mcond)

addActions (expr, attnames, attvals) =
(D.uiUpdateExpression .~ Just expr)
. (D.uiExpressionAttributeNames %~ (<> attnames))
. bool (D.uiExpressionAttributeValues %~ (<> attvals)) id (null attvals)
(D.updateItem_updateExpression ?~ expr)
. (D.updateItem_expressionAttributeNames %~ Just . (<> attnames) . fromMaybe mempty)
. bool (D.updateItem_expressionAttributeValues %~ Just . (<> attvals) . fromMaybe mempty) id (null attvals)
addCondition (Just cond) =
let (expr, attnames, attvals) = dumpCondition cond
in (D.uiConditionExpression .~ Just expr)
. (D.uiExpressionAttributeNames %~ (<> attnames))
. bool (D.uiExpressionAttributeValues %~ (<> attvals)) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
in (D.updateItem_conditionExpression ?~ expr)
. (D.updateItem_expressionAttributeNames %~ Just . (<> attnames) . fromMaybe mempty)
. bool (D.updateItem_expressionAttributeValues %~ Just . (<> attvals) . fromMaybe mempty) id (null attvals) -- HACK; https://github.com/brendanhay/amazonka/issues/332
addCondition Nothing = id -- Cannot happen anyway


-- | Update item in a table.
--
-- > updateItem (Proxy :: Proxy Test) (12, "2") (colCount +=. 100)
updateItemByKey_ :: forall a m r.
(MonadAWS m, DynamoTable a r) => Proxy a -> PrimaryKey a r -> Action a -> m ()
updateItemByKey_ p pkey actions
| Just cmd <- dUpdateItem p pkey actions Nothing = void $ send cmd
(DynamoTable a r, MonadResource m) => Env -> Proxy a -> PrimaryKey a r -> Action a -> m ()
updateItemByKey_ env p pkey actions
| Just cmd <- dUpdateItem p pkey actions Nothing = void $ send env cmd
| otherwise = return ()

-- | Update item in a database, return an updated version of the item.
updateItemByKey :: forall a m r.
(MonadAWS m, DynamoTable a r) => Proxy a -> PrimaryKey a r -> Action a -> m a
updateItemByKey p pkey actions
(DynamoTable a r, MonadResource m, MonadThrow m) => Env -> Proxy a -> PrimaryKey a r -> Action a -> m a
updateItemByKey env p pkey actions
| Just cmd <- dUpdateItem p pkey actions Nothing = do
rs <- send (cmd & D.uiReturnValues .~ Just D.AllNew)
case dGsDecode (rs ^. D.uirsAttributes) of
rs <- send env (cmd & D.updateItem_returnValues ?~ D.ReturnValue_ALL_NEW)
case dGsDecode (fromMaybe mempty (rs ^. D.updateItemResponse_attributes)) of
Right res -> return res
Left err -> throwM (DynamoException $ "Cannot decode item: " <> err)
| otherwise = do
rs <- getItem Strongly p pkey
rs <- getItem env Strongly p pkey
case rs of
Just res -> return res
Nothing -> throwM (DynamoException "Cannot decode item.")

-- | Update item in a table while specifying a condition.
updateItemCond_ :: forall a m r. (MonadAWS m, DynamoTable a r)
=> Proxy a -> PrimaryKey a r -> FilterCondition a -> Action a -> m ()
updateItemCond_ p pkey cond actions
| Just cmd <- dUpdateItem p pkey actions (Just cond) = void $ send cmd
updateItemCond_ :: forall a m r. (DynamoTable a r, MonadResource m)
=> Env ->Proxy a -> PrimaryKey a r -> FilterCondition a -> Action a -> m ()
updateItemCond_ env p pkey cond actions
| Just cmd <- dUpdateItem p pkey actions (Just cond) = void $ send env cmd
| otherwise = return ()

-- | Delete a table from DynamoDB.
deleteTable :: (MonadAWS m, DynamoTable a r) => Proxy a -> m ()
deleteTable p = void $ send (D.deleteTable (tableName p))
deleteTable :: (DynamoTable a r, MonadResource m) => Env -> Proxy a -> m ()
deleteTable env p = void $ send env (D.newDeleteTable (tableName p))

-- | Extract primary key from a record.
--
Expand Down Expand Up @@ -281,7 +283,7 @@ tableKey = dTableKey
-- -- Save data to database
-- putItem (Test "news" "1-2-3-4" "New subject")
-- -- Fetch data given primary key
-- item <- getItem Eventually tTest ("news", "1-2-3-4")
-- item <- getItem env Eventually tTest ("news", "1-2-3-4")
-- liftIO $ print item -- (item :: Maybe Test)
-- -- Scan data using filter condition, return 10 results
-- items <- scanCond tTest (subject' ==. "New subejct") 10
Expand Down
60 changes: 31 additions & 29 deletions src/Database/DynamoDB/BatchRequest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ module Database.DynamoDB.BatchRequest (
) where

import Control.Concurrent (threadDelay)
import Control.Lens (at, ix, (.~), (^.), (^..))
import Control.Lens (at, ix, (.~), (?~), (^.), (^..))
import Control.Monad (unless)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource
import Data.Function ((&))
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List.NonEmpty (NonEmpty(..))
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Data.Proxy
import qualified Data.Text as T
Expand All @@ -39,28 +41,28 @@ import Database.DynamoDB.Types
-- | Retry batch operation, until unprocessedItems is empty.
--
-- TODO: we should use exponential backoff; currently we use a simple 1-sec threadDelay
retryWriteBatch :: MonadAWS m => D.BatchWriteItem -> m ()
retryWriteBatch cmd = do
rs <- send cmd
let unprocessed = rs ^. D.bwirsUnprocessedItems
retryWriteBatch :: MonadResource m => Env -> D.BatchWriteItem -> m ()
retryWriteBatch env cmd = do
rs <- send env cmd
let unprocessed = fromMaybe mempty (rs ^. D.batchWriteItemResponse_unprocessedItems)
unless (null unprocessed) $ do
liftIO $ threadDelay 1000000
retryWriteBatch (cmd & D.bwiRequestItems .~ unprocessed)
retryWriteBatch env (cmd & D.batchWriteItem_requestItems .~ unprocessed)

-- | Retry batch operation, until unprocessedItems is empty.
--
-- TODO: we should use exponential backoff; currently we use a simple 1-sec threadDelay
retryReadBatch :: MonadAWS m => D.BatchGetItem -> m (HashMap T.Text [HashMap T.Text D.AttributeValue])
retryReadBatch = go mempty
retryReadBatch :: MonadResource m => Env -> D.BatchGetItem -> m (HashMap T.Text [HashMap T.Text D.AttributeValue])
retryReadBatch env = go mempty
where
go previous cmd = do
rs <- send cmd
let unprocessed = rs ^. D.bgirsUnprocessedKeys
result = HMap.unionWith (++) previous (rs ^. D.bgirsResponses)
rs <- send env cmd
let unprocessed = fromMaybe mempty (rs ^. D.batchGetItemResponse_unprocessedKeys)
result = HMap.unionWith (++) previous (fromMaybe mempty (rs ^. D.batchGetItemResponse_responses))
if | null unprocessed -> return result
| otherwise -> do
liftIO $ threadDelay 1000000
go result (cmd & D.bgiRequestItems .~ unprocessed)
go result (cmd & D.batchGetItem_requestItems .~ unprocessed)

-- | Chunk list according to batch operation limit
chunkBatch :: Int -> [a] -> [NonEmpty a]
Expand All @@ -73,47 +75,47 @@ chunkBatch _ _ = []
-- If a batch fails on dynamodb exception, it is raised.
--
-- Note: On exception, the information about which items were saved is unavailable
putItemBatch :: forall m a r. (MonadAWS m, DynamoTable a r) => [a] -> m ()
putItemBatch lst = mapM_ go (chunkBatch 25 lst)
putItemBatch :: forall m a r. (MonadResource m, DynamoTable a r) => Env -> [a] -> m ()
putItemBatch env lst = mapM_ go (chunkBatch 25 lst)
where
go items = do
let tblname = tableName (Proxy :: Proxy a)
wrequests = fmap mkrequest items
mkrequest item = D.writeRequest & D.wrPutRequest .~ Just (D.putRequest & D.prItem .~ gsEncode item)
cmd = D.batchWriteItem & D.bwiRequestItems . at tblname .~ Just wrequests
retryWriteBatch cmd
mkrequest item = D.newWriteRequest & D.writeRequest_putRequest ?~ (D.newPutRequest & D.putRequest_item .~ gsEncode item)
cmd = D.newBatchWriteItem & D.batchWriteItem_requestItems . at tblname ?~ wrequests
retryWriteBatch env cmd


-- | Get batch of items.
getItemBatch :: forall m a r. (MonadAWS m, DynamoTable a r) => Consistency -> [PrimaryKey a r] -> m [a]
getItemBatch consistency lst = concat <$> mapM go (chunkBatch 100 lst)
getItemBatch :: forall m a r. (MonadResource m, MonadThrow m, DynamoTable a r) => Env -> Consistency -> [PrimaryKey a r] -> m [a]
getItemBatch env consistency lst = concat <$> mapM go (chunkBatch 100 lst)
where
go keys = do
let tblname = tableName (Proxy :: Proxy a)
wkaas = fmap (dKeyToAttr (Proxy :: Proxy a)) keys
kaas = D.keysAndAttributes wkaas & D.kaaConsistentRead . consistencyL .~ consistency
cmd = D.batchGetItem & D.bgiRequestItems . at tblname .~ Just kaas
kaas = D.newKeysAndAttributes wkaas & D.keysAndAttributes_consistentRead . consistencyL .~ consistency
cmd = D.newBatchGetItem & D.batchGetItem_requestItems . at tblname ?~ kaas

tbls <- retryReadBatch cmd
tbls <- retryReadBatch env cmd
mapM decoder (tbls ^.. ix tblname . traverse)
decoder item =
case dGsDecode item of
Right res -> return res
Left err -> throwM (DynamoException $ "Error decoding item: " <> err )
Left err -> Control.Monad.Catch.throwM (DynamoException $ "Error decoding item: " Data.Monoid.<> err )

dDeleteRequest :: DynamoTable a r => Proxy a -> PrimaryKey a r -> D.DeleteRequest
dDeleteRequest p pkey = D.deleteRequest & D.drKey .~ dKeyToAttr p pkey
dDeleteRequest p pkey = D.newDeleteRequest & D.deleteRequest_key .~ dKeyToAttr p pkey

-- | Batch version of 'deleteItemByKey'.
--
-- Note: Because the requests are chunked, the information about which items
-- were deleted in case of exception is unavailable.
deleteItemBatchByKey :: forall m a r. (MonadAWS m, DynamoTable a r) => Proxy a -> [PrimaryKey a r] -> m ()
deleteItemBatchByKey p lst = mapM_ go (chunkBatch 25 lst)
deleteItemBatchByKey :: forall m a r. (MonadResource m, DynamoTable a r) => Env -> Proxy a -> [PrimaryKey a r] -> m ()
deleteItemBatchByKey env p lst = mapM_ go (chunkBatch 25 lst)
where
go keys = do
let tblname = tableName p
wrequests = fmap mkrequest keys
mkrequest key = D.writeRequest & D.wrDeleteRequest .~ Just (dDeleteRequest p key)
cmd = D.batchWriteItem & D.bwiRequestItems . at tblname .~ Just wrequests
retryWriteBatch cmd
mkrequest key = D.newWriteRequest & D.writeRequest_deleteRequest ?~ dDeleteRequest p key
cmd = D.newBatchWriteItem & D.batchWriteItem_requestItems . at tblname ?~ wrequests
retryWriteBatch env cmd
Loading