If possible, replace symbolic links in place
[ganeti-local] / lib / rpc.py
index e01e2a3..5f9def7 100644 (file)
@@ -126,7 +126,7 @@ def RunWithRPC(fn):
   return wrapper
 
 
-def _Compress(data):
+def _Compress(_, data):
   """Compresses a string for transport over RPC.
 
   Small amounts of data are not compressed.
@@ -460,14 +460,14 @@ class _RpcClientBase:
     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
 
   @staticmethod
-  def _EncodeArg(encoder_fn, (argkind, value)):
+  def _EncodeArg(encoder_fn, node, (argkind, value)):
     """Encode argument.
 
     """
     if argkind is None:
       return value
     else:
-      return encoder_fn(argkind)(value)
+      return encoder_fn(argkind)(node, value)
 
   def _Call(self, cdef, node_list, args):
     """Entry point for automatically generated RPC wrappers.
@@ -489,18 +489,16 @@ class _RpcClientBase:
     if len(args) != len(argdefs):
       raise errors.ProgrammerError("Number of passed arguments doesn't match")
 
-    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
     if prep_fn is None:
-      # for a no-op prep_fn, we serialise the body once, and then we
-      # reuse it in the dictionary values
-      body = serializer.DumpJson(enc_args)
-      pnbody = dict((n, body) for n in node_list)
-    else:
-      # for a custom prep_fn, we pass the encoded arguments and the
-      # node name to the prep_fn, and we serialise its return value
-      assert callable(prep_fn)
-      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
-                    for n in node_list)
+      prep_fn = lambda _, args: args
+    assert callable(prep_fn)
+
+    # encode the arguments for each node individually, pass them and the node
+    # name to the prep_fn, and serialise its return value
+    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
+                                      zip(map(compat.snd, argdefs), args))
+    pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
+                  for n in node_list)
 
     result = self._proc(node_list, procedure, pnbody, read_timeout,
                         req_resolver_opts)
@@ -512,7 +510,7 @@ class _RpcClientBase:
       return result
 
 
-def _ObjectToDict(value):
+def _ObjectToDict(_, value):
   """Converts an object to a dictionary.
 
   @note: See L{objects}.
@@ -521,27 +519,19 @@ def _ObjectToDict(value):
   return value.ToDict()
 
 
-def _ObjectListToDict(value):
+def _ObjectListToDict(node, value):
   """Converts a list of L{objects} to dictionaries.
 
   """
-  return map(_ObjectToDict, value)
+  return map(compat.partial(_ObjectToDict, node), value)
 
 
-def _EncodeNodeToDiskDict(value):
-  """Encodes a dictionary with node name as key and disk objects as values.
-
-  """
-  return dict((name, _ObjectListToDict(disks))
-              for name, disks in value.items())
-
-
-def _PrepareFileUpload(getents_fn, filename):
+def _PrepareFileUpload(getents_fn, node, filename):
   """Loads a file and prepares it for an upload to nodes.
 
   """
   statcb = utils.FileStatHelper()
-  data = _Compress(utils.ReadFile(filename, preread=statcb))
+  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
   st = statcb.st
 
   if getents_fn is None:
@@ -555,7 +545,7 @@ def _PrepareFileUpload(getents_fn, filename):
           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
 
 
-def _PrepareFinalizeExportDisks(snap_disks):
+def _PrepareFinalizeExportDisks(_, snap_disks):
   """Encodes disks for finalizing export.
 
   """
@@ -570,22 +560,7 @@ def _PrepareFinalizeExportDisks(snap_disks):
   return flat_disks
 
 
-def _EncodeImportExportIO((ieio, ieioargs)):
-  """Encodes import/export I/O information.
-
-  """
-  if ieio == constants.IEIO_RAW_DISK:
-    assert len(ieioargs) == 1
-    return (ieio, (ieioargs[0].ToDict(), ))
-
-  if ieio == constants.IEIO_SCRIPT:
-    assert len(ieioargs) == 2
-    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
-
-  return (ieio, ieioargs)
-
-
-def _EncodeBlockdevRename(value):
+def _EncodeBlockdevRename(_, value):
   """Encodes information for renaming block devices.
 
   """
@@ -610,17 +585,15 @@ def _AddSpindlesToLegacyNodeInfo(result, space_info):
   if lvm_pv_info:
     result["spindles_free"] = lvm_pv_info["storage_free"]
     result["spindles_total"] = lvm_pv_info["storage_size"]
+  else:
+    raise errors.OpExecError("No spindle storage information available.")
 
 
-def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info,
-                                           require_vg_info=True):
+def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info):
   """Extracts the storage space information of the default storage type from
   the space info and adds it to the result dictionary.
 
   @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
