ipolicy: Make the keys of the dict consistent
[ganeti-local] / lib / rpc.py
index 256dd70..437df98 100644 (file)
@@ -30,7 +30,6 @@
 # if they need to start using instance attributes
 # R0904: Too many public methods
 
-import os
 import logging
 import zlib
 import base64
@@ -240,7 +239,7 @@ class RpcResult(object):
     raise ec(*args) # pylint: disable=W0142
 
 
-def _SsconfResolver(node_list,
+def _SsconfResolver(node_list, _,
                     ssc=ssconf.SimpleStore,
                     nslookup_fn=netutils.Hostname.GetIP):
   """Return addresses for given node names.
@@ -277,7 +276,7 @@ class _StaticResolver:
     """
     self._addresses = addresses
 
-  def __call__(self, hosts):
+  def __call__(self, hosts, _):
     """Returns static addresses for hosts.
 
     """
@@ -285,7 +284,7 @@ class _StaticResolver:
     return zip(hosts, self._addresses)
 
 
-def _CheckConfigNode(name, node):
+def _CheckConfigNode(name, node, accept_offline_node):
   """Checks if a node is online.
 
   @type name: string
@@ -297,24 +296,29 @@ def _CheckConfigNode(name, node):
   if node is None:
     # Depend on DNS for name resolution
     ip = name
-  elif node.offline:
+  elif node.offline and not accept_offline_node:
     ip = _OFFLINE
   else:
     ip = node.primary_ip
   return (name, ip)
 
 
-def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
+def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
   """Calculate node addresses using configuration.
 
   """
+  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
+
+  assert accept_offline_node or opts is None, "Unknown option"
+
   # Special case for single-host lookups
   if len(hosts) == 1:
     (name, ) = hosts
-    return [_CheckConfigNode(name, single_node_fn(name))]
+    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
   else:
     all_nodes = all_nodes_fn()
-    return [_CheckConfigNode(name, all_nodes.get(name, None))
+    return [_CheckConfigNode(name, all_nodes.get(name, None),
+                             accept_offline_node)
             for name in hosts]
 
 
@@ -358,7 +362,7 @@ class _RpcProcessor:
       else:
         requests[name] = \
           http.client.HttpClientRequest(str(ip), port,
-                                        http.HTTP_PUT, str("/%s" % procedure),
+                                        http.HTTP_POST, str("/%s" % procedure),
                                         headers=_RPC_CLIENT_HEADERS,
                                         post_data=body[name],
                                         read_timeout=read_timeout,
@@ -391,8 +395,8 @@ class _RpcProcessor:
 
     return results
 
-  def __call__(self, hosts, procedure, body, read_timeout,
-               _req_process_fn=http.client.ProcessRequests):
+  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
+               _req_process_fn=None):
     """Makes an RPC request to a number of nodes.
 
     @type hosts: sequence
@@ -408,9 +412,12 @@ class _RpcProcessor:
     assert read_timeout is not None, \
       "Missing RPC read timeout for procedure '%s'" % procedure
 
+    if _req_process_fn is None:
+      _req_process_fn = http.client.ProcessRequests
+
     (results, requests) = \
-      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
-                            body, read_timeout)
+      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
+                            procedure, body, read_timeout)
 
     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
 
@@ -420,13 +427,15 @@ class _RpcProcessor:
 
 
 class _RpcClientBase:
-  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
+  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
+               _req_process_fn=None):
     """Initializes this class.
 
     """
-    self._proc = _RpcProcessor(resolver,
-                               netutils.GetDaemonPort(constants.NODED),
-                               lock_monitor_cb=lock_monitor_cb)
+    proc = _RpcProcessor(resolver,
+                         netutils.GetDaemonPort(constants.NODED),
+                         lock_monitor_cb=lock_monitor_cb)
+    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
 
   @staticmethod
@@ -443,13 +452,22 @@ class _RpcClientBase:
     """Entry point for automatically generated RPC wrappers.
 
     """
-    (procedure, _, timeout, argdefs, prep_fn, postproc_fn, _) = cdef
+    (procedure, _, resolver_opts, timeout, argdefs,
+     prep_fn, postproc_fn, _) = cdef
 
     if callable(timeout):
       read_timeout = timeout(args)
     else:
       read_timeout = timeout
 
