Skip to content

Commit

Permalink
Improved handling of promise complaints
Browse files Browse the repository at this point in the history
  • Loading branch information
jepst committed May 16, 2011
1 parent a6009c8 commit e02cd9a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Remote/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ genericPut = generic `extQ` genericString
genericString :: String -> Put
genericString = put.encode

-- | See 'genericPut'
-- | This is the counterpart 'genericPut'
genericGet :: Data a => Get a
genericGet = generic `extR` genericString
where generic = (\id -> liftM id $ deserializeConstr $ \constr_rep ->
Expand Down
39 changes: 21 additions & 18 deletions Remote/Task.hs
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ diskify fp mps reallywrite =
return False
Right v -> return v

startNodeWorker :: ProcessId -> MVar (Map.Map PromiseId (MVar PromiseStorage)) ->
startNodeWorker :: ProcessId -> NodeBossState ->
MVar PromiseStorage -> Closure Payload -> ProcessM ()
startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) =
do self <- getSelfPid
spawnLocalAnd (starter self) (prefix self)
return ()
Expand All @@ -416,7 +416,7 @@ startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
setDaemonic
starter nodeboss = -- TODO try to do an undiskify here, if the promise is left over from a previous, failed run
let initialState = TaskState {tsMaster=masterpid,tsNodeBoss=Just nodeboss,
tsPromiseCache=mpc, tsRedeemerForwarding=Map.empty,
tsPromiseCache=nsPromiseCache nbs, tsRedeemerForwarding=nsRedeemerForwarding nbs,
tsMonitoring=Map.empty}
tasker = do tbl <- liftTask $ getLookup
case getEntryByIdent tbl cloname of
Expand All @@ -436,7 +436,8 @@ startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
data NodeBossState =
NodeBossState
{
nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage))
nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
nsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId)
}

startNodeManager :: ProcessId -> ProcessM ()
Expand All @@ -451,7 +452,7 @@ startNodeManager masterpid =
(\pc -> let newpc = Map.insert promise promisestore pc
in return (newpc,True))
when (ret)
(startNodeWorker masterpid promisecache promisestore clo)
(startNodeWorker masterpid state promisestore clo)
return (NmStartResponse ret,state))
nmTermination = matchProcessDown masterpid $
do forwardLogs Nothing
Expand Down Expand Up @@ -482,7 +483,8 @@ startNodeManager masterpid =
monitorProcess mypid masterpid MaMonitor
logS "TSK" LoInformation $ "Starting a nodeboss owned by " ++ show masterpid
pc <- liftIO $ newMVar Map.empty
let initState = NodeBossState {nsPromiseCache=pc}
pf <- liftIO $ newMVar Map.empty
let initState = NodeBossState {nsPromiseCache=pc,nsRedeemerForwarding=pf}
handler initState

-- | Starts a new context for executing a 'TaskM' environment.
Expand All @@ -509,8 +511,9 @@ startMaster proc =
where masterproc mvdone mvmaster nodeboss =
do master <- liftIO $ takeMVar mvmaster
pc <- liftIO $ newMVar Map.empty
pf <- liftIO $ newMVar Map.empty
let initialState = TaskState {tsMaster=master,tsNodeBoss=Just nodeboss,
tsPromiseCache=pc, tsRedeemerForwarding=Map.empty,
tsPromiseCache=pc, tsRedeemerForwarding=pf,
tsMonitoring=Map.empty}
res <- liftM snd $ runTaskM proc initialState
liftIO $ putMVar mvdone res
Expand Down Expand Up @@ -779,7 +782,7 @@ readPromise (PromiseImmediate a) = return a
readPromise thepromise@(PromiseBasic prhost prid) =
do mp <- lookupCachedPromise prid
case mp of
Nothing -> do fprhost <- lookupForwardedRedeemer prhost
Nothing -> do fprhost <- liftM (maybe prhost id) $ lookupForwardedRedeemer prid
res <- roundtrip fprhost (NmRedeem prid)
case res of
Left e -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because of "++show e
Expand Down Expand Up @@ -826,15 +829,15 @@ readPromise thepromise@(PromiseBasic prhost prid) =
Left a -> taskError $ "Couldn't file complaint with master about " ++ show fprhost ++ " because " ++ show a
Right (MmComplainResponse newhost)
| newhost == nullPid -> taskError $ "Couldn't file complaint with master about " ++ show fprhost
| otherwise -> do setForwardedRedeemer prhost newhost
| otherwise -> do setForwardedRedeemer prid newhost
readPromise thepromise

