Extract instance related logical units from cmdlib
[ganeti-local] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Common functions used by multiple logical units."""
23
24 import copy
25 import os
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
38
39
40 # States of instance
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47   constants.ADMINST_OFFLINE,
48   ]))
49
50
51 def _ExpandItemName(fn, name, kind):
52   """Expand an item name.
53
54   @param fn: the function to use for expansion
55   @param name: requested item name
56   @param kind: text description ('Node' or 'Instance')
57   @return: the resolved (full) name
58   @raise errors.OpPrereqError: if the item is not found
59
60   """
61   full_name = fn(name)
62   if full_name is None:
63     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64                                errors.ECODE_NOENT)
65   return full_name
66
67
68 def _ExpandInstanceName(cfg, name):
69   """Wrapper over L{_ExpandItemName} for instance."""
70   return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71
72
73 def _ExpandNodeName(cfg, name):
74   """Wrapper over L{_ExpandItemName} for nodes."""
75   return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
76
77
78 def _ShareAll():
79   """Returns a dict declaring all lock levels shared.
80
81   """
82   return dict.fromkeys(locking.LEVELS, 1)
83
84
85 def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86   """Checks if the instances in a node group are still correct.
87
88   @type cfg: L{config.ConfigWriter}
89   @param cfg: The cluster configuration
90   @type group_uuid: string
91   @param group_uuid: Node group UUID
92   @type owned_instances: set or frozenset
93   @param owned_instances: List of currently owned instances
94
95   """
96   wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
97   if owned_instances != wanted_instances:
98     raise errors.OpPrereqError("Instances in node group '%s' changed since"
99                                " locks were acquired, wanted '%s', have '%s';"
100                                " retry the operation" %
101                                (group_uuid,
102                                 utils.CommaJoin(wanted_instances),
103                                 utils.CommaJoin(owned_instances)),
104                                errors.ECODE_STATE)
105
106   return wanted_instances
107
108
109 def _GetWantedNodes(lu, nodes):
110   """Returns list of checked and expanded node names.
111
112   @type lu: L{LogicalUnit}
113   @param lu: the logical unit on whose behalf we execute
114   @type nodes: list
115   @param nodes: list of node names or None for all nodes
116   @rtype: list
117   @return: the list of nodes, sorted
118   @raise errors.ProgrammerError: if the nodes parameter is wrong type
119
120   """
121   if nodes:
122     return [_ExpandNodeName(lu.cfg, name) for name in nodes]
123
124   return utils.NiceSort(lu.cfg.GetNodeList())
125
126
127 def _GetWantedInstances(lu, instances):
128   """Returns list of checked and expanded instance names.
129
130   @type lu: L{LogicalUnit}
131   @param lu: the logical unit on whose behalf we execute
132   @type instances: list
133   @param instances: list of instance names or None for all instances
134   @rtype: list
135   @return: the list of instances, sorted
136   @raise errors.OpPrereqError: if the instances parameter is wrong type
137   @raise errors.OpPrereqError: if any of the passed instances is not found
138
139   """
140   if instances:
141     wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
142   else:
143     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
144   return wanted
145
146
147 def _RunPostHook(lu, node_name):
148   """Runs the post-hook for an opcode on a single node.
149
150   """
151   hm = lu.proc.BuildHooksManager(lu)
152   try:
153     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
154   except Exception, err: # pylint: disable=W0703
155     lu.LogWarning("Errors occurred running hooks on %s: %s",
156                   node_name, err)
157
158
159 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160   """Distribute additional files which are part of the cluster configuration.
161
162   ConfigWriter takes care of distributing the config and ssconf files, but
163   there are more files which should be distributed to all nodes. This function
164   makes sure those are copied.
165
166   @param lu: calling logical unit
167   @param additional_nodes: list of nodes not in the config to distribute to
168   @type additional_vm: boolean
169   @param additional_vm: whether the additional nodes are vm-capable or not
170
171   """
172   # Gather target nodes
173   cluster = lu.cfg.GetClusterInfo()
174   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
175
176   online_nodes = lu.cfg.GetOnlineNodeList()
177   online_set = frozenset(online_nodes)
178   vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
179
180   if additional_nodes is not None:
181     online_nodes.extend(additional_nodes)
182     if additional_vm:
183       vm_nodes.extend(additional_nodes)
184
185   # Never distribute to master node
186   for nodelist in [online_nodes, vm_nodes]:
187     if master_info.name in nodelist:
188       nodelist.remove(master_info.name)
189
190   # Gather file lists
191   (files_all, _, files_mc, files_vm) = \
192     _ComputeAncillaryFiles(cluster, True)
193
194   # Never re-distribute configuration file from here
195   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
196               pathutils.CLUSTER_CONF_FILE in files_vm)
197   assert not files_mc, "Master candidates not handled in this function"
198
199   filemap = [
200     (online_nodes, files_all),
201     (vm_nodes, files_vm),
202     ]
203
204   # Upload the files
205   for (node_list, files) in filemap:
206     for fname in files:
207       _UploadHelper(lu, node_list, fname)
208
209
210 def _ComputeAncillaryFiles(cluster, redist):
211   """Compute files external to Ganeti which need to be consistent.
212
213   @type redist: boolean
214   @param redist: Whether to include files which need to be redistributed
215
216   """
217   # Compute files for all nodes
218   files_all = set([
219     pathutils.SSH_KNOWN_HOSTS_FILE,
220     pathutils.CONFD_HMAC_KEY,
221     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
222     pathutils.SPICE_CERT_FILE,
223     pathutils.SPICE_CACERT_FILE,
224     pathutils.RAPI_USERS_FILE,
225     ])
226
227   if redist:
228     # we need to ship at least the RAPI certificate
229     files_all.add(pathutils.RAPI_CERT_FILE)
230   else:
231     files_all.update(pathutils.ALL_CERT_FILES)
232     files_all.update(ssconf.SimpleStore().GetFileList())
233
234   if cluster.modify_etc_hosts:
235     files_all.add(pathutils.ETC_HOSTS)
236
237   if cluster.use_external_mip_script:
238     files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
239
240   # Files which are optional, these must:
241   # - be present in one other category as well
242   # - either exist or not exist on all nodes of that category (mc, vm all)
243   files_opt = set([
244     pathutils.RAPI_USERS_FILE,
245     ])
246
247   # Files which should only be on master candidates
248   files_mc = set()
249
250   if not redist:
251     files_mc.add(pathutils.CLUSTER_CONF_FILE)
252
253   # File storage
254   if (not redist and (constants.ENABLE_FILE_STORAGE or
255                         constants.ENABLE_SHARED_FILE_STORAGE)):
256     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
257     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
258
259   # Files which should only be on VM-capable nodes
260   files_vm = set(
261     filename
262     for hv_name in cluster.enabled_hypervisors
263     for filename in
264     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
265
266   files_opt |= set(
267     filename
268     for hv_name in cluster.enabled_hypervisors
269     for filename in
270     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
271
272   # Filenames in each category must be unique
273   all_files_set = files_all | files_mc | files_vm
274   assert (len(all_files_set) ==
275           sum(map(len, [files_all, files_mc, files_vm]))), \
276     "Found file listed in more than one file list"
277
278   # Optional files must be present in one other category
279   assert all_files_set.issuperset(files_opt), \
280     "Optional file not in a different required list"
281
282   # This one file should never ever be re-distributed via RPC
283   assert not (redist and
284               pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
285
286   return (files_all, files_opt, files_mc, files_vm)
287
288
289 def _UploadHelper(lu, nodes, fname):
290   """Helper for uploading a file and showing warnings.
291
292   """
293   if os.path.exists(fname):
294     result = lu.rpc.call_upload_file(nodes, fname)
295     for to_node, to_result in result.items():
296       msg = to_result.fail_msg
297       if msg:
298         msg = ("Copy of file %s to node %s failed: %s" %
299                (fname, to_node, msg))
300         lu.LogWarning(msg)
301
302
303 def _MergeAndVerifyHvState(op_input, obj_input):
304   """Combines the hv state from an opcode with the one of the object
305
306   @param op_input: The input dict from the opcode
307   @param obj_input: The input dict from the objects
308   @return: The verified and updated dict
309
310   """
311   if op_input:
312     invalid_hvs = set(op_input) - constants.HYPER_TYPES
313     if invalid_hvs:
314       raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
315                                  " %s" % utils.CommaJoin(invalid_hvs),
316                                  errors.ECODE_INVAL)
317     if obj_input is None:
318       obj_input = {}
319     type_check = constants.HVSTS_PARAMETER_TYPES
320     return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
321
322   return None
323
324
325 def _MergeAndVerifyDiskState(op_input, obj_input):
326   """Combines the disk state from an opcode with the one of the object
327
328   @param op_input: The input dict from the opcode
329   @param obj_input: The input dict from the objects
330   @return: The verified and updated dict
331   """
332   if op_input:
333     invalid_dst = set(op_input) - constants.DS_VALID_TYPES
334     if invalid_dst:
335       raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
336                                  utils.CommaJoin(invalid_dst),
337                                  errors.ECODE_INVAL)
338     type_check = constants.DSS_PARAMETER_TYPES
339     if obj_input is None:
340       obj_input = {}
341     return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
342                                               type_check))
343                 for key, value in op_input.items())
344
345   return None
346
347
348 def _CheckOSParams(lu, required, nodenames, osname, osparams):
349   """OS parameters validation.
350
351   @type lu: L{LogicalUnit}
352   @param lu: the logical unit for which we check
353   @type required: boolean
354   @param required: whether the validation should fail if the OS is not
355       found
356   @type nodenames: list
357   @param nodenames: the list of nodes on which we should check
358   @type osname: string
359   @param osname: the name of the hypervisor we should use
360   @type osparams: dict
361   @param osparams: the parameters which we need to check
362   @raise errors.OpPrereqError: if the parameters are not valid
363
364   """
365   nodenames = _FilterVmNodes(lu, nodenames)
366   result = lu.rpc.call_os_validate(nodenames, required, osname,
367                                    [constants.OS_VALIDATE_PARAMETERS],
368                                    osparams)
369   for node, nres in result.items():
370     # we don't check for offline cases since this should be run only
371     # against the master node and/or an instance's nodes
372     nres.Raise("OS Parameters validation failed on node %s" % node)
373     if not nres.payload:
374       lu.LogInfo("OS %s not found on node %s, validation skipped",
375                  osname, node)
376
377
378 def _CheckHVParams(lu, nodenames, hvname, hvparams):
379   """Hypervisor parameter validation.
380
381   This function abstract the hypervisor parameter validation to be
382   used in both instance create and instance modify.
383
384   @type lu: L{LogicalUnit}
385   @param lu: the logical unit for which we check
386   @type nodenames: list
387   @param nodenames: the list of nodes on which we should check
388   @type hvname: string
389   @param hvname: the name of the hypervisor we should use
390   @type hvparams: dict
391   @param hvparams: the parameters which we need to check
392   @raise errors.OpPrereqError: if the parameters are not valid
393
394   """
395   nodenames = _FilterVmNodes(lu, nodenames)
396
397   cluster = lu.cfg.GetClusterInfo()
398   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
399
400   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
401   for node in nodenames:
402     info = hvinfo[node]
403     if info.offline:
404       continue
405     info.Raise("Hypervisor parameter validation failed on node %s" % node)
406
407
408 def _AdjustCandidatePool(lu, exceptions):
409   """Adjust the candidate pool after node operations.
410
411   """
412   mod_list = lu.cfg.MaintainCandidatePool(exceptions)
413   if mod_list:
414     lu.LogInfo("Promoted nodes to master candidate role: %s",
415                utils.CommaJoin(node.name for node in mod_list))
416     for name in mod_list:
417       lu.context.ReaddNode(name)
418   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
419   if mc_now > mc_max:
420     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
421                (mc_now, mc_max))
422
423
424 def _CheckNodePVs(nresult, exclusive_storage):
425   """Check node PVs.
426
427   """
428   pvlist_dict = nresult.get(constants.NV_PVLIST, None)
429   if pvlist_dict is None:
430     return (["Can't get PV list from node"], None)
431   pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
432   errlist = []
433   # check that ':' is not present in PV names, since it's a
434   # special character for lvcreate (denotes the range of PEs to
435   # use on the PV)
436   for pv in pvlist:
437     if ":" in pv.name:
438       errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
439                      (pv.name, pv.vg_name))
440   es_pvinfo = None
441   if exclusive_storage:
442     (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
443     errlist.extend(errmsgs)
444     shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
445     if shared_pvs:
446       for (pvname, lvlist) in shared_pvs:
447         # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
448         errlist.append("PV %s is shared among unrelated LVs (%s)" %
449                        (pvname, utils.CommaJoin(lvlist)))
450   return (errlist, es_pvinfo)
451
452
453 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454   """Computes if value is in the desired range.
455
456   @param name: name of the parameter for which we perform the check
457   @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
458       not just 'disk')
459   @param ispecs: dictionary containing min and max values
460   @param value: actual value that we want to use
461   @return: None or an error string
462
463   """
464   if value in [None, constants.VALUE_AUTO]:
465     return None
466   max_v = ispecs[constants.ISPECS_MAX].get(name, value)
467   min_v = ispecs[constants.ISPECS_MIN].get(name, value)
468   if value > max_v or min_v > value:
469     if qualifier:
470       fqn = "%s/%s" % (name, qualifier)
471     else:
472       fqn = name
473     return ("%s value %s is not in range [%s, %s]" %
474             (fqn, value, min_v, max_v))
475   return None
476
477
478 def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
479                                  nic_count, disk_sizes, spindle_use,
480                                  disk_template,
481                                  _compute_fn=_ComputeMinMaxSpec):
482   """Verifies ipolicy against provided specs.
483
484   @type ipolicy: dict
485   @param ipolicy: The ipolicy
486   @type mem_size: int
487   @param mem_size: The memory size
488   @type cpu_count: int
489   @param cpu_count: Used cpu cores
490   @type disk_count: int
491   @param disk_count: Number of disks used
492   @type nic_count: int
493   @param nic_count: Number of nics used
494   @type disk_sizes: list of ints
495   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
496   @type spindle_use: int
497   @param spindle_use: The number of spindles this instance uses
498   @type disk_template: string
499   @param disk_template: The disk template of the instance
500   @param _compute_fn: The compute function (unittest only)
501   @return: A list of violations, or an empty list of no violations are found
502
503   """
504   assert disk_count == len(disk_sizes)
505
506   test_settings = [
507     (constants.ISPEC_MEM_SIZE, "", mem_size),
508     (constants.ISPEC_CPU_COUNT, "", cpu_count),
509     (constants.ISPEC_NIC_COUNT, "", nic_count),
510     (constants.ISPEC_SPINDLE_USE, "", spindle_use),
511     ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
512          for idx, d in enumerate(disk_sizes)]
513   if disk_template != constants.DT_DISKLESS:
514     # This check doesn't make sense for diskless instances
515     test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
516   ret = []
517   allowed_dts = ipolicy[constants.IPOLICY_DTS]
518   if disk_template not in allowed_dts:
519     ret.append("Disk template %s is not allowed (allowed templates: %s)" %
520                (disk_template, utils.CommaJoin(allowed_dts)))
521
522   min_errs = None
523   for minmax in ipolicy[constants.ISPECS_MINMAX]:
524     errs = filter(None,
525                   (_compute_fn(name, qualifier, minmax, value)
526                    for (name, qualifier, value) in test_settings))
527     if min_errs is None or len(errs) < len(min_errs):
528       min_errs = errs
529   assert min_errs is not None
530   return ret + min_errs
531
532
533 def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
534                                      _compute_fn=_ComputeIPolicySpecViolation):
535   """Compute if instance meets the specs of ipolicy.
536
537   @type ipolicy: dict
538   @param ipolicy: The ipolicy to verify against
539   @type instance: L{objects.Instance}
540   @param instance: The instance to verify
541   @type cfg: L{config.ConfigWriter}
542   @param cfg: Cluster configuration
543   @param _compute_fn: The function to verify ipolicy (unittest only)
544   @see: L{_ComputeIPolicySpecViolation}
545
546   """
547   be_full = cfg.GetClusterInfo().FillBE(instance)
548   mem_size = be_full[constants.BE_MAXMEM]
549   cpu_count = be_full[constants.BE_VCPUS]
550   spindle_use = be_full[constants.BE_SPINDLE_USE]
551   disk_count = len(instance.disks)
552   disk_sizes = [disk.size for disk in instance.disks]
553   nic_count = len(instance.nics)
554   disk_template = instance.disk_template
555
556   return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
557                      disk_sizes, spindle_use, disk_template)
558
559
560 def _ComputeViolatingInstances(ipolicy, instances, cfg):
561   """Computes a set of instances who violates given ipolicy.
562
563   @param ipolicy: The ipolicy to verify
564   @type instances: L{objects.Instance}
565   @param instances: List of instances to verify
566   @type cfg: L{config.ConfigWriter}
567   @param cfg: Cluster configuration
568   @return: A frozenset of instance names violating the ipolicy
569
570   """
571   return frozenset([inst.name for inst in instances
572                     if _ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
573
574
575 def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
576   """Computes a set of any instances that would violate the new ipolicy.
577
578   @param old_ipolicy: The current (still in-place) ipolicy
579   @param new_ipolicy: The new (to become) ipolicy
580   @param instances: List of instances to verify
581   @type cfg: L{config.ConfigWriter}
582   @param cfg: Cluster configuration
583   @return: A list of instances which violates the new ipolicy but
584       did not before
585
586   """
587   return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
588           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
589
590
591 def _GetUpdatedParams(old_params, update_dict,
592                       use_default=True, use_none=False):
593   """Return the new version of a parameter dictionary.
594
595   @type old_params: dict
596   @param old_params: old parameters
597   @type update_dict: dict
598   @param update_dict: dict containing new parameter values, or
599       constants.VALUE_DEFAULT to reset the parameter to its default
600       value
601   @param use_default: boolean
602   @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
603       values as 'to be deleted' values
604   @param use_none: boolean
605   @type use_none: whether to recognise C{None} values as 'to be
606       deleted' values
607   @rtype: dict
608   @return: the new parameter dictionary
609
610   """
611   params_copy = copy.deepcopy(old_params)
612   for key, val in update_dict.iteritems():
613     if ((use_default and val == constants.VALUE_DEFAULT) or
614           (use_none and val is None)):
615       try:
616         del params_copy[key]
617       except KeyError:
618         pass
619     else:
620       params_copy[key] = val
621   return params_copy
622
623
624 def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
625   """Return the new version of an instance policy.
626
627   @param group_policy: whether this policy applies to a group and thus
628     we should support removal of policy entries
629
630   """
631   ipolicy = copy.deepcopy(old_ipolicy)
632   for key, value in new_ipolicy.items():
633     if key not in constants.IPOLICY_ALL_KEYS:
634       raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
635                                  errors.ECODE_INVAL)
636     if (not value or value == [constants.VALUE_DEFAULT] or
637             value == constants.VALUE_DEFAULT):
638       if group_policy:
639         if key in ipolicy:
640           del ipolicy[key]
641       else:
642         raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
643                                    " on the cluster'" % key,
644                                    errors.ECODE_INVAL)
645     else:
646       if key in constants.IPOLICY_PARAMETERS:
647         # FIXME: we assume all such values are float
648         try:
649           ipolicy[key] = float(value)
650         except (TypeError, ValueError), err:
651           raise errors.OpPrereqError("Invalid value for attribute"
652                                      " '%s': '%s', error: %s" %
653                                      (key, value, err), errors.ECODE_INVAL)
654       elif key == constants.ISPECS_MINMAX:
655         for minmax in value:
656           for k in minmax.keys():
657             utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
658         ipolicy[key] = value
659       elif key == constants.ISPECS_STD:
660         if group_policy:
661           msg = "%s cannot appear in group instance specs" % key
662           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
663         ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
664                                          use_none=False, use_default=False)
665         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
666       else:
667         # FIXME: we assume all others are lists; this should be redone
668         # in a nicer way
669         ipolicy[key] = list(value)
670   try:
671     objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
672   except errors.ConfigurationError, err:
673     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
674                                errors.ECODE_INVAL)
675   return ipolicy
676
677
678 def _AnnotateDiskParams(instance, devs, cfg):
679   """Little helper wrapper to the rpc annotation method.
680
681   @param instance: The instance object
682   @type devs: List of L{objects.Disk}
683   @param devs: The root devices (not any of its children!)
684   @param cfg: The config object
685   @returns The annotated disk copies
686   @see L{rpc.AnnotateDiskParams}
687
688   """
689   return rpc.AnnotateDiskParams(instance.disk_template, devs,
690                                 cfg.GetInstanceDiskParams(instance))
691
692
693 def _SupportsOob(cfg, node):
694   """Tells if node supports OOB.
695
696   @type cfg: L{config.ConfigWriter}
697   @param cfg: The cluster configuration
698   @type node: L{objects.Node}
699   @param node: The node
700   @return: The OOB script if supported or an empty string otherwise
701
702   """
703   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
704
705
706 def _UpdateAndVerifySubDict(base, updates, type_check):
707   """Updates and verifies a dict with sub dicts of the same type.
708
709   @param base: The dict with the old data
710   @param updates: The dict with the new data
711   @param type_check: Dict suitable to ForceDictType to verify correct types
712   @returns: A new dict with updated and verified values
713
714   """
715   def fn(old, value):
716     new = _GetUpdatedParams(old, value)
717     utils.ForceDictType(new, type_check)
718     return new
719
720   ret = copy.deepcopy(base)
721   ret.update(dict((key, fn(base.get(key, {}), value))
722                   for key, value in updates.items()))
723   return ret
724
725
726 def _FilterVmNodes(lu, nodenames):
727   """Filters out non-vm_capable nodes from a list.
728
729   @type lu: L{LogicalUnit}
730   @param lu: the logical unit for which we check
731   @type nodenames: list
732   @param nodenames: the list of nodes on which we should check
733   @rtype: list
734   @return: the list of vm-capable nodes
735
736   """
737   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
738   return [name for name in nodenames if name not in vm_nodes]
739
740
741 def _GetDefaultIAllocator(cfg, ialloc):
742   """Decides on which iallocator to use.
743
744   @type cfg: L{config.ConfigWriter}
745   @param cfg: Cluster configuration object
746   @type ialloc: string or None
747   @param ialloc: Iallocator specified in opcode
748   @rtype: string
749   @return: Iallocator name
750
751   """
752   if not ialloc:
753     # Use default iallocator
754     ialloc = cfg.GetDefaultIAllocator()
755
756   if not ialloc:
757     raise errors.OpPrereqError("No iallocator was specified, neither in the"
758                                " opcode nor as a cluster-wide default",
759                                errors.ECODE_INVAL)
760
761   return ialloc
762
763
764 def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
765                               cur_group_uuid):
766   """Checks if node groups for locked instances are still correct.
767
768   @type cfg: L{config.ConfigWriter}
769   @param cfg: Cluster configuration
770   @type instances: dict; string as key, L{objects.Instance} as value
771   @param instances: Dictionary, instance name as key, instance object as value
772   @type owned_groups: iterable of string
773   @param owned_groups: List of owned groups
774   @type owned_nodes: iterable of string
775   @param owned_nodes: List of owned nodes
776   @type cur_group_uuid: string or None
777   @param cur_group_uuid: Optional group UUID to check against instance's groups
778
779   """
780   for (name, inst) in instances.items():
781     assert owned_nodes.issuperset(inst.all_nodes), \
782       "Instance %s's nodes changed while we kept the lock" % name
783
784     inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
785
786     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
787       "Instance %s has no node in group %s" % (name, cur_group_uuid)
788
789
790 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
791                              primary_only=False):
792   """Checks if the owned node groups are still correct for an instance.
793
794   @type cfg: L{config.ConfigWriter}
795   @param cfg: The cluster configuration
796   @type instance_name: string
797   @param instance_name: Instance name
798   @type owned_groups: set or frozenset
799   @param owned_groups: List of currently owned node groups
800   @type primary_only: boolean
801   @param primary_only: Whether to check node groups for only the primary node
802
803   """
804   inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
805
806   if not owned_groups.issuperset(inst_groups):
807     raise errors.OpPrereqError("Instance %s's node groups changed since"
808                                " locks were acquired, current groups are"
809                                " are '%s', owning groups '%s'; retry the"
810                                " operation" %
811                                (instance_name,
812                                 utils.CommaJoin(inst_groups),
813                                 utils.CommaJoin(owned_groups)),
814                                errors.ECODE_STATE)
815
816   return inst_groups
817
818
819 def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
820   """Unpacks the result of change-group and node-evacuate iallocator requests.
821
822   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
823   L{constants.IALLOCATOR_MODE_CHG_GROUP}.
824
825   @type lu: L{LogicalUnit}
826   @param lu: Logical unit instance
827   @type alloc_result: tuple/list
828   @param alloc_result: Result from iallocator
829   @type early_release: bool
830   @param early_release: Whether to release locks early if possible
831   @type use_nodes: bool
832   @param use_nodes: Whether to display node names instead of groups
833
834   """
835   (moved, failed, jobs) = alloc_result
836
837   if failed:
838     failreason = utils.CommaJoin("%s (%s)" % (name, reason)
839                                  for (name, reason) in failed)
840     lu.LogWarning("Unable to evacuate instances %s", failreason)
841     raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
842
843   if moved:
844     lu.LogInfo("Instances to be moved: %s",
845                utils.CommaJoin("%s (to %s)" %
846                                (name, _NodeEvacDest(use_nodes, group, nodes))
847                                for (name, group, nodes) in moved))
848
849   return [map(compat.partial(_SetOpEarlyRelease, early_release),
850               map(opcodes.OpCode.LoadOpCode, ops))
851           for ops in jobs]
852
853
854 def _NodeEvacDest(use_nodes, group, nodes):
855   """Returns group or nodes depending on caller's choice.
856
857   """
858   if use_nodes:
859     return utils.CommaJoin(nodes)
860   else:
861     return group
862
863
864 def _SetOpEarlyRelease(early_release, op):
865   """Sets C{early_release} flag on opcodes if available.
866
867   """
868   try:
869     op.early_release = early_release
870   except AttributeError:
871     assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
872
873   return op
874
875
876 def _MapInstanceDisksToNodes(instances):
877   """Creates a map from (node, volume) to instance name.
878
879   @type instances: list of L{objects.Instance}
880   @rtype: dict; tuple of (node name, volume name) as key, instance name as value
881
882   """
883   return dict(((node, vol), inst.name)
884               for inst in instances
885               for (node, vols) in inst.MapLVsByNode().items()
886               for vol in vols)
887
888
889 def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
890   """Make sure that none of the given paramters is global.
891
892   If a global parameter is found, an L{errors.OpPrereqError} exception is
893   raised. This is used to avoid setting global parameters for individual nodes.
894
895   @type params: dictionary
896   @param params: Parameters to check
897   @type glob_pars: dictionary
898   @param glob_pars: Forbidden parameters
899   @type kind: string
900   @param kind: Kind of parameters (e.g. "node")
901   @type bad_levels: string
902   @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
903       "instance")
904   @type good_levels: strings
905   @param good_levels: Level(s) at which the parameters are allowed (e.g.
906       "cluster or group")
907
908   """
909   used_globals = glob_pars.intersection(params)
910   if used_globals:
911     msg = ("The following %s parameters are global and cannot"
912            " be customized at %s level, please modify them at"
913            " %s level: %s" %
914            (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
915     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
916
917
918 def _IsExclusiveStorageEnabledNode(cfg, node):
919   """Whether exclusive_storage is in effect for the given node.
920
921   @type cfg: L{config.ConfigWriter}
922   @param cfg: The cluster configuration
923   @type node: L{objects.Node}
924   @param node: The node
925   @rtype: bool
926   @return: The effective value of exclusive_storage
927
928   """
929   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
930
931
932 def _CheckInstanceState(lu, instance, req_states, msg=None):
933   """Ensure that an instance is in one of the required states.
934
935   @param lu: the LU on behalf of which we make the check
936   @param instance: the instance to check
937   @param msg: if passed, should be a message to replace the default one
938   @raise errors.OpPrereqError: if the instance is not in the required state
939
940   """
941   if msg is None:
942     msg = ("can't use instance from outside %s states" %
943            utils.CommaJoin(req_states))
944   if instance.admin_state not in req_states:
945     raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
946                                (instance.name, instance.admin_state, msg),
947                                errors.ECODE_STATE)
948
949   if constants.ADMINST_UP not in req_states:
950     pnode = instance.primary_node
951     if not lu.cfg.GetNodeInfo(pnode).offline:
952       ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
953       ins_l.Raise("Can't contact node %s for instance information" % pnode,
954                   prereq=True, ecode=errors.ECODE_ENVIRON)
955       if instance.name in ins_l.payload:
956         raise errors.OpPrereqError("Instance %s is running, %s" %
957                                    (instance.name, msg), errors.ECODE_STATE)
958     else:
959       lu.LogWarning("Primary node offline, ignoring check that instance"
960                      " is down")
961
962
963 def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
964   """Check the sanity of iallocator and node arguments and use the
965   cluster-wide iallocator if appropriate.
966
967   Check that at most one of (iallocator, node) is specified. If none is
968   specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
969   then the LU's opcode's iallocator slot is filled with the cluster-wide
970   default iallocator.
971
972   @type iallocator_slot: string
973   @param iallocator_slot: the name of the opcode iallocator slot
974   @type node_slot: string
975   @param node_slot: the name of the opcode target node slot
976
977   """
978   node = getattr(lu.op, node_slot, None)
979   ialloc = getattr(lu.op, iallocator_slot, None)
980   if node == []:
981     node = None
982
983   if node is not None and ialloc is not None:
984     raise errors.OpPrereqError("Do not specify both, iallocator and node",
985                                errors.ECODE_INVAL)
986   elif ((node is None and ialloc is None) or
987         ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
988     default_iallocator = lu.cfg.GetDefaultIAllocator()
989     if default_iallocator:
990       setattr(lu.op, iallocator_slot, default_iallocator)
991     else:
992       raise errors.OpPrereqError("No iallocator or node given and no"
993                                  " cluster-wide default iallocator found;"
994                                  " please specify either an iallocator or a"
995                                  " node, or set a cluster-wide default"
996                                  " iallocator", errors.ECODE_INVAL)
997
998
999 def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1000   faulty = []
1001
1002   for dev in instance.disks:
1003     cfg.SetDiskID(dev, node_name)
1004
1005   result = rpc_runner.call_blockdev_getmirrorstatus(node_name,
1006                                                     (instance.disks,
1007                                                      instance))
1008   result.Raise("Failed to get disk status from node %s" % node_name,
1009                prereq=prereq, ecode=errors.ECODE_ENVIRON)
1010
1011   for idx, bdev_status in enumerate(result.payload):
1012     if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1013       faulty.append(idx)
1014
1015   return faulty
1016
1017
1018 def _CheckNodeOnline(lu, node, msg=None):
1019   """Ensure that a given node is online.
1020
1021   @param lu: the LU on behalf of which we make the check
1022   @param node: the node to check
1023   @param msg: if passed, should be a message to replace the default one
1024   @raise errors.OpPrereqError: if the node is offline
1025
1026   """
1027   if msg is None:
1028     msg = "Can't use offline node"
1029   if lu.cfg.GetNodeInfo(node).offline:
1030     raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)