If possible, replace symbolic links in place
[ganeti-local] / lib / rpc.py
index 1058dcc..5f9def7 100644 (file)
@@ -126,7 +126,7 @@ def RunWithRPC(fn):
   return wrapper
 
 
-def _Compress(data):
+def _Compress(_, data):
   """Compresses a string for transport over RPC.
 
   Small amounts of data are not compressed.
@@ -460,14 +460,14 @@ class _RpcClientBase:
     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
 
   @staticmethod
-  def _EncodeArg(encoder_fn, (argkind, value)):
+  def _EncodeArg(encoder_fn, node, (argkind, value)):
     """Encode argument.
 
     """
     if argkind is None:
       return value
     else:
-      return encoder_fn(argkind)(value)
+      return encoder_fn(argkind)(node, value)
 
   def _Call(self, cdef, node_list, args):
     """Entry point for automatically generated RPC wrappers.
@@ -489,18 +489,16 @@ class _RpcClientBase:
     if len(args) != len(argdefs):
       raise errors.ProgrammerError("Number of passed arguments doesn't match")
 
-    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
     if prep_fn is None:
-      # for a no-op prep_fn, we serialise the body once, and then we
-      # reuse it in the dictionary values
-      body = serializer.DumpJson(enc_args)
-      pnbody = dict((n, body) for n in node_list)
-    else:
-      # for a custom prep_fn, we pass the encoded arguments and the
-      # node name to the prep_fn, and we serialise its return value
-      assert callable(prep_fn)
-      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
-                    for n in node_list)
+      prep_fn = lambda _, args: args
+    assert callable(prep_fn)
+
+    # encode the arguments for each node individually, pass them and the node
+    # name to the prep_fn, and serialise its return value
+    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
+                                      zip(map(compat.snd, argdefs), args))
+    pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
+                  for n in node_list)
 
     result = self._proc(node_list, procedure, pnbody, read_timeout,
                         req_resolver_opts)
@@ -512,7 +510,7 @@ class _RpcClientBase:
       return result
 
 
-def _ObjectToDict(value):
+def _ObjectToDict(_, value):
   """Converts an object to a dictionary.
 
   @note: See L{objects}.
@@ -521,27 +519,19 @@ def _ObjectToDict(value):
   return value.ToDict()
 
 
-def _ObjectListToDict(value):
+def _ObjectListToDict(node, value):
   """Converts a list of L{objects} to dictionaries.
 
   """
-  return map(_ObjectToDict, value)
+  return map(compat.partial(_ObjectToDict, node), value)
 
 
-def _EncodeNodeToDiskDict(value):
-  """Encodes a dictionary with node name as key and disk objects as values.
-
-  """
-  return dict((name, _ObjectListToDict(disks))
-              for name, disks in value.items())
-
-
-def _PrepareFileUpload(getents_fn, filename):
+def _PrepareFileUpload(getents_fn, node, filename):
   """Loads a file and prepares it for an upload to nodes.
 
   """
   statcb = utils.FileStatHelper()
-  data = _Compress(utils.ReadFile(filename, preread=statcb))
+  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
   st = statcb.st
 
   if getents_fn is None:
@@ -555,7 +545,7 @@ def _PrepareFileUpload(getents_fn, filename):
           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
 
 
-def _PrepareFinalizeExportDisks(snap_disks):
+def _PrepareFinalizeExportDisks(_, snap_disks):
   """Encodes disks for finalizing export.
 
   """
@@ -570,22 +560,7 @@ def _PrepareFinalizeExportDisks(snap_disks):
   return flat_disks
 
 
-def _EncodeImportExportIO((ieio, ieioargs)):
-  """Encodes import/export I/O information.
-
-  """
-  if ieio == constants.IEIO_RAW_DISK:
-    assert len(ieioargs) == 1
-    return (ieio, (ieioargs[0].ToDict(), ))
-
-  if ieio == constants.IEIO_SCRIPT:
-    assert len(ieioargs) == 2
-    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
-
-  return (ieio, ieioargs)
-
-
-def _EncodeBlockdevRename(value):
+def _EncodeBlockdevRename(_, value):
   """Encodes information for renaming block devices.
 
   """
@@ -662,7 +637,7 @@ def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
   """Annotates just DRBD disks layouts.
 
   """
-  assert disk.dev_type == constants.LD_DRBD8
+  assert disk.dev_type == constants.DT_DRBD8
 
   disk.params = objects.FillDict(drbd_params, disk.params)
   (dev_data, dev_meta) = disk.children
@@ -676,32 +651,33 @@ def _AnnotateDParamsGeneric(disk, (params, )):
   """Generic disk parameter annotation routine.
 
   """
-  assert disk.dev_type != constants.LD_DRBD8
+  assert disk.dev_type != constants.DT_DRBD8
 
   disk.params = objects.FillDict(params, disk.params)
 
   return disk
 
 
-def AnnotateDiskParams(template, disks, disk_params):
+def AnnotateDiskParams(disks, disk_params):
   """Annotates the disk objects with the disk parameters.
 
-  @param template: The disk template used
   @param disks: The list of disks objects to annotate
