Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Storage reimplemented in RON format #101

Merged
merged 6 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 95 additions & 13 deletions ron-storage/RON/Storage.hs
Original file line number Diff line number Diff line change
@@ -1,33 +1,115 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}

module RON.Storage (Collection (..), DocId (..), MonadStorage (..)) where
module RON.Storage
( Collection (..)
, CollectionName
, DocId (..)
, Document (..)
, MonadStorage (..)
, load
, modify
, rawDocId
) where

import RON.Data (Replicated)
import RON.Event (Clock)
import RON.Types (UUID)
import Control.Monad (when)
import Control.Monad.Except (MonadError, catchError, liftEither,
throwError)
import Control.Monad.State.Strict (StateT, execStateT)
import Data.ByteString.Lazy (ByteString)
import Data.Foldable (for_)
import Data.List.NonEmpty (NonEmpty ((:|)))
import Data.Traversable (for)

import RON.Data (ReplicatedAsObject, reduceObject')
import RON.Event (Clock (..))
import RON.Types (Object, UUID)
import qualified RON.UUID as UUID

type Version = UUID

newtype DocId doc = DocId UUID
newtype DocId a = DocId UUID
deriving Show

rawDocId :: DocId doc -> UUID
rawDocId (DocId uuid) = uuid

type CollectionName = FilePath

class Replicated doc => Collection doc where
class ReplicatedAsObject a => Collection a where
collectionName :: CollectionName
fallbackParse :: UUID -> ByteString -> Either String (Object a)

class Clock m => MonadStorage m where
-- | TODO rename list -> getList
class (Clock m, MonadError String m) => MonadStorage m where
listCollections :: m [CollectionName]

-- | Must return @[]@ for non-existent collection
listDocuments :: Collection doc => m [DocId doc]
listDocuments :: Collection a => m [DocId a]

-- | Must return @[]@ for non-existent document
listVersions :: Collection doc => DocId doc -> m [Version]
listVersions :: Collection a => DocId a -> m [Version]

-- | Must create collection and document if not exist
createVersion :: Collection doc => DocId doc -> Version -> doc -> m ()
createVersion :: Collection a => Object a -> m ()

readVersion :: Collection a => DocId a -> Version -> m (Object a)

deleteVersion :: Collection a => DocId a -> Version -> m ()

-- | Result of DB reading
data Document a = Document
{ value :: Object a
-- ^ merged value
, versions :: NonEmpty Version
}

load :: (Collection a, MonadStorage m) => DocId a -> m (Document a)
load docId = loadRetry (3 :: Int)
where
loadRetry n
| n > 0 = do
versions0 <- listVersions docId
case versions0 of
[] -> throwError "Invalid document"
v:vs -> do
let versions = v :| vs
let wrapDoc value = Document{value, versions}
e <- fmap (fmap wrapDoc . vsconcat) $ for versions $ \ver ->
fmapL
(( "version " ++ show ver ++ " ("
++ UUID.encodeBase32 ver ++ ")" ++ ": ") ++)
<$> catchExcept (readVersion docId ver)
liftEither e
| otherwise = throwError "Maximum retries exceeded"

-- | Validation-like version of 'sconcat'.
vsconcat :: NonEmpty (Either String (Object a)) -> Either String (Object a)
vsconcat = foldr1 vappend
where
vappend (Left e1) (Left e2) = Left $ e1 <> e2
vappend e1@(Left _ ) _ = e1
vappend (Right a1) (Right a2) = reduceObject' a1 a2
vappend _ e2@(Left _ ) = e2

catchExcept :: MonadError e m => m a -> m (Either e a)
catchExcept ma = catchError (Right <$> ma) (pure . Left)

readVersion
:: Collection doc => DocId doc -> Version -> m (Either String doc)
fmapL :: (a -> b) -> Either a c -> Either b c
fmapL f = \case
Left a -> Left $ f a
Right c -> Right c

deleteVersion :: Collection doc => DocId doc -> Version -> m ()
-- TODO(2018-10-22, cblp) call `deleteVersion` from `createVersion`
modify
:: (Collection a, MonadStorage m)
=> DocId a -> StateT (Object a) m () -> m (Object a)
modify docId f = do
Document{value = docOld, versions} <- load docId
docNew <- execStateT f docOld
when (docNew /= docOld || length versions /= 1) $ do
createVersion docNew
for_ versions (deleteVersion docId)
pure docNew
173 changes: 173 additions & 0 deletions ron-storage/RON/Storage/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module RON.Storage.IO
( Handle
, Storage
, newHandle
, runStorage
, runStorageT
) where

import Control.Exception (catch, throwIO)
import Control.Monad (filterM, unless)
import Control.Monad.Except (ExceptT (ExceptT), MonadError,
catchError, liftEither, runExceptT,
throwError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (ReaderT (ReaderT), ask, runReaderT)
import Control.Monad.Trans (lift)
import Data.Bits (shiftL)
import qualified Data.ByteString.Lazy as BSL
import Data.Coerce (coerce)
import Data.IORef (IORef, newIORef)
import Data.Maybe (fromMaybe, listToMaybe)
import Data.Traversable (for)
import Data.Word (Word64)
import Network.Info (MAC (MAC), getNetworkInterfaces, mac)
import RON.Event (Clock, EpochClock, EpochTime, Replica, ReplicaId,
advance, applicationSpecific, getCurrentEpochTime,
getEventUuid, getEvents, getPid, runEpochClock)
import RON.Text (parseStateFrame, serializeStateFrame)
import RON.Types (Object (Object), objectFrame, objectId)
import qualified RON.UUID as UUID
import System.Directory (createDirectoryIfMissing, doesDirectoryExist,
listDirectory, removeFile)
import System.FilePath ((</>))
import System.IO.Error (isDoesNotExistError)

import RON.Storage (Collection, DocId (DocId), MonadStorage,
collectionName, createVersion, deleteVersion,
fallbackParse, listCollections, listDocuments,
listVersions, readVersion)

-- | Environment is the dataDir
newtype StorageT clock a = Storage (ExceptT String (ReaderT FilePath clock) a)
deriving (Applicative, Functor, Monad, MonadIO)

runStorageT :: FilePath -> StorageT m a -> m (Either String a)
runStorageT dataDir (Storage except) =
(`runReaderT` dataDir) $ runExceptT except

instance Replica m => Replica (StorageT m) where
getPid = Storage $ lift $ lift getPid

instance Clock m => Clock (StorageT m) where
getEvents = Storage . lift . lift . getEvents
advance = Storage . lift . lift . advance

instance Monad m => MonadError String (StorageT m) where
throwError = Storage . throwError
catchError = coerce . catchError

instance (Clock m, MonadIO m) => MonadStorage (StorageT m) where
listCollections = Storage $ do
dataDir <- ask
liftIO $
listDirectory dataDir
>>= filterM (doesDirectoryExist . (dataDir </>))

listDocuments :: forall doc. Collection doc => StorageT m [DocId doc]
listDocuments = do
docdirs <- listDirectoryIfExists (collectionName @doc)
for docdirs $
fmap DocId . liftEither . maybe (Left "Bad UUID") Right .
UUID.decodeBase32

listVersions (docId :: DocId doc) = do
files <- listDirectoryIfExists $ docDir docId
Storage $
for files $ \file -> do
let path = docDir docId </> file
case UUID.decodeBase32 file of
Just uuid -> pure uuid
Nothing ->
throwError $
"Directory name " ++ path ++ " is not a UUID"

createVersion obj@Object{objectFrame} = do
version <- getEventUuid
Storage $ do
dataDir <- ask
let docdir = dataDir </> objectDir obj
let file = docdir </> UUID.encodeBase32 version
liftIO $ do
createDirectoryIfMissing True docdir
BSL.writeFile file $ serializeStateFrame objectFrame

readVersion docId@(DocId objectId) version = Storage $ do
dataDir <- ask
contents <- liftIO $
BSL.readFile $
dataDir </> docDir docId </> UUID.encodeBase32 version
case parseStateFrame contents of
Right objectFrame -> pure Object{objectId, objectFrame}
Left ronError -> case fallbackParse objectId contents of
Right object -> pure object
Left _fallbackError -> throwError ronError

deleteVersion docId version = Storage $ do
dataDir <- ask
liftIO $ do
let file = dataDir </> docDir docId </> UUID.encodeBase32 version
removeFile file
`catch` \e ->
unless (isDoesNotExistError e) $ throwIO e

data Handle = Handle
{ hClock :: IORef EpochTime
, hDataDir :: FilePath
, hReplica :: ReplicaId
}

type Storage = StorageT EpochClock

newHandle :: FilePath -> IO Handle
newHandle hDataDir = do
time <- getCurrentEpochTime
hClock <- newIORef time
hReplica <- applicationSpecific <$> getMacAddress
pure Handle{hDataDir, hClock, hReplica}

runStorage :: Handle -> Storage a -> IO a
runStorage Handle{hReplica, hDataDir, hClock} action = do
res <- runEpochClock hReplica hClock $ runStorageT hDataDir action
either fail pure res

listDirectoryIfExists :: MonadIO m => FilePath -> StorageT m [FilePath]
listDirectoryIfExists relpath = Storage $ do
dataDir <- ask
let dir = dataDir </> relpath
liftIO $ do
exists <- doesDirectoryExist dir
if exists then listDirectory dir else pure []

docDir :: forall a . Collection a => DocId a -> FilePath
docDir (DocId docId) = collectionName @a </> UUID.encodeBase32 docId

objectDir :: forall a . Collection a => Object a -> FilePath
objectDir Object{objectId} = docDir (DocId objectId :: DocId a)

-- MAC address

getMacAddress :: IO Word64
getMacAddress = decodeMac <$> getMac where
getMac
= fromMaybe
(error "Can't get any non-zero MAC address of this machine")
. listToMaybe
. filter (/= minBound)
. map mac
<$> getNetworkInterfaces
decodeMac (MAC b5 b4 b3 b2 b1 b0)
= fromIntegral b5 `shiftL` 40
+ fromIntegral b4 `shiftL` 32
+ fromIntegral b3 `shiftL` 24
+ fromIntegral b2 `shiftL` 16
+ fromIntegral b1 `shiftL` 8
+ fromIntegral b0
92 changes: 92 additions & 0 deletions ron-storage/RON/Storage/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module RON.Storage.Test (TestDB, runStorageSim) where

import Control.Monad.Except (ExceptT, MonadError, runExceptT,
throwError)
import Control.Monad.State.Strict (StateT, get, gets, modify,
runStateT)
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Char8 as BSLC
import Data.Map.Strict (Map, (!), (!?))
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import RON.Event (Clock, Replica, applicationSpecific, getEventUuid)
import RON.Event.Simulation (ReplicaSim, runNetworkSim, runReplicaSim)
import RON.Text (parseStateFrame, serializeStateFrame)
import RON.Types (Object (Object), UUID, objectFrame, objectId)

import RON.Storage (Collection, CollectionName, DocId (DocId),
MonadStorage, collectionName, createVersion,
deleteVersion, fallbackParse, listCollections,
listDocuments, listVersions, readVersion)

type ByteStringL = BSL.ByteString

type TestDB = Map CollectionName (Map DocumentId (Map Version Document))

type Document = [ByteStringL]

type DocumentId = UUID

type Version = UUID

-- * Storage simulation

newtype StorageSim a = StorageSim (StateT TestDB (ExceptT String ReplicaSim) a)
deriving (Applicative, Clock, Functor, Monad, MonadError String, Replica)

runStorageSim :: TestDB -> StorageSim a -> Either String (a, TestDB)
runStorageSim db (StorageSim action) =
runNetworkSim $ runReplicaSim (applicationSpecific 34) $
runExceptT $ runStateT action db

-- TODO(2018-10-26, cblp) move common implementation between Storage and
-- StorageSim to common functions
instance MonadStorage StorageSim where
listCollections = StorageSim $ gets Map.keys

listDocuments :: forall a . Collection a => StorageSim [DocId a]
listDocuments = StorageSim $ do
db <- get
pure $ map DocId $ Map.keys $ db !. collectionName @a

listVersions (DocId doc :: DocId a) = StorageSim $ do
db <- get
pure $ Map.keys $ db !. collectionName @a !. doc

createVersion (Object{objectId, objectFrame} :: Object a) = do
version <- getEventUuid
let document = BSLC.lines $ serializeStateFrame objectFrame
let insertDocumentVersion =
Just . Map.insertWith (<>) version document . fromMaybe mempty
let alterDocument = Just .
Map.alter insertDocumentVersion objectId . fromMaybe mempty
let alterCollection = Map.alter alterDocument (collectionName @a)
StorageSim $ modify alterCollection

readVersion (DocId objectId :: DocId a) version = StorageSim $ do
db <- get
let contents =
BSLC.unlines $ db !. collectionName @a !. objectId ! version
case parseStateFrame contents of
Right objectFrame -> pure Object{objectId, objectFrame}
Left ronError -> case fallbackParse objectId contents of
Right object -> pure object
Left fallbackError -> throwError $ case BSLC.head contents of
'{' -> fallbackError
_ -> ronError

deleteVersion (DocId doc :: DocId a) version
= StorageSim
. modify
. (`Map.adjust` collectionName @a)
. (`Map.adjust` doc)
$ Map.delete version

(!.) :: Ord a => Map a (Map b c) -> a -> Map b c
m !. a = fromMaybe Map.empty $ m !? a
Loading