, LuxiReq(..)
, Client
, JobId
+ , fromJobId
+ , makeJobId
, RecvResult(..)
- , TagObject(..)
, strOfOp
, getClient
, getServer
, recvMsg
, recvMsgExt
, sendMsg
+ , allLuxiCalls
) where
import Control.Exception (catch)
import Data.IORef
-import Data.Ratio (numerator, denominator)
import qualified Data.ByteString as B
import qualified Data.ByteString.UTF8 as UTF8
import Data.Word (Word8)
import Control.Monad
-import Prelude hiding (catch)
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
+import Text.JSON.Pretty (pp_value)
import Text.JSON.Types
import System.Directory (removeFile)
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
import System.Timeout
import qualified Network.Socket as S
-import Ganeti.HTools.JSON
-import Ganeti.HTools.Types
-import Ganeti.HTools.Utils
-
+import Ganeti.BasicTypes
import Ganeti.Constants
-import Ganeti.Jobs (JobStatus)
-import Ganeti.OpCodes (OpCode)
+import Ganeti.Errors
+import Ganeti.JSON
+import Ganeti.OpParams (pTagsObject)
+import Ganeti.OpCodes
import qualified Ganeti.Query.Language as Qlang
import Ganeti.THH
+import Ganeti.Types
-- * Utility functions
data RecvResult = RecvConnClosed -- ^ Connection closed
| RecvError String -- ^ Any other error
| RecvOk String -- ^ Successfull receive
- deriving (Show, Read, Eq)
-
--- | The Ganeti job type.
-type JobId = Int
-
--- | Data type representing what items do the tag operations apply to.
-$(declareSADT "TagObject"
- [ ("TagInstance", 'tagInstance)
- , ("TagNode", 'tagNode)
- , ("TagGroup", 'tagNodegroup)
- , ("TagCluster", 'tagCluster)
- ])
-$(makeJSONInstance ''TagObject)
+ deriving (Show, Eq)
-- | Currently supported Luxi operations and JSON serialization.
$(genLuxiOp "LuxiOp"
[ (luxiReqQuery,
- [ ("what", [t| Qlang.ItemType |])
- , ("fields", [t| [String] |])
- , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
+ [ simpleField "what" [t| Qlang.ItemType |]
+ , simpleField "fields" [t| [String] |]
+ , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
])
, (luxiReqQueryFields,
- [ ("what", [t| Qlang.ItemType |])
- , ("fields", [t| [String] |])
+ [ simpleField "what" [t| Qlang.ItemType |]
+ , simpleField "fields" [t| [String] |]
])
, (luxiReqQueryNodes,
- [ ("names", [t| [String] |])
- , ("fields", [t| [String] |])
- , ("lock", [t| Bool |])
+ [ simpleField "names" [t| [String] |]
+ , simpleField "fields" [t| [String] |]
+ , simpleField "lock" [t| Bool |]
])
, (luxiReqQueryGroups,
- [ ("names", [t| [String] |])
- , ("fields", [t| [String] |])
- , ("lock", [t| Bool |])
+ [ simpleField "names" [t| [String] |]
+ , simpleField "fields" [t| [String] |]
+ , simpleField "lock" [t| Bool |]
])
, (luxiReqQueryInstances,
- [ ("names", [t| [String] |])
- , ("fields", [t| [String] |])
- , ("lock", [t| Bool |])
+ [ simpleField "names" [t| [String] |]
+ , simpleField "fields" [t| [String] |]
+ , simpleField "lock" [t| Bool |]
])
, (luxiReqQueryJobs,
- [ ("ids", [t| [Int] |])
- , ("fields", [t| [String] |])
+ [ simpleField "ids" [t| [JobId] |]
+ , simpleField "fields" [t| [String] |]
])
, (luxiReqQueryExports,
- [ ("nodes", [t| [String] |])
- , ("lock", [t| Bool |])
+ [ simpleField "nodes" [t| [String] |]
+ , simpleField "lock" [t| Bool |]
])
, (luxiReqQueryConfigValues,
- [ ("fields", [t| [String] |]) ]
+ [ simpleField "fields" [t| [String] |] ]
)
, (luxiReqQueryClusterInfo, [])
, (luxiReqQueryTags,
- [ ("kind", [t| TagObject |])
- , ("name", [t| String |])
- ])
+ [ pTagsObject ])
, (luxiReqSubmitJob,
- [ ("job", [t| [OpCode] |]) ]
+ [ simpleField "job" [t| [MetaOpCode] |] ]
)
, (luxiReqSubmitManyJobs,
- [ ("ops", [t| [[OpCode]] |]) ]
+ [ simpleField "ops" [t| [[MetaOpCode]] |] ]
)
, (luxiReqWaitForJobChange,
- [ ("job", [t| Int |])
- , ("fields", [t| [String]|])
- , ("prev_job", [t| JSValue |])
- , ("prev_log", [t| JSValue |])
- , ("tmout", [t| Int |])
+ [ simpleField "job" [t| JobId |]
+ , simpleField "fields" [t| [String]|]
+ , simpleField "prev_job" [t| JSValue |]
+ , simpleField "prev_log" [t| JSValue |]
+ , simpleField "tmout" [t| Int |]
])
, (luxiReqArchiveJob,
- [ ("job", [t| Int |]) ]
+ [ simpleField "job" [t| JobId |] ]
)
, (luxiReqAutoArchiveJobs,
- [ ("age", [t| Int |])
- , ("tmout", [t| Int |])
+ [ simpleField "age" [t| Int |]
+ , simpleField "tmout" [t| Int |]
])
, (luxiReqCancelJob,
- [ ("job", [t| Int |]) ]
+ [ simpleField "job" [t| JobId |] ]
+ )
+ , (luxiReqChangeJobPriority,
+ [ simpleField "job" [t| JobId |]
+ , simpleField "priority" [t| Int |] ]
)
, (luxiReqSetDrainFlag,
- [ ("flag", [t| Bool |]) ]
+ [ simpleField "flag" [t| Bool |] ]
)
, (luxiReqSetWatcherPause,
- [ ("duration", [t| Double |]) ]
+ [ simpleField "duration" [t| Double |] ]
)
])
$(makeJSONInstance ''LuxiReq)
+-- | List of all defined Luxi calls.
+$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
+
-- | The serialisation of LuxiOps into strings in messages.
$(genStrOfOp ''LuxiOp "strOfOp")
getClient :: String -> IO Client
getClient path = do
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
- withTimeout connTimeout "creating luxi connection" $
+ withTimeout luxiDefCtmo "creating luxi connection" $
S.connect s (S.SockAddrUnix path)
rf <- newIORef B.empty
h <- S.socketToHandle s ReadWriteMode
-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
-sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
+sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
let encoded = UTF8.fromString buf
handle = socket s
B.hPut handle encoded
-- message and the leftover buffer contents.
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
recvUpdate handle obuf = do
- nbuf <- withTimeout queryTimeout "reading luxi response" $ do
+ nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
_ <- hWaitForInput handle (-1)
B.hGetNonBlocking handle 4096
let (msg, remaining) = B.break (eOM ==) nbuf
-- | Extended wrapper over recvMsg.
recvMsgExt :: Client -> IO RecvResult
recvMsgExt s =
- catch (liftM RecvOk (recvMsg s)) $ \e ->
- if isEOFError e
- then return RecvConnClosed
- else return $ RecvError (show e)
+ Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
+ return $ if isEOFError e
+ then RecvConnClosed
+ else RecvError (show e)
-- | Serialize a request to String.
buildCall :: LuxiOp -- ^ The method
decodeCall (LuxiCall call args) =
case call of
ReqQueryJobs -> do
- (jid, jargs) <- fromJVal args
- rid <- mapM parseJobId jid
- let rargs = map fromJSString jargs
- return $ QueryJobs rid rargs
+ (jids, jargs) <- fromJVal args
+ jids' <- case jids of
+ JSNull -> return []
+ _ -> fromJVal jids
+ return $ QueryJobs jids' jargs
ReqQueryInstances -> do
(names, fields, locking) <- fromJVal args
return $ QueryInstances names fields locking
ReqQueryGroups -> do
(names, fields, locking) <- fromJVal args
return $ QueryGroups names fields locking
- ReqQueryClusterInfo -> do
+ ReqQueryClusterInfo ->
return QueryClusterInfo
ReqQuery -> do
(what, fields, qfilter) <- fromJVal args
J.readJSON d `ap`
J.readJSON e
_ -> J.Error "Not enough values"
- rid <- parseJobId jid
- return $ WaitForJobChange rid fields pinfo pidx wtmout
+ return $ WaitForJobChange jid fields pinfo pidx wtmout
ReqArchiveJob -> do
[jid] <- fromJVal args
- rid <- parseJobId jid
- return $ ArchiveJob rid
+ return $ ArchiveJob jid
ReqAutoArchiveJobs -> do
(age, tmout) <- fromJVal args
return $ AutoArchiveJobs age tmout
return $ QueryConfigValues fields
ReqQueryTags -> do
(kind, name) <- fromJVal args
- return $ QueryTags kind name
+ item <- tagObjectFrom kind name
+ return $ QueryTags item
ReqCancelJob -> do
- [job] <- fromJVal args
- rid <- parseJobId job
- return $ CancelJob rid
+ [jid] <- fromJVal args
+ return $ CancelJob jid
+ ReqChangeJobPriority -> do
+ (jid, priority) <- fromJVal args
+ return $ ChangeJobPriority jid priority
ReqSetDrainFlag -> do
[flag] <- fromJVal args
return $ SetDrainFlag flag
-- | Check that luxi responses contain the required keys and that the
-- call was successful.
-validateResult :: String -> Result JSValue
+validateResult :: String -> ErrorResult JSValue
validateResult s = do
when (UTF8.replacement_char `elem` s) $
fail "Failed to decode UTF-8, detected replacement char after decoding"
- oarr <- fromJResult "Parsing LUXI response"
- (decodeStrict s)::Result (JSObject JSValue)
+ oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
let arr = J.fromJSObject oarr
- status <- fromObj arr (strOfKey Success)::Result Bool
- let rkey = strOfKey Result
+ status <- fromObj arr (strOfKey Success)
+ result <- fromObj arr (strOfKey Result)
if status
- then fromObj arr rkey
- else fromObj arr rkey >>= fail
+ then return result
+ else decodeError result
+
+-- | Try to decode an error from the server response. This function
+-- will always fail, since it's called only on the error path (when
+-- status is False).
+decodeError :: JSValue -> ErrorResult JSValue
+decodeError val =
+ case fromJVal val of
+ Ok e -> Bad e
+ Bad msg -> Bad $ GenericError msg
-- | Generic luxi method call.
-callMethod :: LuxiOp -> Client -> IO (Result JSValue)
+callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
callMethod method s = do
sendMsg s $ buildCall method
result <- recvMsg s
let rval = validateResult result
return rval
--- | Parses a job ID.
-parseJobId :: JSValue -> Result JobId
-parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
-parseJobId (JSRational _ x) =
- if denominator x /= 1
- then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
- -- FIXME: potential integer overflow here on 32-bit platforms
- else Ok . fromIntegral . numerator $ x
-parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
-
-- | Parse job submission result.
-parseSubmitJobResult :: JSValue -> Result JobId
-parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
+parseSubmitJobResult :: JSValue -> ErrorResult JobId
+parseSubmitJobResult (JSArray [JSBool True, v]) =
+ case J.readJSON v of
+ J.Error msg -> Bad $ LuxiError msg
+ J.Ok v' -> Ok v'
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
- Bad (fromJSString x)
-parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
- show v
+ Bad . LuxiError $ fromJSString x
+parseSubmitJobResult v =
+ Bad . LuxiError $ "Unknown result from the master daemon: " ++
+ show (pp_value v)
-- | Specialized submitManyJobs call.
-submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
+submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
submitManyJobs s jobs = do
rval <- callMethod (SubmitManyJobs jobs) s
-- map each result (status, payload) pair into a nice Result ADT
return $ case rval of
Bad x -> Bad x
Ok (JSArray r) -> mapM parseSubmitJobResult r
- x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
+ x -> Bad . LuxiError $
+ "Cannot parse response from Ganeti: " ++ show x
-- | Custom queryJobs call.
-queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
+queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
queryJobsStatus s jids = do
rval <- callMethod (QueryJobs jids ["status"]) s
return $ case rval of
Bad x -> Bad x
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
J.Ok vals -> if any null vals
- then Bad "Missing job status field"
+ then Bad $
+ LuxiError "Missing job status field"
else Ok (map head vals)
- J.Error x -> Bad x
+ J.Error x -> Bad $ LuxiError x