Display node name instead of UUID in error message
[ganeti-local] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Common functions used by multiple logical units."""
23
24 import copy
25 import os
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
38
39
40 # States of instance
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47   constants.ADMINST_OFFLINE,
48   ]))
49
50
51 def _ExpandItemName(expand_fn, name, kind):
52   """Expand an item name.
53
54   @param expand_fn: the function to use for expansion
55   @param name: requested item name
56   @param kind: text description ('Node' or 'Instance')
57   @return: the result of the expand_fn, if successful
58   @raise errors.OpPrereqError: if the item is not found
59
60   """
61   (uuid, full_name) = expand_fn(name)
62   if uuid is None or full_name is None:
63     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64                                errors.ECODE_NOENT)
65   return (uuid, full_name)
66
67
68 def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
69   """Wrapper over L{_ExpandItemName} for instance."""
70   (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71   if expected_uuid is not None and uuid != expected_uuid:
72     raise errors.OpPrereqError(
73       "The instances UUID '%s' does not match the expected UUID '%s' for"
74       " instance '%s'. Maybe the instance changed since you submitted this"
75       " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
76   return (uuid, full_name)
77
78
79 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
80   """Expand a short node name into the node UUID and full name.
81
82   @type cfg: L{config.ConfigWriter}
83   @param cfg: The cluster configuration
84   @type expected_uuid: string
85   @param expected_uuid: expected UUID for the node (or None if there is no
86         expectation). If it does not match, a L{errors.OpPrereqError} is
87         raised.
88   @type name: string
89   @param name: the short node name
90
91   """
92   (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
93   if expected_uuid is not None and uuid != expected_uuid:
94     raise errors.OpPrereqError(
95       "The nodes UUID '%s' does not match the expected UUID '%s' for node"
96       " '%s'. Maybe the node changed since you submitted this job." %
97       (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
98   return (uuid, full_name)
99
100
101 def ShareAll():
102   """Returns a dict declaring all lock levels shared.
103
104   """
105   return dict.fromkeys(locking.LEVELS, 1)
106
107
108 def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
109   """Checks if the instances in a node group are still correct.
110
111   @type cfg: L{config.ConfigWriter}
112   @param cfg: The cluster configuration
113   @type group_uuid: string
114   @param group_uuid: Node group UUID
115   @type owned_instance_names: set or frozenset
116   @param owned_instance_names: List of currently owned instances
117
118   """
119   wanted_instances = frozenset(cfg.GetInstanceNames(
120                                  cfg.GetNodeGroupInstances(group_uuid)))
121   if owned_instance_names != wanted_instances:
122     raise errors.OpPrereqError("Instances in node group '%s' changed since"
123                                " locks were acquired, wanted '%s', have '%s';"
124                                " retry the operation" %
125                                (group_uuid,
126                                 utils.CommaJoin(wanted_instances),
127                                 utils.CommaJoin(owned_instance_names)),
128                                errors.ECODE_STATE)
129
130   return wanted_instances
131
132
133 def GetWantedNodes(lu, short_node_names):
134   """Returns list of checked and expanded node names.
135
136   @type lu: L{LogicalUnit}
137   @param lu: the logical unit on whose behalf we execute
138   @type short_node_names: list
139   @param short_node_names: list of node names or None for all nodes
140   @rtype: tuple of lists
141   @return: tupe with (list of node UUIDs, list of node names)
142   @raise errors.ProgrammerError: if the nodes parameter is wrong type
143
144   """
145   if short_node_names:
146     node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
147                   for name in short_node_names]
148   else:
149     node_uuids = lu.cfg.GetNodeList()
150
151   return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
152
153
154 def GetWantedInstances(lu, short_inst_names):
155   """Returns list of checked and expanded instance names.
156
157   @type lu: L{LogicalUnit}
158   @param lu: the logical unit on whose behalf we execute
159   @type short_inst_names: list
160   @param short_inst_names: list of instance names or None for all instances
161   @rtype: tuple of lists
162   @return: tuple of (instance UUIDs, instance names)
163   @raise errors.OpPrereqError: if the instances parameter is wrong type
164   @raise errors.OpPrereqError: if any of the passed instances is not found
165
166   """
167   if short_inst_names:
168     inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
169                   for name in short_inst_names]
170   else:
171     inst_uuids = lu.cfg.GetInstanceList()
172   return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
173
174
175 def RunPostHook(lu, node_name):
176   """Runs the post-hook for an opcode on a single node.
177
178   """
179   hm = lu.proc.BuildHooksManager(lu)
180   try:
181     hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
182   except Exception, err: # pylint: disable=W0703
183     lu.LogWarning("Errors occurred running hooks on %s: %s",
184                   node_name, err)
185
186
187 def RedistributeAncillaryFiles(lu):
188   """Distribute additional files which are part of the cluster configuration.
189
190   ConfigWriter takes care of distributing the config and ssconf files, but
191   there are more files which should be distributed to all nodes. This function
192   makes sure those are copied.
193
194   """
195   # Gather target nodes
196   cluster = lu.cfg.GetClusterInfo()
197   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
198
199   online_node_uuids = lu.cfg.GetOnlineNodeList()
200   online_node_uuid_set = frozenset(online_node_uuids)
201   vm_node_uuids = list(online_node_uuid_set.intersection(
202                          lu.cfg.GetVmCapableNodeList()))
203
204   # Never distribute to master node
205   for node_uuids in [online_node_uuids, vm_node_uuids]:
206     if master_info.uuid in node_uuids:
207       node_uuids.remove(master_info.uuid)
208
209   # Gather file lists
210   (files_all, _, files_mc, files_vm) = \
211     ComputeAncillaryFiles(cluster, True)
212
213   # Never re-distribute configuration file from here
214   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
215               pathutils.CLUSTER_CONF_FILE in files_vm)
216   assert not files_mc, "Master candidates not handled in this function"
217
218   filemap = [
219     (online_node_uuids, files_all),
220     (vm_node_uuids, files_vm),
221     ]
222
223   # Upload the files
224   for (node_uuids, files) in filemap:
225     for fname in files:
226       UploadHelper(lu, node_uuids, fname)
227
228
229 def ComputeAncillaryFiles(cluster, redist):
230   """Compute files external to Ganeti which need to be consistent.
231
232   @type redist: boolean
233   @param redist: Whether to include files which need to be redistributed
234
235   """
236   # Compute files for all nodes
237   files_all = set([
238     pathutils.SSH_KNOWN_HOSTS_FILE,
239     pathutils.CONFD_HMAC_KEY,
240     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
241     pathutils.SPICE_CERT_FILE,
242     pathutils.SPICE_CACERT_FILE,
243     pathutils.RAPI_USERS_FILE,
244     ])
245
246   if redist:
247     # we need to ship at least the RAPI certificate
248     files_all.add(pathutils.RAPI_CERT_FILE)
249   else:
250     files_all.update(pathutils.ALL_CERT_FILES)
251     files_all.update(ssconf.SimpleStore().GetFileList())
252
253   if cluster.modify_etc_hosts:
254     files_all.add(pathutils.ETC_HOSTS)
255
256   if cluster.use_external_mip_script:
257     files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
258
259   # Files which are optional, these must:
260   # - be present in one other category as well
261   # - either exist or not exist on all nodes of that category (mc, vm all)
262   files_opt = set([
263     pathutils.RAPI_USERS_FILE,
264     ])
265
266   # Files which should only be on master candidates
267   files_mc = set()
268
269   if not redist:
270     files_mc.add(pathutils.CLUSTER_CONF_FILE)
271
272   # File storage
273   if (not redist and (cluster.IsFileStorageEnabled() or
274                         cluster.IsSharedFileStorageEnabled())):
275     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
276     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
277
278   # Files which should only be on VM-capable nodes
279   files_vm = set(
280     filename
281     for hv_name in cluster.enabled_hypervisors
282     for filename in
283     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
284
285   files_opt |= set(
286     filename
287     for hv_name in cluster.enabled_hypervisors
288     for filename in
289     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
290
291   # Filenames in each category must be unique
292   all_files_set = files_all | files_mc | files_vm
293   assert (len(all_files_set) ==
294           sum(map(len, [files_all, files_mc, files_vm]))), \
295     "Found file listed in more than one file list"
296
297   # Optional files must be present in one other category
298   assert all_files_set.issuperset(files_opt), \
299     "Optional file not in a different required list"
300
301   # This one file should never ever be re-distributed via RPC
302   assert not (redist and
303               pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
304
305   return (files_all, files_opt, files_mc, files_vm)
306
307
308 def UploadHelper(lu, node_uuids, fname):
309   """Helper for uploading a file and showing warnings.
310
311   """
312   if os.path.exists(fname):
313     result = lu.rpc.call_upload_file(node_uuids, fname)
314     for to_node_uuids, to_result in result.items():
315       msg = to_result.fail_msg
316       if msg:
317         msg = ("Copy of file %s to node %s failed: %s" %
318                (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
319         lu.LogWarning(msg)
320
321
322 def MergeAndVerifyHvState(op_input, obj_input):
323   """Combines the hv state from an opcode with the one of the object
324
325   @param op_input: The input dict from the opcode
326   @param obj_input: The input dict from the objects
327   @return: The verified and updated dict
328
329   """
330   if op_input:
331     invalid_hvs = set(op_input) - constants.HYPER_TYPES
332     if invalid_hvs:
333       raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
334                                  " %s" % utils.CommaJoin(invalid_hvs),
335                                  errors.ECODE_INVAL)
336     if obj_input is None:
337       obj_input = {}
338     type_check = constants.HVSTS_PARAMETER_TYPES
339     return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
340
341   return None
342
343
344 def MergeAndVerifyDiskState(op_input, obj_input):
345   """Combines the disk state from an opcode with the one of the object
346
347   @param op_input: The input dict from the opcode
348   @param obj_input: The input dict from the objects
349   @return: The verified and updated dict
350   """
351   if op_input:
352     invalid_dst = set(op_input) - constants.DS_VALID_TYPES
353     if invalid_dst:
354       raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
355                                  utils.CommaJoin(invalid_dst),
356                                  errors.ECODE_INVAL)
357     type_check = constants.DSS_PARAMETER_TYPES
358     if obj_input is None:
359       obj_input = {}
360     return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
361                                               type_check))
362                 for key, value in op_input.items())
363
364   return None
365
366
367 def CheckOSParams(lu, required, node_uuids, osname, osparams):
368   """OS parameters validation.
369
370   @type lu: L{LogicalUnit}
371   @param lu: the logical unit for which we check
372   @type required: boolean
373   @param required: whether the validation should fail if the OS is not
374       found
375   @type node_uuids: list
376   @param node_uuids: the list of nodes on which we should check
377   @type osname: string
378   @param osname: the name of the hypervisor we should use
379   @type osparams: dict
380   @param osparams: the parameters which we need to check
381   @raise errors.OpPrereqError: if the parameters are not valid
382
383   """
384   node_uuids = _FilterVmNodes(lu, node_uuids)
385   result = lu.rpc.call_os_validate(node_uuids, required, osname,
386                                    [constants.OS_VALIDATE_PARAMETERS],
387                                    osparams)
388   for node_uuid, nres in result.items():
389     # we don't check for offline cases since this should be run only
390     # against the master node and/or an instance's nodes
391     nres.Raise("OS Parameters validation failed on node %s" %
392                lu.cfg.GetNodeName(node_uuid))
393     if not nres.payload:
394       lu.LogInfo("OS %s not found on node %s, validation skipped",
395                  osname, lu.cfg.GetNodeName(node_uuid))
396
397
398 def CheckHVParams(lu, node_uuids, hvname, hvparams):
399   """Hypervisor parameter validation.
400
401   This function abstract the hypervisor parameter validation to be
402   used in both instance create and instance modify.
403
404   @type lu: L{LogicalUnit}
405   @param lu: the logical unit for which we check
406   @type node_uuids: list
407   @param node_uuids: the list of nodes on which we should check
408   @type hvname: string
409   @param hvname: the name of the hypervisor we should use
410   @type hvparams: dict
411   @param hvparams: the parameters which we need to check
412   @raise errors.OpPrereqError: if the parameters are not valid
413
414   """
415   node_uuids = _FilterVmNodes(lu, node_uuids)
416
417   cluster = lu.cfg.GetClusterInfo()
418   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
419
420   hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
421   for node_uuid in node_uuids:
422     info = hvinfo[node_uuid]
423     if info.offline:
424       continue
425     info.Raise("Hypervisor parameter validation failed on node %s" %
426                lu.cfg.GetNodeName(node_uuid))
427
428
429 def AdjustCandidatePool(lu, exceptions):
430   """Adjust the candidate pool after node operations.
431
432   """
433   mod_list = lu.cfg.MaintainCandidatePool(exceptions)
434   if mod_list:
435     lu.LogInfo("Promoted nodes to master candidate role: %s",
436                utils.CommaJoin(node.name for node in mod_list))
437     for node in mod_list:
438       lu.context.ReaddNode(node)
439   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
440   if mc_now > mc_max:
441     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
442                (mc_now, mc_max))
443
444
445 def CheckNodePVs(nresult, exclusive_storage):
446   """Check node PVs.
447
448   """
449   pvlist_dict = nresult.get(constants.NV_PVLIST, None)
450   if pvlist_dict is None:
451     return (["Can't get PV list from node"], None)
452   pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
453   errlist = []
454   # check that ':' is not present in PV names, since it's a
455   # special character for lvcreate (denotes the range of PEs to
456   # use on the PV)
457   for pv in pvlist:
458     if ":" in pv.name:
459       errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
460                      (pv.name, pv.vg_name))
461   es_pvinfo = None
462   if exclusive_storage:
463     (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
464     errlist.extend(errmsgs)
465     shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
466     if shared_pvs:
467       for (pvname, lvlist) in shared_pvs:
468         # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
469         errlist.append("PV %s is shared among unrelated LVs (%s)" %
470                        (pvname, utils.CommaJoin(lvlist)))
471   return (errlist, es_pvinfo)
472
473
474 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
475   """Computes if value is in the desired range.
476
477   @param name: name of the parameter for which we perform the check
478   @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
479       not just 'disk')
480   @param ispecs: dictionary containing min and max values
481   @param value: actual value that we want to use
482   @return: None or an error string
483
484   """
485   if value in [None, constants.VALUE_AUTO]:
486     return None
487   max_v = ispecs[constants.ISPECS_MAX].get(name, value)
488   min_v = ispecs[constants.ISPECS_MIN].get(name, value)
489   if value > max_v or min_v > value:
490     if qualifier:
491       fqn = "%s/%s" % (name, qualifier)
492     else:
493       fqn = name
494     return ("%s value %s is not in range [%s, %s]" %
495             (fqn, value, min_v, max_v))
496   return None
497
498
499 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
500                                 nic_count, disk_sizes, spindle_use,
501                                 disk_template,
502                                 _compute_fn=_ComputeMinMaxSpec):
503   """Verifies ipolicy against provided specs.
504
505   @type ipolicy: dict
506   @param ipolicy: The ipolicy
507   @type mem_size: int
508   @param mem_size: The memory size
509   @type cpu_count: int
510   @param cpu_count: Used cpu cores
511   @type disk_count: int
512   @param disk_count: Number of disks used
513   @type nic_count: int
514   @param nic_count: Number of nics used
515   @type disk_sizes: list of ints
516   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
517   @type spindle_use: int
518   @param spindle_use: The number of spindles this instance uses
519   @type disk_template: string
520   @param disk_template: The disk template of the instance
521   @param _compute_fn: The compute function (unittest only)
522   @return: A list of violations, or an empty list of no violations are found
523
524   """
525   assert disk_count == len(disk_sizes)
526
527   test_settings = [
528     (constants.ISPEC_MEM_SIZE, "", mem_size),
529     (constants.ISPEC_CPU_COUNT, "", cpu_count),
530     (constants.ISPEC_NIC_COUNT, "", nic_count),
531     (constants.ISPEC_SPINDLE_USE, "", spindle_use),
532     ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
533          for idx, d in enumerate(disk_sizes)]
534   if disk_template != constants.DT_DISKLESS:
535     # This check doesn't make sense for diskless instances
536     test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
537   ret = []
538   allowed_dts = ipolicy[constants.IPOLICY_DTS]
539   if disk_template not in allowed_dts:
540     ret.append("Disk template %s is not allowed (allowed templates: %s)" %
541                (disk_template, utils.CommaJoin(allowed_dts)))
542
543   min_errs = None
544   for minmax in ipolicy[constants.ISPECS_MINMAX]:
545     errs = filter(None,
546                   (_compute_fn(name, qualifier, minmax, value)
547                    for (name, qualifier, value) in test_settings))
548     if min_errs is None or len(errs) < len(min_errs):
549       min_errs = errs
550   assert min_errs is not None
551   return ret + min_errs
552
553
554 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
555                                     _compute_fn=ComputeIPolicySpecViolation):
556   """Compute if instance meets the specs of ipolicy.
557
558   @type ipolicy: dict
559   @param ipolicy: The ipolicy to verify against
560   @type instance: L{objects.Instance}
561   @param instance: The instance to verify
562   @type cfg: L{config.ConfigWriter}
563   @param cfg: Cluster configuration
564   @param _compute_fn: The function to verify ipolicy (unittest only)
565   @see: L{ComputeIPolicySpecViolation}
566
567   """
568   ret = []
569   be_full = cfg.GetClusterInfo().FillBE(instance)
570   mem_size = be_full[constants.BE_MAXMEM]
571   cpu_count = be_full[constants.BE_VCPUS]
572   es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
573   if any(es_flags.values()):
574     # With exclusive storage use the actual spindles
575     try:
576       spindle_use = sum([disk.spindles for disk in instance.disks])
577     except TypeError:
578       ret.append("Number of spindles not configured for disks of instance %s"
579                  " while exclusive storage is enabled, try running gnt-cluster"
580                  " repair-disk-sizes" % instance.name)
581       # _ComputeMinMaxSpec ignores 'None's
582       spindle_use = None
583   else:
584     spindle_use = be_full[constants.BE_SPINDLE_USE]
585   disk_count = len(instance.disks)
586   disk_sizes = [disk.size for disk in instance.disks]
587   nic_count = len(instance.nics)
588   disk_template = instance.disk_template
589
590   return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
591                            disk_sizes, spindle_use, disk_template)
592
593
594 def _ComputeViolatingInstances(ipolicy, instances, cfg):
595   """Computes a set of instances who violates given ipolicy.
596
597   @param ipolicy: The ipolicy to verify
598   @type instances: L{objects.Instance}
599   @param instances: List of instances to verify
600   @type cfg: L{config.ConfigWriter}
601   @param cfg: Cluster configuration
602   @return: A frozenset of instance names violating the ipolicy
603
604   """
605   return frozenset([inst.name for inst in instances
606                     if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
607
608
609 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
610   """Computes a set of any instances that would violate the new ipolicy.
611
612   @param old_ipolicy: The current (still in-place) ipolicy
613   @param new_ipolicy: The new (to become) ipolicy
614   @param instances: List of instances to verify
615   @type cfg: L{config.ConfigWriter}
616   @param cfg: Cluster configuration
617   @return: A list of instances which violates the new ipolicy but
618       did not before
619
620   """
621   return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
622           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
623
624
625 def GetUpdatedParams(old_params, update_dict,
626                       use_default=True, use_none=False):
627   """Return the new version of a parameter dictionary.
628
629   @type old_params: dict
630   @param old_params: old parameters
631   @type update_dict: dict
632   @param update_dict: dict containing new parameter values, or
633       constants.VALUE_DEFAULT to reset the parameter to its default
634       value
635   @param use_default: boolean
636   @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
637       values as 'to be deleted' values
638   @param use_none: boolean
639   @type use_none: whether to recognise C{None} values as 'to be
640       deleted' values
641   @rtype: dict
642   @return: the new parameter dictionary
643
644   """
645   params_copy = copy.deepcopy(old_params)
646   for key, val in update_dict.iteritems():
647     if ((use_default and val == constants.VALUE_DEFAULT) or
648           (use_none and val is None)):
649       try:
650         del params_copy[key]
651       except KeyError:
652         pass
653     else:
654       params_copy[key] = val
655   return params_copy
656
657
658 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
659   """Return the new version of an instance policy.
660
661   @param group_policy: whether this policy applies to a group and thus
662     we should support removal of policy entries
663
664   """
665   ipolicy = copy.deepcopy(old_ipolicy)
666   for key, value in new_ipolicy.items():
667     if key not in constants.IPOLICY_ALL_KEYS:
668       raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
669                                  errors.ECODE_INVAL)
670     if (not value or value == [constants.VALUE_DEFAULT] or
671             value == constants.VALUE_DEFAULT):
672       if group_policy:
673         if key in ipolicy:
674           del ipolicy[key]
675       else:
676         raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
677                                    " on the cluster'" % key,
678                                    errors.ECODE_INVAL)
679     else:
680       if key in constants.IPOLICY_PARAMETERS:
681         # FIXME: we assume all such values are float
682         try:
683           ipolicy[key] = float(value)
684         except (TypeError, ValueError), err:
685           raise errors.OpPrereqError("Invalid value for attribute"
686                                      " '%s': '%s', error: %s" %
687                                      (key, value, err), errors.ECODE_INVAL)
688       elif key == constants.ISPECS_MINMAX:
689         for minmax in value:
690           for k in minmax.keys():
691             utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
692         ipolicy[key] = value
693       elif key == constants.ISPECS_STD:
694         if group_policy:
695           msg = "%s cannot appear in group instance specs" % key
696           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
697         ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
698                                         use_none=False, use_default=False)
699         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
700       else:
701         # FIXME: we assume all others are lists; this should be redone
702         # in a nicer way
703         ipolicy[key] = list(value)
704   try:
705     objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
706   except errors.ConfigurationError, err:
707     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
708                                errors.ECODE_INVAL)
709   return ipolicy
710
711
712 def AnnotateDiskParams(instance, devs, cfg):
713   """Little helper wrapper to the rpc annotation method.
714
715   @param instance: The instance object
716   @type devs: List of L{objects.Disk}
717   @param devs: The root devices (not any of its children!)
718   @param cfg: The config object
719   @returns The annotated disk copies
720   @see L{rpc.AnnotateDiskParams}
721
722   """
723   return rpc.AnnotateDiskParams(instance.disk_template, devs,
724                                 cfg.GetInstanceDiskParams(instance))
725
726
727 def SupportsOob(cfg, node):
728   """Tells if node supports OOB.
729
730   @type cfg: L{config.ConfigWriter}
731   @param cfg: The cluster configuration
732   @type node: L{objects.Node}
733   @param node: The node
734   @return: The OOB script if supported or an empty string otherwise
735
736   """
737   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
738
739
740 def _UpdateAndVerifySubDict(base, updates, type_check):
741   """Updates and verifies a dict with sub dicts of the same type.
742
743   @param base: The dict with the old data
744   @param updates: The dict with the new data
745   @param type_check: Dict suitable to ForceDictType to verify correct types
746   @returns: A new dict with updated and verified values
747
748   """
749   def fn(old, value):
750     new = GetUpdatedParams(old, value)
751     utils.ForceDictType(new, type_check)
752     return new
753
754   ret = copy.deepcopy(base)
755   ret.update(dict((key, fn(base.get(key, {}), value))
756                   for key, value in updates.items()))
757   return ret
758
759
760 def _FilterVmNodes(lu, node_uuids):
761   """Filters out non-vm_capable nodes from a list.
762
763   @type lu: L{LogicalUnit}
764   @param lu: the logical unit for which we check
765   @type node_uuids: list
766   @param node_uuids: the list of nodes on which we should check
767   @rtype: list
768   @return: the list of vm-capable nodes
769
770   """
771   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
772   return [uuid for uuid in node_uuids if uuid not in vm_nodes]
773
774
775 def GetDefaultIAllocator(cfg, ialloc):
776   """Decides on which iallocator to use.
777
778   @type cfg: L{config.ConfigWriter}
779   @param cfg: Cluster configuration object
780   @type ialloc: string or None
781   @param ialloc: Iallocator specified in opcode
782   @rtype: string
783   @return: Iallocator name
784
785   """
786   if not ialloc:
787     # Use default iallocator
788     ialloc = cfg.GetDefaultIAllocator()
789
790   if not ialloc:
791     raise errors.OpPrereqError("No iallocator was specified, neither in the"
792                                " opcode nor as a cluster-wide default",
793                                errors.ECODE_INVAL)
794
795   return ialloc
796
797
798 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
799                              cur_group_uuid):
800   """Checks if node groups for locked instances are still correct.
801
802   @type cfg: L{config.ConfigWriter}
803   @param cfg: Cluster configuration
804   @type instances: dict; string as key, L{objects.Instance} as value
805   @param instances: Dictionary, instance UUID as key, instance object as value
806   @type owned_groups: iterable of string
807   @param owned_groups: List of owned groups
808   @type owned_node_uuids: iterable of string
809   @param owned_node_uuids: List of owned nodes
810   @type cur_group_uuid: string or None
811   @param cur_group_uuid: Optional group UUID to check against instance's groups
812
813   """
814   for (uuid, inst) in instances.items():
815     assert owned_node_uuids.issuperset(inst.all_nodes), \
816       "Instance %s's nodes changed while we kept the lock" % inst.name
817
818     inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
819
820     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
821       "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
822
823
824 def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
825   """Checks if the owned node groups are still correct for an instance.
826
827   @type cfg: L{config.ConfigWriter}
828   @param cfg: The cluster configuration
829   @type inst_uuid: string
830   @param inst_uuid: Instance UUID
831   @type owned_groups: set or frozenset
832   @param owned_groups: List of currently owned node groups
833   @type primary_only: boolean
834   @param primary_only: Whether to check node groups for only the primary node
835
836   """
837   inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
838
839   if not owned_groups.issuperset(inst_groups):
840     raise errors.OpPrereqError("Instance %s's node groups changed since"
841                                " locks were acquired, current groups are"
842                                " are '%s', owning groups '%s'; retry the"
843                                " operation" %
844                                (cfg.GetInstanceName(inst_uuid),
845                                 utils.CommaJoin(inst_groups),
846                                 utils.CommaJoin(owned_groups)),
847                                errors.ECODE_STATE)
848
849   return inst_groups
850
851
852 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
853   """Unpacks the result of change-group and node-evacuate iallocator requests.
854
855   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
856   L{constants.IALLOCATOR_MODE_CHG_GROUP}.
857
858   @type lu: L{LogicalUnit}
859   @param lu: Logical unit instance
860   @type alloc_result: tuple/list
861   @param alloc_result: Result from iallocator
862   @type early_release: bool
863   @param early_release: Whether to release locks early if possible
864   @type use_nodes: bool
865   @param use_nodes: Whether to display node names instead of groups
866
867   """
868   (moved, failed, jobs) = alloc_result
869
870   if failed:
871     failreason = utils.CommaJoin("%s (%s)" % (name, reason)
872                                  for (name, reason) in failed)
873     lu.LogWarning("Unable to evacuate instances %s", failreason)
874     raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
875
876   if moved:
877     lu.LogInfo("Instances to be moved: %s",
878                utils.CommaJoin(
879                  "%s (to %s)" %
880                  (name, _NodeEvacDest(use_nodes, group, node_names))
881                  for (name, group, node_names) in moved))
882
883   return [map(compat.partial(_SetOpEarlyRelease, early_release),
884               map(opcodes.OpCode.LoadOpCode, ops))
885           for ops in jobs]
886
887
888 def _NodeEvacDest(use_nodes, group, node_names):
889   """Returns group or nodes depending on caller's choice.
890
891   """
892   if use_nodes:
893     return utils.CommaJoin(node_names)
894   else:
895     return group
896
897
898 def _SetOpEarlyRelease(early_release, op):
899   """Sets C{early_release} flag on opcodes if available.
900
901   """
902   try:
903     op.early_release = early_release
904   except AttributeError:
905     assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
906
907   return op
908
909
910 def MapInstanceLvsToNodes(instances):
911   """Creates a map from (node, volume) to instance name.
912
913   @type instances: list of L{objects.Instance}
914   @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
915           object as value
916
917   """
918   return dict(((node_uuid, vol), inst)
919               for inst in instances
920               for (node_uuid, vols) in inst.MapLVsByNode().items()
921               for vol in vols)
922
923
924 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
925   """Make sure that none of the given paramters is global.
926
927   If a global parameter is found, an L{errors.OpPrereqError} exception is
928   raised. This is used to avoid setting global parameters for individual nodes.
929
930   @type params: dictionary
931   @param params: Parameters to check
932   @type glob_pars: dictionary
933   @param glob_pars: Forbidden parameters
934   @type kind: string
935   @param kind: Kind of parameters (e.g. "node")
936   @type bad_levels: string
937   @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
938       "instance")
939   @type good_levels: strings
940   @param good_levels: Level(s) at which the parameters are allowed (e.g.
941       "cluster or group")
942
943   """
944   used_globals = glob_pars.intersection(params)
945   if used_globals:
946     msg = ("The following %s parameters are global and cannot"
947            " be customized at %s level, please modify them at"
948            " %s level: %s" %
949            (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
950     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
951
952
953 def IsExclusiveStorageEnabledNode(cfg, node):
954   """Whether exclusive_storage is in effect for the given node.
955
956   @type cfg: L{config.ConfigWriter}
957   @param cfg: The cluster configuration
958   @type node: L{objects.Node}
959   @param node: The node
960   @rtype: bool
961   @return: The effective value of exclusive_storage
962
963   """
964   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
965
966
967 def CheckInstanceState(lu, instance, req_states, msg=None):
968   """Ensure that an instance is in one of the required states.
969
970   @param lu: the LU on behalf of which we make the check
971   @param instance: the instance to check
972   @param msg: if passed, should be a message to replace the default one
973   @raise errors.OpPrereqError: if the instance is not in the required state
974
975   """
976   if msg is None:
977     msg = ("can't use instance from outside %s states" %
978            utils.CommaJoin(req_states))
979   if instance.admin_state not in req_states:
980     raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
981                                (instance.name, instance.admin_state, msg),
982                                errors.ECODE_STATE)
983
984   if constants.ADMINST_UP not in req_states:
985     pnode_uuid = instance.primary_node
986     if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
987       all_hvparams = lu.cfg.GetClusterInfo().hvparams
988       ins_l = lu.rpc.call_instance_list(
989                 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
990       ins_l.Raise("Can't contact node %s for instance information" %
991                   lu.cfg.GetNodeName(pnode_uuid),
992                   prereq=True, ecode=errors.ECODE_ENVIRON)
993       if instance.name in ins_l.payload:
994         raise errors.OpPrereqError("Instance %s is running, %s" %
995                                    (instance.name, msg), errors.ECODE_STATE)
996     else:
997       lu.LogWarning("Primary node offline, ignoring check that instance"
998                      " is down")
999
1000
1001 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1002   """Check the sanity of iallocator and node arguments and use the
1003   cluster-wide iallocator if appropriate.
1004
1005   Check that at most one of (iallocator, node) is specified. If none is
1006   specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1007   then the LU's opcode's iallocator slot is filled with the cluster-wide
1008   default iallocator.
1009
1010   @type iallocator_slot: string
1011   @param iallocator_slot: the name of the opcode iallocator slot
1012   @type node_slot: string
1013   @param node_slot: the name of the opcode target node slot
1014
1015   """
1016   node = getattr(lu.op, node_slot, None)
1017   ialloc = getattr(lu.op, iallocator_slot, None)
1018   if node == []:
1019     node = None
1020
1021   if node is not None and ialloc is not None:
1022     raise errors.OpPrereqError("Do not specify both, iallocator and node",
1023                                errors.ECODE_INVAL)
1024   elif ((node is None and ialloc is None) or
1025         ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1026     default_iallocator = lu.cfg.GetDefaultIAllocator()
1027     if default_iallocator:
1028       setattr(lu.op, iallocator_slot, default_iallocator)
1029     else:
1030       raise errors.OpPrereqError("No iallocator or node given and no"
1031                                  " cluster-wide default iallocator found;"
1032                                  " please specify either an iallocator or a"
1033                                  " node, or set a cluster-wide default"
1034                                  " iallocator", errors.ECODE_INVAL)
1035
1036
1037 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1038   faulty = []
1039
1040   for dev in instance.disks:
1041     cfg.SetDiskID(dev, node_uuid)
1042
1043   result = rpc_runner.call_blockdev_getmirrorstatus(
1044              node_uuid, (instance.disks, instance))
1045   result.Raise("Failed to get disk status from node %s" %
1046                cfg.GetNodeName(node_uuid),
1047                prereq=prereq, ecode=errors.ECODE_ENVIRON)
1048
1049   for idx, bdev_status in enumerate(result.payload):
1050     if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1051       faulty.append(idx)
1052
1053   return faulty
1054
1055
1056 def CheckNodeOnline(lu, node_uuid, msg=None):
1057   """Ensure that a given node is online.
1058
1059   @param lu: the LU on behalf of which we make the check
1060   @param node_uuid: the node to check
1061   @param msg: if passed, should be a message to replace the default one
1062   @raise errors.OpPrereqError: if the node is offline
1063
1064   """
1065   if msg is None:
1066     msg = "Can't use offline node"
1067   if lu.cfg.GetNodeInfo(node_uuid).offline:
1068     raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1069                                errors.ECODE_STATE)
1070
1071
1072 def CheckDiskTemplateEnabled(cluster, disk_template):
1073   """Helper function to check if a disk template is enabled.
1074
1075   @type cluster: C{objects.Cluster}
1076   @param cluster: the cluster's configuration
1077   @type disk_template: str
1078   @param disk_template: the disk template to be checked
1079
1080   """
1081   assert disk_template is not None
1082   if disk_template not in constants.DISK_TEMPLATES:
1083     raise errors.OpPrereqError("'%s' is not a valid disk template."
1084                                " Valid disk templates are: %s" %
1085                                (disk_template,
1086                                 ",".join(constants.DISK_TEMPLATES)))
1087   if not disk_template in cluster.enabled_disk_templates:
1088     raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1089                                " Enabled disk templates are: %s" %
1090                                (disk_template,
1091                                 ",".join(cluster.enabled_disk_templates)))
1092
1093
1094 def CheckStorageTypeEnabled(cluster, storage_type):
1095   """Helper function to check if a storage type is enabled.
1096
1097   @type cluster: C{objects.Cluster}
1098   @param cluster: the cluster's configuration
1099   @type storage_type: str
1100   @param storage_type: the storage type to be checked
1101
1102   """
1103   assert storage_type is not None
1104   assert storage_type in constants.STORAGE_TYPES
1105   # special case for lvm-pv, because it cannot be enabled
1106   # via disk templates
1107   if storage_type == constants.ST_LVM_PV:
1108     CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1109   else:
1110     possible_disk_templates = \
1111         utils.storage.GetDiskTemplatesOfStorageType(storage_type)
1112     for disk_template in possible_disk_templates:
1113       if disk_template in cluster.enabled_disk_templates:
1114         return
1115     raise errors.OpPrereqError("No disk template of storage type '%s' is"
1116                                " enabled in this cluster. Enabled disk"
1117                                " templates are: %s" % (storage_type,
1118                                ",".join(cluster.enabled_disk_templates)))
1119
1120
1121 def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
1122   """Checks ipolicy disk templates against enabled disk tempaltes.
1123
1124   @type ipolicy: dict
1125   @param ipolicy: the new ipolicy
1126   @type enabled_disk_templates: list of string
1127   @param enabled_disk_templates: list of enabled disk templates on the
1128     cluster
1129   @raises errors.OpPrereqError: if there is at least one allowed disk
1130     template that is not also enabled.
1131
1132   """
1133   assert constants.IPOLICY_DTS in ipolicy
1134   allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1135   not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1136   if not_enabled:
1137     raise errors.OpPrereqError("The following disk template are allowed"
1138                                " by the ipolicy, but not enabled on the"
1139                                " cluster: %s" % utils.CommaJoin(not_enabled))