Switch the RPC module over to the multi interface
authorIustin Pop <iustin@google.com>
Wed, 20 Feb 2013 11:14:57 +0000 (12:14 +0100)
committerIustin Pop <iustin@google.com>
Wed, 20 Feb 2013 16:47:42 +0000 (17:47 +0100)
This replaces the very-basic parMap of IO actions (fully serialised,
as parMap won't work here), to the multi interface.

This makes a simple "time gnt-node list" on a 6-node cluster go from
3.2s to ~0.9s, and allows even better parallelisation - before,
curlGetString was blocking, whereas the new interface does allow some
interleaving.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Michele Tartara <mtartara@google.com>

src/Ganeti/Rpc.hs

index 78658ba..38df585 100644 (file)
@@ -78,11 +78,13 @@ import Text.JSON.Pretty (pp_value)
 import Network.Curl
 import qualified Ganeti.Path as P
 
+import Ganeti.BasicTypes
 import qualified Ganeti.Constants as C
 import Ganeti.Objects
 import Ganeti.THH
 import Ganeti.Types
-import Ganeti.Compat
+import Ganeti.Curl.Multi
+import Ganeti.Utils
 
 -- * Base RPC functionality and types
 
@@ -151,19 +153,6 @@ data HttpClientRequest = HttpClientRequest
   , requestOpts :: [CurlOption] -- ^ The various curl options
   }
 
--- | Execute the request and return the result as a plain String. When
--- curl reports an error, we propagate it.
-executeHttpRequest :: ERpcError HttpClientRequest -> IO (ERpcError String)
-executeHttpRequest (Left rpc_err) = return $ Left rpc_err
-executeHttpRequest (Right request) = do
-  let reqOpts = CurlPostFields [requestData request]:requestOpts request
-      url = requestUrl request
-  -- FIXME: This is very similar to getUrl in Htools/Rapi.hs
-  (code, !body) <- curlGetString url $ curlOpts ++ reqOpts
-  return $ case code of
-             CurlOK -> Right body
-             _ -> Left $ CurlLayerError (show code)
-
 -- | Prepare url for the HTTP request.
 prepareUrl :: (RpcCall a) => Node -> a -> String
 prepareUrl node call =
@@ -184,10 +173,17 @@ prepareHttpRequest opts node call
                               }
   | otherwise = Left OfflineNodeError
 
+-- | Parse an HTTP reply.
+parseHttpReply :: (Rpc a b) =>
+                  a -> ERpcError (CurlCode, String) -> ERpcError b
+parseHttpReply _ (Left e) = Left e
+parseHttpReply call (Right (CurlOK, body)) = parseHttpResponse call body
+parseHttpReply _ (Right (code, err)) =
+  Left . CurlLayerError $ "code: " ++ show code ++ ", explanation: " ++ err
+
 -- | Parse a result based on the received HTTP response.
-parseHttpResponse :: (Rpc a b) => a -> ERpcError String -> ERpcError b
-parseHttpResponse _ (Left err) = Left err
-parseHttpResponse call (Right res) =
+parseHttpResponse :: (Rpc a b) => a -> String -> ERpcError b
+parseHttpResponse call res =
   case J.decode res of
     J.Error val -> Left $ JsonDecodeError val
     J.Ok (True, res'') -> rpcResultFill call res''
@@ -195,15 +191,6 @@ parseHttpResponse call (Right res) =
        J.JSString msg -> Left $ RpcResultError (J.fromJSString msg)
        _ -> Left . JsonDecodeError $ show (pp_value jerr)
 
--- | Execute RPC call for a sigle node.
-executeSingleRpcCall :: (Rpc a b) =>
-                        [CurlOption] -> Node -> a -> IO (Node, ERpcError b)
-executeSingleRpcCall opts node call = do
-  let request = prepareHttpRequest opts node call
-  response <- executeHttpRequest request
-  let result = parseHttpResponse call response
-  return (node, result)
-
 -- | Execute RPC call for many nodes in parallel.
 executeRpcCall :: (Rpc a b) => [Node] -> a -> IO [(Node, ERpcError b)]
 executeRpcCall nodes call = do
@@ -213,7 +200,24 @@ executeRpcCall nodes call = do
              , CurlSSLKey cert_file
              , CurlCAInfo cert_file
              ]
-  sequence $ parMap rwhnf (\n -> executeSingleRpcCall opts n call) nodes
+      opts_urls = map (\n ->
+                         case prepareHttpRequest opts n call of
+                           Left v -> Left v
+                           Right request ->
+                             Right (CurlPostFields [requestData request]:
+                                    requestOpts request,
+                                    requestUrl request)
+                      ) nodes
+  -- split the opts_urls list; we don't want to pass the
+  -- failed-already nodes to Curl
+  let (lefts, rights, trail) = splitEithers opts_urls
+  results <- execMultiCall rights
+  results' <- case recombineEithers lefts results trail of
+                Bad msg -> error msg
+                Ok r -> return r
+  -- now parse the replies
+  let results'' = map (parseHttpReply call) results'
+  return $ zip nodes results''
 
 -- | Helper function that is used to read dictionaries of values.
 sanitizeDictResults :: [(String, J.Result a)] -> ERpcError [(String, a)]