constants: Move most paths to separate module
[ganeti-local] / htools / Ganeti / Luxi.hs
index d8c243c..16fb27d 100644 (file)
@@ -27,38 +27,54 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 
 module Ganeti.Luxi
   ( LuxiOp(..)
 
 module Ganeti.Luxi
   ( LuxiOp(..)
-  , QrViaLuxi(..)
-  , ResultStatus(..)
   , LuxiReq(..)
   , Client
   , JobId
   , LuxiReq(..)
   , Client
   , JobId
-  , checkRS
+  , RecvResult(..)
+  , TagObject(..)
+  , strOfOp
   , getClient
   , getClient
+  , getServer
+  , acceptClient
   , closeClient
   , closeClient
+  , closeServer
   , callMethod
   , submitManyJobs
   , queryJobsStatus
   , buildCall
   , callMethod
   , submitManyJobs
   , queryJobsStatus
   , buildCall
+  , buildResponse
   , validateCall
   , decodeCall
   , validateCall
   , decodeCall
+  , recvMsg
+  , recvMsgExt
+  , sendMsg
   ) where
 
   ) where
 
+import Control.Exception (catch)
 import Data.IORef
 import Data.Ratio (numerator, denominator)
 import Data.IORef
 import Data.Ratio (numerator, denominator)
+import qualified Data.ByteString as B
+import qualified Data.ByteString.UTF8 as UTF8
+import Data.Word (Word8)
 import Control.Monad
 import Control.Monad
+import Prelude hiding (catch)
 import Text.JSON (encodeStrict, decodeStrict)
 import qualified Text.JSON as J
 import Text.JSON.Types
 import Text.JSON (encodeStrict, decodeStrict)
 import qualified Text.JSON as J
 import Text.JSON.Types
+import System.Directory (removeFile)
+import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
+import System.IO.Error (isEOFError)
 import System.Timeout
 import qualified Network.Socket as S
 
 import System.Timeout
 import qualified Network.Socket as S
 
-import Ganeti.HTools.JSON
+import Ganeti.JSON
 import Ganeti.HTools.Types
 import Ganeti.HTools.Utils
 
 import Ganeti.Constants
 import Ganeti.Jobs (JobStatus)
 import Ganeti.OpCodes (OpCode)
 import Ganeti.HTools.Types
 import Ganeti.HTools.Utils
 
 import Ganeti.Constants
 import Ganeti.Jobs (JobStatus)
 import Ganeti.OpCodes (OpCode)
+import qualified Ganeti.Query.Language as Qlang
 import Ganeti.THH
 
 -- * Utility functions
 import Ganeti.THH
 
 -- * Utility functions
@@ -73,84 +89,94 @@ withTimeout secs descr action = do
 
 -- * Generic protocol functionality
 
 
 -- * Generic protocol functionality
 
+-- | Result of receiving a message from the socket.
+data RecvResult = RecvConnClosed    -- ^ Connection closed
+                | RecvError String  -- ^ Any other error
+                | RecvOk String     -- ^ Successfull receive
+                  deriving (Show, Read, Eq)
+
 -- | The Ganeti job type.
 type JobId = Int
 
 -- | The Ganeti job type.
 type JobId = Int
 
