Check real spindles in ipolicies
[ganeti-local] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Common functions used by multiple logical units."""
23
24 import copy
25 import os
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
38
39
40 # States of instance
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47   constants.ADMINST_OFFLINE,
48   ]))
49
50
51 def _ExpandItemName(fn, name, kind):
52   """Expand an item name.
53
54   @param fn: the function to use for expansion
55   @param name: requested item name
56   @param kind: text description ('Node' or 'Instance')
57   @return: the resolved (full) name
58   @raise errors.OpPrereqError: if the item is not found
59
60   """
61   full_name = fn(name)
62   if full_name is None:
63     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64                                errors.ECODE_NOENT)
65   return full_name
66
67
68 def ExpandInstanceName(cfg, name):
69   """Wrapper over L{_ExpandItemName} for instance."""
70   return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71
72
73 def ExpandNodeName(cfg, name):
74   """Wrapper over L{_ExpandItemName} for nodes."""
75   return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
76
77
78 def ShareAll():
79   """Returns a dict declaring all lock levels shared.
80
81   """
82   return dict.fromkeys(locking.LEVELS, 1)
83
84
85 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86   """Checks if the instances in a node group are still correct.
87
88   @type cfg: L{config.ConfigWriter}
89   @param cfg: The cluster configuration
90   @type group_uuid: string
91   @param group_uuid: Node group UUID
92   @type owned_instances: set or frozenset
93   @param owned_instances: List of currently owned instances
94
95   """
96   wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
97   if owned_instances != wanted_instances:
98     raise errors.OpPrereqError("Instances in node group '%s' changed since"
99                                " locks were acquired, wanted '%s', have '%s';"
100                                " retry the operation" %
101                                (group_uuid,
102                                 utils.CommaJoin(wanted_instances),
103                                 utils.CommaJoin(owned_instances)),
104                                errors.ECODE_STATE)
105
106   return wanted_instances
107
108
109 def GetWantedNodes(lu, nodes):
110   """Returns list of checked and expanded node names.
111
112   @type lu: L{LogicalUnit}
113   @param lu: the logical unit on whose behalf we execute
114   @type nodes: list
115   @param nodes: list of node names or None for all nodes
116   @rtype: list
117   @return: the list of nodes, sorted
118   @raise errors.ProgrammerError: if the nodes parameter is wrong type
119
120   """
121   if nodes:
122     return [ExpandNodeName(lu.cfg, name) for name in nodes]
123
124   return utils.NiceSort(lu.cfg.GetNodeList())
125
126
127 def GetWantedInstances(lu, instances):
128   """Returns list of checked and expanded instance names.
129
130   @type lu: L{LogicalUnit}
131   @param lu: the logical unit on whose behalf we execute
132   @type instances: list
133   @param instances: list of instance names or None for all instances
134   @rtype: list
135   @return: the list of instances, sorted
136   @raise errors.OpPrereqError: if the instances parameter is wrong type
137   @raise errors.OpPrereqError: if any of the passed instances is not found
138
139   """
140   if instances:
141     wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
142   else:
143     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
144   return wanted
145
146
147 def RunPostHook(lu, node_name):
148   """Runs the post-hook for an opcode on a single node.
149
150   """
151   hm = lu.proc.BuildHooksManager(lu)
152   try:
153     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
154   except Exception, err: # pylint: disable=W0703
155     lu.LogWarning("Errors occurred running hooks on %s: %s",
156                   node_name, err)
157
158
159 def RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160   """Distribute additional files which are part of the cluster configuration.
161
162   ConfigWriter takes care of distributing the config and ssconf files, but
163   there are more files which should be distributed to all nodes. This function
164   makes sure those are copied.
165
166   @param lu: calling logical unit
167   @param additional_nodes: list of nodes not in the config to distribute to
168   @type additional_vm: boolean
169   @param additional_vm: whether the additional nodes are vm-capable or not
170
171   """
172   # Gather target nodes
173   cluster = lu.cfg.GetClusterInfo()
174   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
175
176   online_nodes = lu.cfg.GetOnlineNodeList()
177   online_set = frozenset(online_nodes)
178   vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
179
180   if additional_nodes is not None:
181     online_nodes.extend(additional_nodes)
182     if additional_vm:
183       vm_nodes.extend(additional_nodes)
184
185   # Never distribute to master node
186   for nodelist in [online_nodes, vm_nodes]:
187     if master_info.name in nodelist:
188       nodelist.remove(master_info.name)
189
190   # Gather file lists
191   (files_all, _, files_mc, files_vm) = \
192     ComputeAncillaryFiles(cluster, True)
193
194   # Never re-distribute configuration file from here
195   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
196               pathutils.CLUSTER_CONF_FILE in files_vm)
197   assert not files_mc, "Master candidates not handled in this function"
198
199   filemap = [
200     (online_nodes, files_all),
201     (vm_nodes, files_vm),
202     ]
203
204   # Upload the files
205   for (node_list, files) in filemap:
206     for fname in files:
207       UploadHelper(lu, node_list, fname)
208
209
210 def ComputeAncillaryFiles(cluster, redist):
211   """Compute files external to Ganeti which need to be consistent.
212
213   @type redist: boolean
214   @param redist: Whether to include files which need to be redistributed
215
216   """
217   # Compute files for all nodes
218   files_all = set([
219     pathutils.SSH_KNOWN_HOSTS_FILE,
220     pathutils.CONFD_HMAC_KEY,
221     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
222     pathutils.SPICE_CERT_FILE,
223     pathutils.SPICE_CACERT_FILE,
224     pathutils.RAPI_USERS_FILE,
225     ])
226
227   if redist:
228     # we need to ship at least the RAPI certificate
229     files_all.add(pathutils.RAPI_CERT_FILE)
230   else:
231     files_all.update(pathutils.ALL_CERT_FILES)
232     files_all.update(ssconf.SimpleStore().GetFileList())
233
234   if cluster.modify_etc_hosts:
235     files_all.add(pathutils.ETC_HOSTS)
236
237   if cluster.use_external_mip_script:
238     files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
239
240   # Files which are optional, these must:
241   # - be present in one other category as well
242   # - either exist or not exist on all nodes of that category (mc, vm all)
243   files_opt = set([
244     pathutils.RAPI_USERS_FILE,
245     ])
246
247   # Files which should only be on master candidates
248   files_mc = set()
249
250   if not redist:
251     files_mc.add(pathutils.CLUSTER_CONF_FILE)
252
253   # File storage
254   if (not redist and (constants.ENABLE_FILE_STORAGE or
255                         constants.ENABLE_SHARED_FILE_STORAGE)):
256     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
257     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
258
259   # Files which should only be on VM-capable nodes
260   files_vm = set(
261     filename
262     for hv_name in cluster.enabled_hypervisors
263     for filename in
264     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
265
266   files_opt |= set(
267     filename
268     for hv_name in cluster.enabled_hypervisors
269     for filename in
270     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
271
272   # Filenames in each category must be unique
273   all_files_set = files_all | files_mc | files_vm
274   assert (len(all_files_set) ==
275           sum(map(len, [files_all, files_mc, files_vm]))), \
276     "Found file listed in more than one file list"
277
278   # Optional files must be present in one other category
279   assert all_files_set.issuperset(files_opt), \
280     "Optional file not in a different required list"
281
282   # This one file should never ever be re-distributed via RPC
283   assert not (redist and
284               pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
285
286   return (files_all, files_opt, files_mc, files_vm)
287
288
289 def UploadHelper(lu, nodes, fname):
290   """Helper for uploading a file and showing warnings.
291
292   """
293   if os.path.exists(fname):
294     result = lu.rpc.call_upload_file(nodes, fname)
295     for to_node, to_result in result.items():
296       msg = to_result.fail_msg
297       if msg:
298         msg = ("Copy of file %s to node %s failed: %s" %
299                (fname, to_node, msg))
300         lu.LogWarning(msg)
301
302
303 def MergeAndVerifyHvState(op_input, obj_input):
304   """Combines the hv state from an opcode with the one of the object
305
306   @param op_input: The input dict from the opcode
307   @param obj_input: The input dict from the objects
308   @return: The verified and updated dict
309
310   """
311   if op_input:
312     invalid_hvs = set(op_input) - constants.HYPER_TYPES
313     if invalid_hvs:
314       raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
315                                  " %s" % utils.CommaJoin(invalid_hvs),
316                                  errors.ECODE_INVAL)
317     if obj_input is None:
318       obj_input = {}
319     type_check = constants.HVSTS_PARAMETER_TYPES
320     return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
321
322   return None
323
324
325 def MergeAndVerifyDiskState(op_input, obj_input):
326   """Combines the disk state from an opcode with the one of the object
327
328   @param op_input: The input dict from the opcode
329   @param obj_input: The input dict from the objects
330   @return: The verified and updated dict
331   """
332   if op_input:
333     invalid_dst = set(op_input) - constants.DS_VALID_TYPES
334     if invalid_dst:
335       raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
336                                  utils.CommaJoin(invalid_dst),
337                                  errors.ECODE_INVAL)
338     type_check = constants.DSS_PARAMETER_TYPES
339     if obj_input is None:
340       obj_input = {}
341     return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
342                                               type_check))
343                 for key, value in op_input.items())
344
345   return None
346
347
348 def CheckOSParams(lu, required, nodenames, osname, osparams):
349   """OS parameters validation.
350
351   @type lu: L{LogicalUnit}
352   @param lu: the logical unit for which we check
353   @type required: boolean
354   @param required: whether the validation should fail if the OS is not
355       found
356   @type nodenames: list
357   @param nodenames: the list of nodes on which we should check
358   @type osname: string
359   @param osname: the name of the hypervisor we should use
360   @type osparams: dict
361   @param osparams: the parameters which we need to check
362   @raise errors.OpPrereqError: if the parameters are not valid
363
364   """
365   nodenames = _FilterVmNodes(lu, nodenames)
366   result = lu.rpc.call_os_validate(nodenames, required, osname,
367                                    [constants.OS_VALIDATE_PARAMETERS],
368                                    osparams)
369   for node, nres in result.items():
370     # we don't check for offline cases since this should be run only
371     # against the master node and/or an instance's nodes
372     nres.Raise("OS Parameters validation failed on node %s" % node)
373     if not nres.payload:
374       lu.LogInfo("OS %s not found on node %s, validation skipped",
375                  osname, node)
376
377
378 def CheckHVParams(lu, nodenames, hvname, hvparams):
379   """Hypervisor parameter validation.
380
381   This function abstract the hypervisor parameter validation to be
382   used in both instance create and instance modify.
383
384   @type lu: L{LogicalUnit}
385   @param lu: the logical unit for which we check
386   @type nodenames: list
387   @param nodenames: the list of nodes on which we should check
388   @type hvname: string
389   @param hvname: the name of the hypervisor we should use
390   @type hvparams: dict
391   @param hvparams: the parameters which we need to check
392   @raise errors.OpPrereqError: if the parameters are not valid
393
394   """
395   nodenames = _FilterVmNodes(lu, nodenames)
396
397   cluster = lu.cfg.GetClusterInfo()
398   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
399
400   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
401   for node in nodenames:
402     info = hvinfo[node]
403     if info.offline:
404       continue
405     info.Raise("Hypervisor parameter validation failed on node %s" % node)
406
407
408 def AdjustCandidatePool(lu, exceptions):
409   """Adjust the candidate pool after node operations.
410
411   """
412   mod_list = lu.cfg.MaintainCandidatePool(exceptions)
413   if mod_list:
414     lu.LogInfo("Promoted nodes to master candidate role: %s",
415                utils.CommaJoin(node.name for node in mod_list))
416     for name in mod_list:
417       lu.context.ReaddNode(name)
418   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
419   if mc_now > mc_max:
420     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
421                (mc_now, mc_max))
422
423
424 def CheckNodePVs(nresult, exclusive_storage):
425   """Check node PVs.
426
427   """
428   pvlist_dict = nresult.get(constants.NV_PVLIST, None)
429   if pvlist_dict is None:
430     return (["Can't get PV list from node"], None)
431   pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
432   errlist = []
433   # check that ':' is not present in PV names, since it's a
434   # special character for lvcreate (denotes the range of PEs to
435   # use on the PV)
436   for pv in pvlist:
437     if ":" in pv.name:
438       errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
439                      (pv.name, pv.vg_name))
440   es_pvinfo = None
441   if exclusive_storage:
442     (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
443     errlist.extend(errmsgs)
444     shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
445     if shared_pvs:
446       for (pvname, lvlist) in shared_pvs:
447         # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
448         errlist.append("PV %s is shared among unrelated LVs (%s)" %
449                        (pvname, utils.CommaJoin(lvlist)))
450   return (errlist, es_pvinfo)
451
452
453 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454   """Computes if value is in the desired range.
455
456   @param name: name of the parameter for which we perform the check
457   @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
458       not just 'disk')
459   @param ispecs: dictionary containing min and max values
460   @param value: actual value that we want to use
461   @return: None or an error string
462
463   """
464   if value in [None, constants.VALUE_AUTO]:
465     return None
466   max_v = ispecs[constants.ISPECS_MAX].get(name, value)
467   min_v = ispecs[constants.ISPECS_MIN].get(name, value)
468   if value > max_v or min_v > value:
469     if qualifier:
470       fqn = "%s/%s" % (name, qualifier)
471     else:
472       fqn = name
473     return ("%s value %s is not in range [%s, %s]" %
474             (fqn, value, min_v, max_v))
475   return None
476
477
478 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
479                                 nic_count, disk_sizes, spindle_use,
480                                 disk_template,
481                                 _compute_fn=_ComputeMinMaxSpec):
482   """Verifies ipolicy against provided specs.
483
484   @type ipolicy: dict
485   @param ipolicy: The ipolicy
486   @type mem_size: int
487   @param mem_size: The memory size
488   @type cpu_count: int
489   @param cpu_count: Used cpu cores
490   @type disk_count: int
491   @param disk_count: Number of disks used
492   @type nic_count: int
493   @param nic_count: Number of nics used
494   @type disk_sizes: list of ints
495   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
496   @type spindle_use: int
497   @param spindle_use: The number of spindles this instance uses
498   @type disk_template: string
499   @param disk_template: The disk template of the instance
500   @param _compute_fn: The compute function (unittest only)
501   @return: A list of violations, or an empty list of no violations are found
502
503   """
504   assert disk_count == len(disk_sizes)
505
506   test_settings = [
507     (constants.ISPEC_MEM_SIZE, "", mem_size),
508     (constants.ISPEC_CPU_COUNT, "", cpu_count),
509     (constants.ISPEC_NIC_COUNT, "", nic_count),
510     (constants.ISPEC_SPINDLE_USE, "", spindle_use),
511     ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
512          for idx, d in enumerate(disk_sizes)]
513   if disk_template != constants.DT_DISKLESS:
514     # This check doesn't make sense for diskless instances
515     test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
516   ret = []
517   allowed_dts = ipolicy[constants.IPOLICY_DTS]
518   if disk_template not in allowed_dts:
519     ret.append("Disk template %s is not allowed (allowed templates: %s)" %
520                (disk_template, utils.CommaJoin(allowed_dts)))
521
522   min_errs = None
523   for minmax in ipolicy[constants.ISPECS_MINMAX]:
524     errs = filter(None,
525                   (_compute_fn(name, qualifier, minmax, value)
526                    for (name, qualifier, value) in test_settings))
527     if min_errs is None or len(errs) < len(min_errs):
528       min_errs = errs
529   assert min_errs is not None
530   return ret + min_errs
531
532
533 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
534                                     _compute_fn=ComputeIPolicySpecViolation):
535   """Compute if instance meets the specs of ipolicy.
536
537   @type ipolicy: dict
538   @param ipolicy: The ipolicy to verify against
539   @type instance: L{objects.Instance}
540   @param instance: The instance to verify
541   @type cfg: L{config.ConfigWriter}
542   @param cfg: Cluster configuration
543   @param _compute_fn: The function to verify ipolicy (unittest only)
544   @see: L{ComputeIPolicySpecViolation}
545
546   """
547   ret = []
548   be_full = cfg.GetClusterInfo().FillBE(instance)
549   mem_size = be_full[constants.BE_MAXMEM]
550   cpu_count = be_full[constants.BE_VCPUS]
551   es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, instance.all_nodes)
552   if any(es_flags.values()):
553     # With exclusive storage use the actual spindles
554     try:
555       spindle_use = sum([disk.spindles for disk in instance.disks])
556     except TypeError:
557       ret.append("Number of spindles not configured for disks of instance %s"
558                  " while exclusive storage is enabled, try running gnt-cluster"
559                  " repair-disk-sizes" % instance.name)
560       # _ComputeMinMaxSpec ignores 'None's
561       spindle_use = None
562   else:
563     spindle_use = be_full[constants.BE_SPINDLE_USE]
564   disk_count = len(instance.disks)
565   disk_sizes = [disk.size for disk in instance.disks]
566   nic_count = len(instance.nics)
567   disk_template = instance.disk_template
568
569   return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
570                            disk_sizes, spindle_use, disk_template)
571
572
573 def _ComputeViolatingInstances(ipolicy, instances, cfg):
574   """Computes a set of instances who violates given ipolicy.
575
576   @param ipolicy: The ipolicy to verify
577   @type instances: L{objects.Instance}
578   @param instances: List of instances to verify
579   @type cfg: L{config.ConfigWriter}
580   @param cfg: Cluster configuration
581   @return: A frozenset of instance names violating the ipolicy
582
583   """
584   return frozenset([inst.name for inst in instances
585                     if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
586
587
588 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
589   """Computes a set of any instances that would violate the new ipolicy.
590
591   @param old_ipolicy: The current (still in-place) ipolicy
592   @param new_ipolicy: The new (to become) ipolicy
593   @param instances: List of instances to verify
594   @type cfg: L{config.ConfigWriter}
595   @param cfg: Cluster configuration
596   @return: A list of instances which violates the new ipolicy but
597       did not before
598
599   """
600   return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
601           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
602
603
604 def GetUpdatedParams(old_params, update_dict,
605                       use_default=True, use_none=False):
606   """Return the new version of a parameter dictionary.
607
608   @type old_params: dict
609   @param old_params: old parameters
610   @type update_dict: dict
611   @param update_dict: dict containing new parameter values, or
612       constants.VALUE_DEFAULT to reset the parameter to its default
613       value
614   @param use_default: boolean
615   @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
616       values as 'to be deleted' values
617   @param use_none: boolean
618   @type use_none: whether to recognise C{None} values as 'to be
619       deleted' values
620   @rtype: dict
621   @return: the new parameter dictionary
622
623   """
624   params_copy = copy.deepcopy(old_params)
625   for key, val in update_dict.iteritems():
626     if ((use_default and val == constants.VALUE_DEFAULT) or
627           (use_none and val is None)):
628       try:
629         del params_copy[key]
630       except KeyError:
631         pass
632     else:
633       params_copy[key] = val
634   return params_copy
635
636
637 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
638   """Return the new version of an instance policy.
639
640   @param group_policy: whether this policy applies to a group and thus
641     we should support removal of policy entries
642
643   """
644   ipolicy = copy.deepcopy(old_ipolicy)
645   for key, value in new_ipolicy.items():
646     if key not in constants.IPOLICY_ALL_KEYS:
647       raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
648                                  errors.ECODE_INVAL)
649     if (not value or value == [constants.VALUE_DEFAULT] or
650             value == constants.VALUE_DEFAULT):
651       if group_policy:
652         if key in ipolicy:
653           del ipolicy[key]
654       else:
655         raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
656                                    " on the cluster'" % key,
657                                    errors.ECODE_INVAL)
658     else:
659       if key in constants.IPOLICY_PARAMETERS:
660         # FIXME: we assume all such values are float
661         try:
662           ipolicy[key] = float(value)
663         except (TypeError, ValueError), err:
664           raise errors.OpPrereqError("Invalid value for attribute"
665                                      " '%s': '%s', error: %s" %
666                                      (key, value, err), errors.ECODE_INVAL)
667       elif key == constants.ISPECS_MINMAX:
668         for minmax in value:
669           for k in minmax.keys():
670             utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
671         ipolicy[key] = value
672       elif key == constants.ISPECS_STD:
673         if group_policy:
674           msg = "%s cannot appear in group instance specs" % key
675           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
676         ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
677                                         use_none=False, use_default=False)
678         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
679       else:
680         # FIXME: we assume all others are lists; this should be redone
681         # in a nicer way
682         ipolicy[key] = list(value)
683   try:
684     objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
685   except errors.ConfigurationError, err:
686     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
687                                errors.ECODE_INVAL)
688   return ipolicy
689
690
691 def AnnotateDiskParams(instance, devs, cfg):
692   """Little helper wrapper to the rpc annotation method.
693
694   @param instance: The instance object
695   @type devs: List of L{objects.Disk}
696   @param devs: The root devices (not any of its children!)
697   @param cfg: The config object
698   @returns The annotated disk copies
699   @see L{rpc.AnnotateDiskParams}
700
701   """
702   return rpc.AnnotateDiskParams(instance.disk_template, devs,
703                                 cfg.GetInstanceDiskParams(instance))
704
705
706 def SupportsOob(cfg, node):
707   """Tells if node supports OOB.
708
709   @type cfg: L{config.ConfigWriter}
710   @param cfg: The cluster configuration
711   @type node: L{objects.Node}
712   @param node: The node
713   @return: The OOB script if supported or an empty string otherwise
714
715   """
716   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
717
718
719 def _UpdateAndVerifySubDict(base, updates, type_check):
720   """Updates and verifies a dict with sub dicts of the same type.
721
722   @param base: The dict with the old data
723   @param updates: The dict with the new data
724   @param type_check: Dict suitable to ForceDictType to verify correct types
725   @returns: A new dict with updated and verified values
726
727   """
728   def fn(old, value):
729     new = GetUpdatedParams(old, value)
730     utils.ForceDictType(new, type_check)
731     return new
732
733   ret = copy.deepcopy(base)
734   ret.update(dict((key, fn(base.get(key, {}), value))
735                   for key, value in updates.items()))
736   return ret
737
738
739 def _FilterVmNodes(lu, nodenames):
740   """Filters out non-vm_capable nodes from a list.
741
742   @type lu: L{LogicalUnit}
743   @param lu: the logical unit for which we check
744   @type nodenames: list
745   @param nodenames: the list of nodes on which we should check
746   @rtype: list
747   @return: the list of vm-capable nodes
748
749   """
750   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
751   return [name for name in nodenames if name not in vm_nodes]
752
753
754 def GetDefaultIAllocator(cfg, ialloc):
755   """Decides on which iallocator to use.
756
757   @type cfg: L{config.ConfigWriter}
758   @param cfg: Cluster configuration object
759   @type ialloc: string or None
760   @param ialloc: Iallocator specified in opcode
761   @rtype: string
762   @return: Iallocator name
763
764   """
765   if not ialloc:
766     # Use default iallocator
767     ialloc = cfg.GetDefaultIAllocator()
768
769   if not ialloc:
770     raise errors.OpPrereqError("No iallocator was specified, neither in the"
771                                " opcode nor as a cluster-wide default",
772                                errors.ECODE_INVAL)
773
774   return ialloc
775
776
777 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
778                              cur_group_uuid):
779   """Checks if node groups for locked instances are still correct.
780
781   @type cfg: L{config.ConfigWriter}
782   @param cfg: Cluster configuration
783   @type instances: dict; string as key, L{objects.Instance} as value
784   @param instances: Dictionary, instance name as key, instance object as value
785   @type owned_groups: iterable of string
786   @param owned_groups: List of owned groups
787   @type owned_nodes: iterable of string
788   @param owned_nodes: List of owned nodes
789   @type cur_group_uuid: string or None
790   @param cur_group_uuid: Optional group UUID to check against instance's groups
791
792   """
793   for (name, inst) in instances.items():
794     assert owned_nodes.issuperset(inst.all_nodes), \
795       "Instance %s's nodes changed while we kept the lock" % name
796
797     inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
798
799     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
800       "Instance %s has no node in group %s" % (name, cur_group_uuid)
801
802
803 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
804                             primary_only=False):
805   """Checks if the owned node groups are still correct for an instance.
806
807   @type cfg: L{config.ConfigWriter}
808   @param cfg: The cluster configuration
809   @type instance_name: string
810   @param instance_name: Instance name
811   @type owned_groups: set or frozenset
812   @param owned_groups: List of currently owned node groups
813   @type primary_only: boolean
814   @param primary_only: Whether to check node groups for only the primary node
815
816   """
817   inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
818
819   if not owned_groups.issuperset(inst_groups):
820     raise errors.OpPrereqError("Instance %s's node groups changed since"
821                                " locks were acquired, current groups are"
822                                " are '%s', owning groups '%s'; retry the"
823                                " operation" %
824                                (instance_name,
825                                 utils.CommaJoin(inst_groups),
826                                 utils.CommaJoin(owned_groups)),
827                                errors.ECODE_STATE)
828
829   return inst_groups
830
831
832 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
833   """Unpacks the result of change-group and node-evacuate iallocator requests.
834
835   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
836   L{constants.IALLOCATOR_MODE_CHG_GROUP}.
837
838   @type lu: L{LogicalUnit}
839   @param lu: Logical unit instance
840   @type alloc_result: tuple/list
841   @param alloc_result: Result from iallocator
842   @type early_release: bool
843   @param early_release: Whether to release locks early if possible
844   @type use_nodes: bool
845   @param use_nodes: Whether to display node names instead of groups
846
847   """
848   (moved, failed, jobs) = alloc_result
849
850   if failed:
851     failreason = utils.CommaJoin("%s (%s)" % (name, reason)
852                                  for (name, reason) in failed)
853     lu.LogWarning("Unable to evacuate instances %s", failreason)
854     raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
855
856   if moved:
857     lu.LogInfo("Instances to be moved: %s",
858                utils.CommaJoin("%s (to %s)" %
859                                (name, _NodeEvacDest(use_nodes, group, nodes))
860                                for (name, group, nodes) in moved))
861
862   return [map(compat.partial(_SetOpEarlyRelease, early_release),
863               map(opcodes.OpCode.LoadOpCode, ops))
864           for ops in jobs]
865
866
867 def _NodeEvacDest(use_nodes, group, nodes):
868   """Returns group or nodes depending on caller's choice.
869
870   """
871   if use_nodes:
872     return utils.CommaJoin(nodes)
873   else:
874     return group
875
876
877 def _SetOpEarlyRelease(early_release, op):
878   """Sets C{early_release} flag on opcodes if available.
879
880   """
881   try:
882     op.early_release = early_release
883   except AttributeError:
884     assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
885
886   return op
887
888
889 def MapInstanceDisksToNodes(instances):
890   """Creates a map from (node, volume) to instance name.
891
892   @type instances: list of L{objects.Instance}
893   @rtype: dict; tuple of (node name, volume name) as key, instance name as value
894
895   """
896   return dict(((node, vol), inst.name)
897               for inst in instances
898               for (node, vols) in inst.MapLVsByNode().items()
899               for vol in vols)
900
901
902 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
903   """Make sure that none of the given paramters is global.
904
905   If a global parameter is found, an L{errors.OpPrereqError} exception is
906   raised. This is used to avoid setting global parameters for individual nodes.
907
908   @type params: dictionary
909   @param params: Parameters to check
910   @type glob_pars: dictionary
911   @param glob_pars: Forbidden parameters
912   @type kind: string
913   @param kind: Kind of parameters (e.g. "node")
914   @type bad_levels: string
915   @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
916       "instance")
917   @type good_levels: strings
918   @param good_levels: Level(s) at which the parameters are allowed (e.g.
919       "cluster or group")
920
921   """
922   used_globals = glob_pars.intersection(params)
923   if used_globals:
924     msg = ("The following %s parameters are global and cannot"
925            " be customized at %s level, please modify them at"
926            " %s level: %s" %
927            (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
928     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
929
930
931 def IsExclusiveStorageEnabledNode(cfg, node):
932   """Whether exclusive_storage is in effect for the given node.
933
934   @type cfg: L{config.ConfigWriter}
935   @param cfg: The cluster configuration
936   @type node: L{objects.Node}
937   @param node: The node
938   @rtype: bool
939   @return: The effective value of exclusive_storage
940
941   """
942   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
943
944
945 def CheckInstanceState(lu, instance, req_states, msg=None):
946   """Ensure that an instance is in one of the required states.
947
948   @param lu: the LU on behalf of which we make the check
949   @param instance: the instance to check
950   @param msg: if passed, should be a message to replace the default one
951   @raise errors.OpPrereqError: if the instance is not in the required state
952
953   """
954   if msg is None:
955     msg = ("can't use instance from outside %s states" %
956            utils.CommaJoin(req_states))
957   if instance.admin_state not in req_states:
958     raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
959                                (instance.name, instance.admin_state, msg),
960                                errors.ECODE_STATE)
961
962   if constants.ADMINST_UP not in req_states:
963     pnode = instance.primary_node
964     if not lu.cfg.GetNodeInfo(pnode).offline:
965       ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
966       ins_l.Raise("Can't contact node %s for instance information" % pnode,
967                   prereq=True, ecode=errors.ECODE_ENVIRON)
968       if instance.name in ins_l.payload:
969         raise errors.OpPrereqError("Instance %s is running, %s" %
970                                    (instance.name, msg), errors.ECODE_STATE)
971     else:
972       lu.LogWarning("Primary node offline, ignoring check that instance"
973                      " is down")
974
975
976 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
977   """Check the sanity of iallocator and node arguments and use the
978   cluster-wide iallocator if appropriate.
979
980   Check that at most one of (iallocator, node) is specified. If none is
981   specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
982   then the LU's opcode's iallocator slot is filled with the cluster-wide
983   default iallocator.
984
985   @type iallocator_slot: string
986   @param iallocator_slot: the name of the opcode iallocator slot
987   @type node_slot: string
988   @param node_slot: the name of the opcode target node slot
989
990   """
991   node = getattr(lu.op, node_slot, None)
992   ialloc = getattr(lu.op, iallocator_slot, None)
993   if node == []:
994     node = None
995
996   if node is not None and ialloc is not None:
997     raise errors.OpPrereqError("Do not specify both, iallocator and node",
998                                errors.ECODE_INVAL)
999   elif ((node is None and ialloc is None) or
1000         ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1001     default_iallocator = lu.cfg.GetDefaultIAllocator()
1002     if default_iallocator:
1003       setattr(lu.op, iallocator_slot, default_iallocator)
1004     else:
1005       raise errors.OpPrereqError("No iallocator or node given and no"
1006                                  " cluster-wide default iallocator found;"
1007                                  " please specify either an iallocator or a"
1008                                  " node, or set a cluster-wide default"
1009                                  " iallocator", errors.ECODE_INVAL)
1010
1011
1012 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1013   faulty = []
1014
1015   for dev in instance.disks:
1016     cfg.SetDiskID(dev, node_name)
1017
1018   result = rpc_runner.call_blockdev_getmirrorstatus(node_name,
1019                                                     (instance.disks,
1020                                                      instance))
1021   result.Raise("Failed to get disk status from node %s" % node_name,
1022                prereq=prereq, ecode=errors.ECODE_ENVIRON)
1023
1024   for idx, bdev_status in enumerate(result.payload):
1025     if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1026       faulty.append(idx)
1027
1028   return faulty
1029
1030
1031 def CheckNodeOnline(lu, node, msg=None):
1032   """Ensure that a given node is online.
1033
1034   @param lu: the LU on behalf of which we make the check
1035   @param node: the node to check
1036   @param msg: if passed, should be a message to replace the default one
1037   @raise errors.OpPrereqError: if the node is offline
1038
1039   """
1040   if msg is None:
1041     msg = "Can't use offline node"
1042   if lu.cfg.GetNodeInfo(node).offline:
1043     raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)