LUClusterSetParams: When ipolicy is updated warn for new violations
[ganeti-local] / lib / rpc.py
index 1269845..437df98 100644 (file)
@@ -30,7 +30,6 @@
 # if they need to start using instance attributes
 # R0904: Too many public methods
 
-import os
 import logging
 import zlib
 import base64
@@ -363,7 +362,7 @@ class _RpcProcessor:
       else:
         requests[name] = \
           http.client.HttpClientRequest(str(ip), port,
-                                        http.HTTP_PUT, str("/%s" % procedure),
+                                        http.HTTP_POST, str("/%s" % procedure),
                                         headers=_RPC_CLIENT_HEADERS,
                                         post_data=body[name],
                                         read_timeout=read_timeout,
@@ -397,7 +396,7 @@ class _RpcProcessor:
     return results
 
   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
-               _req_process_fn=http.client.ProcessRequests):
+               _req_process_fn=None):
     """Makes an RPC request to a number of nodes.
 
     @type hosts: sequence
@@ -413,6 +412,9 @@ class _RpcProcessor:
     assert read_timeout is not None, \
       "Missing RPC read timeout for procedure '%s'" % procedure
 
+    if _req_process_fn is None:
+      _req_process_fn = http.client.ProcessRequests
+
     (results, requests) = \
       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
                             procedure, body, read_timeout)
@@ -425,13 +427,15 @@ class _RpcProcessor:
 
 
 class _RpcClientBase:
-  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
+  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
+               _req_process_fn=None):
     """Initializes this class.
 
     """
-    self._proc = _RpcProcessor(resolver,
-                               netutils.GetDaemonPort(constants.NODED),
-                               lock_monitor_cb=lock_monitor_cb)
+    proc = _RpcProcessor(resolver,
+                         netutils.GetDaemonPort(constants.NODED),
+                         lock_monitor_cb=lock_monitor_cb)
+    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
 
   @staticmethod
@@ -461,6 +465,9 @@ class _RpcClientBase:
     else:
       req_resolver_opts = resolver_opts
 
+    if len(args) != len(argdefs):
+      raise errors.ProgrammerError("Number of passed arguments doesn't match")
+
     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
     if prep_fn is None:
       # for a no-op prep_fn, we serialise the body once, and then we
@@ -470,7 +477,7 @@ class _RpcClientBase:
     else:
       # for a custom prep_fn, we pass the encoded arguments and the
       # node name to the prep_fn, and we serialise its return value
-      assert(callable(prep_fn))
+      assert callable(prep_fn)
       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
                     for n in node_list)
 
@@ -508,13 +515,19 @@ def _EncodeNodeToDiskDict(value):
               for name, disks in value.items())
 
 
-def _PrepareFileUpload(filename):
+def _PrepareFileUpload(getents_fn, filename):
   """Loads a file and prepares it for an upload to nodes.
 
   """
-  data = _Compress(utils.ReadFile(filename))
-  st = os.stat(filename)
-  getents = runtime.GetEnts()
+  statcb = utils.FileStatHelper()
+  data = _Compress(utils.ReadFile(filename, preread=statcb))
+  st = statcb.st
+
+  if getents_fn is None:
+    getents_fn = runtime.GetEnts
+
+  getents = getents_fn()
+
   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
 
@@ -561,7 +574,6 @@ _ENCODERS = {
   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
-  rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
   rpc_defs.ED_COMPRESS: _Compress,
   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
@@ -576,34 +588,40 @@ class RpcRunner(_RpcClientBase,
   """RPC runner class.
 
   """
-  def __init__(self, context):
+  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
     """Initialized the RPC runner.
 
-    @type context: C{masterd.GanetiContext}
-    @param context: Ganeti context
+    @type cfg: L{config.ConfigWriter}
+    @param cfg: Configuration
+    @type lock_monitor_cb: callable
+    @param lock_monitor_cb: Lock monitor callback
 
     """
-    self._cfg = context.cfg
+    self._cfg = cfg
 
     encoders = _ENCODERS.copy()
 
-    # Add encoders requiring configuration object
     encoders.update({
+      # Encoders requiring configuration object
       rpc_defs.ED_INST_DICT: self._InstDict,
       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
+
+      # Encoders with special requirements
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
       })
 
     # Resolver using configuration
-    resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
-                              self._cfg.GetAllNodesInfo)
+    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
+                              cfg.GetAllNodesInfo)
 
     # Pylint doesn't recognize multiple inheritance properly, see
     # <http://www.logilab.org/ticket/36586> and
     # <http://www.logilab.org/ticket/35642>
     # pylint: disable=W0233
     _RpcClientBase.__init__(self, resolver, encoders.get,
-                            lock_monitor_cb=context.glm.AddToLockMonitor)
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
     _generated_rpc.RpcClientConfig.__init__(self)
     _generated_rpc.RpcClientBootstrap.__init__(self)
     _generated_rpc.RpcClientDefault.__init__(self)
@@ -692,7 +710,8 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
   """RPC wrappers for L{config}.
 
   """
-  def __init__(self, context, address_list):
+  def __init__(self, context, address_list, _req_process_fn=None,
+               _getents=None):
     """Initializes this class.
 
     """
@@ -707,6 +726,13 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
       # Caller provided an address list
       resolver = _StaticResolver(address_list)
 
-    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
-                            lock_monitor_cb=lock_monitor_cb)
+    encoders = _ENCODERS.copy()
+
+    encoders.update({
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+      })
+
+    _RpcClientBase.__init__(self, resolver, encoders.get,
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
     _generated_rpc.RpcClientConfig.__init__(self)