-  @type require_vg_info: boolean
-  @param require_vg_info: indicates whether volume group information is
-    required or not
 
   """
   # Check if there is at least one row for non-spindle storage info.
@@ -633,41 +606,29 @@ def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info,
   else:
     default_space_info = space_info[0]
 
-  if require_vg_info:
-    # if lvm storage is required, ignore the actual default and look for LVM
-    lvm_info_found = False
-    for space_entry in space_info:
-      if space_entry["type"] == constants.ST_LVM_VG:
-        default_space_info = space_entry
-        lvm_info_found = True
-        continue
-    if not lvm_info_found:
-      raise errors.OpExecError("LVM volume group info required, but not"
-                               " provided.")
-
   if default_space_info:
     result["name"] = default_space_info["name"]
     result["storage_free"] = default_space_info["storage_free"]
     result["storage_size"] = default_space_info["storage_size"]
 
 
-def MakeLegacyNodeInfo(data, require_vg_info=True):
+def MakeLegacyNodeInfo(data, require_spindles=False):
   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
 
   Converts the data into a single dictionary. This is fine for most use cases,
   but some require information from more than one volume group or hypervisor.
 
-  @param require_vg_info: raise an error if the returnd vg_info
-      doesn't have any values
+  @param require_spindles: add spindle storage information to the legacy node
+      info
 
   """
   (bootid, space_info, (hv_info, )) = data
 
   ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
 
-  _AddSpindlesToLegacyNodeInfo(ret, space_info)
-  _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info,
-                                         require_vg_info=require_vg_info)
+  if require_spindles:
+    _AddSpindlesToLegacyNodeInfo(ret, space_info)
+  _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info)
 
   return ret
 
@@ -676,7 +637,7 @@ def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
   """Annotates just DRBD disks layouts.
 
   """
-  assert disk.dev_type == constants.LD_DRBD8
+  assert disk.dev_type == constants.DT_DRBD8
 
   disk.params = objects.FillDict(drbd_params, disk.params)
   (dev_data, dev_meta) = disk.children
@@ -690,35 +651,36 @@ def _AnnotateDParamsGeneric(disk, (params, )):
   """Generic disk parameter annotation routine.
 
   """
-  assert disk.dev_type != constants.LD_DRBD8
+  assert disk.dev_type != constants.DT_DRBD8
 
   disk.params = objects.FillDict(params, disk.params)
 
   return disk
 
 
-def AnnotateDiskParams(template, disks, disk_params):
+def AnnotateDiskParams(disks, disk_params):
   """Annotates the disk objects with the disk parameters.
 
-  @param template: The disk template used
   @param disks: The list of disks objects to annotate
-  @param disk_params: The disk paramaters for annotation
+  @param disk_params: The disk parameters for annotation
   @returns: A list of disk objects annotated
 
   """
-  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
+  def AnnotateDisk(disk):
+    if disk.dev_type == constants.DT_DISKLESS:
+      return disk
 
-  if template == constants.DT_DRBD8:
-    annotation_fn = _AnnotateDParamsDRBD
-  elif template == constants.DT_DISKLESS:
-    annotation_fn = lambda disk, _: disk
-  else:
-    annotation_fn = _AnnotateDParamsGeneric
+    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
+
+    if disk.dev_type == constants.DT_DRBD8:
+      return _AnnotateDParamsDRBD(disk, ld_params)
+    else:
+      return _AnnotateDParamsGeneric(disk, ld_params)
 
-  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+  return [AnnotateDisk(disk.Copy()) for disk in disks]
 
 
-def _GetESFlag(cfg, node_uuid):
+def _GetExclusiveStorageFlag(cfg, node_uuid):
   ni = cfg.GetNodeInfo(node_uuid)
   if ni is None:
     raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
@@ -726,6 +688,31 @@ def _GetESFlag(cfg, node_uuid):
   return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
 
 
+def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
+  """Adds the exclusive storage flag to lvm units.
+
+  This function creates a copy of the storage_units lists, with the
+  es_flag being added to all lvm storage units.
+
+  @type storage_units: list of pairs (string, string)
+  @param storage_units: list of 'raw' storage units, consisting only of
+    (storage_type, storage_key)
+  @type es_flag: boolean
+  @param es_flag: exclusive storage flag
+  @rtype: list of tuples (string, string, list)
+  @return: list of storage units (storage_type, storage_key, params) with
+    the params containing the es_flag for lvm-vg storage units
+
+  """
+  result = []
+  for (storage_type, storage_key) in storage_units:
+    if storage_type in [constants.ST_LVM_VG, constants.ST_LVM_PV]:
+      result.append((storage_type, storage_key, [es_flag]))
+    else:
+      result.append((storage_type, storage_key, []))
+  return result
+
+
 def GetExclusiveStorageForNodes(cfg, node_uuids):
   """Return the exclusive storage flag for all the given nodes.
 
@@ -734,24 +721,48 @@ def GetExclusiveStorageForNodes(cfg, node_uuids):
   @type node_uuids: list or tuple
   @param node_uuids: node UUIDs for which to read the flag
   @rtype: dict
-  @return: mapping from node names to exclusive storage flags
+  @return: mapping from node uuids to exclusive storage flags
   @raise errors.OpPrereqError: if any given node name has no corresponding
   node
 
   """
-  getflag = lambda n: _GetESFlag(cfg, n)
+  getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
   flags = map(getflag, node_uuids)
   return dict(zip(node_uuids, flags))
 
 