-  @param disk_params: The disk paramaters for annotation
+  @param disk_params: The disk parameters for annotation
   @returns: A list of disk objects annotated
 
   """
-  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
+  def AnnotateDisk(disk):
+    if disk.dev_type == constants.DT_DISKLESS:
+      return disk
 
-  if template == constants.DT_DRBD8:
-    annotation_fn = _AnnotateDParamsDRBD
-  elif template == constants.DT_DISKLESS:
-    annotation_fn = lambda disk, _: disk
-  else:
-    annotation_fn = _AnnotateDParamsGeneric
+    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
 
-  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+    if disk.dev_type == constants.DT_DRBD8:
+      return _AnnotateDParamsDRBD(disk, ld_params)
+    else:
+      return _AnnotateDParamsGeneric(disk, ld_params)
+
+  return [AnnotateDisk(disk.Copy()) for disk in disks]
 
 
 def _GetExclusiveStorageFlag(cfg, node_uuid):
@@ -785,10 +761,8 @@ def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
 _ENCODERS = {
   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
-  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
   rpc_defs.ED_COMPRESS: _Compress,
   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
-  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
   }
 
@@ -825,9 +799,12 @@ class RpcRunner(_RpcClientBase,
       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
       rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
+      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
 
       # Encoders with special requirements
       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+
+      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
       })
 
     # Resolver using configuration
@@ -846,7 +823,7 @@ class RpcRunner(_RpcClientBase,
     _generated_rpc.RpcClientDnsOnly.__init__(self)
     _generated_rpc.RpcClientDefault.__init__(self)
 
-  def _NicDict(self, nic):
+  def _NicDict(self, _, nic):
     """Convert the given nic to a dict and encapsulate netinfo
 
     """
@@ -858,7 +835,7 @@ class RpcRunner(_RpcClientBase,
         n.netinfo = objects.Network.ToDict(nobj)
     return n.ToDict()
 
-  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
+  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
     """Convert the given instance to a dict.
 
     This is done via the instance's ToDict() method and additionally
@@ -888,7 +865,7 @@ class RpcRunner(_RpcClientBase,
     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
     if osp is not None:
       idict["osparams"].update(osp)
-    idict["disks"] = self._DisksDictDP((instance.disks, instance))
+    idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
     for nic in idict["nics"]:
       nic["nicparams"] = objects.FillDict(
         cluster.nicparams[constants.PP_DEFAULT],
@@ -901,42 +878,71 @@ class RpcRunner(_RpcClientBase,
           nic["netinfo"] = objects.Network.ToDict(nobj)
     return idict
 
-  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
+  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
     """Wrapper for L{_InstDict}.
 
     """
-    return self._InstDict(instance, hvp=hvp, bep=bep)
+    return self._InstDict(node, instance, hvp=hvp, bep=bep)
 
-  def _InstDictOspDp(self, (instance, osparams)):
+  def _InstDictOspDp(self, node, (instance, osparams)):
     """Wrapper for L{_InstDict}.
 
     """
-    return self._InstDict(instance, osp=osparams)
+    return self._InstDict(node, instance, osp=osparams)
 
-  def _DisksDictDP(self, (disks, instance)):
+  def _DisksDictDP(self, node, (disks, instance)):
     """Wrapper for L{AnnotateDiskParams}.
 
     """
     diskparams = self._cfg.GetInstanceDiskParams(instance)
-    return [disk.ToDict()
-            for disk in AnnotateDiskParams(instance.disk_template,
-                                           disks, diskparams)]
+    ret = []
+    for disk in AnnotateDiskParams(disks, diskparams):
+      disk_node_uuids = disk.GetNodes(instance.primary_node)
+      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
+                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
+
+      disk.UpdateDynamicDiskParams(node, node_ips)
+
+      ret.append(disk.ToDict(include_dynamic_params=True))
 
-  def _MultiDiskDictDP(self, disks_insts):
+    return ret
+
+  def _MultiDiskDictDP(self, node, disks_insts):
     """Wrapper for L{AnnotateDiskParams}.
 
     Supports a list of (disk, instance) tuples.
     """
     return [disk for disk_inst in disks_insts
-            for disk in self._DisksDictDP(disk_inst)]
+            for disk in self._DisksDictDP(node, disk_inst)]
 
-  def _SingleDiskDictDP(self, (disk, instance)):
+  def _SingleDiskDictDP(self, node, (disk, instance)):
     """Wrapper for L{AnnotateDiskParams}.
 
     """
-    (anno_disk,) = self._DisksDictDP(([disk], instance))
+    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
     return anno_disk
 
+  def _EncodeNodeToDiskDictDP(self, node, value):
+    """Encode dict of node name -> list of (disk, instance) tuples as values.
+
+    """
+    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
+                for name, disks in value.items())
+
+  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
+    """Encodes import/export I/O information.
+
+    """
+    if ieio == constants.IEIO_RAW_DISK:
+      assert len(ieioargs) == 1
+      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
+
+    if ieio == constants.IEIO_SCRIPT:
+      assert len(ieioargs) == 2
+      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
+
+    return (ieio, ieioargs)
+
 
 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
   """RPC wrappers for job queue.