data TaskState = TaskState
{
tsMaster :: ProcessId,
tsNodeBoss :: Maybe ProcessId,
tsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
tsRedeemerForwarding :: Map.Map ProcessId ProcessId,
tsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId),
tsMonitoring :: Map.Map ProcessId ()
}

Expand All @@ -847,18 +850,18 @@ instance Monad TaskM where
return (ts'',a')
return x = TaskM $ \ts -> return $ (ts,x)

lookupForwardedRedeemer :: ProcessId -> TaskM ProcessId
lookupForwardedRedeemer :: PromiseId -> TaskM (Maybe ProcessId)
lookupForwardedRedeemer q =
TaskM $ \ts ->
case Map.lookup q (tsRedeemerForwarding ts) of
Nothing -> return (ts,q)
Just a -> return (ts,a)
liftIO $ withMVar (tsRedeemerForwarding ts) $ (\fwd ->
let lo = Map.lookup q fwd
in return (ts,lo))

setForwardedRedeemer :: ProcessId -> ProcessId -> TaskM ()
setForwardedRedeemer :: PromiseId -> ProcessId -> TaskM ()
setForwardedRedeemer from to =
TaskM $ \ts ->
let newmap = Map.insert from to (tsRedeemerForwarding ts)
in return (ts {tsRedeemerForwarding=newmap},())
TaskM $ \ts -> liftIO $ modifyMVar (tsRedeemerForwarding ts) (\fwd ->
let newmap = Map.insert from to fwd
in return ( newmap,(ts,()) ) )

lookupCachedPromise :: PromiseId -> TaskM (Maybe (MVar PromiseStorage))
lookupCachedPromise prid = TaskM $ \ts ->
Expand Down
15 changes: 11 additions & 4 deletions util/Diag.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ module Main (main) where
-- nodes, run this program to check your configuration.

import Remote (remoteInit, getPeers, getSelfNode, ProcessM)
import Remote.Process (hostFromNid,getConfig,cfgNetworkMagic,cfgKnownHosts)
import Remote.Process (hostFromNid,getConfig,cfgNetworkMagic,cfgKnownHosts,cfgPeerDiscoveryPort)

import qualified Data.Map as Map (elems)
import Control.Monad.Trans (liftIO)
import Data.List (intercalate)
import Data.List (intercalate,nub)

s :: String -> ProcessM ()
s = liftIO . putStrLn

orNone :: [String] -> String
orNone [] = "None"
orNone a = intercalate "," a

initialProcess myRole =
do s "Cloud Haskell diagnostics\n"
mynid <- getSelfNode
Expand All @@ -24,8 +28,11 @@ initialProcess myRole =
s $ "I seem to be running on host \""++hostFromNid mynid++"\".\nIf that's wrong, set it using the cfgHostName option.\n"
s $ "My role is \""++myRole++"\".\nIf that's wrong, set it using the cfgRole option.\n"
s $ "My magic is \""++cfgNetworkMagic cfg++"\".\nIf that's wrong, set it using the cfgNetworkMagic option.\n"
s $ "I will look for nodes on the following hosts,\n as well as any hosts on the local network: " ++ intercalate "," (cfgKnownHosts cfg)
let hosts = intercalate ", " $ map (hostFromNid) (concat $ Map.elems peers)
s $ "I will look for nodes on the following hosts: " ++ orNone (cfgKnownHosts cfg)
s $ if cfgPeerDiscoveryPort cfg > 0
then "I will also look for nodes on the local network."
else "I will not look for nodes on the local network other than those named above."
let hosts = orNone $ nub $ map (hostFromNid) (concat $ Map.elems peers)
s $ "I have found nodes on the following hosts: "++hosts++".\nIf I'm not finding all the nodes you expected, make sure they:"
s $ "\tare running\n\tare not behind a firewall\n\thave the same magic\n\tare listed in cfgKnownHosts"

Expand Down

0 comments on commit e02cd9a

Please sign in to comment.