+def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
+  """Return the lvm storage unit for all the given nodes.
+
+  Main purpose of this function is to map the exclusive storage flag, which
+  can be different for each node, to the default LVM storage unit.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: cluster configuration
+  @type storage_units: list of pairs (string, string)
+  @param storage_units: list of 'raw' storage units, e.g. pairs of
+    (storage_type, storage_key)
+  @type node_uuids: list or tuple
+  @param node_uuids: node UUIDs for which to read the flag
+  @rtype: dict
+  @return: mapping from node uuids to a list of storage units which include
+    the exclusive storage flag for lvm storage
+  @raise errors.OpPrereqError: if any given node name has no corresponding
+  node
+
+  """
+  getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
+      storage_units, _GetExclusiveStorageFlag(cfg, n))
+  flags = map(getunit, node_uuids)
+  return dict(zip(node_uuids, flags))
+
+
 #: Generic encoders
 _ENCODERS = {
   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
-  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
   rpc_defs.ED_COMPRESS: _Compress,
   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
-  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
   }
 
@@ -786,10 +797,14 @@ class RpcRunner(_RpcClientBase,
 
       # Encoders annotating disk parameters
       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
+      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
+      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
 
       # Encoders with special requirements
       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+
+      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
       })
 
     # Resolver using configuration
@@ -808,7 +823,7 @@ class RpcRunner(_RpcClientBase,
     _generated_rpc.RpcClientDnsOnly.__init__(self)
     _generated_rpc.RpcClientDefault.__init__(self)
 
-  def _NicDict(self, nic):
+  def _NicDict(self, _, nic):
     """Convert the given nic to a dict and encapsulate netinfo
 
     """
@@ -820,7 +835,7 @@ class RpcRunner(_RpcClientBase,
         n.netinfo = objects.Network.ToDict(nobj)
     return n.ToDict()
 
-  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
+  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
     """Convert the given instance to a dict.
 
     This is done via the instance's ToDict() method and additionally
@@ -850,7 +865,7 @@ class RpcRunner(_RpcClientBase,
     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
     if osp is not None:
       idict["osparams"].update(osp)
-    idict["disks"] = self._DisksDictDP((instance.disks, instance))
+    idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
     for nic in idict["nics"]:
       nic["nicparams"] = objects.FillDict(
         cluster.nicparams[constants.PP_DEFAULT],
@@ -863,34 +878,71 @@ class RpcRunner(_RpcClientBase,
           nic["netinfo"] = objects.Network.ToDict(nobj)
     return idict
 
-  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
+  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
     """Wrapper for L{_InstDict}.
 
     """
-    return self._InstDict(instance, hvp=hvp, bep=bep)
+    return self._InstDict(node, instance, hvp=hvp, bep=bep)
 
-  def _InstDictOspDp(self, (instance, osparams)):
+  def _InstDictOspDp(self, node, (instance, osparams)):
     """Wrapper for L{_InstDict}.
 
     """
-    return self._InstDict(instance, osp=osparams)
+    return self._InstDict(node, instance, osp=osparams)
 
-  def _DisksDictDP(self, (disks, instance)):
+  def _DisksDictDP(self, node, (disks, instance)):
     """Wrapper for L{AnnotateDiskParams}.
 
     """
     diskparams = self._cfg.GetInstanceDiskParams(instance)
-    return [disk.ToDict()
-            for disk in AnnotateDiskParams(instance.disk_template,
-                                           disks, diskparams)]
+    ret = []
+    for disk in AnnotateDiskParams(disks, diskparams):
+      disk_node_uuids = disk.GetNodes(instance.primary_node)
+      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
+                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
+
+      disk.UpdateDynamicDiskParams(node, node_ips)
 
-  def _SingleDiskDictDP(self, (disk, instance)):
+      ret.append(disk.ToDict(include_dynamic_params=True))
+
+    return ret
+
+  def _MultiDiskDictDP(self, node, disks_insts):
     """Wrapper for L{AnnotateDiskParams}.
 
+    Supports a list of (disk, instance) tuples.
     """
-    (anno_disk,) = self._DisksDictDP(([disk], instance))
+    return [disk for disk_inst in disks_insts
+            for disk in self._DisksDictDP(node, disk_inst)]
+
+  def _SingleDiskDictDP(self, node, (disk, instance)):
+    """Wrapper for L{AnnotateDiskParams}.
+
+    """
+    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
     return anno_disk
 
+  def _EncodeNodeToDiskDictDP(self, node, value):
+    """Encode dict of node name -> list of (disk, instance) tuples as values.
+
+    """
+    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
+                for name, disks in value.items())
+
+  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
+    """Encodes import/export I/O information.
+
+    """
+    if ieio == constants.IEIO_RAW_DISK:
+      assert len(ieioargs) == 1
+      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
+
+    if ieio == constants.IEIO_SCRIPT:
+      assert len(ieioargs) == 2
+      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
+
+    return (ieio, ieioargs)
+
 
 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
   """RPC wrappers for job queue.