Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalise table merge with a monoid on values #142

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions zebra-cli/main/zebra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,30 @@ pMerge =
<*> pOutputFormat
<*> pMergeRowsPerBlock
<*> optional pMergeMaximumRowSize
<*> pMergeMode

pMergeMode :: Parser MergeMode
pMergeMode =
fromMaybe MergeValue <$> optional (pMergeValue <|> pMergeMeasure <|> pMergeMeasureGreaterThan)

pMergeValue :: Parser MergeMode
pMergeValue =
Options.flag' MergeValue $
Options.long "value" <>
Options.help "Standard merge of input values. Each key needs to fit wholly in memory."

pMergeMeasure :: Parser MergeMode
pMergeMeasure =
Options.flag' MergeMeasure $
Options.long "measure" <>
Options.help "Merge by measuring the size of input values."

pMergeMeasureGreaterThan :: Parser MergeMode
pMergeMeasureGreaterThan =
fmap MergeMeasureGreaterThanMegabytes .
Options.option Options.auto $
Options.long "measure-greater-than-megabytes" <>
Options.help "Merge by measuring the size of input values, only output keys whose size is greater than the specified number in megabytes."

pMergeRowsPerBlock :: Parser MergeRowsPerBlock
pMergeRowsPerBlock =
Expand Down
30 changes: 28 additions & 2 deletions zebra-cli/src/Zebra/Command/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Zebra.Command.Merge (
Merge(..)
, MergeRowsPerBlock(..)
, MergeMaximumRowSize(..)
, MergeMode(..)
, zebraMerge

, MergeError(..)
Expand Down Expand Up @@ -56,8 +57,15 @@ data Merge =
, mergeVersion :: !BinaryVersion
, mergeRowsPerChunk :: !MergeRowsPerBlock
, mergeMaximumRowSize :: !(Maybe MergeMaximumRowSize)
, mergeMode :: MergeMode
} deriving (Eq, Ord, Show)

data MergeMode =
MergeValue
| MergeMeasure
| MergeMeasureGreaterThanMegabytes Int64
deriving (Eq, Ord, Show)

