Skip to content

Commit

Permalink
Merge pull request #42 from nikita-volkov/settings
Browse files Browse the repository at this point in the history
Introduce flexible config
  • Loading branch information
nikita-volkov committed Feb 26, 2024
2 parents d7162ce + 108c3be commit e72ad55
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 63 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 0.11
# 1

- Optional observability event stream added. Provides a flexible mechanism for monitoring the healthiness of the pool via logs and metrics.
- Configuration got isolated into a DSL, which will allow to provide new configurations without breaking backward compatibility.

# 0.10.1

Expand Down
7 changes: 6 additions & 1 deletion hasql-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ library
-- cabal-gild: discover src/library/exposed
exposed-modules:
Hasql.Pool
Hasql.Pool.Config
Hasql.Pool.Observation

-- cabal-gild: discover src/library/other
other-modules: Hasql.Pool.Prelude
other-modules:
Hasql.Pool.Config.Config
Hasql.Pool.Config.Setting
Hasql.Pool.Prelude

build-depends:
base >=4.11 && <5,
bytestring >=0.10 && <0.14,
Expand Down
73 changes: 13 additions & 60 deletions src/library/exposed/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module Hasql.Pool
( -- * Pool
Pool,
acquire,
acquireDynamically,
use,
release,

Expand All @@ -16,6 +15,7 @@ import qualified Data.Text.Encoding.Error as Text
import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import qualified Hasql.Pool.Config.Config as Config
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session
Expand Down Expand Up @@ -63,61 +63,14 @@ data Pool = Pool
poolObserver :: Observation -> IO ()
}

-- | Create a connection-pool, with default settings.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquire ::
-- | Pool size.
Int ->
-- | Connection acquisition timeout.
DiffTime ->
-- | Maximal connection lifetime.
DiffTime ->
-- | Maximal connection idle time.
DiffTime ->
-- | Connection settings.
Connection.Settings ->
-- | Observation handler.
--
-- Typically it's used for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings observer =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) observer

-- | Create a connection-pool.
--
-- In difference to 'acquire' new connection settings get fetched each
-- time a connection is created. This may be useful for some security models.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquireDynamically ::
-- | Pool size.
Int ->
-- | Connection acquisition timeout.
DiffTime ->
-- | Maximal connection lifetime.
DiffTime ->
-- | Maximal connection idle time.
DiffTime ->
-- | Action fetching connection settings.
IO Connection.Settings ->
-- | Observation handler.
--
-- Use it for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings observer = do
acquire :: Config.Config -> IO Pool
acquire config = do
connectionQueue <- newTQueueIO
capVar <- newTVarIO poolSize
capVar <- newTVarIO (Config.size config)
reuseVar <- newTVarIO =<< newTVarIO True
reaperRef <- newIORef ()

Expand All @@ -126,31 +79,31 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe
now <- getMonotonicTimeNSec
join . atomically $ do
entries <- flushTQueue connectionQueue
let (agedEntries, unagedEntries) = partition (entryIsAged maxLifetimeNanos now) entries
(idleEntries, liveEntries) = partition (entryIsIdle maxLifetimeNanos now) unagedEntries
let (agedEntries, unagedEntries) = partition (entryIsAged agingTimeoutNanos now) entries
(idleEntries, liveEntries) = partition (entryIsIdle agingTimeoutNanos now) unagedEntries
traverse_ (writeTQueue connectionQueue) liveEntries
return $ do
forM_ agedEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
(Config.observationHandler config) (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
forM_ idleEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))
(Config.observationHandler config) (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))

void . mkWeakIORef reaperRef $ do
-- When the pool goes out of scope, stop the manager.
killThread managerTid

return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef observer
return $ Pool (Config.size config) (Config.connectionSettingsProvider config) acqTimeoutMicros agingTimeoutNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef (Config.observationHandler config)
where
acqTimeoutMicros =
div (fromIntegral (diffTimeToPicoseconds acqTimeout)) 1_000_000
maxLifetimeNanos =
div (fromIntegral (diffTimeToPicoseconds maxLifetime)) 1_000
div (fromIntegral (diffTimeToPicoseconds (Config.acquisitionTimeout config))) 1_000_000
agingTimeoutNanos =
div (fromIntegral (diffTimeToPicoseconds (Config.agingTimeout config))) 1_000
maxIdletimeNanos =
div (fromIntegral (diffTimeToPicoseconds maxIdletime)) 1_000
div (fromIntegral (diffTimeToPicoseconds (Config.idlenessTimeout config))) 1_000

-- | Release all the idle connections in the pool, and mark the in-use connections
-- to be released after use. Any connections acquired after the call will be
Expand Down
24 changes: 24 additions & 0 deletions src/library/exposed/Hasql/Pool/Config.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- | DSL for construction of configs.
module Hasql.Pool.Config
( Config.Config,
settings,
Setting.Setting,
Setting.size,
Setting.acquisitionTimeout,
Setting.agingTimeout,
Setting.idlenessTimeout,
Setting.staticConnectionSettings,
Setting.dynamicConnectionSettings,
Setting.observationHandler,
)
where

