Skip to content

Commit

Permalink
Commenting a minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jepst committed May 15, 2011
1 parent fc724f2 commit a6009c8
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 53 deletions.
5 changes: 3 additions & 2 deletions Remote.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Remote ( -- * The process layer
ProcessM, NodeId, ProcessId, MatchM,
getSelfPid, getSelfNode,

send,
send,sendQuiet,

spawn, spawnLocal, spawnAnd,
spawnLink,
Expand All @@ -38,7 +38,8 @@ module Remote ( -- * The process layer

remotable, RemoteCallMetaData, Lookup,

Closure(..), Payload, genericPut, genericGet, Serializable,
Closure, makeClosure, invokeClosure,
Payload, genericPut, genericGet, Serializable,

-- * Channels

Expand Down
10 changes: 2 additions & 8 deletions Remote/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ module Remote.Call (
) where

import Language.Haskell.TH
import Data.Maybe (maybe)
import Data.List (intercalate,null)
import qualified Data.Map as Map (Map,insert,lookup,empty,toList)
import Data.Dynamic (toDyn,Dynamic,fromDynamic,dynTypeRep)
import Data.Generics (Data)
import Data.Typeable (typeOf,Typeable)
import Data.Binary (Binary)
import Remote.Encoding (Payload,serialDecode,serialEncode,serialEncodePure,Serializable)
import Remote.Encoding (Payload,serialDecode,serialEncode,serialEncodePure)
import Control.Monad.Trans (liftIO)
import Control.Monad (liftM)
import Remote.Closure (Closure(..))
Expand Down Expand Up @@ -130,6 +123,7 @@ remotable names =
(AppT (ConT n) _) -> (AppT (ConT n) payload)
_ -> toProcessM payload
_ -> toProcessM payload
processmtoclosure (AppT mc x) | mc == ttprocessm && isarrow x = AppT ttclosure x
processmtoclosure (x) = (AppT ttclosure x)
isarrowful = isarrow $ last arglist
isarrow (AppT (AppT ArrowT _) _) = True
Expand Down
47 changes: 47 additions & 0 deletions Remote/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,20 @@ import Control.Concurrent.STM.TVar (TVar,newTVarIO,readTVar,writeTVar)
-- * Channels
----------------------------------------------

-- | A channel is a unidirectional communication pipeline
-- with two ends: a sending port, and a receiving port.
-- This is the sending port. A process holding this
-- value can insert messages into the channel. SendPorts
-- themselves can also be sent to other processes.
-- The other side of the channel is the 'ReceivePort'.
newtype SendPort a = SendPort ProcessId deriving (Typeable)

-- | A process holding a ReceivePort can extract messages
-- from the channel, which we inserted by
-- the holder(s) of the corresponding 'SendPort'.
-- Critically, ReceivePorts, unlike SendPorts, are not serializable.
-- This means that you can only receive messages through a channel
-- on the node on which the channel was created.
data ReceivePort a = ReceivePortSimple ProcessId (MVar ())
| ReceivePortBiased [Node -> STM a]
| ReceivePortRR (TVar [Node -> STM a])
Expand All @@ -42,16 +55,20 @@ instance Binary (SendPort a) where
put (SendPort pid) = put pid
get = get >>= return . SendPort

-- | Create a new channel, and returns both the 'SendPort'
-- and 'ReceivePort' thereof.
newChannel :: (Serializable a) => ProcessM (SendPort a, ReceivePort a)
newChannel = do mv <- liftIO $ newEmptyMVar
pid <- spawnLocalAnd (body mv) setDaemonic
return (SendPort pid,
ReceivePortSimple pid mv)
where body mv = liftIO (takeMVar mv)

-- | Inserts a new value into the channel.
sendChannel :: (Serializable a) => SendPort a -> a -> ProcessM ()
sendChannel (SendPort pid) a = send pid a

-- | Extract a value from the channel, in FIFO order.
receiveChannel :: (Serializable a) => ReceivePort a -> ProcessM a
receiveChannel rc = do p <- getProcess
channelCheckPids [rc]
Expand All @@ -71,22 +88,47 @@ receiveChannelImpl node rc =

data CombinedChannelAction b = forall a. (Serializable a) => CombinedChannelAction (ReceivePort a) (a -> b)

-- | Specifies a port and an adapter for combining ports via 'combinePortsBiased' and
-- 'combinePortsRR'.
combinedChannelAction :: (Serializable a) => ReceivePort a -> (a -> b) -> CombinedChannelAction b
combinedChannelAction = CombinedChannelAction

-- | This function lets us respond to messages on multiple channels
-- by combining several 'ReceivePort's into one. The resulting port
-- is the sum of the input ports, and will extract messages from all
-- of them in FIFO order. The input ports are specified by
-- 'combinedChannelAction', which also gives a converter function.
-- After combining the underlying receive ports can still
-- be used independently, as well.
-- We provide two ways to combine ports, which differ bias
-- they demonstrate in returning messages when more than one
-- underlying channel is nonempty. combinePortsBiased will
-- check ports in the order given by its argument, and so
-- if the first channel always was a message waiting, it will.
-- starve the other channels. The alternative is 'combinePortsRR'.
combinePortsBiased :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)
combinePortsBiased chns = do mapM_ (\(CombinedChannelAction chn _ ) -> channelCheckPids [chn]) chns
return $ ReceivePortBiased [(\node -> receiveChannelImpl node chn >>= return . fun) | (CombinedChannelAction chn fun) <- chns]

-- | See 'combinePortsBiased'. This function differs from that one
-- in that the order that the underlying ports are checked is rotated
-- with each invocation, guaranteeing that, given enough invocations,
-- every channel will have a chance to contribute a message.
combinePortsRR :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)
combinePortsRR chns = do mapM_ (\(CombinedChannelAction chn _ ) -> channelCheckPids [chn]) chns
tv <- liftIO $ newTVarIO [(\node -> receiveChannelImpl node chn >>= return . fun) | (CombinedChannelAction chn fun) <- chns]
return $ ReceivePortRR tv

