Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database change notifications with Server-sent Events (SSE) #1807

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
784bfd9
Add for use with subscribing to database records with Server Sent Ev…
kodeFant Sep 1, 2023
fc9b092
Fix typo in module name
kodeFant Sep 1, 2023
e592b25
Remove commented code
kodeFant Sep 1, 2023
ee1ab07
Remove duplicate headers
kodeFant Sep 1, 2023
f514043
Try to improve formatting
kodeFant Sep 1, 2023
c21290e
Add some documentation
kodeFant Sep 1, 2023
9b51967
Catch errors on heartbeat
kodeFant Sep 1, 2023
d2a2550
Clean up code a bit
kodeFant Sep 1, 2023
81bdc7c
Document code
kodeFant Sep 1, 2023
eb4bf41
More precise comment
kodeFant Sep 1, 2023
b7d8184
Unsubscribe PGListener and gracefully shut down heartbeat loop after …
kodeFant Sep 2, 2023
6346369
More explicit imports
kodeFant Sep 2, 2023
020b125
Use header names provided by WAI library
kodeFant Sep 2, 2023
610ddd7
Add comment to cleanup runCleanupActions
kodeFant Sep 2, 2023
618f7f4
Whitespace adjustments
kodeFant Sep 2, 2023
1d605fc
Concentrate notification query and do some performance optimizations
kodeFant Sep 2, 2023
6c63526
Add comments
kodeFant Sep 2, 2023
cb2fb1b
Re-order imports so easier to scan
kodeFant Sep 2, 2023
acde116
Re-order imports so easier to scan
kodeFant Sep 2, 2023
2239175
Add comment about why notificationTrigger was written this way
kodeFant Sep 2, 2023
f95e9a1
Nitpick whitespace
kodeFant Sep 2, 2023
aef20b0
Rename to IHP.PGEventStore
kodeFant Sep 2, 2023
3242dbb
Correct comment
kodeFant Sep 2, 2023
13fdd23
Try to organize the order of the functions better
kodeFant Sep 2, 2023
555cb46
Try to organize the order of the functions better
kodeFant Sep 2, 2023
7a4d67d
Add comment to
kodeFant Sep 2, 2023
8f4c8a1
Go back on notificationTrigger. The simple one is probably better and…
kodeFant Sep 2, 2023
a8e608a
Use pge_ prefix on notification and trigger names
kodeFant Sep 2, 2023
fd73387
Improve comment
kodeFant Sep 2, 2023
c542997
Remove stream initialize message as it's not necessary
kodeFant Sep 3, 2023
41b5686
Remove unecessary abstraction
kodeFant Sep 3, 2023
0690dcd
improve heartbeat a bit an use readTVarIO
kodeFant Sep 3, 2023
ee7d54c
Make import syntax consistent with rest of IHP source
kodeFant Sep 3, 2023
cc7b7a0
Go with more readable event payload
kodeFant Sep 3, 2023
cd5532a
Add newline on end of event payload
kodeFant Sep 3, 2023
75f4e42
Add newline on end of event payload
kodeFant Sep 3, 2023
49093b6
Mostly cosmetic change
kodeFant Sep 3, 2023
1787941
Add space between colon and data
kodeFant Sep 3, 2023
1858acc
Add MDN reference
kodeFant Sep 3, 2023
8b717d9
Fix typo and formatting
kodeFant Sep 3, 2023
adc1720
Improve some comments and logging
kodeFant Sep 3, 2023
3b9c47e
Some comment clarifications
kodeFant Sep 3, 2023
f196190
Nitpick
kodeFant Sep 3, 2023
f5bc7f1
Remove unecessary imports
kodeFant Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions IHP/PGEventSource.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
-- | The IHP.PGEventSource module is responsible for dispatching Server-sent Events (SSE) from PostgreSQL notification triggers.
module IHP.PGEventSource (streamPgEvent, initPgEventSource) where

