Node-UUID related cleanup
[ganeti-local] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Common functions used by multiple logical units."""
23
24 import copy
25 import os
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
38
39
40 # States of instance
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47   constants.ADMINST_OFFLINE,
48   ]))
49
50
51 def _ExpandItemName(expand_fn, name, kind):
52   """Expand an item name.
53
54   @param expand_fn: the function to use for expansion
55   @param name: requested item name
56   @param kind: text description ('Node' or 'Instance')
57   @return: the result of the expand_fn, if successful
58   @raise errors.OpPrereqError: if the item is not found
59
60   """
61   full_name = expand_fn(name)
62   if full_name is None:
63     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64                                errors.ECODE_NOENT)
65   return full_name
66
67
68 def ExpandInstanceName(cfg, name):
69   """Wrapper over L{_ExpandItemName} for instance."""
70   return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71
72
73 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
74   """Expand a short node name into the node UUID and full name.
75
76   @type cfg: L{config.ConfigWriter}
77   @param cfg: The cluster configuration
78   @type expected_uuid: string
79   @param expected_uuid: expected UUID for the node (or None if there is no
80         expectation). If it does not match, a L{errors.OpPrereqError} is
81         raised.
82   @type name: string
83   @param name: the short node name
84
85   """
86   (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
87   if expected_uuid is not None and uuid != expected_uuid:
88     raise errors.OpPrereqError(
89       "The nodes UUID '%s' does not match the expected UUID '%s' for node"
90       " '%s'. Maybe the node changed since you submitted this job." %
91       (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
92   return (uuid, full_name)
93
94
95 def ShareAll():
96   """Returns a dict declaring all lock levels shared.
97
98   """
99   return dict.fromkeys(locking.LEVELS, 1)
100
101
102 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
103   """Checks if the instances in a node group are still correct.
104
105   @type cfg: L{config.ConfigWriter}
106   @param cfg: The cluster configuration
107   @type group_uuid: string
108   @param group_uuid: Node group UUID
109   @type owned_instances: set or frozenset
110   @param owned_instances: List of currently owned instances
111
112   """
113   wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
114   if owned_instances != wanted_instances:
115     raise errors.OpPrereqError("Instances in node group '%s' changed since"
116                                " locks were acquired, wanted '%s', have '%s';"
117                                " retry the operation" %
118                                (group_uuid,
119                                 utils.CommaJoin(wanted_instances),
120                                 utils.CommaJoin(owned_instances)),
121                                errors.ECODE_STATE)
122
123   return wanted_instances
124
125
126 def GetWantedNodes(lu, short_node_names):
127   """Returns list of checked and expanded node names.
128
129   @type lu: L{LogicalUnit}
130   @param lu: the logical unit on whose behalf we execute
131   @type short_node_names: list
132   @param short_node_names: list of node names or None for all nodes
133   @rtype: tuple of lists
134   @return: tupe with (list of node UUIDs, list of node names)
135   @raise errors.ProgrammerError: if the nodes parameter is wrong type
136
137   """
138   if short_node_names:
139     node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
140                   for name in short_node_names]
141   else:
142     node_uuids = lu.cfg.GetNodeList()
143
144   return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
145
146
147 def GetWantedInstances(lu, instances):
148   """Returns list of checked and expanded instance names.
149
150   @type lu: L{LogicalUnit}
151   @param lu: the logical unit on whose behalf we execute
152   @type instances: list
153   @param instances: list of instance names or None for all instances
154   @rtype: list
155   @return: the list of instances, sorted
156   @raise errors.OpPrereqError: if the instances parameter is wrong type
157   @raise errors.OpPrereqError: if any of the passed instances is not found
158
159   """
160   if instances:
161     wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
162   else:
163     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
164   return wanted
165
166
167 def RunPostHook(lu, node_name):
168   """Runs the post-hook for an opcode on a single node.
169
170   """
171   hm = lu.proc.BuildHooksManager(lu)
172   try:
173     hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
174   except Exception, err: # pylint: disable=W0703
175     lu.LogWarning("Errors occurred running hooks on %s: %s",
176                   node_name, err)
177
178
179 def RedistributeAncillaryFiles(lu):
180   """Distribute additional files which are part of the cluster configuration.
181
182   ConfigWriter takes care of distributing the config and ssconf files, but
183   there are more files which should be distributed to all nodes. This function
184   makes sure those are copied.
185
186   """
187   # Gather target nodes
188   cluster = lu.cfg.GetClusterInfo()
189   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
190
191   online_node_uuids = lu.cfg.GetOnlineNodeList()
192   online_node_uuid_set = frozenset(online_node_uuids)
193   vm_node_uuids = list(online_node_uuid_set.intersection(
194                          lu.cfg.GetVmCapableNodeList()))
195
196   # Never distribute to master node
197   for node_uuids in [online_node_uuids, vm_node_uuids]:
198     if master_info.uuid in node_uuids:
199       node_uuids.remove(master_info.uuid)
200
201   # Gather file lists
202   (files_all, _, files_mc, files_vm) = \
203     ComputeAncillaryFiles(cluster, True)
204
205   # Never re-distribute configuration file from here
206   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
207               pathutils.CLUSTER_CONF_FILE in files_vm)
208   assert not files_mc, "Master candidates not handled in this function"
209
210   filemap = [
211     (online_node_uuids, files_all),
212     (vm_node_uuids, files_vm),
213     ]
214
215   # Upload the files
216   for (node_uuids, files) in filemap:
217     for fname in files:
218       UploadHelper(lu, node_uuids, fname)
219
220
221 def ComputeAncillaryFiles(cluster, redist):
222   """Compute files external to Ganeti which need to be consistent.
223
224   @type redist: boolean
225   @param redist: Whether to include files which need to be redistributed
226
227   """
228   # Compute files for all nodes
229   files_all = set([
230     pathutils.SSH_KNOWN_HOSTS_FILE,
231     pathutils.CONFD_HMAC_KEY,
232     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
233     pathutils.SPICE_CERT_FILE,
234     pathutils.SPICE_CACERT_FILE,
235     pathutils.RAPI_USERS_FILE,
236     ])
237
238   if redist:
239     # we need to ship at least the RAPI certificate
240     files_all.add(pathutils.RAPI_CERT_FILE)
241   else:
242     files_all.update(pathutils.ALL_CERT_FILES)
243     files_all.update(ssconf.SimpleStore().GetFileList())
244
245   if cluster.modify_etc_hosts:
246     files_all.add(pathutils.ETC_HOSTS)
247
248   if cluster.use_external_mip_script:
249     files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
250
251   # Files which are optional, these must:
252   # - be present in one other category as well
253   # - either exist or not exist on all nodes of that category (mc, vm all)
254   files_opt = set([
255     pathutils.RAPI_USERS_FILE,
256     ])
257
258   # Files which should only be on master candidates
259   files_mc = set()
260
261   if not redist:
262     files_mc.add(pathutils.CLUSTER_CONF_FILE)
263
264   # File storage
265   if (not redist and (constants.ENABLE_FILE_STORAGE or
266                         constants.ENABLE_SHARED_FILE_STORAGE)):
267     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
268     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
269
270   # Files which should only be on VM-capable nodes
271   files_vm = set(
272     filename
273     for hv_name in cluster.enabled_hypervisors
274     for filename in
275     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
276
277   files_opt |= set(
278     filename
279     for hv_name in cluster.enabled_hypervisors
280     for filename in
281     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
282
283   # Filenames in each category must be unique
284   all_files_set = files_all | files_mc | files_vm
285   assert (len(all_files_set) ==
286           sum(map(len, [files_all, files_mc, files_vm]))), \
287     "Found file listed in more than one file list"
288
289   # Optional files must be present in one other category
290   assert all_files_set.issuperset(files_opt), \
291     "Optional file not in a different required list"
292
293   # This one file should never ever be re-distributed via RPC
294   assert not (redist and
295               pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
296
297   return (files_all, files_opt, files_mc, files_vm)
298
299
300 def UploadHelper(lu, node_uuids, fname):
301   """Helper for uploading a file and showing warnings.
302
303   """
304   if os.path.exists(fname):
305     result = lu.rpc.call_upload_file(node_uuids, fname)
306     for to_node_uuids, to_result in result.items():
307       msg = to_result.fail_msg
308       if msg:
309         msg = ("Copy of file %s to node %s failed: %s" %
310                (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
311         lu.LogWarning(msg)
312
313
314 def MergeAndVerifyHvState(op_input, obj_input):
315   """Combines the hv state from an opcode with the one of the object
316
317   @param op_input: The input dict from the opcode
318   @param obj_input: The input dict from the objects
319   @return: The verified and updated dict
320
321   """
322   if op_input:
323     invalid_hvs = set(op_input) - constants.HYPER_TYPES
324     if invalid_hvs:
325       raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
326                                  " %s" % utils.CommaJoin(invalid_hvs),
327                                  errors.ECODE_INVAL)
328     if obj_input is None:
329       obj_input = {}
330     type_check = constants.HVSTS_PARAMETER_TYPES
331     return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
332
333   return None
334
335
336 def MergeAndVerifyDiskState(op_input, obj_input):
337   """Combines the disk state from an opcode with the one of the object
338
339   @param op_input: The input dict from the opcode
340   @param obj_input: The input dict from the objects
341   @return: The verified and updated dict
342   """
343   if op_input:
344     invalid_dst = set(op_input) - constants.DS_VALID_TYPES
345     if invalid_dst:
346       raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
347                                  utils.CommaJoin(invalid_dst),
348                                  errors.ECODE_INVAL)
349     type_check = constants.DSS_PARAMETER_TYPES
350     if obj_input is None:
351       obj_input = {}
352     return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
353                                               type_check))
354                 for key, value in op_input.items())
355
356   return None
357
358
359 def CheckOSParams(lu, required, node_uuids, osname, osparams):
360   """OS parameters validation.
361
362   @type lu: L{LogicalUnit}
363   @param lu: the logical unit for which we check
364   @type required: boolean
365   @param required: whether the validation should fail if the OS is not
366       found
367   @type node_uuids: list
368   @param node_uuids: the list of nodes on which we should check
369   @type osname: string
370   @param osname: the name of the hypervisor we should use
371   @type osparams: dict
372   @param osparams: the parameters which we need to check
373   @raise errors.OpPrereqError: if the parameters are not valid
374
375   """
376   node_uuids = _FilterVmNodes(lu, node_uuids)
377   result = lu.rpc.call_os_validate(node_uuids, required, osname,
378                                    [constants.OS_VALIDATE_PARAMETERS],
379                                    osparams)
380   for node_uuid, nres in result.items():
381     # we don't check for offline cases since this should be run only
382     # against the master node and/or an instance's nodes
383     nres.Raise("OS Parameters validation failed on node %s" %
384                lu.cfg.GetNodeName(node_uuid))
385     if not nres.payload:
386       lu.LogInfo("OS %s not found on node %s, validation skipped",
387                  osname, lu.cfg.GetNodeName(node_uuid))
388
389
390 def CheckHVParams(lu, node_uuids, hvname, hvparams):
391   """Hypervisor parameter validation.
392
393   This function abstract the hypervisor parameter validation to be
394   used in both instance create and instance modify.
395
396   @type lu: L{LogicalUnit}
397   @param lu: the logical unit for which we check
398   @type node_uuids: list
399   @param node_uuids: the list of nodes on which we should check
400   @type hvname: string
401   @param hvname: the name of the hypervisor we should use
402   @type hvparams: dict
403   @param hvparams: the parameters which we need to check
404   @raise errors.OpPrereqError: if the parameters are not valid
405
406   """
407   node_uuids = _FilterVmNodes(lu, node_uuids)
408
409   cluster = lu.cfg.GetClusterInfo()
410   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
411
412   hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
413   for node_uuid in node_uuids:
414     info = hvinfo[node_uuid]
415     if info.offline:
416       continue
417     info.Raise("Hypervisor parameter validation failed on node %s" %
418                lu.cfg.GetNodeName(node_uuid))
419
420
421 def AdjustCandidatePool(lu, exceptions):
422   """Adjust the candidate pool after node operations.
423
424   """
425   mod_list = lu.cfg.MaintainCandidatePool(exceptions)
426   if mod_list:
427     lu.LogInfo("Promoted nodes to master candidate role: %s",
428                utils.CommaJoin(node.name for node in mod_list))
429     for node in mod_list:
430       lu.context.ReaddNode(node)
431   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
432   if mc_now > mc_max:
433     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
434                (mc_now, mc_max))
435
436
437 def CheckNodePVs(nresult, exclusive_storage):
438   """Check node PVs.
439
440   """
441   pvlist_dict = nresult.get(constants.NV_PVLIST, None)
442   if pvlist_dict is None:
443     return (["Can't get PV list from node"], None)
444   pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
445   errlist = []
446   # check that ':' is not present in PV names, since it's a
447   # special character for lvcreate (denotes the range of PEs to
448   # use on the PV)
449   for pv in pvlist:
450     if ":" in pv.name:
451       errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
452                      (pv.name, pv.vg_name))
453   es_pvinfo = None
454   if exclusive_storage:
455     (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
456     errlist.extend(errmsgs)
457     shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
458     if shared_pvs:
459       for (pvname, lvlist) in shared_pvs:
460         # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
461         errlist.append("PV %s is shared among unrelated LVs (%s)" %
462                        (pvname, utils.CommaJoin(lvlist)))
463   return (errlist, es_pvinfo)
464
465
466 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
467   """Computes if value is in the desired range.
468
469   @param name: name of the parameter for which we perform the check
470   @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
471       not just 'disk')
472   @param ispecs: dictionary containing min and max values
473   @param value: actual value that we want to use
474   @return: None or an error string
475
476   """
477   if value in [None, constants.VALUE_AUTO]:
478     return None
479   max_v = ispecs[constants.ISPECS_MAX].get(name, value)
480   min_v = ispecs[constants.ISPECS_MIN].get(name, value)
481   if value > max_v or min_v > value:
482     if qualifier:
483       fqn = "%s/%s" % (name, qualifier)
484     else:
485       fqn = name
486     return ("%s value %s is not in range [%s, %s]" %
487             (fqn, value, min_v, max_v))
488   return None
489
490
491 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
492                                 nic_count, disk_sizes, spindle_use,
493                                 disk_template,
494                                 _compute_fn=_ComputeMinMaxSpec):
495   """Verifies ipolicy against provided specs.
496
497   @type ipolicy: dict
498   @param ipolicy: The ipolicy
499   @type mem_size: int
500   @param mem_size: The memory size
501   @type cpu_count: int
502   @param cpu_count: Used cpu cores
503   @type disk_count: int
504   @param disk_count: Number of disks used
505   @type nic_count: int
506   @param nic_count: Number of nics used
507   @type disk_sizes: list of ints
508   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
509   @type spindle_use: int
510   @param spindle_use: The number of spindles this instance uses
511   @type disk_template: string
512   @param disk_template: The disk template of the instance
513   @param _compute_fn: The compute function (unittest only)
514   @return: A list of violations, or an empty list of no violations are found
515
516   """
517   assert disk_count == len(disk_sizes)
518
519   test_settings = [
520     (constants.ISPEC_MEM_SIZE, "", mem_size),
521     (constants.ISPEC_CPU_COUNT, "", cpu_count),
522     (constants.ISPEC_NIC_COUNT, "", nic_count),
523     (constants.ISPEC_SPINDLE_USE, "", spindle_use),
524     ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
525          for idx, d in enumerate(disk_sizes)]
526   if disk_template != constants.DT_DISKLESS:
527     # This check doesn't make sense for diskless instances
528     test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
529   ret = []
530   allowed_dts = ipolicy[constants.IPOLICY_DTS]
531   if disk_template not in allowed_dts:
532     ret.append("Disk template %s is not allowed (allowed templates: %s)" %
533                (disk_template, utils.CommaJoin(allowed_dts)))
534
535   min_errs = None
536   for minmax in ipolicy[constants.ISPECS_MINMAX]:
537     errs = filter(None,
538                   (_compute_fn(name, qualifier, minmax, value)
539                    for (name, qualifier, value) in test_settings))
540     if min_errs is None or len(errs) < len(min_errs):
541       min_errs = errs
542   assert min_errs is not None
543   return ret + min_errs
544
545
546 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
547                                     _compute_fn=ComputeIPolicySpecViolation):
548   """Compute if instance meets the specs of ipolicy.
549
550   @type ipolicy: dict
551   @param ipolicy: The ipolicy to verify against
552   @type instance: L{objects.Instance}
553   @param instance: The instance to verify
554   @type cfg: L{config.ConfigWriter}
555   @param cfg: Cluster configuration
556   @param _compute_fn: The function to verify ipolicy (unittest only)
557   @see: L{ComputeIPolicySpecViolation}
558
559   """
560   ret = []
561   be_full = cfg.GetClusterInfo().FillBE(instance)
562   mem_size = be_full[constants.BE_MAXMEM]
563   cpu_count = be_full[constants.BE_VCPUS]
564   es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
565   if any(es_flags.values()):
566     # With exclusive storage use the actual spindles
567     try:
568       spindle_use = sum([disk.spindles for disk in instance.disks])
569     except TypeError:
570       ret.append("Number of spindles not configured for disks of instance %s"
571                  " while exclusive storage is enabled, try running gnt-cluster"
572                  " repair-disk-sizes" % instance.name)
573       # _ComputeMinMaxSpec ignores 'None's
574       spindle_use = None
575   else:
576     spindle_use = be_full[constants.BE_SPINDLE_USE]
577   disk_count = len(instance.disks)
578   disk_sizes = [disk.size for disk in instance.disks]
579   nic_count = len(instance.nics)
580   disk_template = instance.disk_template
581
582   return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
583                            disk_sizes, spindle_use, disk_template)
584
585
586 def _ComputeViolatingInstances(ipolicy, instances, cfg):
587   """Computes a set of instances who violates given ipolicy.
588
589   @param ipolicy: The ipolicy to verify
590   @type instances: L{objects.Instance}
591   @param instances: List of instances to verify
592   @type cfg: L{config.ConfigWriter}
593   @param cfg: Cluster configuration
594   @return: A frozenset of instance names violating the ipolicy
595
596   """
597   return frozenset([inst.name for inst in instances
598                     if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
599
600
601 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
602   """Computes a set of any instances that would violate the new ipolicy.
603
604   @param old_ipolicy: The current (still in-place) ipolicy
605   @param new_ipolicy: The new (to become) ipolicy
606   @param instances: List of instances to verify
607   @type cfg: L{config.ConfigWriter}
608   @param cfg: Cluster configuration
609   @return: A list of instances which violates the new ipolicy but
610       did not before
611
612   """
613   return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
614           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
615
616
617 def GetUpdatedParams(old_params, update_dict,
618                       use_default=True, use_none=False):
619   """Return the new version of a parameter dictionary.
620
621   @type old_params: dict
622   @param old_params: old parameters
623   @type update_dict: dict
624   @param update_dict: dict containing new parameter values, or
625       constants.VALUE_DEFAULT to reset the parameter to its default
626       value
627   @param use_default: boolean
628   @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
629       values as 'to be deleted' values
630   @param use_none: boolean
631   @type use_none: whether to recognise C{None} values as 'to be
632       deleted' values
633   @rtype: dict
634   @return: the new parameter dictionary
635
636   """
637   params_copy = copy.deepcopy(old_params)
638   for key, val in update_dict.iteritems():
639     if ((use_default and val == constants.VALUE_DEFAULT) or
640           (use_none and val is None)):
641       try:
642         del params_copy[key]
643       except KeyError:
644         pass
645     else:
646       params_copy[key] = val
647   return params_copy
648
649
650 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
651   """Return the new version of an instance policy.
652
653   @param group_policy: whether this policy applies to a group and thus
654     we should support removal of policy entries
655
656   """
657   ipolicy = copy.deepcopy(old_ipolicy)
658   for key, value in new_ipolicy.items():
659     if key not in constants.IPOLICY_ALL_KEYS:
660       raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
661                                  errors.ECODE_INVAL)
662     if (not value or value == [constants.VALUE_DEFAULT] or
663             value == constants.VALUE_DEFAULT):
664       if group_policy:
665         if key in ipolicy:
666           del ipolicy[key]
667       else:
668         raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
669                                    " on the cluster'" % key,
670                                    errors.ECODE_INVAL)
671     else:
672       if key in constants.IPOLICY_PARAMETERS:
673         # FIXME: we assume all such values are float
674         try:
675           ipolicy[key] = float(value)
676         except (TypeError, ValueError), err:
677           raise errors.OpPrereqError("Invalid value for attribute"
678                                      " '%s': '%s', error: %s" %
679                                      (key, value, err), errors.ECODE_INVAL)
680       elif key == constants.ISPECS_MINMAX:
681         for minmax in value:
682           for k in minmax.keys():
683             utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
684         ipolicy[key] = value
685       elif key == constants.ISPECS_STD:
686         if group_policy:
687           msg = "%s cannot appear in group instance specs" % key
688           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
689         ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
690                                         use_none=False, use_default=False)
691         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
692       else:
693         # FIXME: we assume all others are lists; this should be redone
694         # in a nicer way
695         ipolicy[key] = list(value)
696   try:
697     objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
698   except errors.ConfigurationError, err:
699     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
700                                errors.ECODE_INVAL)
701   return ipolicy
702
703
704 def AnnotateDiskParams(instance, devs, cfg):
705   """Little helper wrapper to the rpc annotation method.
706
707   @param instance: The instance object
708   @type devs: List of L{objects.Disk}
709   @param devs: The root devices (not any of its children!)
710   @param cfg: The config object
711   @returns The annotated disk copies
712   @see L{rpc.AnnotateDiskParams}
713
714   """
715   return rpc.AnnotateDiskParams(instance.disk_template, devs,
716                                 cfg.GetInstanceDiskParams(instance))
717
718
719 def SupportsOob(cfg, node):
720   """Tells if node supports OOB.
721
722   @type cfg: L{config.ConfigWriter}
723   @param cfg: The cluster configuration
724   @type node: L{objects.Node}
725   @param node: The node
726   @return: The OOB script if supported or an empty string otherwise
727
728   """
729   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
730
731
732 def _UpdateAndVerifySubDict(base, updates, type_check):
733   """Updates and verifies a dict with sub dicts of the same type.
734
735   @param base: The dict with the old data
736   @param updates: The dict with the new data
737   @param type_check: Dict suitable to ForceDictType to verify correct types
738   @returns: A new dict with updated and verified values
739
740   """
741   def fn(old, value):
742     new = GetUpdatedParams(old, value)
743     utils.ForceDictType(new, type_check)
744     return new
745
746   ret = copy.deepcopy(base)
747   ret.update(dict((key, fn(base.get(key, {}), value))
748                   for key, value in updates.items()))
749   return ret
750
751
752 def _FilterVmNodes(lu, node_uuids):
753   """Filters out non-vm_capable nodes from a list.
754
755   @type lu: L{LogicalUnit}
756   @param lu: the logical unit for which we check
757   @type node_uuids: list
758   @param node_uuids: the list of nodes on which we should check
759   @rtype: list
760   @return: the list of vm-capable nodes
761
762   """
763   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
764   return [uuid for uuid in node_uuids if uuid not in vm_nodes]
765
766
767 def GetDefaultIAllocator(cfg, ialloc):
768   """Decides on which iallocator to use.
769
770   @type cfg: L{config.ConfigWriter}
771   @param cfg: Cluster configuration object
772   @type ialloc: string or None
773   @param ialloc: Iallocator specified in opcode
774   @rtype: string
775   @return: Iallocator name
776
777   """
778   if not ialloc:
779     # Use default iallocator
780     ialloc = cfg.GetDefaultIAllocator()
781
782   if not ialloc:
783     raise errors.OpPrereqError("No iallocator was specified, neither in the"
784                                " opcode nor as a cluster-wide default",
785                                errors.ECODE_INVAL)
786
787   return ialloc
788
789
790 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
791                              cur_group_uuid):
792   """Checks if node groups for locked instances are still correct.
793
794   @type cfg: L{config.ConfigWriter}
795   @param cfg: Cluster configuration
796   @type instances: dict; string as key, L{objects.Instance} as value
797   @param instances: Dictionary, instance name as key, instance object as value
798   @type owned_groups: iterable of string
799   @param owned_groups: List of owned groups
800   @type owned_node_uuids: iterable of string
801   @param owned_node_uuids: List of owned nodes
802   @type cur_group_uuid: string or None
803   @param cur_group_uuid: Optional group UUID to check against instance's groups
804
805   """
806   for (name, inst) in instances.items():
807     assert owned_node_uuids.issuperset(inst.all_nodes), \
808       "Instance %s's nodes changed while we kept the lock" % name
809
810     inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
811
812     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
813       "Instance %s has no node in group %s" % (name, cur_group_uuid)
814
815
816 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
817                             primary_only=False):
818   """Checks if the owned node groups are still correct for an instance.
819
820   @type cfg: L{config.ConfigWriter}
821   @param cfg: The cluster configuration
822   @type instance_name: string
823   @param instance_name: Instance name
824   @type owned_groups: set or frozenset
825   @param owned_groups: List of currently owned node groups
826   @type primary_only: boolean
827   @param primary_only: Whether to check node groups for only the primary node
828
829   """
830   inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
831
832   if not owned_groups.issuperset(inst_groups):
833     raise errors.OpPrereqError("Instance %s's node groups changed since"
834                                " locks were acquired, current groups are"
835                                " are '%s', owning groups '%s'; retry the"
836                                " operation" %
837                                (instance_name,
838                                 utils.CommaJoin(inst_groups),
839                                 utils.CommaJoin(owned_groups)),
840                                errors.ECODE_STATE)
841
842   return inst_groups
843
844
845 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
846   """Unpacks the result of change-group and node-evacuate iallocator requests.
847
848   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
849   L{constants.IALLOCATOR_MODE_CHG_GROUP}.
850
851   @type lu: L{LogicalUnit}
852   @param lu: Logical unit instance
853   @type alloc_result: tuple/list
854   @param alloc_result: Result from iallocator
855   @type early_release: bool
856   @param early_release: Whether to release locks early if possible
857   @type use_nodes: bool
858   @param use_nodes: Whether to display node names instead of groups
859
860   """
861   (moved, failed, jobs) = alloc_result
862
863   if failed:
864     failreason = utils.CommaJoin("%s (%s)" % (name, reason)
865                                  for (name, reason) in failed)
866     lu.LogWarning("Unable to evacuate instances %s", failreason)
867     raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
868
869   if moved:
870     lu.LogInfo("Instances to be moved: %s",
871                utils.CommaJoin(
872                  "%s (to %s)" %
873                  (name, _NodeEvacDest(use_nodes, group, node_names))
874                  for (name, group, node_names) in moved))
875
876   return [map(compat.partial(_SetOpEarlyRelease, early_release),
877               map(opcodes.OpCode.LoadOpCode, ops))
878           for ops in jobs]
879
880
881 def _NodeEvacDest(use_nodes, group, node_names):
882   """Returns group or nodes depending on caller's choice.
883
884   """
885   if use_nodes:
886     return utils.CommaJoin(node_names)
887   else:
888     return group
889
890
891 def _SetOpEarlyRelease(early_release, op):
892   """Sets C{early_release} flag on opcodes if available.
893
894   """
895   try:
896     op.early_release = early_release
897   except AttributeError:
898     assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
899
900   return op
901
902
903 def MapInstanceDisksToNodes(instances):
904   """Creates a map from (node, volume) to instance name.
905
906   @type instances: list of L{objects.Instance}
907   @rtype: dict; tuple of (node uuid, volume name) as key, instance name as value
908
909   """
910   return dict(((node_uuid, vol), inst.name)
911               for inst in instances
912               for (node_uuid, vols) in inst.MapLVsByNode().items()
913               for vol in vols)
914
915
916 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
917   """Make sure that none of the given paramters is global.
918
919   If a global parameter is found, an L{errors.OpPrereqError} exception is
920   raised. This is used to avoid setting global parameters for individual nodes.
921
922   @type params: dictionary
923   @param params: Parameters to check
924   @type glob_pars: dictionary
925   @param glob_pars: Forbidden parameters
926   @type kind: string
927   @param kind: Kind of parameters (e.g. "node")
928   @type bad_levels: string
929   @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
930       "instance")
931   @type good_levels: strings
932   @param good_levels: Level(s) at which the parameters are allowed (e.g.
933       "cluster or group")
934
935   """
936   used_globals = glob_pars.intersection(params)
937   if used_globals:
938     msg = ("The following %s parameters are global and cannot"
939            " be customized at %s level, please modify them at"
940            " %s level: %s" %
941            (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
942     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
943
944
945 def IsExclusiveStorageEnabledNode(cfg, node):
946   """Whether exclusive_storage is in effect for the given node.
947
948   @type cfg: L{config.ConfigWriter}
949   @param cfg: The cluster configuration
950   @type node: L{objects.Node}
951   @param node: The node
952   @rtype: bool
953   @return: The effective value of exclusive_storage
954
955   """
956   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
957
958
959 def CheckInstanceState(lu, instance, req_states, msg=None):
960   """Ensure that an instance is in one of the required states.
961
962   @param lu: the LU on behalf of which we make the check
963   @param instance: the instance to check
964   @param msg: if passed, should be a message to replace the default one
965   @raise errors.OpPrereqError: if the instance is not in the required state
966
967   """
968   if msg is None:
969     msg = ("can't use instance from outside %s states" %
970            utils.CommaJoin(req_states))
971   if instance.admin_state not in req_states:
972     raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
973                                (instance.name, instance.admin_state, msg),
974                                errors.ECODE_STATE)
975
976   if constants.ADMINST_UP not in req_states:
977     pnode_uuid = instance.primary_node
978     if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
979       all_hvparams = lu.cfg.GetClusterInfo().hvparams
980       ins_l = lu.rpc.call_instance_list(
981                 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
982       ins_l.Raise("Can't contact node %s for instance information" %
983                   lu.cfg.GetNodeName(pnode_uuid),
984                   prereq=True, ecode=errors.ECODE_ENVIRON)
985       if instance.name in ins_l.payload:
986         raise errors.OpPrereqError("Instance %s is running, %s" %
987                                    (instance.name, msg), errors.ECODE_STATE)
988     else:
989       lu.LogWarning("Primary node offline, ignoring check that instance"
990                      " is down")
991
992
993 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
994   """Check the sanity of iallocator and node arguments and use the
995   cluster-wide iallocator if appropriate.
996
997   Check that at most one of (iallocator, node) is specified. If none is
998   specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
999   then the LU's opcode's iallocator slot is filled with the cluster-wide
1000   default iallocator.
1001
1002   @type iallocator_slot: string
1003   @param iallocator_slot: the name of the opcode iallocator slot
1004   @type node_slot: string
1005   @param node_slot: the name of the opcode target node slot
1006
1007   """
1008   node = getattr(lu.op, node_slot, None)
1009   ialloc = getattr(lu.op, iallocator_slot, None)
1010   if node == []:
1011     node = None
1012
1013   if node is not None and ialloc is not None:
1014     raise errors.OpPrereqError("Do not specify both, iallocator and node",
1015                                errors.ECODE_INVAL)
1016   elif ((node is None and ialloc is None) or
1017         ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1018     default_iallocator = lu.cfg.GetDefaultIAllocator()
1019     if default_iallocator:
1020       setattr(lu.op, iallocator_slot, default_iallocator)
1021     else:
1022       raise errors.OpPrereqError("No iallocator or node given and no"
1023                                  " cluster-wide default iallocator found;"
1024                                  " please specify either an iallocator or a"
1025                                  " node, or set a cluster-wide default"
1026                                  " iallocator", errors.ECODE_INVAL)
1027
1028
1029 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1030   faulty = []
1031
1032   for dev in instance.disks:
1033     cfg.SetDiskID(dev, node_uuid)
1034
1035   result = rpc_runner.call_blockdev_getmirrorstatus(
1036              node_uuid, (instance.disks, instance))
1037   result.Raise("Failed to get disk status from node %s" %
1038                cfg.GetNodeName(node_uuid),
1039                prereq=prereq, ecode=errors.ECODE_ENVIRON)
1040
1041   for idx, bdev_status in enumerate(result.payload):
1042     if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1043       faulty.append(idx)
1044
1045   return faulty
1046
1047
1048 def CheckNodeOnline(lu, node_uuid, msg=None):
1049   """Ensure that a given node is online.
1050
1051   @param lu: the LU on behalf of which we make the check
1052   @param node_uuid: the node to check
1053   @param msg: if passed, should be a message to replace the default one
1054   @raise errors.OpPrereqError: if the node is offline
1055
1056   """
1057   if msg is None:
1058     msg = "Can't use offline node"
1059   if lu.cfg.GetNodeInfo(node_uuid).offline:
1060     raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1061                                errors.ECODE_STATE)