-- | Similar to 'combinePortsBiased', with the difference that the
-- the underlying ports must be of the same type, and you don't
-- have the opportunity to provide an adapter function.
mergePortsBiased :: (Serializable a) => [ReceivePort a] -> ProcessM (ReceivePort a)
mergePortsBiased chns = do channelCheckPids chns
return $ ReceivePortBiased [(\node -> receiveChannelImpl node chn) | chn <- chns]

-- | Similar to 'combinePortsRR', with the difference that the
-- the underlying ports must be of the same type, and you don't
-- have the opportunity to provide an adapter function.
mergePortsRR :: (Serializable a) => [ReceivePort a] -> ProcessM (ReceivePort a)
mergePortsRR chns = do channelCheckPids chns
tv <- liftIO $ newTVarIO [(\node -> receiveChannelImpl node chn) | chn <- chns]
Expand All @@ -109,6 +151,11 @@ receiveChannelSimple node (ReceivePortSimple chpid _) =
Just q -> return q
where badPid = throw $ TransmitException QteUnknownPid

-- | Terminate a channel. After calling this function, 'receiveChannel'
-- on that port (or on any combined port based on it) will either
-- fail or block indefinitely, and 'sendChannel' on the corresponding
-- 'SendPort' will fail. Any unread messages remaining in the channel
-- will be lost.
terminateChannel :: (Serializable a) => ReceivePort a -> ProcessM ()
terminateChannel (ReceivePortSimple _ term) = liftIO $ putMVar (term) ()
terminateChannel _ = throw $ TransmitException QteUnknownPid
4 changes: 2 additions & 2 deletions Remote/Closure.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import Remote.Encoding (Payload)
-- In spirit, this is actually:
--
-- > data Closure a where
-- > Closure :: Serializable v => (v -#> a) -> v -> Closure a
-- > Closure :: Serializable v => Static (v -> a) -> v -> Closure a
--
-- where funny arrow (-#>) identifies a function with no free variables.
-- where the Static type wraps a function with no non-static free variables.
-- We simulate this behavior by identifying top-level functions as strings.
-- See the paper for clarification.
data Closure a = Closure String Payload
Expand Down
31 changes: 26 additions & 5 deletions Remote/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ import System.IO (Handle)
import Data.Typeable (typeOf,Typeable)
import Data.Generics (Data,gfoldl,gunfold, toConstr,constrRep,ConstrRep(..),repConstr,extQ,extR,dataTypeOf)

-- | Data that can be sent as a message must implement
-- this class. The class has no functions of its own,
-- but instead simply requires that the type implement
-- both 'Typeable' and 'Binary'. Typeable can usually
-- be derived automatically. Binary requires the put and get
-- functions, which can be easily implemented by hand,
-- or you can use the 'genericGet' and 'genericPut' flavors,
-- which will work automatically for types implementing
-- 'Data'.
class (Binary a,Typeable a) => Serializable a
instance (Binary a,Typeable a) => Serializable a

Expand Down Expand Up @@ -104,11 +113,22 @@ serialDecode a = (\id ->
else return Nothing ) id


{- By default, gfoldl will try to store a String as a list of Chars,
which is pretty inefficient. So, we special-case string serialization
to use the serialization provided by Binary. Other types could
also be easily special-cased
-}
-- | Data types that can be used in messaging must
-- be serializable, which means that they must implement
-- the 'get' and 'put' methods from 'Binary'. If you
-- are too lazy to write these functions yourself,
-- you can delegate responsibility to this function.
-- It's usually sufficient to do something like this:
--
-- > import Data.Data (Data)
-- > import Data.Typeable (Typeable)
-- > import Data.Binary (Binary, get, put)
-- > data MyType = MkMyType Foobar Int [(String, Waddle Baz)]
-- > | MkSpatula
-- > deriving (Data, Typeable)
-- > instance Binary MyType where
-- > put = genericPut
-- > get = genericGet
genericPut :: (Data a) => a -> Put
genericPut = generic `extQ` genericString
where generic what = fst $ gfoldl
Expand All @@ -118,6 +138,7 @@ genericPut = generic `extQ` genericString
genericString :: String -> Put
genericString = put.encode

-- | See 'genericPut'
genericGet :: Data a => Get a
genericGet = generic `extR` genericString
where generic = (\id -> liftM id $ deserializeConstr $ \constr_rep ->
Expand Down
6 changes: 5 additions & 1 deletion Remote/Peer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ sendBroadcast port str
return ()
)

-- | Find peers with 'getPeersStatic' and 'getPeersDynamic' and combines their results.
-- | Returns information about all nodes on the current network
-- that this node knows about. This function combines dynamic
-- and static mechanisms. See documentation on 'getPeersStatic'
-- and 'getPeersDynamic' for more info. This function depends
-- on the configuration values @cfgKnownHosts@ and @cfgPeerDiscoveryPort@.
getPeers :: ProcessM PeerInfo
getPeers = do a <- getPeersStatic
b <- getPeersDynamic 500000
Expand Down
29 changes: 17 additions & 12 deletions Remote/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Remote.Process (
match,matchIf,matchUnknown,matchUnknownThrow,matchProcessDown,

-- * Message sending
send,
send,sendQuiet,

-- * Logging functions
logS,say,
Expand Down Expand Up @@ -62,24 +62,24 @@ module Remote.Process (
where

import Control.Concurrent (forkIO,ThreadId,threadDelay)
import Control.Concurrent.MVar (withMVar,MVar,newMVar, newEmptyMVar,isEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar)
import Control.Concurrent.MVar (MVar,newMVar, newEmptyMVar,isEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar)
import Prelude hiding (catch)
import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,PatternMatchFail(..),catch)
import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,catch)
import Control.Monad (foldM,when,liftM,forever)
import Control.Monad.Trans (MonadTrans,lift,MonadIO,liftIO)
import Control.Monad.Trans (MonadIO,liftIO)
import Data.Binary (Binary,put,get,putWord8,getWord8)
import Data.Char (isSpace,isDigit)
import Data.List (isSuffixOf,foldl', isPrefixOf,nub)
import Data.Maybe (listToMaybe,catMaybes,fromJust,isNothing)
import Data.Maybe (catMaybes,isNothing)
import Data.Typeable (Typeable)
import Data.Unique (newUnique,hashUnique)
import System.IO (Handle,hIsEOF,hClose,hSetBuffering,hGetChar,hPutChar,BufferMode(..),hFlush)
import System.IO (Handle,hClose,hSetBuffering,hGetChar,hPutChar,BufferMode(..),hFlush)
import System.IO.Error (isEOFError,isDoesNotExistError,isUserError)
import System.FilePath (FilePath)
import Network.BSD (HostEntry(..),getHostName)
import Network (HostName,PortID(..),PortNumber(..),listenOn,accept,sClose,connectTo,Socket)
import Network (HostName,PortID(..),PortNumber,listenOn,accept,sClose,connectTo,Socket)
import Network.Socket (PortNumber(..),setSocketOption,SocketOption(..),socketPort,aNY_PORT )
import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,member,update,map,empty,adjust,alter,insert,delete,lookup,toList,size,insertWith')
import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,member,update,empty,adjust,alter,insert,delete,lookup,toList,size,insertWith')
import Remote.Reg (getEntryByIdent,Lookup,empty)
import Remote.Encoding (serialEncode,serialDecode,serialEncodePure,serialDecodePure,Payload,Serializable,PayloadLength,genericPut,genericGet,hPutPayload,hGetPayload,payloadLength,getPayloadType)
import System.Environment (getArgs)
Expand Down Expand Up @@ -651,7 +651,7 @@ forkProcessWeak f = do p <- getProcess
return ()

-- | Create a new process on the current node. Returns the new process's identifier.
-- Unlike 'spawnRemote', this function does not need a 'Closure' or a 'NodeId'.
-- Unlike 'spawn', this function does not need a 'Closure' or a 'NodeId'.
spawnLocal :: ProcessM () -> ProcessM ProcessId
spawnLocal fun = do p <- getProcess
liftIO $ runLocalProcess (prNodeRef p) fun
Expand Down Expand Up @@ -869,6 +869,11 @@ send pid msg = sendSimple pid msg PldUser >>=
_ -> throw $ TransmitException x
)

-- | Like 'send', but in case of error returns a value rather than throw
-- an exception.
sendQuiet :: (Serializable a) => ProcessId -> a -> ProcessM TransmitStatus
sendQuiet p m = sendSimple p m PldUser

sendSimple :: (Serializable a) => ProcessId -> a -> PayloadDisposition -> ProcessM TransmitStatus
sendSimple pid dat pld = sendTry pid dat (Nothing :: Maybe ()) pld

Expand Down Expand Up @@ -2282,13 +2287,13 @@ terminate = throw ProcessTerminationException
unpause :: ProcessId -> ProcessM ()
unpause pid = send pid AmSpawnUnpause

-- | A variant of 'spawnRemote' that starts the remote process with
-- | A variant of 'spawn' that starts the remote process with
-- bidirectoinal monitoring, as in 'linkProcess'
spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
spawnLink node clo = do mypid <- getSelfPid
spawnAnd node clo defaultSpawnOptions {amsoLink=Just mypid}

-- | A variant of spawnRemote that allows greater control over how the remote process is started.
-- | A variant of 'spawn' that allows greater control over how the remote process is started.
spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessId
spawnAnd node clo opt =
do res <- roundtripQueryUnsafe PldAdmin (adminGetPid node ServiceSpawner) (AmSpawn clo opt)
Expand Down Expand Up @@ -2338,7 +2343,7 @@ startSpawnerService = serviceThread ServiceSpawner spawner
Right (Just pl) -> responder (Just pl)
spawnWorker c = do a <- invokeClosure c
case a of
Nothing -> (logS "SYS" LoCritical $ "Failed to invoke closure "++(show c)) --TODO it would be nice if this error could be propagated to the caller of spawnRemote, at the very least it should throw an exception so a linked process will be notified
Nothing -> (logS "SYS" LoCritical $ "Failed to invoke closure "++(show c)) --TODO it would be nice if this error could be propagated to the caller of spawn, at the very least it should throw an exception so a linked process will be notified
Just q -> q
matchCallRequest = roundtripResponseAsync
(\cmd sender -> case cmd of
Expand Down
4 changes: 2 additions & 2 deletions Remote/Reg.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- | Compile-time metadata functions, part of the
-- | Runtime metadata functions, part of the
-- RPC mechanism
module Remote.Reg (
-- * Runtime metadata
Expand All @@ -16,7 +16,7 @@ import Data.Typeable (Typeable)
import qualified Data.Map as Map (insert,lookup,Map,empty)

----------------------------------------------
-- * Run-time metadata
-- * Runtime metadata
------------------------------

-- | Data of this type is generated at compile-time
Expand Down
Loading

0 comments on commit a6009c8

Please sign in to comment.