-$(declareSADT "QrViaLuxi"
-  [ ("QRLock", 'qrLock)
-  , ("QRInstance", 'qrInstance)
-  , ("QRNode", 'qrNode)
-  , ("QRGroup", 'qrGroup)
-  , ("QROs", 'qrOs)
+-- | Data type representing what items do the tag operations apply to.
+$(declareSADT "TagObject"
+  [ ("TagInstance", 'tagInstance)
+  , ("TagNode",     'tagNode)
+  , ("TagGroup",    'tagNodegroup)
+  , ("TagCluster",  'tagCluster)
   ])
   ])
-$(makeJSONInstance ''QrViaLuxi)
+$(makeJSONInstance ''TagObject)
 
 -- | Currently supported Luxi operations and JSON serialization.
 $(genLuxiOp "LuxiOp"
 
 -- | Currently supported Luxi operations and JSON serialization.
 $(genLuxiOp "LuxiOp"
-  [(luxiReqQuery,
-    [ ("what",    [t| QrViaLuxi |], [| id |])
-    , ("fields",  [t| [String]  |], [| id |])
-    , ("qfilter", [t| ()        |], [| const JSNull |])
+  [ (luxiReqQuery,
+    [ ("what",    [t| Qlang.ItemType |])
+    , ("fields",  [t| [String]  |])
+    , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
+    ])
+  , (luxiReqQueryFields,
+    [ ("what",    [t| Qlang.ItemType |])
+    , ("fields",  [t| [String]  |])
     ])
   , (luxiReqQueryNodes,
     ])
   , (luxiReqQueryNodes,
-     [ ("names",  [t| [String] |], [| id |])
-     , ("fields", [t| [String] |], [| id |])
-     , ("lock",   [t| Bool     |], [| id |])
+     [ ("names",  [t| [String] |])
+     , ("fields", [t| [String] |])
+     , ("lock",   [t| Bool     |])
      ])
   , (luxiReqQueryGroups,
      ])
   , (luxiReqQueryGroups,
-     [ ("names",  [t| [String] |], [| id |])
-     , ("fields", [t| [String] |], [| id |])
-     , ("lock",   [t| Bool     |], [| id |])
+     [ ("names",  [t| [String] |])
+     , ("fields", [t| [String] |])
+     , ("lock",   [t| Bool     |])
      ])
   , (luxiReqQueryInstances,
      ])
   , (luxiReqQueryInstances,
-     [ ("names",  [t| [String] |], [| id |])
-     , ("fields", [t| [String] |], [| id |])
-     , ("lock",   [t| Bool     |], [| id |])
+     [ ("names",  [t| [String] |])
+     , ("fields", [t| [String] |])
+     , ("lock",   [t| Bool     |])
      ])
   , (luxiReqQueryJobs,
      ])
   , (luxiReqQueryJobs,
-     [ ("ids",    [t| [Int]    |], [| id |])
-     , ("fields", [t| [String] |], [| id |])
+     [ ("ids",    [t| [Int]    |])
+     , ("fields", [t| [String] |])
      ])
   , (luxiReqQueryExports,
      ])
   , (luxiReqQueryExports,
-     [ ("nodes", [t| [String] |], [| id |])
-     , ("lock",  [t| Bool     |], [| id |])
+     [ ("nodes", [t| [String] |])
+     , ("lock",  [t| Bool     |])
      ])
   , (luxiReqQueryConfigValues,
      ])
   , (luxiReqQueryConfigValues,
-     [ ("fields", [t| [String] |], [| id |]) ]
+     [ ("fields", [t| [String] |]) ]
     )
   , (luxiReqQueryClusterInfo, [])
   , (luxiReqQueryTags,
     )
   , (luxiReqQueryClusterInfo, [])
   , (luxiReqQueryTags,
-     [ ("kind", [t| String |], [| id |])
-     , ("name", [t| String |], [| id |])
+     [ ("kind", [t| TagObject |])
+     , ("name", [t| String |])
      ])
   , (luxiReqSubmitJob,
      ])
   , (luxiReqSubmitJob,
-     [ ("job", [t| [OpCode] |], [| id |]) ]
+     [ ("job", [t| [OpCode] |]) ]
     )
   , (luxiReqSubmitManyJobs,
     )
   , (luxiReqSubmitManyJobs,
-     [ ("ops", [t| [[OpCode]] |], [| id |]) ]
+     [ ("ops", [t| [[OpCode]] |]) ]
     )
   , (luxiReqWaitForJobChange,
     )
   , (luxiReqWaitForJobChange,
-     [ ("job",      [t| Int     |], [| id |])
-     , ("fields",   [t| [String]|], [| id |])
-     , ("prev_job", [t| JSValue |], [| id |])
-     , ("prev_log", [t| JSValue |], [| id |])
-     , ("tmout",    [t| Int     |], [| id |])
+     [ ("job",      [t| Int     |])
+     , ("fields",   [t| [String]|])
+     , ("prev_job", [t| JSValue |])
+     , ("prev_log", [t| JSValue |])
+     , ("tmout",    [t| Int     |])
      ])
   , (luxiReqArchiveJob,
      ])
   , (luxiReqArchiveJob,
-     [ ("job", [t| Int |], [| id |]) ]
+     [ ("job", [t| Int |]) ]
     )
   , (luxiReqAutoArchiveJobs,
     )
   , (luxiReqAutoArchiveJobs,
-     [ ("age",   [t| Int |], [| id |])
-     , ("tmout", [t| Int |], [| id |])
+     [ ("age",   [t| Int |])
+     , ("tmout", [t| Int |])
      ])
   , (luxiReqCancelJob,
      ])
   , (luxiReqCancelJob,
-     [ ("job", [t| Int |], [| id |]) ]
+     [ ("job", [t| Int |]) ]
     )
   , (luxiReqSetDrainFlag,
     )
   , (luxiReqSetDrainFlag,
-     [ ("flag", [t| Bool |], [| id |]) ]
+     [ ("flag", [t| Bool |]) ]
     )
   , (luxiReqSetWatcherPause,
     )
   , (luxiReqSetWatcherPause,
-     [ ("duration", [t| Double |], [| id |]) ]
+     [ ("duration", [t| Double |]) ]
     )
   ])
 
     )
   ])
 
@@ -159,30 +185,16 @@ $(makeJSONInstance ''LuxiReq)
 -- | The serialisation of LuxiOps into strings in messages.
 $(genStrOfOp ''LuxiOp "strOfOp")
 
 -- | The serialisation of LuxiOps into strings in messages.
 $(genStrOfOp ''LuxiOp "strOfOp")
 
-$(declareIADT "ResultStatus"
-  [ ("RSNormal", 'rsNormal)
-  , ("RSUnknown", 'rsUnknown)
-  , ("RSNoData", 'rsNodata)
-  , ("RSUnavailable", 'rsUnavail)
-  , ("RSOffline", 'rsOffline)
-  ])
-
-$(makeJSONInstance ''ResultStatus)
-
 -- | Type holding the initial (unparsed) Luxi call.
 data LuxiCall = LuxiCall LuxiReq JSValue
 
 -- | Type holding the initial (unparsed) Luxi call.
 data LuxiCall = LuxiCall LuxiReq JSValue
 
--- | Check that ResultStatus is success or fail with descriptive message.
-checkRS :: (Monad m) => ResultStatus -> a -> m a
-checkRS RSNormal val    = return val
-checkRS RSUnknown _     = fail "Unknown field"
-checkRS RSNoData _      = fail "No data for a field"
-checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
-checkRS RSOffline _     = fail "Ganeti reports resource as offline"
-
 -- | The end-of-message separator.
 -- | The end-of-message separator.
-eOM :: Char
-eOM = '\3'
+eOM :: Word8
+eOM = 3
+
+-- | The end-of-message encoded as a ByteString.
+bEOM :: B.ByteString
+bEOM = B.singleton eOM
 
 -- | Valid keys in the requests and responses.
 data MsgKeys = Method
 
 -- | Valid keys in the requests and responses.
 data MsgKeys = Method
@@ -194,8 +206,8 @@ data MsgKeys = Method
 $(genStrOfKey ''MsgKeys "strOfKey")
 
 -- | Luxi client encapsulation.
 $(genStrOfKey ''MsgKeys "strOfKey")
 
 -- | Luxi client encapsulation.
-data Client = Client { socket :: S.Socket   -- ^ The socket of the client
-                     , rbuf :: IORef String -- ^ Already received buffer
+data Client = Client { socket :: Handle           -- ^ The socket of the client
+                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
                      }
 
 -- | Connects to the master daemon and returns a luxi Client.
                      }
 
 -- | Connects to the master daemon and returns a luxi Client.
@@ -204,56 +216,106 @@ getClient path = do
   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
   withTimeout connTimeout "creating luxi connection" $
               S.connect s (S.SockAddrUnix path)
   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
   withTimeout connTimeout "creating luxi connection" $
               S.connect s (S.SockAddrUnix path)
-  rf <- newIORef ""
-  return Client { socket=s, rbuf=rf}
+  rf <- newIORef B.empty
+  h <- S.socketToHandle s ReadWriteMode
+  return Client { socket=h, rbuf=rf }
+
+-- | Creates and returns a server endpoint.
+getServer :: FilePath -> IO S.Socket
+getServer path = do
+  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
+  S.bindSocket s (S.SockAddrUnix path)
+  S.listen s 5 -- 5 is the max backlog
+  return s
+
+-- | Closes a server endpoint.
+-- FIXME: this should be encapsulated into a nicer type.
+closeServer :: FilePath -> S.Socket -> IO ()
+closeServer path sock = do
+  S.sClose sock
+  removeFile path
+
+-- | Accepts a client
+acceptClient :: S.Socket -> IO Client
+acceptClient s = do
+  -- second return is the address of the client, which we ignore here
+  (client_socket, _) <- S.accept s
+  new_buffer <- newIORef B.empty
+  handle <- S.socketToHandle client_socket ReadWriteMode
+  return Client { socket=handle, rbuf=new_buffer }
 
 -- | Closes the client socket.
 closeClient :: Client -> IO ()
 
 -- | Closes the client socket.
 closeClient :: Client -> IO ()
-closeClient = S.sClose . socket
+closeClient = hClose . socket
 
 -- | Sends a message over a luxi transport.
 sendMsg :: Client -> String -> IO ()
 
 -- | Sends a message over a luxi transport.
 sendMsg :: Client -> String -> IO ()
-sendMsg s buf =
-  let _send obuf = do
-        sbytes <- withTimeout queryTimeout
-                  "sending luxi message" $
-                  S.send (socket s) obuf
-        unless (sbytes == length obuf) $ _send (drop sbytes obuf)
-  in _send (buf ++ [eOM])
+sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
+  let encoded = UTF8.fromString buf
+      handle = socket s
+  B.hPut handle encoded
+  B.hPut handle bEOM
+  hFlush handle
+
+-- | Given a current buffer and the handle, it will read from the
+-- network until we get a full message, and it will return that
+-- message and the leftover buffer contents.
+recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
+recvUpdate handle obuf = do
+  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
+            _ <- hWaitForInput handle (-1)
+            B.hGetNonBlocking handle 4096
+  let (msg, remaining) = B.break (eOM ==) nbuf
+      newbuf = B.append obuf msg
+  if B.null remaining
+    then recvUpdate handle newbuf
+    else return (newbuf, B.tail remaining)
 
 -- | Waits for a message over a luxi transport.
 recvMsg :: Client -> IO String
 recvMsg s = do
 
 -- | Waits for a message over a luxi transport.
 recvMsg :: Client -> IO String
 recvMsg s = do
-  let _recv obuf = do
-              nbuf <- withTimeout queryTimeout "reading luxi response" $
-                      S.recv (socket s) 4096
-              let (msg, remaining) = break (eOM ==) nbuf
-              if null remaining
-                then _recv (obuf ++ msg)
-                else return (obuf ++ msg, tail remaining)
   cbuf <- readIORef $ rbuf s
   cbuf <- readIORef $ rbuf s
-  let (imsg, ibuf) = break (eOM ==) cbuf
+  let (imsg, ibuf) = B.break (eOM ==) cbuf
   (msg, nbuf) <-
   (msg, nbuf) <-
-    if null ibuf      -- if old buffer didn't contain a full message
-      then _recv cbuf   -- then we read from network
-      else return (imsg, tail ibuf) -- else we return data from our buffer
+    if B.null ibuf      -- if old buffer didn't contain a full message
+      then recvUpdate (socket s) cbuf   -- then we read from network
+      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
   writeIORef (rbuf s) nbuf
   writeIORef (rbuf s) nbuf
-  return msg
+  return $ UTF8.toString msg
+
+-- | Extended wrapper over recvMsg.
+recvMsgExt :: Client -> IO RecvResult
+recvMsgExt s =
+  catch (liftM RecvOk (recvMsg s)) $ \e ->
+    if isEOFError e
+      then return RecvConnClosed
+      else return $ RecvError (show e)
 
 -- | Serialize a request to String.
 buildCall :: LuxiOp  -- ^ The method
           -> String  -- ^ The serialized form
 buildCall lo =
 
 -- | Serialize a request to String.
 buildCall :: LuxiOp  -- ^ The method
           -> String  -- ^ The serialized form
 buildCall lo =
-  let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
-           , (strOfKey Args, opToArgs lo::JSValue)
+  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
+           , (strOfKey Args, opToArgs lo)
            ]
       jo = toJSObject ja
   in encodeStrict jo
 
            ]
       jo = toJSObject ja
   in encodeStrict jo
 
+-- | Serialize the response to String.
+buildResponse :: Bool    -- ^ Success
+              -> JSValue -- ^ The arguments
+              -> String  -- ^ The serialized form
+buildResponse success args =
+  let ja = [ (strOfKey Success, JSBool success)
+           , (strOfKey Result, args)]
+      jo = toJSObject ja
+  in encodeStrict jo
+
 -- | Check that luxi request contains the required keys and parse it.
 validateCall :: String -> Result LuxiCall
 validateCall s = do
 -- | Check that luxi request contains the required keys and parse it.
 validateCall :: String -> Result LuxiCall
 validateCall s = do
-  arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue)
+  arr <- fromJResult "parsing top-level luxi message" $
+         decodeStrict s::Result (JSObject JSValue)
   let aobj = fromJSObject arr
   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
   args <- fromObj aobj (strOfKey Args)
   let aobj = fromJSObject arr
   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
   args <- fromObj aobj (strOfKey Args)
@@ -280,12 +342,17 @@ decodeCall (LuxiCall call args) =
     ReqQueryGroups -> do
               (names, fields, locking) <- fromJVal args
               return $ QueryGroups names fields locking
     ReqQueryGroups -> do
               (names, fields, locking) <- fromJVal args
               return $ QueryGroups names fields locking
-    ReqQueryClusterInfo -> do
+    ReqQueryClusterInfo ->
               return QueryClusterInfo
     ReqQuery -> do
               return QueryClusterInfo
     ReqQuery -> do
-              (what, fields, _) <-
-                fromJVal args::Result (QrViaLuxi, [String], JSValue)
-              return $ Query what fields ()
+              (what, fields, qfilter) <- fromJVal args
+              return $ Query what fields qfilter
+    ReqQueryFields -> do
+              (what, fields) <- fromJVal args
+              fields' <- case fields of
+                           JSNull -> return []
+                           _ -> fromJVal fields
+              return $ QueryFields what fields'
     ReqSubmitJob -> do
               [ops1] <- fromJVal args
               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
     ReqSubmitJob -> do
               [ops1] <- fromJVal args
               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
@@ -341,6 +408,8 @@ decodeCall (LuxiCall call args) =
 -- call was successful.
 validateResult :: String -> Result JSValue
 validateResult s = do
 -- call was successful.
 validateResult :: String -> Result JSValue
 validateResult s = do
+  when (UTF8.replacement_char `elem` s) $
+       fail "Failed to decode UTF-8, detected replacement char after decoding"
   oarr <- fromJResult "Parsing LUXI response"
           (decodeStrict s)::Result (JSObject JSValue)
   let arr = J.fromJSObject oarr
   oarr <- fromJResult "Parsing LUXI response"
           (decodeStrict s)::Result (JSObject JSValue)
   let arr = J.fromJSObject oarr