newtype MergeRowsPerBlock =
MergeRowsPerBlock {
unMergeRowsPerBlock :: Int
Expand Down Expand Up @@ -121,7 +129,25 @@ zebraMerge x = do
fmap (Merge.MaximumRowSize . unMergeMaximumRowSize) $ mergeMaximumRowSize x

union =
maybe Merge.unionStriped Merge.unionStripedWith mschema
case mergeMode x of
MergeValue ->
maybe
(Merge.unionStriped Merge.valueMonoid Merge.identityExtraction)
(Merge.unionStripedWith Merge.valueMonoid Merge.identityExtraction)

MergeMeasure ->
maybe
(Merge.unionStriped Merge.measureMonoid Merge.identityExtraction)
(Merge.unionStripedWith Merge.measureMonoid Merge.identityExtraction)

MergeMeasureGreaterThanMegabytes m ->
let
b =
m * 1024 * 1024
in
maybe
(Merge.unionStriped Merge.measureMonoid (Merge.minimumExtraction b))
(Merge.unionStripedWith Merge.measureMonoid (Merge.minimumExtraction b))

firstJoin MergeIOError .
writeFileOrStdout (mergeOutput x) .
Expand All @@ -130,5 +156,5 @@ zebraMerge x = do
hoist (firstJoin MergeStripedError) .
Striped.rechunk (unMergeRowsPerBlock $ mergeRowsPerChunk x) .
hoist (firstJoin MergeUnionTableError) $
union msize inputs
union mschema msize inputs
{-# SPECIALIZE zebraMerge :: Merge -> EitherT MergeError (ResourceT IO) () #-}
149 changes: 110 additions & 39 deletions zebra-core/src/Zebra/Merge/Table.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ module Zebra.Merge.Table (
, unionLogical
, unionStriped
, unionStripedWith

, Monoidal(..)
, valueMonoid
, measureMonoid
, summationMonoid

, Extract(..)
, identityExtraction
, minimumExtraction
) where

import Control.Monad.Morph (hoist, squash)
Expand All @@ -32,6 +41,8 @@ import X.Control.Monad.Trans.Either (EitherT, hoistEither, left)
import X.Data.Vector.Cons (Cons)
import qualified X.Data.Vector.Cons as Cons

import Zebra.Table.Data
import qualified Zebra.Table.Encoding as Encoding
import Zebra.Table.Logical (LogicalSchemaError, LogicalMergeError)
import qualified Zebra.Table.Logical as Logical
import Zebra.Table.Schema (SchemaUnionError)
Expand All @@ -45,24 +56,66 @@ newtype MaximumRowSize =
unMaximumRowSize :: Int64
} deriving (Eq, Ord, Show)

data Input m =
data Input m a =
Input {
inputData :: !(Map Logical.Value Logical.Value)
inputData :: !(Map Logical.Value a)
, inputStream :: !(Maybe (Stream (Of Logical.Table) m ()))
}

data Step m =
data Step m a =
Step {
_stepComplete :: !(Map Logical.Value Logical.Value)
, _stepRemaining :: !(Cons Boxed.Vector (Input m))
_stepComplete :: !(Map Logical.Value a)
, _stepRemaining :: !(Cons Boxed.Vector (Input m a))
}

data Extract a =
Extract {
extractResult :: Map Logical.Value a -> Map Logical.Value a
}

identityExtraction :: Extract a
identityExtraction =
Extract id

minimumExtraction :: Int64 -> Extract Int64
minimumExtraction x =
Extract $ Map.filter (> x)

data Monoidal a =
Monoidal {
monoidFromValue :: Logical.Value -> a
, monoidToValue :: a -> Logical.Value
, monoidSchema :: Schema.Table -> Either UnionTableError Schema.Table
, monoidOp :: a -> a -> Either LogicalMergeError a
}

valueMonoid :: Monoidal Logical.Value
valueMonoid =
Monoidal id id (pure . id) Logical.mergeValue

measureMonoid :: Monoidal Int64
measureMonoid =
Monoidal Logical.sizeValue Logical.Int
(\schema ->
case schema of
Schema.Map def k _ ->
pure $ Schema.Map def k (Schema.Int DenyDefault Encoding.Int)
t0 ->
Left $ UnionTableMonoidNotDefined t0
)
(\x0 x1 -> pure $ x0 + x1)

summationMonoid :: Monoidal Logical.Value
summationMonoid =
Monoidal id id (pure . id) Logical.sumValue

data UnionTableError =
UnionEmptyInput
| UnionStripedError !StripedError
| UnionLogicalSchemaError !LogicalSchemaError
| UnionLogicalMergeError !LogicalMergeError
| UnionSchemaError !SchemaUnionError
| UnionTableMonoidNotDefined !Schema.Table
deriving (Eq, Show)

renderUnionTableError :: UnionTableError -> Text
Expand All @@ -77,6 +130,8 @@ renderUnionTableError = \case
Logical.renderLogicalMergeError err
UnionSchemaError err ->
Schema.renderSchemaUnionError err
UnionTableMonoidNotDefined _ ->
"Monoid not defined for schema of this shape."

------------------------------------------------------------------------
-- General
Expand All @@ -96,41 +151,41 @@ peekHead input = do
pure (hd, Stream.cons hd tl)
{-# INLINABLE peekHead #-}

hasData :: Input m -> Bool
hasData :: Input m a -> Bool
hasData =
not . Map.null . inputData
{-# INLINABLE hasData #-}

replaceData :: Map Logical.Value Logical.Value -> Input m -> Input m
replaceData :: Map Logical.Value a -> Input m a -> Input m a
replaceData values input =
input {
inputData =
values
}
{-# INLINABLE replaceData #-}

dropData :: Map Logical.Value a -> Input m -> Input m
dropData :: Map Logical.Value b -> Input m a -> Input m a
dropData drops input =
input {
inputData =
inputData input `Map.difference` drops
}
{-# INLINABLE dropData #-}

replaceStream :: Stream (Of Logical.Table) m () -> Input m -> Input m
replaceStream :: Stream (Of Logical.Table) m () -> Input m a -> Input m a
replaceStream stream input =
input {
inputStream =
Just stream
}
{-# INLINABLE replaceStream #-}

isClosed :: Input m -> Bool
isClosed :: Input m a -> Bool
isClosed =
isNothing . inputStream
{-# INLINABLE isClosed #-}

closeStream :: Input m -> Input m
closeStream :: Input m a -> Input m a
closeStream input =
input {
inputStream =
Expand All @@ -140,9 +195,10 @@ closeStream input =

updateInput ::
Monad m
=> Input m
-> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m)
updateInput input =
=> (Logical.Value -> a)
-> Input m a
-> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m a)
updateInput f input =
case inputStream input of
Nothing ->
pure input
Expand All @@ -159,7 +215,7 @@ updateInput input =
Right (hd, tl) -> do
values <- lift . firstT UnionLogicalSchemaError . hoistEither $ Logical.takeMap hd
modify $ Map.unionWith (+) (Map.map Logical.sizeValue values)
pure . replaceStream tl $ replaceData values input
pure . replaceStream tl $ replaceData (fmap f values) input
{-# INLINABLE updateInput #-}

takeExcessiveValues :: Maybe MaximumRowSize -> Map Logical.Value Int64 -> Map Logical.Value Int64
Expand All @@ -170,23 +226,30 @@ takeExcessiveValues = \case
Map.filter (> unMaximumRowSize size)
{-# INLINABLE takeExcessiveValues #-}

unionStep :: Monad m => Cons Boxed.Vector (Input m) -> EitherT UnionTableError m (Step m)
unionStep inputs = do
step <- firstT UnionLogicalMergeError . hoistEither . Logical.unionStep $ fmap inputData inputs
unionStep ::
Monad m
=> (a -> a -> Either LogicalMergeError a)
-> (Map Logical.Value a -> Map Logical.Value a)
-> Cons Boxed.Vector (Input m a)
-> EitherT UnionTableError m (Step m a)
unionStep f g inputs = do
step <- firstT UnionLogicalMergeError . hoistEither . Logical.unionStep f $ fmap inputData inputs
pure $
Step
(Logical.unionComplete step)
(Cons.zipWith replaceData (Logical.unionRemaining step) inputs)
(g $ Logical.unionComplete step)
(Cons.zipWith replaceData(Logical.unionRemaining step) inputs)
{-# INLINABLE unionStep #-}

unionInput ::
Monad m
=> Maybe MaximumRowSize
-> Cons Boxed.Vector (Input m)
=> Monoidal a
-> Extract a
-> Maybe MaximumRowSize
-> Cons Boxed.Vector (Input m a)
-> Map Logical.Value Int64
-> Stream (Of Logical.Table) (EitherT UnionTableError m) ()
unionInput msize inputs0 sizes0 = do
(inputs1, sizes1) <- lift $ runStateT (traverse updateInput inputs0) sizes0
unionInput m e msize inputs0 sizes0 = do
(inputs1, sizes1) <- lift $ runStateT (traverse (updateInput (monoidFromValue m)) inputs0) sizes0

let
drops =
Expand All @@ -198,51 +261,59 @@ unionInput msize inputs0 sizes0 = do
if Cons.all isClosed inputs2 then do
pure ()
else do
Step values inputs3 <- lift $ unionStep inputs2
Step values inputs3 <- lift $ unionStep (monoidOp m) (extractResult e) inputs2

if Map.null values then
unionInput msize inputs3 sizes1
unionInput m e msize inputs3 sizes1
else do
Stream.yield $ Logical.Map values
unionInput msize inputs3 sizes1
Stream.yield . Logical.Map . fmap (monoidToValue m) $ values
unionInput m e msize inputs3 sizes1
{-# INLINABLE unionInput #-}

unionLogical ::
Monad m
=> Schema.Table
=> Monoidal a
-> Extract a
-> Schema.Table
-> Maybe MaximumRowSize
-> Cons Boxed.Vector (Stream (Of Logical.Table) m ())
-> Stream (Of Logical.Table) (EitherT UnionTableError m) ()
unionLogical schema msize inputs = do
unionLogical m e schema msize inputs = do
Stream.whenEmpty (Logical.empty schema) $
unionInput msize (fmap (Input Map.empty . Just) inputs) Map.empty
unionInput m e msize (fmap (Input Map.empty . Just) inputs) Map.empty
{-# INLINABLE unionLogical #-}

unionStripedWith ::
Monad m
=> Schema.Table
=> Monoidal a
-> Extract a
-> Schema.Table
-> Maybe MaximumRowSize
-> Cons Boxed.Vector (Stream (Of Striped.Table) m ())
-> Stream (Of Striped.Table) (EitherT UnionTableError m) ()
unionStripedWith schema msize inputs0 = do
unionStripedWith m e schema0 msize inputs0 = do
let
fromStriped =
Stream.mapM (hoistEither . first UnionStripedError . Striped.toLogical) .
Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) .
Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema0) .
hoist lift

schema1 <- lift . hoistEither $ monoidSchema m schema0

hoist squash .
Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $
unionLogical schema msize (fmap fromStriped inputs0)
Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema1) $
unionLogical m e schema0 msize (fmap fromStriped inputs0)
{-# INLINABLE unionStripedWith #-}

unionStriped ::
Monad m
=> Maybe MaximumRowSize
=> Monoidal a
-> Extract a
-> Maybe MaximumRowSize
-> Cons Boxed.Vector (Stream (Of Striped.Table) m ())
-> Stream (Of Striped.Table) (EitherT UnionTableError m) ()
unionStriped msize inputs0 = do
unionStriped m e msize inputs0 = do
(heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0
schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads
unionStripedWith schema msize inputs1
unionStripedWith m e schema msize inputs1
{-# INLINABLE unionStriped #-}
Loading