diff --git a/zebra-cli/main/zebra.hs b/zebra-cli/main/zebra.hs index ce9fdf4..82c7eb8 100644 --- a/zebra-cli/main/zebra.hs +++ b/zebra-cli/main/zebra.hs @@ -214,6 +214,30 @@ pMerge = <*> pOutputFormat <*> pMergeRowsPerBlock <*> optional pMergeMaximumRowSize + <*> pMergeMode + +pMergeMode :: Parser MergeMode +pMergeMode = + fromMaybe MergeValue <$> optional (pMergeValue <|> pMergeMeasure <|> pMergeMeasureGreaterThan) + +pMergeValue :: Parser MergeMode +pMergeValue = + Options.flag' MergeValue $ + Options.long "value" <> + Options.help "Standard merge of input values. Each key needs to fit wholly in memory." + +pMergeMeasure :: Parser MergeMode +pMergeMeasure = + Options.flag' MergeMeasure $ + Options.long "measure" <> + Options.help "Merge by measuring the size of input values." + +pMergeMeasureGreaterThan :: Parser MergeMode +pMergeMeasureGreaterThan = + fmap MergeMeasureGreaterThanMegabytes . + Options.option Options.auto $ + Options.long "measure-greater-than-megabytes" <> + Options.help "Merge by measuring the size of input values, only output keys whose size is greater than the specified number in megabytes." pMergeRowsPerBlock :: Parser MergeRowsPerBlock pMergeRowsPerBlock = diff --git a/zebra-cli/src/Zebra/Command/Merge.hs b/zebra-cli/src/Zebra/Command/Merge.hs index 2790936..d602afe 100644 --- a/zebra-cli/src/Zebra/Command/Merge.hs +++ b/zebra-cli/src/Zebra/Command/Merge.hs @@ -9,6 +9,7 @@ module Zebra.Command.Merge ( Merge(..) , MergeRowsPerBlock(..) , MergeMaximumRowSize(..) + , MergeMode(..) , zebraMerge , MergeError(..) @@ -56,8 +57,15 @@ data Merge = , mergeVersion :: !BinaryVersion , mergeRowsPerChunk :: !MergeRowsPerBlock , mergeMaximumRowSize :: !(Maybe MergeMaximumRowSize) + , mergeMode :: MergeMode } deriving (Eq, Ord, Show) +data MergeMode = + MergeValue + | MergeMeasure + | MergeMeasureGreaterThanMegabytes Int64 + deriving (Eq, Ord, Show) + newtype MergeRowsPerBlock = MergeRowsPerBlock { unMergeRowsPerBlock :: Int @@ -121,7 +129,25 @@ zebraMerge x = do fmap (Merge.MaximumRowSize . unMergeMaximumRowSize) $ mergeMaximumRowSize x union = - maybe Merge.unionStriped Merge.unionStripedWith mschema + case mergeMode x of + MergeValue -> + maybe + (Merge.unionStriped Merge.valueMonoid Merge.identityExtraction) + (Merge.unionStripedWith Merge.valueMonoid Merge.identityExtraction) + + MergeMeasure -> + maybe + (Merge.unionStriped Merge.measureMonoid Merge.identityExtraction) + (Merge.unionStripedWith Merge.measureMonoid Merge.identityExtraction) + + MergeMeasureGreaterThanMegabytes m -> + let + b = + m * 1024 * 1024 + in + maybe + (Merge.unionStriped Merge.measureMonoid (Merge.minimumExtraction b)) + (Merge.unionStripedWith Merge.measureMonoid (Merge.minimumExtraction b)) firstJoin MergeIOError . writeFileOrStdout (mergeOutput x) . @@ -130,5 +156,5 @@ zebraMerge x = do hoist (firstJoin MergeStripedError) . Striped.rechunk (unMergeRowsPerBlock $ mergeRowsPerChunk x) . hoist (firstJoin MergeUnionTableError) $ - union msize inputs + union mschema msize inputs {-# SPECIALIZE zebraMerge :: Merge -> EitherT MergeError (ResourceT IO) () #-} diff --git a/zebra-core/src/Zebra/Merge/Table.hs b/zebra-core/src/Zebra/Merge/Table.hs index 94fc6d3..029b230 100644 --- a/zebra-core/src/Zebra/Merge/Table.hs +++ b/zebra-core/src/Zebra/Merge/Table.hs @@ -13,6 +13,15 @@ module Zebra.Merge.Table ( , unionLogical , unionStriped , unionStripedWith + + , Monoidal(..) + , valueMonoid + , measureMonoid + , summationMonoid + + , Extract(..) + , identityExtraction + , minimumExtraction ) where import Control.Monad.Morph (hoist, squash) @@ -32,6 +41,8 @@ import X.Control.Monad.Trans.Either (EitherT, hoistEither, left) import X.Data.Vector.Cons (Cons) import qualified X.Data.Vector.Cons as Cons +import Zebra.Table.Data +import qualified Zebra.Table.Encoding as Encoding import Zebra.Table.Logical (LogicalSchemaError, LogicalMergeError) import qualified Zebra.Table.Logical as Logical import Zebra.Table.Schema (SchemaUnionError) @@ -45,24 +56,66 @@ newtype MaximumRowSize = unMaximumRowSize :: Int64 } deriving (Eq, Ord, Show) -data Input m = +data Input m a = Input { - inputData :: !(Map Logical.Value Logical.Value) + inputData :: !(Map Logical.Value a) , inputStream :: !(Maybe (Stream (Of Logical.Table) m ())) } -data Step m = +data Step m a = Step { - _stepComplete :: !(Map Logical.Value Logical.Value) - , _stepRemaining :: !(Cons Boxed.Vector (Input m)) + _stepComplete :: !(Map Logical.Value a) + , _stepRemaining :: !(Cons Boxed.Vector (Input m a)) + } + +data Extract a = + Extract { + extractResult :: Map Logical.Value a -> Map Logical.Value a } +identityExtraction :: Extract a +identityExtraction = + Extract id + +minimumExtraction :: Int64 -> Extract Int64 +minimumExtraction x = + Extract $ Map.filter (> x) + +data Monoidal a = + Monoidal { + monoidFromValue :: Logical.Value -> a + , monoidToValue :: a -> Logical.Value + , monoidSchema :: Schema.Table -> Either UnionTableError Schema.Table + , monoidOp :: a -> a -> Either LogicalMergeError a + } + +valueMonoid :: Monoidal Logical.Value +valueMonoid = + Monoidal id id (pure . id) Logical.mergeValue + +measureMonoid :: Monoidal Int64 +measureMonoid = + Monoidal Logical.sizeValue Logical.Int + (\schema -> + case schema of + Schema.Map def k _ -> + pure $ Schema.Map def k (Schema.Int DenyDefault Encoding.Int) + t0 -> + Left $ UnionTableMonoidNotDefined t0 + ) + (\x0 x1 -> pure $ x0 + x1) + +summationMonoid :: Monoidal Logical.Value +summationMonoid = + Monoidal id id (pure . id) Logical.sumValue + data UnionTableError = UnionEmptyInput | UnionStripedError !StripedError | UnionLogicalSchemaError !LogicalSchemaError | UnionLogicalMergeError !LogicalMergeError | UnionSchemaError !SchemaUnionError + | UnionTableMonoidNotDefined !Schema.Table deriving (Eq, Show) renderUnionTableError :: UnionTableError -> Text @@ -77,6 +130,8 @@ renderUnionTableError = \case Logical.renderLogicalMergeError err UnionSchemaError err -> Schema.renderSchemaUnionError err + UnionTableMonoidNotDefined _ -> + "Monoid not defined for schema of this shape." ------------------------------------------------------------------------ -- General @@ -96,12 +151,12 @@ peekHead input = do pure (hd, Stream.cons hd tl) {-# INLINABLE peekHead #-} -hasData :: Input m -> Bool +hasData :: Input m a -> Bool hasData = not . Map.null . inputData {-# INLINABLE hasData #-} -replaceData :: Map Logical.Value Logical.Value -> Input m -> Input m +replaceData :: Map Logical.Value a -> Input m a -> Input m a replaceData values input = input { inputData = @@ -109,7 +164,7 @@ replaceData values input = } {-# INLINABLE replaceData #-} -dropData :: Map Logical.Value a -> Input m -> Input m +dropData :: Map Logical.Value b -> Input m a -> Input m a dropData drops input = input { inputData = @@ -117,7 +172,7 @@ dropData drops input = } {-# INLINABLE dropData #-} -replaceStream :: Stream (Of Logical.Table) m () -> Input m -> Input m +replaceStream :: Stream (Of Logical.Table) m () -> Input m a -> Input m a replaceStream stream input = input { inputStream = @@ -125,12 +180,12 @@ replaceStream stream input = } {-# INLINABLE replaceStream #-} -isClosed :: Input m -> Bool +isClosed :: Input m a -> Bool isClosed = isNothing . inputStream {-# INLINABLE isClosed #-} -closeStream :: Input m -> Input m +closeStream :: Input m a -> Input m a closeStream input = input { inputStream = @@ -140,9 +195,10 @@ closeStream input = updateInput :: Monad m - => Input m - -> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m) -updateInput input = + => (Logical.Value -> a) + -> Input m a + -> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m a) +updateInput f input = case inputStream input of Nothing -> pure input @@ -159,7 +215,7 @@ updateInput input = Right (hd, tl) -> do values <- lift . firstT UnionLogicalSchemaError . hoistEither $ Logical.takeMap hd modify $ Map.unionWith (+) (Map.map Logical.sizeValue values) - pure . replaceStream tl $ replaceData values input + pure . replaceStream tl $ replaceData (fmap f values) input {-# INLINABLE updateInput #-} takeExcessiveValues :: Maybe MaximumRowSize -> Map Logical.Value Int64 -> Map Logical.Value Int64 @@ -170,23 +226,30 @@ takeExcessiveValues = \case Map.filter (> unMaximumRowSize size) {-# INLINABLE takeExcessiveValues #-} -unionStep :: Monad m => Cons Boxed.Vector (Input m) -> EitherT UnionTableError m (Step m) -unionStep inputs = do - step <- firstT UnionLogicalMergeError . hoistEither . Logical.unionStep $ fmap inputData inputs +unionStep :: + Monad m + => (a -> a -> Either LogicalMergeError a) + -> (Map Logical.Value a -> Map Logical.Value a) + -> Cons Boxed.Vector (Input m a) + -> EitherT UnionTableError m (Step m a) +unionStep f g inputs = do + step <- firstT UnionLogicalMergeError . hoistEither . Logical.unionStep f $ fmap inputData inputs pure $ Step - (Logical.unionComplete step) - (Cons.zipWith replaceData (Logical.unionRemaining step) inputs) + (g $ Logical.unionComplete step) + (Cons.zipWith replaceData(Logical.unionRemaining step) inputs) {-# INLINABLE unionStep #-} unionInput :: Monad m - => Maybe MaximumRowSize - -> Cons Boxed.Vector (Input m) + => Monoidal a + -> Extract a + -> Maybe MaximumRowSize + -> Cons Boxed.Vector (Input m a) -> Map Logical.Value Int64 -> Stream (Of Logical.Table) (EitherT UnionTableError m) () -unionInput msize inputs0 sizes0 = do - (inputs1, sizes1) <- lift $ runStateT (traverse updateInput inputs0) sizes0 +unionInput m e msize inputs0 sizes0 = do + (inputs1, sizes1) <- lift $ runStateT (traverse (updateInput (monoidFromValue m)) inputs0) sizes0 let drops = @@ -198,51 +261,59 @@ unionInput msize inputs0 sizes0 = do if Cons.all isClosed inputs2 then do pure () else do - Step values inputs3 <- lift $ unionStep inputs2 + Step values inputs3 <- lift $ unionStep (monoidOp m) (extractResult e) inputs2 if Map.null values then - unionInput msize inputs3 sizes1 + unionInput m e msize inputs3 sizes1 else do - Stream.yield $ Logical.Map values - unionInput msize inputs3 sizes1 + Stream.yield . Logical.Map . fmap (monoidToValue m) $ values + unionInput m e msize inputs3 sizes1 {-# INLINABLE unionInput #-} unionLogical :: Monad m - => Schema.Table + => Monoidal a + -> Extract a + -> Schema.Table -> Maybe MaximumRowSize -> Cons Boxed.Vector (Stream (Of Logical.Table) m ()) -> Stream (Of Logical.Table) (EitherT UnionTableError m) () -unionLogical schema msize inputs = do +unionLogical m e schema msize inputs = do Stream.whenEmpty (Logical.empty schema) $ - unionInput msize (fmap (Input Map.empty . Just) inputs) Map.empty + unionInput m e msize (fmap (Input Map.empty . Just) inputs) Map.empty {-# INLINABLE unionLogical #-} unionStripedWith :: Monad m - => Schema.Table + => Monoidal a + -> Extract a + -> Schema.Table -> Maybe MaximumRowSize -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) -> Stream (Of Striped.Table) (EitherT UnionTableError m) () -unionStripedWith schema msize inputs0 = do +unionStripedWith m e schema0 msize inputs0 = do let fromStriped = Stream.mapM (hoistEither . first UnionStripedError . Striped.toLogical) . - Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) . + Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema0) . hoist lift + schema1 <- lift . hoistEither $ monoidSchema m schema0 + hoist squash . - Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $ - unionLogical schema msize (fmap fromStriped inputs0) + Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema1) $ + unionLogical m e schema0 msize (fmap fromStriped inputs0) {-# INLINABLE unionStripedWith #-} unionStriped :: Monad m - => Maybe MaximumRowSize + => Monoidal a + -> Extract a + -> Maybe MaximumRowSize -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) -> Stream (Of Striped.Table) (EitherT UnionTableError m) () -unionStriped msize inputs0 = do +unionStriped m e msize inputs0 = do (heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0 schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads - unionStripedWith schema msize inputs1 + unionStripedWith m e schema msize inputs1 {-# INLINABLE unionStriped #-} diff --git a/zebra-core/src/Zebra/Table/Logical.hs b/zebra-core/src/Zebra/Table/Logical.hs index 0273577..2d258e7 100644 --- a/zebra-core/src/Zebra/Table/Logical.hs +++ b/zebra-core/src/Zebra/Table/Logical.hs @@ -53,6 +53,8 @@ module Zebra.Table.Logical ( , UnionStep(..) , unionStep + , sumValue + -- * Internal , renderField ) where @@ -256,20 +258,27 @@ merge x0 x1 = pure $ Array (xs0 <> xs1) (Map kvs0, Map kvs1) -> - Map <$> mergeMap kvs0 kvs1 + Map <$> mergeMap mergeValue kvs0 kvs1 _ -> Left $ LogicalCannotMergeMismatchedCollections x0 x1 {-# INLINABLE merge #-} -mergeMap :: Map Value Value -> Map Value Value -> Either LogicalMergeError (Map Value Value) -mergeMap xs0 xs1 = +mergeMap :: + (a -> a -> Either LogicalMergeError a) + -> Map Value a + -> Map Value a + -> Either LogicalMergeError (Map Value a) +mergeMap f xs0 xs1 = sequenceA $ - Map.mergeWithKey (\_ x y -> Just (mergeValue x y)) (fmap pure) (fmap pure) xs0 xs1 + Map.mergeWithKey (\_ x y -> Just (f x y)) (fmap pure) (fmap pure) xs0 xs1 {-# INLINABLE mergeMap #-} -mergeMaps :: Boxed.Vector (Map Value Value) -> Either LogicalMergeError (Map Value Value) -mergeMaps kvss = +mergeMaps :: + (a -> a -> Either LogicalMergeError a) + -> Boxed.Vector (Map Value a) + -> Either LogicalMergeError (Map Value a) +mergeMaps f kvss = case Boxed.length kvss of 0 -> pure $ Map.empty @@ -278,7 +287,7 @@ mergeMaps kvss = pure $ kvss Boxed.! 0 2 -> - mergeMap + mergeMap f (kvss Boxed.! 0) (kvss Boxed.! 1) @@ -287,10 +296,10 @@ mergeMaps kvss = (kvss0, kvss1) = Boxed.splitAt (n `div` 2) kvss - kvs0 <- mergeMaps kvss0 - kvs1 <- mergeMaps kvss1 + kvs0 <- mergeMaps f kvss0 + kvs1 <- mergeMaps f kvss1 - mergeMap kvs0 kvs1 + mergeMap f kvs0 kvs1 {-# INLINABLE mergeMaps #-} mergeValue :: Value -> Value -> Either LogicalMergeError Value @@ -321,15 +330,43 @@ mergeValue x0 x1 = Left $ LogicalCannotMergeMismatchedValues x0 x1 {-# INLINABLE mergeValue #-} +sumValue :: Value -> Value -> Either LogicalMergeError Value +sumValue x0 x1 = + case (x0, x1) of + (Unit, Unit) -> + pure Unit + + (Int v0, Int v1) -> + pure . Int $ v0 + v1 + + (Double v0, Double v1) -> + pure . Double $ v0 + v1 + + (Enum tag0 v0, Enum tag1 v1) -> + Left $ LogicalCannotMergeEnum (tag0, v0) (tag1, v1) + + (Struct fs0, Struct fs1) -> + Struct <$> Cons.zipWithM sumValue fs0 fs1 + + (Nested xs0, Nested xs1) -> + Nested <$> merge xs0 xs1 + + (Reversed v0, Reversed v1) -> + Reversed <$> sumValue v0 v1 + + _ -> + Left $ LogicalCannotMergeMismatchedValues x0 x1 +{-# INLINABLE sumValue #-} + ------------------------------------------------------------------------ -data UnionStep = +data UnionStep a = UnionStep { - unionComplete :: !(Map Value Value) - , unionRemaining :: !(Cons Boxed.Vector (Map Value Value)) + unionComplete :: !(Map Value a) + , unionRemaining :: !(Cons Boxed.Vector (Map Value a)) } deriving (Eq, Ord, Show) -maximumKey :: Map Value Value -> Maybe Value +maximumKey :: Map Value a -> Maybe Value maximumKey kvs = if Map.null kvs then Nothing @@ -337,8 +374,11 @@ maximumKey kvs = pure . fst $ Map.findMax kvs {-# INLINABLE maximumKey #-} -unionStep :: Cons Boxed.Vector (Map Value Value) -> Either LogicalMergeError UnionStep -unionStep kvss = +unionStep :: + (a -> a -> Either LogicalMergeError a) + -> Cons Boxed.Vector (Map Value a) + -> Either LogicalMergeError (UnionStep a) +unionStep f kvss = let maximums = Cons.mapMaybe maximumKey kvss @@ -362,7 +402,7 @@ unionStep kvss = dones = Cons.zipWith insert done1 done0 - done <- mergeMaps $ Cons.toVector dones + done <- mergeMaps f $ Cons.toVector dones pure $ UnionStep done incomplete {-# INLINABLE unionStep #-} diff --git a/zebra-core/test/Test/Zebra/Merge/Table.hs b/zebra-core/test/Test/Zebra/Merge/Table.hs index 618a452..dbcfc46 100644 --- a/zebra-core/test/Test/Zebra/Merge/Table.hs +++ b/zebra-core/test/Test/Zebra/Merge/Table.hs @@ -79,11 +79,12 @@ unionSimple xss0 = pure $ pure x unionList :: - Maybe Merge.MaximumRowSize + Merge.Monoidal a + -> Maybe Merge.MaximumRowSize -> Cons Boxed.Vector (NonEmpty Striped.Table) -> Either String (Maybe Striped.Table) -unionList msize xss0 = - case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize $ fmap Stream.each xss0 of +unionList m msize xss0 = + case runIdentity . runEitherT . Stream.toList . Merge.unionStriped m Merge.identityExtraction msize $ fmap Stream.each xss0 of Left (UnionLogicalMergeError _) -> pure Nothing Left err -> @@ -112,7 +113,7 @@ prop_union_identity = Striped.unsafeConcat $ Cons.fromNonEmpty file0 - x <- first ppShow $ unionList Nothing files + x <- first ppShow $ unionList Merge.valueMonoid Nothing files pure $ Just (normalizeStriped file) === @@ -124,7 +125,7 @@ prop_union_files_same_schema = gamble (Cons.unsafeFromList <$> listOfN 1 10 (jFile schema)) $ \files -> either (flip counterexample False) id $ do x <- first ppShow $ unionSimple files - y <- first ppShow $ unionList Nothing files + y <- first ppShow $ unionList Merge.valueMonoid Nothing files pure $ fmap normalizeStriped x === @@ -135,7 +136,7 @@ prop_union_files_empty = gamble jMapSchema $ \schema -> gamble (Cons.unsafeFromList <$> listOfN 1 10 (jFile schema)) $ \files -> either (flip counterexample False) id $ do - x <- first ppShow $ unionList (Just (Merge.MaximumRowSize (-1))) files + x <- first ppShow $ unionList Merge.valueMonoid (Just (Merge.MaximumRowSize (-1))) files pure $ Just (Striped.empty schema) === x @@ -150,12 +151,85 @@ prop_union_files_diff_schema = gamble (traverse jFile schemas) $ \files -> either (flip counterexample False) id $ do x <- first ppShow $ unionSimple files - y <- first ppShow $ unionList Nothing files + y <- first ppShow $ unionList Merge.valueMonoid Nothing files pure $ fmap normalizeStriped x === fmap normalizeStriped y +prop_measure_empty :: Property +prop_measure_empty = + gamble jMapSchema $ \schema -> + gamble (Cons.unsafeFromList <$> listOfN 1 10 (jFile schema)) $ \files -> + either (flip counterexample False) id $ do + x <- first ppShow $ unionList Merge.measureMonoid (Just (Merge.MaximumRowSize (-1))) files + let + Right schema0 = + Merge.monoidSchema Merge.measureMonoid schema + pure $ + Just (Striped.empty schema0) === x + +prop_measure_commutative :: Property +prop_measure_commutative = + gamble jMapSchema $ \schema -> + gamble (jFile schema) $ \file0 -> + gamble (jFile schema) $ \file1 -> + either (flip counterexample False) id $ do + let + files0 = + Cons.from2 file0 file1 + + files1 = + Cons.from2 file1 file0 + + x0 <- first ppShow $ unionList Merge.measureMonoid Nothing files0 + x1 <- first ppShow $ unionList Merge.measureMonoid Nothing files1 + + pure $ + x0 + === + x1 + +prop_measure_associative :: Property +prop_measure_associative = + gamble jMapSchema $ \schema -> + gamble (jFile schema) $ \file0 -> + gamble (jFile schema) $ \file1 -> + gamble (jFile schema) $ \file2 -> + either (flip counterexample False) id $ do + let + files01 = + Cons.from2 file0 file1 + + files2 = + Cons.from2 file2 (Striped.empty schema :| []) + + files12 = + Cons.from2 file1 file2 + + files0 = + Cons.from2 file0 (Striped.empty schema :| []) + + Just x01 <- first ppShow $ unionList Merge.measureMonoid Nothing files01 + Just x2 <- first ppShow $ unionList Merge.measureMonoid Nothing files2 + Just x12 <- first ppShow $ unionList Merge.measureMonoid Nothing files12 + Just x0 <- first ppShow $ unionList Merge.measureMonoid Nothing files0 + + let + y0 = + Cons.from2 (x01 :| []) (x2 :| []) + + y1 = + Cons.from2 (x0 :| []) (x12 :| []) + + measure0 <- first ppShow $ unionList Merge.summationMonoid Nothing y0 + measure1 <- first ppShow $ unionList Merge.summationMonoid Nothing y1 + + pure $ + measure0 + === + measure1 + return [] tests :: IO Bool tests =