Index nodes by their UUID
[ganeti-local] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Common functions used by multiple logical units."""
23
24 import copy
25 import os
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
38
39
40 # States of instance
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47   constants.ADMINST_OFFLINE,
48   ]))
49
50
51 def _ExpandItemName(expand_fn, name, kind):
52   """Expand an item name.
53
54   @param expand_fn: the function to use for expansion
55   @param name: requested item name
56   @param kind: text description ('Node' or 'Instance')
57   @return: the result of the expand_fn, if successful
58   @raise errors.OpPrereqError: if the item is not found
59
60   """
61   full_name = expand_fn(name)
62   if full_name is None:
63     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64                                errors.ECODE_NOENT)
65   return full_name
66
67
68 def ExpandInstanceName(cfg, name):
69   """Wrapper over L{_ExpandItemName} for instance."""
70   return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71
72
73 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
74   """Expand a short node name into the node UUID and full name.
75
76   @type cfg: L{config.ConfigWriter}
77   @param cfg: The cluster configuration
78   @type expected_uuid: string
79   @param expected_uuid: expected UUID for the node (or None if there is no
80         expectation). If it does not match, a L{errors.OpPrereqError} is
81         raised.
82   @type name: string
83   @param name: the short node name
84
85   """
86   (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
87   if expected_uuid is not None and uuid != expected_uuid:
88     raise errors.OpPrereqError(
89       "The nodes UUID '%s' does not match the expected UUID '%s' for node"
90       " '%s'. Maybe the node changed since you submitted this job." %
91       (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
92   return (uuid, full_name)
93
94
95 def ShareAll():
96   """Returns a dict declaring all lock levels shared.
97
98   """
99   return dict.fromkeys(locking.LEVELS, 1)
100
101
102 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
103   """Checks if the instances in a node group are still correct.
104
105   @type cfg: L{config.ConfigWriter}
106   @param cfg: The cluster configuration
107   @type group_uuid: string
108   @param group_uuid: Node group UUID
109   @type owned_instances: set or frozenset
110   @param owned_instances: List of currently owned instances
111
112   """
113   wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
114   if owned_instances != wanted_instances:
115     raise errors.OpPrereqError("Instances in node group '%s' changed since"
116                                " locks were acquired, wanted '%s', have '%s';"
117                                " retry the operation" %
118                                (group_uuid,
119                                 utils.CommaJoin(wanted_instances),
120                                 utils.CommaJoin(owned_instances)),
121                                errors.ECODE_STATE)
122
123   return wanted_instances
124
125
126 def GetWantedNodes(lu, short_node_names):
127   """Returns list of checked and expanded node names.
128
129   @type lu: L{LogicalUnit}
130   @param lu: the logical unit on whose behalf we execute
131   @type short_node_names: list
132   @param short_node_names: list of node names or None for all nodes
133   @rtype: tuple of lists
134   @return: tupe with (list of node UUIDs, list of node names)
135   @raise errors.ProgrammerError: if the nodes parameter is wrong type
136
137   """
138   if short_node_names:
139     node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
140                   for name in short_node_names]
141   else:
142     node_uuids = lu.cfg.GetNodeList()
143
144   return (node_uuids, [lu.cfg.GetNodeInfo(uuid).name for uuid in node_uuids])
145
146
147 def GetWantedInstances(lu, instances):
148   """Returns list of checked and expanded instance names.
149
150   @type lu: L{LogicalUnit}
151   @param lu: the logical unit on whose behalf we execute
152   @type instances: list
153   @param instances: list of instance names or None for all instances
154   @rtype: list
155   @return: the list of instances, sorted
156   @raise errors.OpPrereqError: if the instances parameter is wrong type
157   @raise errors.OpPrereqError: if any of the passed instances is not found
158
159   """
160   if instances:
161     wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
162   else:
163     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
164   return wanted
165
166
167 def RunPostHook(lu, node_name):
168   """Runs the post-hook for an opcode on a single node.
169
170   """
171   hm = lu.proc.BuildHooksManager(lu)
172   try:
173     node_names = [node_name]
174     hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=node_names)
175   except Exception, err: # pylint: disable=W0703
176     lu.LogWarning("Errors occurred running hooks on %s: %s",
177                   node_name, err)
178
179
180 def RedistributeAncillaryFiles(lu):
181   """Distribute additional files which are part of the cluster configuration.
182
183   ConfigWriter takes care of distributing the config and ssconf files, but
184   there are more files which should be distributed to all nodes. This function
185   makes sure those are copied.
186
187   """
188   # Gather target nodes
189   cluster = lu.cfg.GetClusterInfo()
190   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
191
192   online_node_uuids = lu.cfg.GetOnlineNodeList()
193   online_node_uuid_set = frozenset(online_node_uuids)
194   vm_node_uuids = list(online_node_uuid_set.intersection(
195                          lu.cfg.GetVmCapableNodeList()))
196
197   # Never distribute to master node
198   for node_uuids in [online_node_uuids, vm_node_uuids]:
199     if master_info.uuid in node_uuids:
200       node_uuids.remove(master_info.uuid)
201
202   # Gather file lists
203   (files_all, _, files_mc, files_vm) = \
204     ComputeAncillaryFiles(cluster, True)
205
206   # Never re-distribute configuration file from here
207   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
208               pathutils.CLUSTER_CONF_FILE in files_vm)
209   assert not files_mc, "Master candidates not handled in this function"
210
211   filemap = [
212     (online_node_uuids, files_all),
213     (vm_node_uuids, files_vm),
214     ]
215
216   # Upload the files
217   for (node_uuids, files) in filemap:
218     for fname in files:
219       UploadHelper(lu, node_uuids, fname)
220
221
222 def ComputeAncillaryFiles(cluster, redist):
223   """Compute files external to Ganeti which need to be consistent.
224
225   @type redist: boolean
226   @param redist: Whether to include files which need to be redistributed
227
228   """
229   # Compute files for all nodes
230   files_all = set([
231     pathutils.SSH_KNOWN_HOSTS_FILE,
232     pathutils.CONFD_HMAC_KEY,
233     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
234     pathutils.SPICE_CERT_FILE,
235     pathutils.SPICE_CACERT_FILE,
236     pathutils.RAPI_USERS_FILE,
237     ])
238
239   if redist:
240     # we need to ship at least the RAPI certificate
241     files_all.add(pathutils.RAPI_CERT_FILE)
242   else:
243     files_all.update(pathutils.ALL_CERT_FILES)
244     files_all.update(ssconf.SimpleStore().GetFileList())
245
246   if cluster.modify_etc_hosts:
247     files_all.add(pathutils.ETC_HOSTS)
248
249   if cluster.use_external_mip_script:
250     files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
251
252   # Files which are optional, these must:
253   # - be present in one other category as well
254   # - either exist or not exist on all nodes of that category (mc, vm all)
255   files_opt = set([
256     pathutils.RAPI_USERS_FILE,
257     ])
258
259   # Files which should only be on master candidates
260   files_mc = set()
261
262   if not redist:
263     files_mc.add(pathutils.CLUSTER_CONF_FILE)
264
265   # File storage
266   if (not redist and (constants.ENABLE_FILE_STORAGE or
267                         constants.ENABLE_SHARED_FILE_STORAGE)):
268     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
269     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
270
271   # Files which should only be on VM-capable nodes
272   files_vm = set(
273     filename
274     for hv_name in cluster.enabled_hypervisors
275     for filename in
276     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
277
278   files_opt |= set(
279     filename
280     for hv_name in cluster.enabled_hypervisors
281     for filename in
282     hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
283
284   # Filenames in each category must be unique
285   all_files_set = files_all | files_mc | files_vm
286   assert (len(all_files_set) ==
287           sum(map(len, [files_all, files_mc, files_vm]))), \
288     "Found file listed in more than one file list"
289
290   # Optional files must be present in one other category
291   assert all_files_set.issuperset(files_opt), \
292     "Optional file not in a different required list"
293
294   # This one file should never ever be re-distributed via RPC
295   assert not (redist and
296               pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
297
298   return (files_all, files_opt, files_mc, files_vm)
299
300
301 def UploadHelper(lu, node_uuids, fname):
302   """Helper for uploading a file and showing warnings.
303
304   """
305   if os.path.exists(fname):
306     result = lu.rpc.call_upload_file(node_uuids, fname)
307     for to_node_uuids, to_result in result.items():
308       msg = to_result.fail_msg
309       if msg:
310         msg = ("Copy of file %s to node %s failed: %s" %
311                (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
312         lu.LogWarning(msg)
313
314
315 def MergeAndVerifyHvState(op_input, obj_input):
316   """Combines the hv state from an opcode with the one of the object
317
318   @param op_input: The input dict from the opcode
319   @param obj_input: The input dict from the objects
320   @return: The verified and updated dict
321
322   """
323   if op_input:
324     invalid_hvs = set(op_input) - constants.HYPER_TYPES
325     if invalid_hvs:
326       raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
327                                  " %s" % utils.CommaJoin(invalid_hvs),
328                                  errors.ECODE_INVAL)
329     if obj_input is None:
330       obj_input = {}
331     type_check = constants.HVSTS_PARAMETER_TYPES
332     return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
333
334   return None
335
336
337 def MergeAndVerifyDiskState(op_input, obj_input):
338   """Combines the disk state from an opcode with the one of the object
339
340   @param op_input: The input dict from the opcode
341   @param obj_input: The input dict from the objects
342   @return: The verified and updated dict
343   """
344   if op_input:
345     invalid_dst = set(op_input) - constants.DS_VALID_TYPES
346     if invalid_dst:
347       raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
348                                  utils.CommaJoin(invalid_dst),
349                                  errors.ECODE_INVAL)
350     type_check = constants.DSS_PARAMETER_TYPES
351     if obj_input is None:
352       obj_input = {}
353     return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
354                                               type_check))
355                 for key, value in op_input.items())
356
357   return None
358
359
360 def CheckOSParams(lu, required, node_uuids, osname, osparams):
361   """OS parameters validation.
362
363   @type lu: L{LogicalUnit}
364   @param lu: the logical unit for which we check
365   @type required: boolean
366   @param required: whether the validation should fail if the OS is not
367       found
368   @type node_uuids: list
369   @param node_uuids: the list of nodes on which we should check
370   @type osname: string
371   @param osname: the name of the hypervisor we should use
372   @type osparams: dict
373   @param osparams: the parameters which we need to check
374   @raise errors.OpPrereqError: if the parameters are not valid
375
376   """
377   node_uuids = _FilterVmNodes(lu, node_uuids)
378   result = lu.rpc.call_os_validate(node_uuids, required, osname,
379                                    [constants.OS_VALIDATE_PARAMETERS],
380                                    osparams)
381   for node_uuid, nres in result.items():
382     # we don't check for offline cases since this should be run only
383     # against the master node and/or an instance's nodes
384     nres.Raise("OS Parameters validation failed on node %s" %
385                lu.cfg.GetNodeName(node_uuid))
386     if not nres.payload:
387       lu.LogInfo("OS %s not found on node %s, validation skipped",
388                  osname, lu.cfg.GetNodeName(node_uuid))
389
390
391 def CheckHVParams(lu, node_uuids, hvname, hvparams):
392   """Hypervisor parameter validation.
393
394   This function abstract the hypervisor parameter validation to be
395   used in both instance create and instance modify.
396
397   @type lu: L{LogicalUnit}
398   @param lu: the logical unit for which we check
399   @type node_uuids: list
400   @param node_uuids: the list of nodes on which we should check
401   @type hvname: string
402   @param hvname: the name of the hypervisor we should use
403   @type hvparams: dict
404   @param hvparams: the parameters which we need to check
405   @raise errors.OpPrereqError: if the parameters are not valid
406
407   """
408   node_uuids = _FilterVmNodes(lu, node_uuids)
409
410   cluster = lu.cfg.GetClusterInfo()
411   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
412
413   hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
414   for node_uuid in node_uuids:
415     info = hvinfo[node_uuid]
416     if info.offline:
417       continue
418     info.Raise("Hypervisor parameter validation failed on node %s" %
419                lu.cfg.GetNodeName(node_uuid))
420
421
422 def AdjustCandidatePool(lu, exceptions):
423   """Adjust the candidate pool after node operations.
424
425   """
426   mod_list = lu.cfg.MaintainCandidatePool(exceptions)
427   if mod_list:
428     lu.LogInfo("Promoted nodes to master candidate role: %s",
429                utils.CommaJoin(node.name for node in mod_list))
430     for node in mod_list:
431       lu.context.ReaddNode(node)
432   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
433   if mc_now > mc_max:
434     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
435                (mc_now, mc_max))
436
437
438 def CheckNodePVs(nresult, exclusive_storage):
439   """Check node PVs.
440
441   """
442   pvlist_dict = nresult.get(constants.NV_PVLIST, None)
443   if pvlist_dict is None:
444     return (["Can't get PV list from node"], None)
445   pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
446   errlist = []
447   # check that ':' is not present in PV names, since it's a
448   # special character for lvcreate (denotes the range of PEs to
449   # use on the PV)
450   for pv in pvlist:
451     if ":" in pv.name:
452       errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
453                      (pv.name, pv.vg_name))
454   es_pvinfo = None
455   if exclusive_storage:
456     (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
457     errlist.extend(errmsgs)
458     shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
459     if shared_pvs:
460       for (pvname, lvlist) in shared_pvs:
461         # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
462         errlist.append("PV %s is shared among unrelated LVs (%s)" %
463                        (pvname, utils.CommaJoin(lvlist)))
464   return (errlist, es_pvinfo)
465
466
467 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
468   """Computes if value is in the desired range.
469
470   @param name: name of the parameter for which we perform the check
471   @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
472       not just 'disk')
473   @param ispecs: dictionary containing min and max values
474   @param value: actual value that we want to use
475   @return: None or an error string
476
477   """
478   if value in [None, constants.VALUE_AUTO]:
479     return None
480   max_v = ispecs[constants.ISPECS_MAX].get(name, value)
481   min_v = ispecs[constants.ISPECS_MIN].get(name, value)
482   if value > max_v or min_v > value:
483     if qualifier:
484       fqn = "%s/%s" % (name, qualifier)
485     else:
486       fqn = name
487     return ("%s value %s is not in range [%s, %s]" %
488             (fqn, value, min_v, max_v))
489   return None
490
491
492 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
493                                 nic_count, disk_sizes, spindle_use,
494                                 disk_template,
495                                 _compute_fn=_ComputeMinMaxSpec):
496   """Verifies ipolicy against provided specs.
497
498   @type ipolicy: dict
499   @param ipolicy: The ipolicy
500   @type mem_size: int
501   @param mem_size: The memory size
502   @type cpu_count: int
503   @param cpu_count: Used cpu cores
504   @type disk_count: int
505   @param disk_count: Number of disks used
506   @type nic_count: int
507   @param nic_count: Number of nics used
508   @type disk_sizes: list of ints
509   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
510   @type spindle_use: int
511   @param spindle_use: The number of spindles this instance uses
512   @type disk_template: string
513   @param disk_template: The disk template of the instance
514   @param _compute_fn: The compute function (unittest only)
515   @return: A list of violations, or an empty list of no violations are found
516
517   """
518   assert disk_count == len(disk_sizes)
519
520   test_settings = [
521     (constants.ISPEC_MEM_SIZE, "", mem_size),
522     (constants.ISPEC_CPU_COUNT, "", cpu_count),
523     (constants.ISPEC_NIC_COUNT, "", nic_count),
524     (constants.ISPEC_SPINDLE_USE, "", spindle_use),
525     ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
526          for idx, d in enumerate(disk_sizes)]
527   if disk_template != constants.DT_DISKLESS:
528     # This check doesn't make sense for diskless instances
529     test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
530   ret = []
531   allowed_dts = ipolicy[constants.IPOLICY_DTS]
532   if disk_template not in allowed_dts:
533     ret.append("Disk template %s is not allowed (allowed templates: %s)" %
534                (disk_template, utils.CommaJoin(allowed_dts)))
535
536   min_errs = None
537   for minmax in ipolicy[constants.ISPECS_MINMAX]:
538     errs = filter(None,
539                   (_compute_fn(name, qualifier, minmax, value)
540                    for (name, qualifier, value) in test_settings))
541     if min_errs is None or len(errs) < len(min_errs):
542       min_errs = errs
543   assert min_errs is not None
544   return ret + min_errs
545
546
547 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
548                                     _compute_fn=ComputeIPolicySpecViolation):
549   """Compute if instance meets the specs of ipolicy.
550
551   @type ipolicy: dict
552   @param ipolicy: The ipolicy to verify against
553   @type instance: L{objects.Instance}
554   @param instance: The instance to verify
555   @type cfg: L{config.ConfigWriter}
556   @param cfg: Cluster configuration
557   @param _compute_fn: The function to verify ipolicy (unittest only)
558   @see: L{ComputeIPolicySpecViolation}
559
560   """
561   ret = []
562   be_full = cfg.GetClusterInfo().FillBE(instance)
563   mem_size = be_full[constants.BE_MAXMEM]
564   cpu_count = be_full[constants.BE_VCPUS]
565   es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
566   if any(es_flags.values()):
567     # With exclusive storage use the actual spindles
568     try:
569       spindle_use = sum([disk.spindles for disk in instance.disks])
570     except TypeError:
571       ret.append("Number of spindles not configured for disks of instance %s"
572                  " while exclusive storage is enabled, try running gnt-cluster"
573                  " repair-disk-sizes" % instance.name)
574       # _ComputeMinMaxSpec ignores 'None's
575       spindle_use = None
576   else:
577     spindle_use = be_full[constants.BE_SPINDLE_USE]
578   disk_count = len(instance.disks)
579   disk_sizes = [disk.size for disk in instance.disks]
580   nic_count = len(instance.nics)
581   disk_template = instance.disk_template
582
583   return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
584                            disk_sizes, spindle_use, disk_template)
585
586
587 def _ComputeViolatingInstances(ipolicy, instances, cfg):
588   """Computes a set of instances who violates given ipolicy.
589
590   @param ipolicy: The ipolicy to verify
591   @type instances: L{objects.Instance}
592   @param instances: List of instances to verify
593   @type cfg: L{config.ConfigWriter}
594   @param cfg: Cluster configuration
595   @return: A frozenset of instance names violating the ipolicy
596
597   """
598   return frozenset([inst.name for inst in instances
599                     if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
600
601
602 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
603   """Computes a set of any instances that would violate the new ipolicy.
604
605   @param old_ipolicy: The current (still in-place) ipolicy
606   @param new_ipolicy: The new (to become) ipolicy
607   @param instances: List of instances to verify
608   @type cfg: L{config.ConfigWriter}
609   @param cfg: Cluster configuration
610   @return: A list of instances which violates the new ipolicy but
611       did not before
612
613   """
614   return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
615           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
616
617
618 def GetUpdatedParams(old_params, update_dict,
619                       use_default=True, use_none=False):
620   """Return the new version of a parameter dictionary.
621
622   @type old_params: dict
623   @param old_params: old parameters
624   @type update_dict: dict
625   @param update_dict: dict containing new parameter values, or
626       constants.VALUE_DEFAULT to reset the parameter to its default
627       value
628   @param use_default: boolean
629   @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
630       values as 'to be deleted' values
631   @param use_none: boolean
632   @type use_none: whether to recognise C{None} values as 'to be
633       deleted' values
634   @rtype: dict
635   @return: the new parameter dictionary
636
637   """
638   params_copy = copy.deepcopy(old_params)
639   for key, val in update_dict.iteritems():
640     if ((use_default and val == constants.VALUE_DEFAULT) or
641           (use_none and val is None)):
642       try:
643         del params_copy[key]
644       except KeyError:
645         pass
646     else:
647       params_copy[key] = val
648   return params_copy
649
650
651 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
652   """Return the new version of an instance policy.
653
654   @param group_policy: whether this policy applies to a group and thus
655     we should support removal of policy entries
656
657   """
658   ipolicy = copy.deepcopy(old_ipolicy)
659   for key, value in new_ipolicy.items():
660     if key not in constants.IPOLICY_ALL_KEYS:
661       raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
662                                  errors.ECODE_INVAL)
663     if (not value or value == [constants.VALUE_DEFAULT] or
664             value == constants.VALUE_DEFAULT):
665       if group_policy:
666         if key in ipolicy:
667           del ipolicy[key]
668       else:
669         raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
670                                    " on the cluster'" % key,
671                                    errors.ECODE_INVAL)
672     else:
673       if key in constants.IPOLICY_PARAMETERS:
674         # FIXME: we assume all such values are float
675         try:
676           ipolicy[key] = float(value)
677         except (TypeError, ValueError), err:
678           raise errors.OpPrereqError("Invalid value for attribute"
679                                      " '%s': '%s', error: %s" %
680                                      (key, value, err), errors.ECODE_INVAL)
681       elif key == constants.ISPECS_MINMAX:
682         for minmax in value:
683           for k in minmax.keys():
684             utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
685         ipolicy[key] = value
686       elif key == constants.ISPECS_STD:
687         if group_policy:
688           msg = "%s cannot appear in group instance specs" % key
689           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
690         ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
691                                         use_none=False, use_default=False)
692         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
693       else:
694         # FIXME: we assume all others are lists; this should be redone
695         # in a nicer way
696         ipolicy[key] = list(value)
697   try:
698     objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
699   except errors.ConfigurationError, err:
700     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
701                                errors.ECODE_INVAL)
702   return ipolicy
703
704
705 def AnnotateDiskParams(instance, devs, cfg):
706   """Little helper wrapper to the rpc annotation method.
707
708   @param instance: The instance object
709   @type devs: List of L{objects.Disk}
710   @param devs: The root devices (not any of its children!)
711   @param cfg: The config object
712   @returns The annotated disk copies
713   @see L{rpc.AnnotateDiskParams}
714
715   """
716   return rpc.AnnotateDiskParams(instance.disk_template, devs,
717                                 cfg.GetInstanceDiskParams(instance))
718
719
720 def SupportsOob(cfg, node):
721   """Tells if node supports OOB.
722
723   @type cfg: L{config.ConfigWriter}
724   @param cfg: The cluster configuration
725   @type node: L{objects.Node}
726   @param node: The node
727   @return: The OOB script if supported or an empty string otherwise
728
729   """
730   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
731
732
733 def _UpdateAndVerifySubDict(base, updates, type_check):
734   """Updates and verifies a dict with sub dicts of the same type.
735
736   @param base: The dict with the old data
737   @param updates: The dict with the new data
738   @param type_check: Dict suitable to ForceDictType to verify correct types
739   @returns: A new dict with updated and verified values
740
741   """
742   def fn(old, value):
743     new = GetUpdatedParams(old, value)
744     utils.ForceDictType(new, type_check)
745     return new
746
747   ret = copy.deepcopy(base)
748   ret.update(dict((key, fn(base.get(key, {}), value))
749                   for key, value in updates.items()))
750   return ret
751
752
753 def _FilterVmNodes(lu, node_uuids):
754   """Filters out non-vm_capable nodes from a list.
755
756   @type lu: L{LogicalUnit}
757   @param lu: the logical unit for which we check
758   @type node_uuids: list
759   @param node_uuids: the list of nodes on which we should check
760   @rtype: list
761   @return: the list of vm-capable nodes
762
763   """
764   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
765   return [uuid for uuid in node_uuids if uuid not in vm_nodes]
766
767
768 def GetDefaultIAllocator(cfg, ialloc):
769   """Decides on which iallocator to use.
770
771   @type cfg: L{config.ConfigWriter}
772   @param cfg: Cluster configuration object
773   @type ialloc: string or None
774   @param ialloc: Iallocator specified in opcode
775   @rtype: string
776   @return: Iallocator name
777
778   """
779   if not ialloc:
780     # Use default iallocator
781     ialloc = cfg.GetDefaultIAllocator()
782
783   if not ialloc:
784     raise errors.OpPrereqError("No iallocator was specified, neither in the"
785                                " opcode nor as a cluster-wide default",
786                                errors.ECODE_INVAL)
787
788   return ialloc
789
790
791 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
792                              cur_group_uuid):
793   """Checks if node groups for locked instances are still correct.
794
795   @type cfg: L{config.ConfigWriter}
796   @param cfg: Cluster configuration
797   @type instances: dict; string as key, L{objects.Instance} as value
798   @param instances: Dictionary, instance name as key, instance object as value
799   @type owned_groups: iterable of string
800   @param owned_groups: List of owned groups
801   @type owned_node_uuids: iterable of string
802   @param owned_node_uuids: List of owned nodes
803   @type cur_group_uuid: string or None
804   @param cur_group_uuid: Optional group UUID to check against instance's groups
805
806   """
807   for (name, inst) in instances.items():
808     assert owned_node_uuids.issuperset(inst.all_nodes), \
809       "Instance %s's nodes changed while we kept the lock" % name
810
811     inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
812
813     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
814       "Instance %s has no node in group %s" % (name, cur_group_uuid)
815
816
817 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
818                             primary_only=False):
819   """Checks if the owned node groups are still correct for an instance.
820
821   @type cfg: L{config.ConfigWriter}
822   @param cfg: The cluster configuration
823   @type instance_name: string
824   @param instance_name: Instance name
825   @type owned_groups: set or frozenset
826   @param owned_groups: List of currently owned node groups
827   @type primary_only: boolean
828   @param primary_only: Whether to check node groups for only the primary node
829
830   """
831   inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
832
833   if not owned_groups.issuperset(inst_groups):
834     raise errors.OpPrereqError("Instance %s's node groups changed since"
835                                " locks were acquired, current groups are"
836                                " are '%s', owning groups '%s'; retry the"
837                                " operation" %
838                                (instance_name,
839                                 utils.CommaJoin(inst_groups),
840                                 utils.CommaJoin(owned_groups)),
841                                errors.ECODE_STATE)
842
843   return inst_groups
844
845
846 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
847   """Unpacks the result of change-group and node-evacuate iallocator requests.
848
849   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
850   L{constants.IALLOCATOR_MODE_CHG_GROUP}.
851
852   @type lu: L{LogicalUnit}
853   @param lu: Logical unit instance
854   @type alloc_result: tuple/list
855   @param alloc_result: Result from iallocator
856   @type early_release: bool
857   @param early_release: Whether to release locks early if possible
858   @type use_nodes: bool
859   @param use_nodes: Whether to display node names instead of groups
860
861   """
862   (moved, failed, jobs) = alloc_result
863
864   if failed:
865     failreason = utils.CommaJoin("%s (%s)" % (name, reason)
866                                  for (name, reason) in failed)
867     lu.LogWarning("Unable to evacuate instances %s", failreason)
868     raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
869
870   if moved:
871     lu.LogInfo("Instances to be moved: %s",
872                utils.CommaJoin(
873                  "%s (to %s)" %
874                  (name, _NodeEvacDest(use_nodes, group, node_names))
875                  for (name, group, node_names) in moved))
876
877   return [map(compat.partial(_SetOpEarlyRelease, early_release),
878               map(opcodes.OpCode.LoadOpCode, ops))
879           for ops in jobs]
880
881
882 def _NodeEvacDest(use_nodes, group, node_names):
883   """Returns group or nodes depending on caller's choice.
884
885   """
886   if use_nodes:
887     return utils.CommaJoin(node_names)
888   else:
889     return group
890
891
892 def _SetOpEarlyRelease(early_release, op):
893   """Sets C{early_release} flag on opcodes if available.
894
895   """
896   try:
897     op.early_release = early_release
898   except AttributeError:
899     assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
900
901   return op
902
903
904 def MapInstanceDisksToNodes(instances):
905   """Creates a map from (node, volume) to instance name.
906
907   @type instances: list of L{objects.Instance}
908   @rtype: dict; tuple of (node uuid, volume name) as key, instance name as value
909
910   """
911   return dict(((node_uuid, vol), inst.name)
912               for inst in instances
913               for (node_uuid, vols) in inst.MapLVsByNode().items()
914               for vol in vols)
915
916
917 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
918   """Make sure that none of the given paramters is global.
919
920   If a global parameter is found, an L{errors.OpPrereqError} exception is
921   raised. This is used to avoid setting global parameters for individual nodes.
922
923   @type params: dictionary
924   @param params: Parameters to check
925   @type glob_pars: dictionary
926   @param glob_pars: Forbidden parameters
927   @type kind: string
928   @param kind: Kind of parameters (e.g. "node")
929   @type bad_levels: string
930   @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
931       "instance")
932   @type good_levels: strings
933   @param good_levels: Level(s) at which the parameters are allowed (e.g.
934       "cluster or group")
935
936   """
937   used_globals = glob_pars.intersection(params)
938   if used_globals:
939     msg = ("The following %s parameters are global and cannot"
940            " be customized at %s level, please modify them at"
941            " %s level: %s" %
942            (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
943     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
944
945
946 def IsExclusiveStorageEnabledNode(cfg, node):
947   """Whether exclusive_storage is in effect for the given node.
948
949   @type cfg: L{config.ConfigWriter}
950   @param cfg: The cluster configuration
951   @type node: L{objects.Node}
952   @param node: The node
953   @rtype: bool
954   @return: The effective value of exclusive_storage
955
956   """
957   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
958
959
960 def CheckInstanceState(lu, instance, req_states, msg=None):
961   """Ensure that an instance is in one of the required states.
962
963   @param lu: the LU on behalf of which we make the check
964   @param instance: the instance to check
965   @param msg: if passed, should be a message to replace the default one
966   @raise errors.OpPrereqError: if the instance is not in the required state
967
968   """
969   if msg is None:
970     msg = ("can't use instance from outside %s states" %
971            utils.CommaJoin(req_states))
972   if instance.admin_state not in req_states:
973     raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
974                                (instance.name, instance.admin_state, msg),
975                                errors.ECODE_STATE)
976
977   if constants.ADMINST_UP not in req_states:
978     pnode_uuid = instance.primary_node
979     if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
980       all_hvparams = lu.cfg.GetClusterInfo().hvparams
981       ins_l = lu.rpc.call_instance_list(
982                 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
983       ins_l.Raise("Can't contact node %s for instance information" %
984                   lu.cfg.GetNodeName(pnode_uuid),
985                   prereq=True, ecode=errors.ECODE_ENVIRON)
986       if instance.name in ins_l.payload:
987         raise errors.OpPrereqError("Instance %s is running, %s" %
988                                    (instance.name, msg), errors.ECODE_STATE)
989     else:
990       lu.LogWarning("Primary node offline, ignoring check that instance"
991                      " is down")
992
993
994 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
995   """Check the sanity of iallocator and node arguments and use the
996   cluster-wide iallocator if appropriate.
997
998   Check that at most one of (iallocator, node) is specified. If none is
999   specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1000   then the LU's opcode's iallocator slot is filled with the cluster-wide
1001   default iallocator.
1002
1003   @type iallocator_slot: string
1004   @param iallocator_slot: the name of the opcode iallocator slot
1005   @type node_slot: string
1006   @param node_slot: the name of the opcode target node slot
1007
1008   """
1009   node = getattr(lu.op, node_slot, None)
1010   ialloc = getattr(lu.op, iallocator_slot, None)
1011   if node == []:
1012     node = None
1013
1014   if node is not None and ialloc is not None:
1015     raise errors.OpPrereqError("Do not specify both, iallocator and node",
1016                                errors.ECODE_INVAL)
1017   elif ((node is None and ialloc is None) or
1018         ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1019     default_iallocator = lu.cfg.GetDefaultIAllocator()
1020     if default_iallocator:
1021       setattr(lu.op, iallocator_slot, default_iallocator)
1022     else:
1023       raise errors.OpPrereqError("No iallocator or node given and no"
1024                                  " cluster-wide default iallocator found;"
1025                                  " please specify either an iallocator or a"
1026                                  " node, or set a cluster-wide default"
1027                                  " iallocator", errors.ECODE_INVAL)
1028
1029
1030 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1031   faulty = []
1032
1033   for dev in instance.disks:
1034     cfg.SetDiskID(dev, node_uuid)
1035
1036   result = rpc_runner.call_blockdev_getmirrorstatus(
1037              node_uuid, (instance.disks, instance))
1038   result.Raise("Failed to get disk status from node %s" %
1039                cfg.GetNodeName(node_uuid),
1040                prereq=prereq, ecode=errors.ECODE_ENVIRON)
1041
1042   for idx, bdev_status in enumerate(result.payload):
1043     if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1044       faulty.append(idx)
1045
1046   return faulty
1047
1048
1049 def CheckNodeOnline(lu, node_uuid, msg=None):
1050   """Ensure that a given node is online.
1051
1052   @param lu: the LU on behalf of which we make the check
1053   @param node_uuid: the node to check
1054   @param msg: if passed, should be a message to replace the default one
1055   @raise errors.OpPrereqError: if the node is offline
1056
1057   """
1058   if msg is None:
1059     msg = "Can't use offline node"
1060   if lu.cfg.GetNodeInfo(node_uuid).offline:
1061     raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1062                                errors.ECODE_STATE)