import IHP.Prelude
import IHP.ApplicationContext (ApplicationContext(pgListener))
import IHP.Controller.Context (fromContext, putContext)
import IHP.ControllerSupport
import IHP.ModelSupport (withTableReadTracker, withRowLevelSecurityDisabled, sqlExec, trackTableRead)
import qualified IHP.Log as Log
import qualified IHP.PGListener as PGListener
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (atomically, TVar, newTVarIO, readTVar, writeTVar, modifyTVar', readTVarIO)
import qualified Control.Exception as Exception
import qualified Data.ByteString.Builder as B
import Data.String.Interpolate.IsString (i)
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import qualified Database.PostgreSQL.Simple.Types as PG
import Database.PostgreSQL.Simple.Notification (notificationPid, Notification)
import qualified Network.Wai as Wai
import Network.HTTP.Types (status200, hConnection)
import Network.HTTP.Types.Header (HeaderName, hContentType, hCacheControl)
import qualified Data.Set as Set
import Data.String.Interpolate.Util (unindent)


-- | Initialize database events functionality. This makes the PGListener
-- from the `ApplicationContext` available in the `ControllerContext`.
initPgEventSource :: (?context :: ControllerContext, ?applicationContext :: ApplicationContext) => IO ()
initPgEventSource = do
putContext ?applicationContext.pgListener


-- | Stream database change events to clients as Server-sent Events (SSE).
-- This function dispatches events to the client (most commonly the web browser) when the PGListener subscription triggers a notification.
streamPgEvent :: (?modelContext :: ModelContext, ?context :: ControllerContext, ?touchedTables::IORef (Set ByteString)) => ByteString -> IO ()
streamPgEvent eventName = do
touchedTables <- Set.toList <$> readIORef ?touchedTables
pgListener <- fromContext @PGListener.PGListener

-- Keep track of whether the client is still connected
isActive <- newTVarIO True
-- Cleanup actions to be executed when the client disconnects
cleanupActions <- newTVarIO [] :: IO (TVar [IO ()])

let addCleanupAction action = atomically $ modifyTVar' cleanupActions (action:)

let streamBody sendChunk flush = do
-- For each touched table, create a trigger in the database and subscribe to notifications
touchedTables
|> mapM \table -> do
createTriggerForTable table

let notificationCallback = handleNotificationTrigger sendChunk flush eventName table
subscription <- PGListener.subscribe (channelName table) notificationCallback pgListener

-- Add a cleanup action to unsubscribe from the channel when the client disconnects
addCleanupAction $ PGListener.unsubscribe subscription pgListener

-- Send a heartbeat to the client every 30 seconds to keep the connection alive
sendHeartbeats sendChunk flush isActive
`Exception.finally` runCleanupActions cleanupActions

-- Send the stream to the client
respondEventSource streamBody


-- | Required headers for SSE responses.
sseHeaders :: [(HeaderName, ByteString)]
sseHeaders =
[ (hCacheControl, "no-store")
, (hConnection, "keep-alive")
, (hContentType, "text/event-stream")
]

-- | Responds with a streaming body as an SSE to the client.
-- This function takes a 'Wai.StreamingBody' (essentially a stream of data chunks)
-- and sends it to the client with the appropriate headers
respondEventSource :: (?context::ControllerContext) => Wai.StreamingBody -> IO ()
respondEventSource streamBody = respondAndExit $ Wai.responseStream status200 sseHeaders streamBody


-- | Send periodic heartbeats to the client to keep the connection alive.
sendHeartbeats :: (?context :: ControllerContext) => (B.Builder -> IO a) -> IO () -> TVar Bool -> IO ()
sendHeartbeats sendChunk flush isActive = do
active <- readTVarIO isActive
when active $ do
threadDelay (30 * 1000000)
handleDisconnect isActive $ do
sendChunk (B.stringUtf8 ": heartbeat\n\n") >> flush
sendHeartbeats sendChunk flush isActive


-- Gracefully handle the client disconnect exception
handleDisconnect :: (?context :: ControllerContext) => TVar Bool -> IO () -> IO ()
handleDisconnect isActive action = action `Exception.catch` \e ->
if isDisconnectException e
then do
Log.info ("PGEventSource disconnected gracefully" :: Text)
atomically $ writeTVar isActive False
else Log.error $ "PGEventSource Error: " ++ show (e :: Exception.SomeException)
where
isDisconnectException e = "Client closed connection prematurely" `isInfixOf` show (e :: Exception.SomeException)


-- | Executes all cleanup actions stored in the provided 'TVar'.
--
-- After executing the cleanup actions, the 'TVar' is emptied.
--
-- @param cleanupActions A 'TVar' containing a list of IO actions representing cleanup operations.
runCleanupActions :: (?context :: ControllerContext) => TVar [IO a] -> IO ()
runCleanupActions cleanupActions = do
actions <- atomically $ do
a <- readTVar cleanupActions
writeTVar cleanupActions []
return a
forM_ actions id
Log.debug ("PGEventSource cleanup actions executed" :: Text)


-- | Handle notifications triggered by table changes. Sends the notification data as an SSE.
handleNotificationTrigger :: (?context :: ControllerContext) => (B.Builder -> IO a) -> IO () -> ByteString -> ByteString -> Notification -> IO ()
handleNotificationTrigger sendChunk flush eventName table notification = do
-- See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
let eventPayload =
B.byteString $ cs $ unindent (
[i|
id: #{fromIntegral $ notificationPid notification}
event: #{eventName}
data: #{table} change event triggered
|] <> "\n\n" -- Ending with at least two newlines is required to separate the event payload from the next one in the stream.
)

sendChunk eventPayload >> flush
`Exception.catch` (\e -> Log.error $ "PGEventSource error: Error sending chunk: " ++ show (e :: Exception.SomeException))
pure ()



-- | Creates a database trigger that notifies on table changes (insert, update, delete).
createTriggerForTable :: (?modelContext::ModelContext) => ByteString -> IO ()
createTriggerForTable table = do
let createTriggerSql = notificationTrigger table
withRowLevelSecurityDisabled do
sqlExec createTriggerSql ()
pure ()

-- | Generate the channel name for PostgreSQL notifications based on the table name.
channelName :: ByteString -> ByteString
channelName tableName = "pge_did_change_" <> tableName


-- | Constructs the SQL for creating triggers on table changes and sending notifications to the corresponding channel.
notificationTrigger :: ByteString -> PG.Query
notificationTrigger tableName = PG.Query [i|
BEGIN;
CREATE OR REPLACE FUNCTION #{functionName}() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('#{channelName tableName}', '');
RETURN new;
END;
$$ language plpgsql;
DROP TRIGGER IF EXISTS #{insertTriggerName} ON #{tableName};
CREATE TRIGGER #{insertTriggerName} AFTER INSERT ON "#{tableName}" FOR EACH STATEMENT EXECUTE PROCEDURE #{functionName}();

DROP TRIGGER IF EXISTS #{updateTriggerName} ON #{tableName};
CREATE TRIGGER #{updateTriggerName} AFTER UPDATE ON "#{tableName}" FOR EACH STATEMENT EXECUTE PROCEDURE #{functionName}();

DROP TRIGGER IF EXISTS #{deleteTriggerName} ON #{tableName};
CREATE TRIGGER #{deleteTriggerName} AFTER DELETE ON "#{tableName}" FOR EACH STATEMENT EXECUTE PROCEDURE #{functionName}();

COMMIT;
|]
where
functionName = "pge_notify_did_change_" <> tableName
insertTriggerName = "pge_did_insert_" <> tableName
updateTriggerName = "pge_did_update_" <> tableName
deleteTriggerName = "pge_did_delete_" <> tableName