import qualified Hasql.Pool.Config.Config as Config
import qualified Hasql.Pool.Config.Setting as Setting
import Hasql.Pool.Prelude

-- | Compile config from a list of settings.
-- Latter settings override the preceding in cases of conflicts.
settings :: [Setting.Setting] -> Config.Config
settings =
foldr ($) Config.defaults . fmap Setting.apply
27 changes: 27 additions & 0 deletions src/library/other/Hasql/Pool/Config/Config.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Hasql.Pool.Config.Config where

import qualified Hasql.Connection as Connection
import Hasql.Pool.Observation (Observation)
import Hasql.Pool.Prelude

-- | Configufation for Hasql connection pool.
data Config = Config
{ size :: Int,
acquisitionTimeout :: DiffTime,
agingTimeout :: DiffTime,
idlenessTimeout :: DiffTime,
connectionSettingsProvider :: IO Connection.Settings,
observationHandler :: Observation -> IO ()
}

-- | Reasonable defaults, which can be built upon.
defaults :: Config
defaults =
Config
{ size = 3,
acquisitionTimeout = 10,
agingTimeout = 60 * 60 * 24,
idlenessTimeout = 60 * 10,
connectionSettingsProvider = pure "postgresql://postgres:postgres@localhost:5432/postgres",
observationHandler = const (pure ())
}
82 changes: 82 additions & 0 deletions src/library/other/Hasql/Pool/Config/Setting.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
module Hasql.Pool.Config.Setting where

import qualified Hasql.Connection as Connection
import Hasql.Pool.Config.Config (Config)
import qualified Hasql.Pool.Config.Config as Config
import Hasql.Pool.Observation (Observation)
import Hasql.Pool.Prelude

apply :: Setting -> Config -> Config
apply (Setting run) = run

-- | A single setting of a config.
newtype Setting
= Setting (Config -> Config)

-- | Pool size.
--
-- 3 by default.
size :: Int -> Setting
size x =
Setting (\config -> config {Config.size = x})

-- | Connection acquisition timeout.
--
-- 10 seconds by default.
acquisitionTimeout :: DiffTime -> Setting
acquisitionTimeout x =
Setting (\config -> config {Config.acquisitionTimeout = x})

-- | Maximal connection lifetime.
--
-- Determines how long is available for reuse.
-- After the timeout passes and an active session is finished the connection will be closed releasing a slot in the pool for a fresh connection to be established.
--
-- This is useful as a healthy measure for resetting the server-side caches.
--
-- 1 day by default.
agingTimeout :: DiffTime -> Setting
agingTimeout x =
Setting (\config -> config {Config.agingTimeout = x})

-- | Maximal connection idle time.
--
-- How long to keep a connection open when it's not being used.
--
-- 10 minutes by default.
idlenessTimeout :: DiffTime -> Setting
idlenessTimeout x =
Setting (\config -> config {Config.idlenessTimeout = x})

-- | Connection string.
--
-- You can use 'Hasql.Connection.settings' to construct it.
--
-- @\"postgresql://postgres:postgres@localhost:5432/postgres\"@ by default.
staticConnectionSettings :: Connection.Settings -> Setting
staticConnectionSettings x =
Setting (\config -> config {Config.connectionSettingsProvider = pure x})

-- | Action providing connection settings.
--
-- Gets used each time a connection gets established by the pool.
-- This may be useful for some authorization models.
--
-- You can use 'Hasql.Connection.settings' to construct it.
--
-- @pure \"postgresql://postgres:postgres@localhost:5432/postgres\"@ by default.
dynamicConnectionSettings :: IO Connection.Settings -> Setting
dynamicConnectionSettings x =
Setting (\config -> config {Config.connectionSettingsProvider = x})

-- | Observation handler.
--
-- Typically it's used for monitoring the state of the pool via metrics and logging.
--
-- If the provided action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue to avoid occupying the pool management thread for too long.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
--
-- @const (pure ())@ by default.
observationHandler :: (Observation -> IO ()) -> Setting
observationHandler x =
Setting (\config -> config {Config.observationHandler = x})
14 changes: 13 additions & 1 deletion src/test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import qualified Hasql.Connection as Connection
import qualified Hasql.Decoders as Decoders
import qualified Hasql.Encoders as Encoders
import Hasql.Pool
import qualified Hasql.Pool.Config as Config
import qualified Hasql.Session as Session
import qualified Hasql.Statement as Statement
import qualified System.Environment
Expand All @@ -18,7 +19,18 @@ main :: IO ()
main = do
connectionSettings <- getConnectionSettings
let withPool poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings (const (pure ()))) release
bracket
( acquire
( Config.settings
[ Config.size poolSize,
Config.acquisitionTimeout acqTimeout,
Config.agingTimeout maxLifetime,
Config.idlenessTimeout maxIdletime,
Config.staticConnectionSettings connectionSettings
]
)
)
release
withDefaultPool =
withPool 3 10 1_800 1_800 connectionSettings

Expand Down

0 comments on commit e72ad55

Please sign in to comment.