#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import base64
import pycurl
import threading
+import copy
from ganeti import utils
from ganeti import objects
from ganeti import runtime
from ganeti import compat
from ganeti import rpc_defs
+from ganeti import pathutils
+from ganeti import vcluster
# Special module generated at build time
from ganeti import _generated_rpc
import ganeti.http.client # pylint: disable=W0611
-# Timeout for connecting to nodes (seconds)
-_RPC_CONNECT_TIMEOUT = 5
-
_RPC_CLIENT_HEADERS = [
"Content-type: %s" % http.HTTP_APP_JSON,
"Expect:",
]
-# Various time constants for the timeout table
-_TMO_URGENT = 60 # one minute
-_TMO_FAST = 5 * 60 # five minutes
-_TMO_NORMAL = 15 * 60 # 15 minutes
-_TMO_SLOW = 3600 # one hour
-_TMO_4HRS = 4 * 3600
-_TMO_1DAY = 86400
-
#: Special value to describe an offline host
_OFFLINE = object()
def _ConfigRpcCurl(curl):
- noded_cert = str(constants.NODED_CERT_FILE)
+ noded_cert = str(pathutils.NODED_CERT_FILE)
curl.setopt(pycurl.FOLLOWLOCATION, False)
curl.setopt(pycurl.CAINFO, noded_cert)
curl.setopt(pycurl.SSLCERT, noded_cert)
curl.setopt(pycurl.SSLKEYTYPE, "PEM")
curl.setopt(pycurl.SSLKEY, noded_cert)
- curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
+ curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
def RunWithRPC(fn):
"""RPC Result class.
This class holds an RPC result. It is needed since in multi-node
- calls we can't raise an exception just because one one out of many
+ calls we can't raise an exception just because one out of many
failed, and therefore we use this class to encapsulate the result.
@ivar data: the data payload, for successful results, or None
args = (msg, )
raise ec(*args) # pylint: disable=W0142
+ def Warn(self, msg, feedback_fn):
+ """If the result has failed, call the feedback_fn.
+
+ This is used to in cases were LU wants to warn the
+ user about a failure, but continue anyway.
+
+ """
+ if not self.fail_msg:
+ return
+
+ msg = "%s: %s" % (msg, self.fail_msg)
+ feedback_fn(msg)
+
def _SsconfResolver(ssconf_ips, node_list, _,
ssc=ssconf.SimpleStore,
ip = ipmap.get(node)
if ip is None:
ip = nslookup_fn(node, family=family)
- result.append((node, ip))
+ result.append((node, ip, node))
return result
"""
assert len(hosts) == len(self._addresses)
- return zip(hosts, self._addresses)
+ return zip(hosts, self._addresses, hosts)
-def _CheckConfigNode(name, node, accept_offline_node):
+def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
"""Checks if a node is online.
- @type name: string
- @param name: Node name
+ @type node_uuid_or_name: string
+ @param node_uuid_or_name: Node UUID
@type node: L{objects.Node} or None
@param node: Node object
"""
if node is None:
- # Depend on DNS for name resolution
- ip = name
- elif node.offline and not accept_offline_node:
- ip = _OFFLINE
+ # Assume that the passed parameter was actually a node name, so depend on
+ # DNS for name resolution
+ return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
else:
- ip = node.primary_ip
- return (name, ip)
+ if node.offline and not accept_offline_node:
+ ip = _OFFLINE
+ else:
+ ip = node.primary_ip
+ return (node.name, ip, node_uuid_or_name)
-def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
+def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
"""Calculate node addresses using configuration.
+ Note that strings in node_uuids are treated as node names if the UUID is not
+ found in the configuration.
+
"""
accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
assert accept_offline_node or opts is None, "Unknown option"
# Special case for single-host lookups
- if len(hosts) == 1:
- (name, ) = hosts
- return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
+ if len(node_uuids) == 1:
+ (uuid, ) = node_uuids
+ return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
else:
all_nodes = all_nodes_fn()
- return [_CheckConfigNode(name, all_nodes.get(name, None),
+ return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
accept_offline_node)
- for name in hosts]
+ for uuid in node_uuids]
class _RpcProcessor:
def __init__(self, resolver, port, lock_monitor_cb=None):
"""Initializes this class.
- @param resolver: callable accepting a list of hostnames, returning a list
- of tuples containing name and IP address (IP address can be the name or
- the special value L{_OFFLINE} to mark offline machines)
+ @param resolver: callable accepting a list of node UUIDs or hostnames,
+ returning a list of tuples containing name, IP address and original name
+ of the resolved node. IP address can be the name or the special value
+ L{_OFFLINE} to mark offline machines.
@type port: int
@param port: TCP port
@param lock_monitor_cb: Callable for registering with lock monitor
assert isinstance(body, dict)
assert len(body) == len(hosts)
assert compat.all(isinstance(v, str) for v in body.values())
- assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
+ assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
"%s != %s" % (hosts, body.keys())
- for (name, ip) in hosts:
+ for (name, ip, original_name) in hosts:
if ip is _OFFLINE:
# Node is marked as offline
- results[name] = RpcResult(node=name, offline=True, call=procedure)
+ results[original_name] = RpcResult(node=name,
+ offline=True,
+ call=procedure)
else:
- requests[name] = \
+ requests[original_name] = \
http.client.HttpClientRequest(str(ip), port,
http.HTTP_POST, str("/%s" % procedure),
headers=_RPC_CLIENT_HEADERS,
- post_data=body[name],
+ post_data=body[original_name],
read_timeout=read_timeout,
nicename="%s/%s" % (name, procedure),
curl_config_fn=_ConfigRpcCurl)
return results
- def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
+ def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
_req_process_fn=None):
"""Makes an RPC request to a number of nodes.
- @type hosts: sequence
- @param hosts: Hostnames
+ @type nodes: sequence
+ @param nodes: node UUIDs or Hostnames
@type procedure: string
@param procedure: Request path
@type body: dictionary
@param body: dictionary with request bodies per host
@type read_timeout: int or None
@param read_timeout: Read timeout for request
+ @rtype: dictionary
+ @return: a dictionary mapping host names to rpc.RpcResult objects
"""
assert read_timeout is not None, \
_req_process_fn = http.client.ProcessRequests
(results, requests) = \
- self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
+ self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
procedure, body, read_timeout)
_req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
getents = getents_fn()
- return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
+ virt_filename = vcluster.MakeVirtualPath(filename)
+
+ return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
return [(d.ToDict(), uid) for d, uid in value]
+def _AddSpindlesToLegacyNodeInfo(result, space_info):
+ """Extracts the spindle information from the space info and adds
+ it to the result dictionary.
+
+ @type result: dict of strings
+ @param result: dictionary holding the result of the legacy node info
+ @type space_info: list of dicts of strings
+ @param space_info: list, each row holding space information of one storage
+ unit
+ @rtype: None
+ @return: does not return anything, manipulates the C{result} variable
+
+ """
+ lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
+ space_info, constants.ST_LVM_PV)
+ if lvm_pv_info:
+ result["spindles_free"] = lvm_pv_info["storage_free"]
+ result["spindles_total"] = lvm_pv_info["storage_size"]
+
+
+def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info,
+ require_vg_info=True):
+ """Extracts the storage space information of the default storage type from
+ the space info and adds it to the result dictionary.
+
+ @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
+ @type require_vg_info: boolean
+ @param require_vg_info: indicates whether volume group information is
+ required or not
+
+ """
+ # Check if there is at least one row for non-spindle storage info.
+ no_defaults = (len(space_info) < 1) or \
+ (space_info[0]["type"] == constants.ST_LVM_PV and len(space_info) == 1)
+
+ default_space_info = None
+ if no_defaults:
+ logging.warning("No storage info provided for default storage type.")
+ else:
+ default_space_info = space_info[0]
+
+ if require_vg_info:
+ # if lvm storage is required, ignore the actual default and look for LVM
+ lvm_info_found = False
+ for space_entry in space_info:
+ if space_entry["type"] == constants.ST_LVM_VG:
+ default_space_info = space_entry
+ lvm_info_found = True
+ continue
+ if not lvm_info_found:
+ raise errors.OpExecError("LVM volume group info required, but not"
+ " provided.")
+
+ if default_space_info:
+ result["name"] = default_space_info["name"]
+ result["storage_free"] = default_space_info["storage_free"]
+ result["storage_size"] = default_space_info["storage_size"]
+
+
+def MakeLegacyNodeInfo(data, require_vg_info=True):
+ """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+
+ Converts the data into a single dictionary. This is fine for most use cases,
+ but some require information from more than one volume group or hypervisor.
+
+ @param require_vg_info: raise an error if the returnd vg_info
+ doesn't have any values
+
+ """
+ (bootid, space_info, (hv_info, )) = data
+
+ ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
+
+ _AddSpindlesToLegacyNodeInfo(ret, space_info)
+ _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info,
+ require_vg_info=require_vg_info)
+
+ return ret
+
+
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
"""Annotates just DRBD disks layouts.
else:
annotation_fn = _AnnotateDParamsGeneric
- new_disks = []
- for disk in disks:
- new_disks.append(annotation_fn(disk.Copy(), ld_params))
+ return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+
+
+def _GetExclusiveStorageFlag(cfg, node_uuid):
+ ni = cfg.GetNodeInfo(node_uuid)
+ if ni is None:
+ raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
+ errors.ECODE_NOENT)
+ return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
+
+
+def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
+ """Adds the exclusive storage flag to lvm units.
+
+ This function creates a copy of the storage_units lists, with the
+ es_flag being added to all lvm storage units.
+
+ @type storage_units: list of pairs (string, string)
+ @param storage_units: list of 'raw' storage units, consisting only of
+ (storage_type, storage_key)
+ @type es_flag: boolean
+ @param es_flag: exclusive storage flag
+ @rtype: list of tuples (string, string, list)
+ @return: list of storage units (storage_type, storage_key, params) with
+ the params containing the es_flag for lvm-vg storage units
+
+ """
+ result = []
+ for (storage_type, storage_key) in storage_units:
+ if storage_type == constants.ST_LVM_VG:
+ result.append((storage_type, storage_key, es_flag))
+ else:
+ result.append((storage_type, storage_key, []))
+ return result
+
+
+def GetExclusiveStorageForNodes(cfg, node_uuids):
+ """Return the exclusive storage flag for all the given nodes.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: cluster configuration
+ @type node_uuids: list or tuple
+ @param node_uuids: node UUIDs for which to read the flag
+ @rtype: dict
+ @return: mapping from node uuids to exclusive storage flags
+ @raise errors.OpPrereqError: if any given node name has no corresponding
+ node
+
+ """
+ getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
+ flags = map(getflag, node_uuids)
+ return dict(zip(node_uuids, flags))
+
+
+def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
+ """Return the lvm storage unit for all the given nodes.
+
+ Main purpose of this function is to map the exclusive storage flag, which
+ can be different for each node, to the default LVM storage unit.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: cluster configuration
+ @type storage_units: list of pairs (string, string)
+ @param storage_units: list of 'raw' storage units, e.g. pairs of
+ (storage_type, storage_key)
+ @type node_uuids: list or tuple
+ @param node_uuids: node UUIDs for which to read the flag
+ @rtype: dict
+ @return: mapping from node uuids to a list of storage units which include
+ the exclusive storage flag for lvm storage
+ @raise errors.OpPrereqError: if any given node name has no corresponding
+ node
- return new_disks
+ """
+ getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
+ storage_units, _GetExclusiveStorageFlag(cfg, n))
+ flags = map(getunit, node_uuids)
+ return dict(zip(node_uuids, flags))
#: Generic encoders
rpc_defs.ED_INST_DICT: self._InstDict,
rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
+ rpc_defs.ED_NIC_DICT: self._NicDict,
# Encoders annotating disk parameters
rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
+ rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
# Encoders with special requirements
_generated_rpc.RpcClientDnsOnly.__init__(self)
_generated_rpc.RpcClientDefault.__init__(self)
+ def _NicDict(self, nic):
+ """Convert the given nic to a dict and encapsulate netinfo
+
+ """
+ n = copy.deepcopy(nic)
+ if n.network:
+ net_uuid = self._cfg.LookupNetwork(n.network)
+ if net_uuid:
+ nobj = self._cfg.GetNetwork(net_uuid)
+ n.netinfo = objects.Network.ToDict(nobj)
+ return n.ToDict()
+
def _InstDict(self, instance, hvp=None, bep=None, osp=None):
"""Convert the given instance to a dict.
idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
if osp is not None:
idict["osparams"].update(osp)
+ idict["disks"] = self._DisksDictDP((instance.disks, instance))
for nic in idict["nics"]:
nic["nicparams"] = objects.FillDict(
cluster.nicparams[constants.PP_DEFAULT],
nic["nicparams"])
- idict["disks"] = self._DisksDictDP((instance.disks, instance))
+ network = nic.get("network", None)
+ if network:
+ net_uuid = self._cfg.LookupNetwork(network)
+ if net_uuid:
+ nobj = self._cfg.GetNetwork(net_uuid)
+ nic["netinfo"] = objects.Network.ToDict(nobj)
return idict
def _InstDictHvpBepDp(self, (instance, hvp, bep)):
for disk in AnnotateDiskParams(instance.disk_template,
disks, diskparams)]
+ def _MultiDiskDictDP(self, disks_insts):
+ """Wrapper for L{AnnotateDiskParams}.
+
+ Supports a list of (disk, instance) tuples.
+ """
+ return [disk for disk_inst in disks_insts
+ for disk in self._DisksDictDP(disk_inst)]
+
def _SingleDiskDictDP(self, (disk, instance)):
"""Wrapper for L{AnnotateDiskParams}.