+    if callable(resolver_opts):
+      req_resolver_opts = resolver_opts(args)
+    else:
+      req_resolver_opts = resolver_opts
+
+    if len(args) != len(argdefs):
+      raise errors.ProgrammerError("Number of passed arguments doesn't match")
+
     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
     if prep_fn is None:
       # for a no-op prep_fn, we serialise the body once, and then we
@@ -459,11 +477,12 @@ class _RpcClientBase:
     else:
       # for a custom prep_fn, we pass the encoded arguments and the
       # node name to the prep_fn, and we serialise its return value
-      assert(callable(prep_fn))
+      assert callable(prep_fn)
       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
                     for n in node_list)
 
-    result = self._proc(node_list, procedure, pnbody, read_timeout)
+    result = self._proc(node_list, procedure, pnbody, read_timeout,
+                        req_resolver_opts)
 
     if postproc_fn:
       return dict(map(lambda (key, value): (key, postproc_fn(value)),
@@ -496,13 +515,19 @@ def _EncodeNodeToDiskDict(value):
               for name, disks in value.items())
 
 
-def _PrepareFileUpload(filename):
+def _PrepareFileUpload(getents_fn, filename):
   """Loads a file and prepares it for an upload to nodes.
 
   """
-  data = _Compress(utils.ReadFile(filename))
-  st = os.stat(filename)
-  getents = runtime.GetEnts()
+  statcb = utils.FileStatHelper()
+  data = _Compress(utils.ReadFile(filename, preread=statcb))
+  st = statcb.st
+
+  if getents_fn is None:
+    getents_fn = runtime.GetEnts
+
+  getents = getents_fn()
+
   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
 
@@ -549,7 +574,6 @@ _ENCODERS = {
   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
-  rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
   rpc_defs.ED_COMPRESS: _Compress,
   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
@@ -564,34 +588,40 @@ class RpcRunner(_RpcClientBase,
   """RPC runner class.
 
   """
-  def __init__(self, context):
+  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
     """Initialized the RPC runner.
 
-    @type context: C{masterd.GanetiContext}
-    @param context: Ganeti context
+    @type cfg: L{config.ConfigWriter}
+    @param cfg: Configuration
+    @type lock_monitor_cb: callable
+    @param lock_monitor_cb: Lock monitor callback
 
     """
-    self._cfg = context.cfg
+    self._cfg = cfg
 
     encoders = _ENCODERS.copy()
 
-    # Add encoders requiring configuration object
     encoders.update({
+      # Encoders requiring configuration object
       rpc_defs.ED_INST_DICT: self._InstDict,
       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
+
+      # Encoders with special requirements
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
       })
 
     # Resolver using configuration
-    resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
-                              self._cfg.GetAllNodesInfo)
+    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
+                              cfg.GetAllNodesInfo)
 
     # Pylint doesn't recognize multiple inheritance properly, see
     # <http://www.logilab.org/ticket/36586> and
     # <http://www.logilab.org/ticket/35642>
     # pylint: disable=W0233
     _RpcClientBase.__init__(self, resolver, encoders.get,
-                            lock_monitor_cb=context.glm.AddToLockMonitor)
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
     _generated_rpc.RpcClientConfig.__init__(self)
     _generated_rpc.RpcClientBootstrap.__init__(self)
     _generated_rpc.RpcClientDefault.__init__(self)
@@ -680,7 +710,8 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
   """RPC wrappers for L{config}.
 
   """
-  def __init__(self, context, address_list):
+  def __init__(self, context, address_list, _req_process_fn=None,
+               _getents=None):
     """Initializes this class.
 
     """
@@ -695,6 +726,13 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
       # Caller provided an address list
       resolver = _StaticResolver(address_list)
 
-    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
-                            lock_monitor_cb=lock_monitor_cb)
+    encoders = _ENCODERS.copy()
+
+    encoders.update({
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+      })
+
+    _RpcClientBase.__init__(self, resolver, encoders.get,
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
     _generated_rpc.RpcClientConfig.__init__(self)