Merge branch 'next' into branch-2.1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overridden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     # support for dry-run
93     self.dry_run_result = None
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100     self.CheckArguments()
101
102   def __GetSSH(self):
103     """Returns the SshRunner object
104
105     """
106     if not self.__ssh:
107       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108     return self.__ssh
109
110   ssh = property(fget=__GetSSH)
111
112   def CheckArguments(self):
113     """Check syntactic validity for the opcode arguments.
114
115     This method is for doing a simple syntactic check and ensure
116     validity of opcode parameters, without any cluster-related
117     checks. While the same can be accomplished in ExpandNames and/or
118     CheckPrereq, doing these separate is better because:
119
120       - ExpandNames is left as as purely a lock-related function
121       - CheckPrereq is run after we have acquired locks (and possible
122         waited for them)
123
124     The function is allowed to change the self.op attribute so that
125     later methods can no longer worry about missing parameters.
126
127     """
128     pass
129
130   def ExpandNames(self):
131     """Expand names for this LU.
132
133     This method is called before starting to execute the opcode, and it should
134     update all the parameters of the opcode to their canonical form (e.g. a
135     short node name must be fully expanded after this method has successfully
136     completed). This way locking, hooks, logging, ecc. can work correctly.
137
138     LUs which implement this method must also populate the self.needed_locks
139     member, as a dict with lock levels as keys, and a list of needed lock names
140     as values. Rules:
141
142       - use an empty dict if you don't need any lock
143       - if you don't need any lock at a particular level omit that level
144       - don't put anything for the BGL level
145       - if you want all locks at a level use locking.ALL_SET as a value
146
147     If you need to share locks (rather than acquire them exclusively) at one
148     level you can modify self.share_locks, setting a true value (usually 1) for
149     that level. By default locks are not shared.
150
151     Examples::
152
153       # Acquire all nodes and one instance
154       self.needed_locks = {
155         locking.LEVEL_NODE: locking.ALL_SET,
156         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157       }
158       # Acquire just two nodes
159       self.needed_locks = {
160         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161       }
162       # Acquire no locks
163       self.needed_locks = {} # No, you can't leave it to the default value None
164
165     """
166     # The implementation of this method is mandatory only if the new LU is
167     # concurrent, so that old LUs don't need to be changed all at the same
168     # time.
169     if self.REQ_BGL:
170       self.needed_locks = {} # Exclusive LUs don't need locks.
171     else:
172       raise NotImplementedError
173
174   def DeclareLocks(self, level):
175     """Declare LU locking needs for a level
176
177     While most LUs can just declare their locking needs at ExpandNames time,
178     sometimes there's the need to calculate some locks after having acquired
179     the ones before. This function is called just before acquiring locks at a
180     particular level, but after acquiring the ones at lower levels, and permits
181     such calculations. It can be used to modify self.needed_locks, and by
182     default it does nothing.
183
184     This function is only called if you have something already set in
185     self.needed_locks for the level.
186
187     @param level: Locking level which is going to be locked
188     @type level: member of ganeti.locking.LEVELS
189
190     """
191
192   def CheckPrereq(self):
193     """Check prerequisites for this LU.
194
195     This method should check that the prerequisites for the execution
196     of this LU are fulfilled. It can do internode communication, but
197     it should be idempotent - no cluster or system changes are
198     allowed.
199
200     The method should raise errors.OpPrereqError in case something is
201     not fulfilled. Its return value is ignored.
202
203     This method should also update all the parameters of the opcode to
204     their canonical form if it hasn't been done by ExpandNames before.
205
206     """
207     raise NotImplementedError
208
209   def Exec(self, feedback_fn):
210     """Execute the LU.
211
212     This method should implement the actual work. It should raise
213     errors.OpExecError for failures that are somewhat dealt with in
214     code, or expected.
215
216     """
217     raise NotImplementedError
218
219   def BuildHooksEnv(self):
220     """Build hooks environment for this LU.
221
222     This method should return a three-node tuple consisting of: a dict
223     containing the environment that will be used for running the
224     specific hook for this LU, a list of node names on which the hook
225     should run before the execution, and a list of node names on which
226     the hook should run after the execution.
227
228     The keys of the dict must not have 'GANETI_' prefixed as this will
229     be handled in the hooks runner. Also note additional keys will be
230     added by the hooks runner. If the LU doesn't define any
231     environment, an empty dict (and not None) should be returned.
232
233     No nodes should be returned as an empty list (and not None).
234
235     Note that if the HPATH for a LU class is None, this function will
236     not be called.
237
238     """
239     raise NotImplementedError
240
241   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242     """Notify the LU about the results of its hooks.
243
244     This method is called every time a hooks phase is executed, and notifies
245     the Logical Unit about the hooks' result. The LU can then use it to alter
246     its result based on the hooks.  By default the method does nothing and the
247     previous result is passed back unchanged but any LU can define it if it
248     wants to use the local cluster hook-scripts somehow.
249
250     @param phase: one of L{constants.HOOKS_PHASE_POST} or
251         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252     @param hook_results: the results of the multi-node hooks rpc call
253     @param feedback_fn: function used send feedback back to the caller
254     @param lu_result: the previous Exec result this LU had, or None
255         in the PRE phase
256     @return: the new Exec result, based on the previous result
257         and hook results
258
259     """
260     return lu_result
261
262   def _ExpandAndLockInstance(self):
263     """Helper function to expand and lock an instance.
264
265     Many LUs that work on an instance take its name in self.op.instance_name
266     and need to expand it and then declare the expanded name for locking. This
267     function does it, and then updates self.op.instance_name to the expanded
268     name. It also initializes needed_locks as a dict, if this hasn't been done
269     before.
270
271     """
272     if self.needed_locks is None:
273       self.needed_locks = {}
274     else:
275       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276         "_ExpandAndLockInstance called with instance-level locks set"
277     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278     if expanded_name is None:
279       raise errors.OpPrereqError("Instance '%s' not known" %
280                                   self.op.instance_name)
281     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282     self.op.instance_name = expanded_name
283
284   def _LockInstancesNodes(self, primary_only=False):
285     """Helper function to declare instances' nodes for locking.
286
287     This function should be called after locking one or more instances to lock
288     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289     with all primary or secondary nodes for instances already locked and
290     present in self.needed_locks[locking.LEVEL_INSTANCE].
291
292     It should be called from DeclareLocks, and for safety only works if
293     self.recalculate_locks[locking.LEVEL_NODE] is set.
294
295     In the future it may grow parameters to just lock some instance's nodes, or
296     to just lock primaries or secondary nodes, if needed.
297
298     If should be called in DeclareLocks in a way similar to::
299
300       if level == locking.LEVEL_NODE:
301         self._LockInstancesNodes()
302
303     @type primary_only: boolean
304     @param primary_only: only lock primary nodes of locked instances
305
306     """
307     assert locking.LEVEL_NODE in self.recalculate_locks, \
308       "_LockInstancesNodes helper function called with no nodes to recalculate"
309
310     # TODO: check if we're really been called with the instance locks held
311
312     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313     # future we might want to have different behaviors depending on the value
314     # of self.recalculate_locks[locking.LEVEL_NODE]
315     wanted_nodes = []
316     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317       instance = self.context.cfg.GetInstanceInfo(instance_name)
318       wanted_nodes.append(instance.primary_node)
319       if not primary_only:
320         wanted_nodes.extend(instance.secondary_nodes)
321
322     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326
327     del self.recalculate_locks[locking.LEVEL_NODE]
328
329
330 class NoHooksLU(LogicalUnit):
331   """Simple LU which runs no hooks.
332
333   This LU is intended as a parent for other LogicalUnits which will
334   run no hooks, in order to reduce duplicate code.
335
336   """
337   HPATH = None
338   HTYPE = None
339
340
341 def _GetWantedNodes(lu, nodes):
342   """Returns list of checked and expanded node names.
343
344   @type lu: L{LogicalUnit}
345   @param lu: the logical unit on whose behalf we execute
346   @type nodes: list
347   @param nodes: list of node names or None for all nodes
348   @rtype: list
349   @return: the list of nodes, sorted
350   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351
352   """
353   if not isinstance(nodes, list):
354     raise errors.OpPrereqError("Invalid argument type 'nodes'")
355
356   if not nodes:
357     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358       " non-empty list of nodes whose name is to be expanded.")
359
360   wanted = []
361   for name in nodes:
362     node = lu.cfg.ExpandNodeName(name)
363     if node is None:
364       raise errors.OpPrereqError("No such node name '%s'" % name)
365     wanted.append(node)
366
367   return utils.NiceSort(wanted)
368
369
370 def _GetWantedInstances(lu, instances):
371   """Returns list of checked and expanded instance names.
372
373   @type lu: L{LogicalUnit}
374   @param lu: the logical unit on whose behalf we execute
375   @type instances: list
376   @param instances: list of instance names or None for all instances
377   @rtype: list
378   @return: the list of instances, sorted
379   @raise errors.OpPrereqError: if the instances parameter is wrong type
380   @raise errors.OpPrereqError: if any of the passed instances is not found
381
382   """
383   if not isinstance(instances, list):
384     raise errors.OpPrereqError("Invalid argument type 'instances'")
385
386   if instances:
387     wanted = []
388
389     for name in instances:
390       instance = lu.cfg.ExpandInstanceName(name)
391       if instance is None:
392         raise errors.OpPrereqError("No such instance name '%s'" % name)
393       wanted.append(instance)
394
395   else:
396     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
397   return wanted
398
399
400 def _CheckOutputFields(static, dynamic, selected):
401   """Checks whether all selected fields are valid.
402
403   @type static: L{utils.FieldSet}
404   @param static: static fields set
405   @type dynamic: L{utils.FieldSet}
406   @param dynamic: dynamic fields set
407
408   """
409   f = utils.FieldSet()
410   f.Extend(static)
411   f.Extend(dynamic)
412
413   delta = f.NonMatching(selected)
414   if delta:
415     raise errors.OpPrereqError("Unknown output fields selected: %s"
416                                % ",".join(delta))
417
418
419 def _CheckBooleanOpField(op, name):
420   """Validates boolean opcode parameters.
421
422   This will ensure that an opcode parameter is either a boolean value,
423   or None (but that it always exists).
424
425   """
426   val = getattr(op, name, None)
427   if not (val is None or isinstance(val, bool)):
428     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429                                (name, str(val)))
430   setattr(op, name, val)
431
432
433 def _CheckNodeOnline(lu, node):
434   """Ensure that a given node is online.
435
436   @param lu: the LU on behalf of which we make the check
437   @param node: the node to check
438   @raise errors.OpPrereqError: if the node is offline
439
440   """
441   if lu.cfg.GetNodeInfo(node).offline:
442     raise errors.OpPrereqError("Can't use offline node %s" % node)
443
444
445 def _CheckNodeNotDrained(lu, node):
446   """Ensure that a given node is not drained.
447
448   @param lu: the LU on behalf of which we make the check
449   @param node: the node to check
450   @raise errors.OpPrereqError: if the node is drained
451
452   """
453   if lu.cfg.GetNodeInfo(node).drained:
454     raise errors.OpPrereqError("Can't use drained node %s" % node)
455
456
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458                           memory, vcpus, nics, disk_template, disks,
459                           bep, hvp, hypervisor_name):
460   """Builds instance related env variables for hooks
461
462   This builds the hook environment from individual variables.
463
464   @type name: string
465   @param name: the name of the instance
466   @type primary_node: string
467   @param primary_node: the name of the instance's primary node
468   @type secondary_nodes: list
469   @param secondary_nodes: list of secondary nodes as strings
470   @type os_type: string
471   @param os_type: the name of the instance's OS
472   @type status: boolean
473   @param status: the should_run status of the instance
474   @type memory: string
475   @param memory: the memory size of the instance
476   @type vcpus: string
477   @param vcpus: the count of VCPUs the instance has
478   @type nics: list
479   @param nics: list of tuples (ip, mac, mode, link) representing
480       the NICs the instance has
481   @type disk_template: string
482   @param disk_template: the disk template of the instance
483   @type disks: list
484   @param disks: the list of (size, mode) pairs
485   @type bep: dict
486   @param bep: the backend parameters for the instance
487   @type hvp: dict
488   @param hvp: the hypervisor parameters for the instance
489   @type hypervisor_name: string
490   @param hypervisor_name: the hypervisor for the instance
491   @rtype: dict
492   @return: the hook environment for this instance
493
494   """
495   if status:
496     str_status = "up"
497   else:
498     str_status = "down"
499   env = {
500     "OP_TARGET": name,
501     "INSTANCE_NAME": name,
502     "INSTANCE_PRIMARY": primary_node,
503     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504     "INSTANCE_OS_TYPE": os_type,
505     "INSTANCE_STATUS": str_status,
506     "INSTANCE_MEMORY": memory,
507     "INSTANCE_VCPUS": vcpus,
508     "INSTANCE_DISK_TEMPLATE": disk_template,
509     "INSTANCE_HYPERVISOR": hypervisor_name,
510   }
511
512   if nics:
513     nic_count = len(nics)
514     for idx, (ip, mac, mode, link) in enumerate(nics):
515       if ip is None:
516         ip = ""
517       env["INSTANCE_NIC%d_IP" % idx] = ip
518       env["INSTANCE_NIC%d_MAC" % idx] = mac
519       env["INSTANCE_NIC%d_MODE" % idx] = mode
520       env["INSTANCE_NIC%d_LINK" % idx] = link
521       if mode == constants.NIC_MODE_BRIDGED:
522         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
523   else:
524     nic_count = 0
525
526   env["INSTANCE_NIC_COUNT"] = nic_count
527
528   if disks:
529     disk_count = len(disks)
530     for idx, (size, mode) in enumerate(disks):
531       env["INSTANCE_DISK%d_SIZE" % idx] = size
532       env["INSTANCE_DISK%d_MODE" % idx] = mode
533   else:
534     disk_count = 0
535
536   env["INSTANCE_DISK_COUNT"] = disk_count
537
538   for source, kind in [(bep, "BE"), (hvp, "HV")]:
539     for key, value in source.items():
540       env["INSTANCE_%s_%s" % (kind, key)] = value
541
542   return env
543
544 def _NICListToTuple(lu, nics):
545   """Build a list of nic information tuples.
546
547   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548   value in LUQueryInstanceData.
549
550   @type lu:  L{LogicalUnit}
551   @param lu: the logical unit on whose behalf we execute
552   @type nics: list of L{objects.NIC}
553   @param nics: list of nics to convert to hooks tuples
554
555   """
556   hooks_nics = []
557   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558   for nic in nics:
559     ip = nic.ip
560     mac = nic.mac
561     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562     mode = filled_params[constants.NIC_MODE]
563     link = filled_params[constants.NIC_LINK]
564     hooks_nics.append((ip, mac, mode, link))
565   return hooks_nics
566
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568   """Builds instance related env variables for hooks from an object.
569
570   @type lu: L{LogicalUnit}
571   @param lu: the logical unit on whose behalf we execute
572   @type instance: L{objects.Instance}
573   @param instance: the instance for which we should build the
574       environment
575   @type override: dict
576   @param override: dictionary with key/values that will override
577       our values
578   @rtype: dict
579   @return: the hook environment dictionary
580
581   """
582   cluster = lu.cfg.GetClusterInfo()
583   bep = cluster.FillBE(instance)
584   hvp = cluster.FillHV(instance)
585   args = {
586     'name': instance.name,
587     'primary_node': instance.primary_node,
588     'secondary_nodes': instance.secondary_nodes,
589     'os_type': instance.os,
590     'status': instance.admin_up,
591     'memory': bep[constants.BE_MEMORY],
592     'vcpus': bep[constants.BE_VCPUS],
593     'nics': _NICListToTuple(lu, instance.nics),
594     'disk_template': instance.disk_template,
595     'disks': [(disk.size, disk.mode) for disk in instance.disks],
596     'bep': bep,
597     'hvp': hvp,
598     'hypervisor': instance.hypervisor,
599   }
600   if override:
601     args.update(override)
602   return _BuildInstanceHookEnv(**args)
603
604
605 def _AdjustCandidatePool(lu):
606   """Adjust the candidate pool after node operations.
607
608   """
609   mod_list = lu.cfg.MaintainCandidatePool()
610   if mod_list:
611     lu.LogInfo("Promoted nodes to master candidate role: %s",
612                ", ".join(node.name for node in mod_list))
613     for name in mod_list:
614       lu.context.ReaddNode(name)
615   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
616   if mc_now > mc_max:
617     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618                (mc_now, mc_max))
619
620
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622                                profile=constants.PP_DEFAULT):
623   """Check that the brigdes needed by a list of nics exist.
624
625   """
626   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628                 for nic in target_nics]
629   brlist = [params[constants.NIC_LINK] for params in paramslist
630             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
631   if brlist:
632     result = lu.rpc.call_bridges_exist(target_node, brlist)
633     result.Raise("Error checking bridges on destination node '%s'" %
634                  target_node, prereq=True)
635
636
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638   """Check that the brigdes needed by an instance exist.
639
640   """
641   if node is None:
642     node = instance.primary_node
643   _CheckNicsBridgesExist(lu, instance.nics, node)
644
645
646 class LUDestroyCluster(NoHooksLU):
647   """Logical unit for destroying the cluster.
648
649   """
650   _OP_REQP = []
651
652   def CheckPrereq(self):
653     """Check prerequisites.
654
655     This checks whether the cluster is empty.
656
657     Any errors are signaled by raising errors.OpPrereqError.
658
659     """
660     master = self.cfg.GetMasterNode()
661
662     nodelist = self.cfg.GetNodeList()
663     if len(nodelist) != 1 or nodelist[0] != master:
664       raise errors.OpPrereqError("There are still %d node(s) in"
665                                  " this cluster." % (len(nodelist) - 1))
666     instancelist = self.cfg.GetInstanceList()
667     if instancelist:
668       raise errors.OpPrereqError("There are still %d instance(s) in"
669                                  " this cluster." % len(instancelist))
670
671   def Exec(self, feedback_fn):
672     """Destroys the cluster.
673
674     """
675     master = self.cfg.GetMasterNode()
676     result = self.rpc.call_node_stop_master(master, False)
677     result.Raise("Could not disable the master role")
678     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679     utils.CreateBackup(priv_key)
680     utils.CreateBackup(pub_key)
681     return master
682
683
684 class LUVerifyCluster(LogicalUnit):
685   """Verifies the cluster status.
686
687   """
688   HPATH = "cluster-verify"
689   HTYPE = constants.HTYPE_CLUSTER
690   _OP_REQP = ["skip_checks"]
691   REQ_BGL = False
692
693   def ExpandNames(self):
694     self.needed_locks = {
695       locking.LEVEL_NODE: locking.ALL_SET,
696       locking.LEVEL_INSTANCE: locking.ALL_SET,
697     }
698     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
699
700   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701                   node_result, feedback_fn, master_files,
702                   drbd_map, vg_name):
703     """Run multiple tests against a node.
704
705     Test list:
706
707       - compares ganeti version
708       - checks vg existence and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     @type nodeinfo: L{objects.Node}
713     @param nodeinfo: the node to check
714     @param file_list: required list of files
715     @param local_cksum: dictionary of local files and their checksums
716     @param node_result: the results from the node
717     @param feedback_fn: function used to accumulate results
718     @param master_files: list of files that only masters should have
719     @param drbd_map: the useddrbd minors for this node, in
720         form of minor: (instance, must_exist) which correspond to instances
721         and their running status
722     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
723
724     """
725     node = nodeinfo.name
726
727     # main result, node_result should be a non-empty dict
728     if not node_result or not isinstance(node_result, dict):
729       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
730       return True
731
732     # compares ganeti version
733     local_version = constants.PROTOCOL_VERSION
734     remote_version = node_result.get('version', None)
735     if not (remote_version and isinstance(remote_version, (list, tuple)) and
736             len(remote_version) == 2):
737       feedback_fn("  - ERROR: connection to %s failed" % (node))
738       return True
739
740     if local_version != remote_version[0]:
741       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
742                   " node %s %s" % (local_version, node, remote_version[0]))
743       return True
744
745     # node seems compatible, we can actually try to look into its results
746
747     bad = False
748
749     # full package version
750     if constants.RELEASE_VERSION != remote_version[1]:
751       feedback_fn("  - WARNING: software version mismatch: master %s,"
752                   " node %s %s" %
753                   (constants.RELEASE_VERSION, node, remote_version[1]))
754
755     # checks vg existence and size > 20G
756     if vg_name is not None:
757       vglist = node_result.get(constants.NV_VGLIST, None)
758       if not vglist:
759         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
760                         (node,))
761         bad = True
762       else:
763         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764                                               constants.MIN_VG_SIZE)
765         if vgstatus:
766           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
767           bad = True
768
769     # checks config file checksum
770
771     remote_cksum = node_result.get(constants.NV_FILELIST, None)
772     if not isinstance(remote_cksum, dict):
773       bad = True
774       feedback_fn("  - ERROR: node hasn't returned file checksum data")
775     else:
776       for file_name in file_list:
777         node_is_mc = nodeinfo.master_candidate
778         must_have_file = file_name not in master_files
779         if file_name not in remote_cksum:
780           if node_is_mc or must_have_file:
781             bad = True
782             feedback_fn("  - ERROR: file '%s' missing" % file_name)
783         elif remote_cksum[file_name] != local_cksum[file_name]:
784           if node_is_mc or must_have_file:
785             bad = True
786             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
787           else:
788             # not candidate and this is not a must-have file
789             bad = True
790             feedback_fn("  - ERROR: file '%s' should not exist on non master"
791                         " candidates (and the file is outdated)" % file_name)
792         else:
793           # all good, except non-master/non-must have combination
794           if not node_is_mc and not must_have_file:
795             feedback_fn("  - ERROR: file '%s' should not exist on non master"
796                         " candidates" % file_name)
797
798     # checks ssh to any
799
800     if constants.NV_NODELIST not in node_result:
801       bad = True
802       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
803     else:
804       if node_result[constants.NV_NODELIST]:
805         bad = True
806         for node in node_result[constants.NV_NODELIST]:
807           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
808                           (node, node_result[constants.NV_NODELIST][node]))
809
810     if constants.NV_NODENETTEST not in node_result:
811       bad = True
812       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
813     else:
814       if node_result[constants.NV_NODENETTEST]:
815         bad = True
816         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
817         for node in nlist:
818           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
819                           (node, node_result[constants.NV_NODENETTEST][node]))
820
821     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822     if isinstance(hyp_result, dict):
823       for hv_name, hv_result in hyp_result.iteritems():
824         if hv_result is not None:
825           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
826                       (hv_name, hv_result))
827
828     # check used drbd list
829     if vg_name is not None:
830       used_minors = node_result.get(constants.NV_DRBDLIST, [])
831       if not isinstance(used_minors, (tuple, list)):
832         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
833                     str(used_minors))
834       else:
835         for minor, (iname, must_exist) in drbd_map.items():
836           if minor not in used_minors and must_exist:
837             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
838                         " not active" % (minor, iname))
839             bad = True
840         for minor in used_minors:
841           if minor not in drbd_map:
842             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
843                         minor)
844             bad = True
845
846     return bad
847
848   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849                       node_instance, feedback_fn, n_offline):
850     """Verify an instance.
851
852     This function checks to see if the required block devices are
853     available on the instance's node.
854
855     """
856     bad = False
857
858     node_current = instanceconfig.primary_node
859
860     node_vol_should = {}
861     instanceconfig.MapLVsByNode(node_vol_should)
862
863     for node in node_vol_should:
864       if node in n_offline:
865         # ignore missing volumes on offline nodes
866         continue
867       for volume in node_vol_should[node]:
868         if node not in node_vol_is or volume not in node_vol_is[node]:
869           feedback_fn("  - ERROR: volume %s missing on node %s" %
870                           (volume, node))
871           bad = True
872
873     if instanceconfig.admin_up:
874       if ((node_current not in node_instance or
875           not instance in node_instance[node_current]) and
876           node_current not in n_offline):
877         feedback_fn("  - ERROR: instance %s not running on node %s" %
878                         (instance, node_current))
879         bad = True
880
881     for node in node_instance:
882       if (not node == node_current):
883         if instance in node_instance[node]:
884           feedback_fn("  - ERROR: instance %s should not run on node %s" %
885                           (instance, node))
886           bad = True
887
888     return bad
889
890   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891     """Verify if there are any unknown volumes in the cluster.
892
893     The .os, .swap and backup volumes are ignored. All other volumes are
894     reported as unknown.
895
896     """
897     bad = False
898
899     for node in node_vol_is:
900       for volume in node_vol_is[node]:
901         if node not in node_vol_should or volume not in node_vol_should[node]:
902           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
903                       (volume, node))
904           bad = True
905     return bad
906
907   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908     """Verify the list of running instances.
909
910     This checks what instances are running but unknown to the cluster.
911
912     """
913     bad = False
914     for node in node_instance:
915       for runninginstance in node_instance[node]:
916         if runninginstance not in instancelist:
917           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
918                           (runninginstance, node))
919           bad = True
920     return bad
921
922   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923     """Verify N+1 Memory Resilience.
924
925     Check that if one single node dies we can still start all the instances it
926     was primary for.
927
928     """
929     bad = False
930
931     for node, nodeinfo in node_info.iteritems():
932       # This code checks that every node which is now listed as secondary has
933       # enough memory to host all instances it is supposed to should a single
934       # other node in the cluster fail.
935       # FIXME: not ready for failover to an arbitrary node
936       # FIXME: does not support file-backed instances
937       # WARNING: we currently take into account down instances as well as up
938       # ones, considering that even if they're down someone might want to start
939       # them even in the event of a node failure.
940       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
941         needed_mem = 0
942         for instance in instances:
943           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944           if bep[constants.BE_AUTO_BALANCE]:
945             needed_mem += bep[constants.BE_MEMORY]
946         if nodeinfo['mfree'] < needed_mem:
947           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
948                       " failovers should node %s fail" % (node, prinode))
949           bad = True
950     return bad
951
952   def CheckPrereq(self):
953     """Check prerequisites.
954
955     Transform the list of checks we're going to skip into a set and check that
956     all its members are valid.
957
958     """
959     self.skip_set = frozenset(self.op.skip_checks)
960     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961       raise errors.OpPrereqError("Invalid checks to be skipped specified")
962
963   def BuildHooksEnv(self):
964     """Build hooks env.
965
966     Cluster-Verify hooks just ran in the post phase and their failure makes
967     the output be logged in the verify output and the verification to fail.
968
969     """
970     all_nodes = self.cfg.GetNodeList()
971     env = {
972       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
973       }
974     for node in self.cfg.GetAllNodesInfo().values():
975       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
976
977     return env, [], all_nodes
978
979   def Exec(self, feedback_fn):
980     """Verify integrity of cluster, performing various test on nodes.
981
982     """
983     bad = False
984     feedback_fn("* Verifying global settings")
985     for msg in self.cfg.VerifyConfig():
986       feedback_fn("  - ERROR: %s" % msg)
987
988     vg_name = self.cfg.GetVGName()
989     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990     nodelist = utils.NiceSort(self.cfg.GetNodeList())
991     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994                         for iname in instancelist)
995     i_non_redundant = [] # Non redundant instances
996     i_non_a_balanced = [] # Non auto-balanced instances
997     n_offline = [] # List of offline nodes
998     n_drained = [] # List of nodes being drained
999     node_volume = {}
1000     node_instance = {}
1001     node_info = {}
1002     instance_cfg = {}
1003
1004     # FIXME: verify OS list
1005     # do local checksums
1006     master_files = [constants.CLUSTER_CONF_FILE]
1007
1008     file_names = ssconf.SimpleStore().GetFileList()
1009     file_names.append(constants.SSL_CERT_FILE)
1010     file_names.append(constants.RAPI_CERT_FILE)
1011     file_names.extend(master_files)
1012
1013     local_checksums = utils.FingerprintFiles(file_names)
1014
1015     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016     node_verify_param = {
1017       constants.NV_FILELIST: file_names,
1018       constants.NV_NODELIST: [node.name for node in nodeinfo
1019                               if not node.offline],
1020       constants.NV_HYPERVISOR: hypervisors,
1021       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022                                   node.secondary_ip) for node in nodeinfo
1023                                  if not node.offline],
1024       constants.NV_INSTANCELIST: hypervisors,
1025       constants.NV_VERSION: None,
1026       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1027       }
1028     if vg_name is not None:
1029       node_verify_param[constants.NV_VGLIST] = None
1030       node_verify_param[constants.NV_LVLIST] = vg_name
1031       node_verify_param[constants.NV_DRBDLIST] = None
1032     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033                                            self.cfg.GetClusterName())
1034
1035     cluster = self.cfg.GetClusterInfo()
1036     master_node = self.cfg.GetMasterNode()
1037     all_drbd_map = self.cfg.ComputeDRBDMap()
1038
1039     for node_i in nodeinfo:
1040       node = node_i.name
1041
1042       if node_i.offline:
1043         feedback_fn("* Skipping offline node %s" % (node,))
1044         n_offline.append(node)
1045         continue
1046
1047       if node == master_node:
1048         ntype = "master"
1049       elif node_i.master_candidate:
1050         ntype = "master candidate"
1051       elif node_i.drained:
1052         ntype = "drained"
1053         n_drained.append(node)
1054       else:
1055         ntype = "regular"
1056       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1057
1058       msg = all_nvinfo[node].fail_msg
1059       if msg:
1060         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1061         bad = True
1062         continue
1063
1064       nresult = all_nvinfo[node].payload
1065       node_drbd = {}
1066       for minor, instance in all_drbd_map[node].items():
1067         if instance not in instanceinfo:
1068           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1069                       instance)
1070           # ghost instance should not be running, but otherwise we
1071           # don't give double warnings (both ghost instance and
1072           # unallocated minor in use)
1073           node_drbd[minor] = (instance, False)
1074         else:
1075           instance = instanceinfo[instance]
1076           node_drbd[minor] = (instance.name, instance.admin_up)
1077       result = self._VerifyNode(node_i, file_names, local_checksums,
1078                                 nresult, feedback_fn, master_files,
1079                                 node_drbd, vg_name)
1080       bad = bad or result
1081
1082       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1083       if vg_name is None:
1084         node_volume[node] = {}
1085       elif isinstance(lvdata, basestring):
1086         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1087                     (node, utils.SafeEncode(lvdata)))
1088         bad = True
1089         node_volume[node] = {}
1090       elif not isinstance(lvdata, dict):
1091         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1092         bad = True
1093         continue
1094       else:
1095         node_volume[node] = lvdata
1096
1097       # node_instance
1098       idata = nresult.get(constants.NV_INSTANCELIST, None)
1099       if not isinstance(idata, list):
1100         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1101                     (node,))
1102         bad = True
1103         continue
1104
1105       node_instance[node] = idata
1106
1107       # node_info
1108       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109       if not isinstance(nodeinfo, dict):
1110         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1111         bad = True
1112         continue
1113
1114       try:
1115         node_info[node] = {
1116           "mfree": int(nodeinfo['memory_free']),
1117           "pinst": [],
1118           "sinst": [],
1119           # dictionary holding all instances this node is secondary for,
1120           # grouped by their primary node. Each key is a cluster node, and each
1121           # value is a list of instances which have the key as primary and the
1122           # current node as secondary.  this is handy to calculate N+1 memory
1123           # availability if you can only failover from a primary to its
1124           # secondary.
1125           "sinst-by-pnode": {},
1126         }
1127         # FIXME: devise a free space model for file based instances as well
1128         if vg_name is not None:
1129           if (constants.NV_VGLIST not in nresult or
1130               vg_name not in nresult[constants.NV_VGLIST]):
1131             feedback_fn("  - ERROR: node %s didn't return data for the"
1132                         " volume group '%s' - it is either missing or broken" %
1133                         (node, vg_name))
1134             bad = True
1135             continue
1136           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137       except (ValueError, KeyError):
1138         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1139                     " from node %s" % (node,))
1140         bad = True
1141         continue
1142
1143     node_vol_should = {}
1144
1145     for instance in instancelist:
1146       feedback_fn("* Verifying instance %s" % instance)
1147       inst_config = instanceinfo[instance]
1148       result =  self._VerifyInstance(instance, inst_config, node_volume,
1149                                      node_instance, feedback_fn, n_offline)
1150       bad = bad or result
1151       inst_nodes_offline = []
1152
1153       inst_config.MapLVsByNode(node_vol_should)
1154
1155       instance_cfg[instance] = inst_config
1156
1157       pnode = inst_config.primary_node
1158       if pnode in node_info:
1159         node_info[pnode]['pinst'].append(instance)
1160       elif pnode not in n_offline:
1161         feedback_fn("  - ERROR: instance %s, connection to primary node"
1162                     " %s failed" % (instance, pnode))
1163         bad = True
1164
1165       if pnode in n_offline:
1166         inst_nodes_offline.append(pnode)
1167
1168       # If the instance is non-redundant we cannot survive losing its primary
1169       # node, so we are not N+1 compliant. On the other hand we have no disk
1170       # templates with more than one secondary so that situation is not well
1171       # supported either.
1172       # FIXME: does not support file-backed instances
1173       if len(inst_config.secondary_nodes) == 0:
1174         i_non_redundant.append(instance)
1175       elif len(inst_config.secondary_nodes) > 1:
1176         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1177                     % instance)
1178
1179       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180         i_non_a_balanced.append(instance)
1181
1182       for snode in inst_config.secondary_nodes:
1183         if snode in node_info:
1184           node_info[snode]['sinst'].append(instance)
1185           if pnode not in node_info[snode]['sinst-by-pnode']:
1186             node_info[snode]['sinst-by-pnode'][pnode] = []
1187           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188         elif snode not in n_offline:
1189           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1190                       " %s failed" % (instance, snode))
1191           bad = True
1192         if snode in n_offline:
1193           inst_nodes_offline.append(snode)
1194
1195       if inst_nodes_offline:
1196         # warn that the instance lives on offline nodes, and set bad=True
1197         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1198                     ", ".join(inst_nodes_offline))
1199         bad = True
1200
1201     feedback_fn("* Verifying orphan volumes")
1202     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203                                        feedback_fn)
1204     bad = bad or result
1205
1206     feedback_fn("* Verifying remaining instances")
1207     result = self._VerifyOrphanInstances(instancelist, node_instance,
1208                                          feedback_fn)
1209     bad = bad or result
1210
1211     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212       feedback_fn("* Verifying N+1 Memory redundancy")
1213       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1214       bad = bad or result
1215
1216     feedback_fn("* Other Notes")
1217     if i_non_redundant:
1218       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1219                   % len(i_non_redundant))
1220
1221     if i_non_a_balanced:
1222       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1223                   % len(i_non_a_balanced))
1224
1225     if n_offline:
1226       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1227
1228     if n_drained:
1229       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1230
1231     return not bad
1232
1233   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234     """Analyze the post-hooks' result
1235
1236     This method analyses the hook result, handles it, and sends some
1237     nicely-formatted feedback back to the user.
1238
1239     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241     @param hooks_results: the results of the multi-node hooks rpc call
1242     @param feedback_fn: function used send feedback back to the caller
1243     @param lu_result: previous Exec result
1244     @return: the new Exec result, based on the previous result
1245         and hook results
1246
1247     """
1248     # We only really run POST phase hooks, and are only interested in
1249     # their results
1250     if phase == constants.HOOKS_PHASE_POST:
1251       # Used to change hooks' output to proper indentation
1252       indent_re = re.compile('^', re.M)
1253       feedback_fn("* Hooks Results")
1254       if not hooks_results:
1255         feedback_fn("  - ERROR: general communication failure")
1256         lu_result = 1
1257       else:
1258         for node_name in hooks_results:
1259           show_node_header = True
1260           res = hooks_results[node_name]
1261           msg = res.fail_msg
1262           if msg:
1263             if res.offline:
1264               # no need to warn or set fail return value
1265               continue
1266             feedback_fn("    Communication failure in hooks execution: %s" %
1267                         msg)
1268             lu_result = 1
1269             continue
1270           for script, hkr, output in res.payload:
1271             if hkr == constants.HKR_FAIL:
1272               # The node header is only shown once, if there are
1273               # failing hooks on that node
1274               if show_node_header:
1275                 feedback_fn("  Node %s:" % node_name)
1276                 show_node_header = False
1277               feedback_fn("    ERROR: Script %s failed, output:" % script)
1278               output = indent_re.sub('      ', output)
1279               feedback_fn("%s" % output)
1280               lu_result = 1
1281
1282       return lu_result
1283
1284
1285 class LUVerifyDisks(NoHooksLU):
1286   """Verifies the cluster disks status.
1287
1288   """
1289   _OP_REQP = []
1290   REQ_BGL = False
1291
1292   def ExpandNames(self):
1293     self.needed_locks = {
1294       locking.LEVEL_NODE: locking.ALL_SET,
1295       locking.LEVEL_INSTANCE: locking.ALL_SET,
1296     }
1297     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1298
1299   def CheckPrereq(self):
1300     """Check prerequisites.
1301
1302     This has no prerequisites.
1303
1304     """
1305     pass
1306
1307   def Exec(self, feedback_fn):
1308     """Verify integrity of cluster disks.
1309
1310     @rtype: tuple of three items
1311     @return: a tuple of (dict of node-to-node_error, list of instances
1312         which need activate-disks, dict of instance: (node, volume) for
1313         missing volumes
1314
1315     """
1316     result = res_nodes, res_instances, res_missing = {}, [], {}
1317
1318     vg_name = self.cfg.GetVGName()
1319     nodes = utils.NiceSort(self.cfg.GetNodeList())
1320     instances = [self.cfg.GetInstanceInfo(name)
1321                  for name in self.cfg.GetInstanceList()]
1322
1323     nv_dict = {}
1324     for inst in instances:
1325       inst_lvs = {}
1326       if (not inst.admin_up or
1327           inst.disk_template not in constants.DTS_NET_MIRROR):
1328         continue
1329       inst.MapLVsByNode(inst_lvs)
1330       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331       for node, vol_list in inst_lvs.iteritems():
1332         for vol in vol_list:
1333           nv_dict[(node, vol)] = inst
1334
1335     if not nv_dict:
1336       return result
1337
1338     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1339
1340     for node in nodes:
1341       # node_volume
1342       node_res = node_lvs[node]
1343       if node_res.offline:
1344         continue
1345       msg = node_res.fail_msg
1346       if msg:
1347         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1348         res_nodes[node] = msg
1349         continue
1350
1351       lvs = node_res.payload
1352       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1353         inst = nv_dict.pop((node, lv_name), None)
1354         if (not lv_online and inst is not None
1355             and inst.name not in res_instances):
1356           res_instances.append(inst.name)
1357
1358     # any leftover items in nv_dict are missing LVs, let's arrange the
1359     # data better
1360     for key, inst in nv_dict.iteritems():
1361       if inst.name not in res_missing:
1362         res_missing[inst.name] = []
1363       res_missing[inst.name].append(key)
1364
1365     return result
1366
1367
1368 class LURenameCluster(LogicalUnit):
1369   """Rename the cluster.
1370
1371   """
1372   HPATH = "cluster-rename"
1373   HTYPE = constants.HTYPE_CLUSTER
1374   _OP_REQP = ["name"]
1375
1376   def BuildHooksEnv(self):
1377     """Build hooks env.
1378
1379     """
1380     env = {
1381       "OP_TARGET": self.cfg.GetClusterName(),
1382       "NEW_NAME": self.op.name,
1383       }
1384     mn = self.cfg.GetMasterNode()
1385     return env, [mn], [mn]
1386
1387   def CheckPrereq(self):
1388     """Verify that the passed name is a valid one.
1389
1390     """
1391     hostname = utils.HostInfo(self.op.name)
1392
1393     new_name = hostname.name
1394     self.ip = new_ip = hostname.ip
1395     old_name = self.cfg.GetClusterName()
1396     old_ip = self.cfg.GetMasterIP()
1397     if new_name == old_name and new_ip == old_ip:
1398       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1399                                  " cluster has changed")
1400     if new_ip != old_ip:
1401       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1402         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1403                                    " reachable on the network. Aborting." %
1404                                    new_ip)
1405
1406     self.op.name = new_name
1407
1408   def Exec(self, feedback_fn):
1409     """Rename the cluster.
1410
1411     """
1412     clustername = self.op.name
1413     ip = self.ip
1414
1415     # shutdown the master IP
1416     master = self.cfg.GetMasterNode()
1417     result = self.rpc.call_node_stop_master(master, False)
1418     result.Raise("Could not disable the master role")
1419
1420     try:
1421       cluster = self.cfg.GetClusterInfo()
1422       cluster.cluster_name = clustername
1423       cluster.master_ip = ip
1424       self.cfg.Update(cluster)
1425
1426       # update the known hosts file
1427       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1428       node_list = self.cfg.GetNodeList()
1429       try:
1430         node_list.remove(master)
1431       except ValueError:
1432         pass
1433       result = self.rpc.call_upload_file(node_list,
1434                                          constants.SSH_KNOWN_HOSTS_FILE)
1435       for to_node, to_result in result.iteritems():
1436         msg = to_result.fail_msg
1437         if msg:
1438           msg = ("Copy of file %s to node %s failed: %s" %
1439                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1440           self.proc.LogWarning(msg)
1441
1442     finally:
1443       result = self.rpc.call_node_start_master(master, False, False)
1444       msg = result.fail_msg
1445       if msg:
1446         self.LogWarning("Could not re-enable the master role on"
1447                         " the master, please restart manually: %s", msg)
1448
1449
1450 def _RecursiveCheckIfLVMBased(disk):
1451   """Check if the given disk or its children are lvm-based.
1452
1453   @type disk: L{objects.Disk}
1454   @param disk: the disk to check
1455   @rtype: boolean
1456   @return: boolean indicating whether a LD_LV dev_type was found or not
1457
1458   """
1459   if disk.children:
1460     for chdisk in disk.children:
1461       if _RecursiveCheckIfLVMBased(chdisk):
1462         return True
1463   return disk.dev_type == constants.LD_LV
1464
1465
1466 class LUSetClusterParams(LogicalUnit):
1467   """Change the parameters of the cluster.
1468
1469   """
1470   HPATH = "cluster-modify"
1471   HTYPE = constants.HTYPE_CLUSTER
1472   _OP_REQP = []
1473   REQ_BGL = False
1474
1475   def CheckArguments(self):
1476     """Check parameters
1477
1478     """
1479     if not hasattr(self.op, "candidate_pool_size"):
1480       self.op.candidate_pool_size = None
1481     if self.op.candidate_pool_size is not None:
1482       try:
1483         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1484       except (ValueError, TypeError), err:
1485         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1486                                    str(err))
1487       if self.op.candidate_pool_size < 1:
1488         raise errors.OpPrereqError("At least one master candidate needed")
1489
1490   def ExpandNames(self):
1491     # FIXME: in the future maybe other cluster params won't require checking on
1492     # all nodes to be modified.
1493     self.needed_locks = {
1494       locking.LEVEL_NODE: locking.ALL_SET,
1495     }
1496     self.share_locks[locking.LEVEL_NODE] = 1
1497
1498   def BuildHooksEnv(self):
1499     """Build hooks env.
1500
1501     """
1502     env = {
1503       "OP_TARGET": self.cfg.GetClusterName(),
1504       "NEW_VG_NAME": self.op.vg_name,
1505       }
1506     mn = self.cfg.GetMasterNode()
1507     return env, [mn], [mn]
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     This checks whether the given params don't conflict and
1513     if the given volume group is valid.
1514
1515     """
1516     if self.op.vg_name is not None and not self.op.vg_name:
1517       instances = self.cfg.GetAllInstancesInfo().values()
1518       for inst in instances:
1519         for disk in inst.disks:
1520           if _RecursiveCheckIfLVMBased(disk):
1521             raise errors.OpPrereqError("Cannot disable lvm storage while"
1522                                        " lvm-based instances exist")
1523
1524     node_list = self.acquired_locks[locking.LEVEL_NODE]
1525
1526     # if vg_name not None, checks given volume group on all nodes
1527     if self.op.vg_name:
1528       vglist = self.rpc.call_vg_list(node_list)
1529       for node in node_list:
1530         msg = vglist[node].fail_msg
1531         if msg:
1532           # ignoring down node
1533           self.LogWarning("Error while gathering data on node %s"
1534                           " (ignoring node): %s", node, msg)
1535           continue
1536         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1537                                               self.op.vg_name,
1538                                               constants.MIN_VG_SIZE)
1539         if vgstatus:
1540           raise errors.OpPrereqError("Error on node '%s': %s" %
1541                                      (node, vgstatus))
1542
1543     self.cluster = cluster = self.cfg.GetClusterInfo()
1544     # validate params changes
1545     if self.op.beparams:
1546       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1547       self.new_beparams = objects.FillDict(
1548         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1549
1550     if self.op.nicparams:
1551       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1552       self.new_nicparams = objects.FillDict(
1553         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1554       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1555
1556     # hypervisor list/parameters
1557     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1558     if self.op.hvparams:
1559       if not isinstance(self.op.hvparams, dict):
1560         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1561       for hv_name, hv_dict in self.op.hvparams.items():
1562         if hv_name not in self.new_hvparams:
1563           self.new_hvparams[hv_name] = hv_dict
1564         else:
1565           self.new_hvparams[hv_name].update(hv_dict)
1566
1567     if self.op.enabled_hypervisors is not None:
1568       self.hv_list = self.op.enabled_hypervisors
1569       if not self.hv_list:
1570         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1571                                    " least one member")
1572       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1573       if invalid_hvs:
1574         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1575                                    " entries: %s" % invalid_hvs)
1576     else:
1577       self.hv_list = cluster.enabled_hypervisors
1578
1579     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1580       # either the enabled list has changed, or the parameters have, validate
1581       for hv_name, hv_params in self.new_hvparams.items():
1582         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1583             (self.op.enabled_hypervisors and
1584              hv_name in self.op.enabled_hypervisors)):
1585           # either this is a new hypervisor, or its parameters have changed
1586           hv_class = hypervisor.GetHypervisor(hv_name)
1587           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1588           hv_class.CheckParameterSyntax(hv_params)
1589           _CheckHVParams(self, node_list, hv_name, hv_params)
1590
1591   def Exec(self, feedback_fn):
1592     """Change the parameters of the cluster.
1593
1594     """
1595     if self.op.vg_name is not None:
1596       new_volume = self.op.vg_name
1597       if not new_volume:
1598         new_volume = None
1599       if new_volume != self.cfg.GetVGName():
1600         self.cfg.SetVGName(new_volume)
1601       else:
1602         feedback_fn("Cluster LVM configuration already in desired"
1603                     " state, not changing")
1604     if self.op.hvparams:
1605       self.cluster.hvparams = self.new_hvparams
1606     if self.op.enabled_hypervisors is not None:
1607       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1608     if self.op.beparams:
1609       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1610     if self.op.nicparams:
1611       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1612
1613     if self.op.candidate_pool_size is not None:
1614       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1615       # we need to update the pool size here, otherwise the save will fail
1616       _AdjustCandidatePool(self)
1617
1618     self.cfg.Update(self.cluster)
1619
1620
1621 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1622   """Distribute additional files which are part of the cluster configuration.
1623
1624   ConfigWriter takes care of distributing the config and ssconf files, but
1625   there are more files which should be distributed to all nodes. This function
1626   makes sure those are copied.
1627
1628   @param lu: calling logical unit
1629   @param additional_nodes: list of nodes not in the config to distribute to
1630
1631   """
1632   # 1. Gather target nodes
1633   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1634   dist_nodes = lu.cfg.GetNodeList()
1635   if additional_nodes is not None:
1636     dist_nodes.extend(additional_nodes)
1637   if myself.name in dist_nodes:
1638     dist_nodes.remove(myself.name)
1639   # 2. Gather files to distribute
1640   dist_files = set([constants.ETC_HOSTS,
1641                     constants.SSH_KNOWN_HOSTS_FILE,
1642                     constants.RAPI_CERT_FILE,
1643                     constants.RAPI_USERS_FILE,
1644                    ])
1645
1646   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1647   for hv_name in enabled_hypervisors:
1648     hv_class = hypervisor.GetHypervisor(hv_name)
1649     dist_files.update(hv_class.GetAncillaryFiles())
1650
1651   # 3. Perform the files upload
1652   for fname in dist_files:
1653     if os.path.exists(fname):
1654       result = lu.rpc.call_upload_file(dist_nodes, fname)
1655       for to_node, to_result in result.items():
1656         msg = to_result.fail_msg
1657         if msg:
1658           msg = ("Copy of file %s to node %s failed: %s" %
1659                  (fname, to_node, msg))
1660           lu.proc.LogWarning(msg)
1661
1662
1663 class LURedistributeConfig(NoHooksLU):
1664   """Force the redistribution of cluster configuration.
1665
1666   This is a very simple LU.
1667
1668   """
1669   _OP_REQP = []
1670   REQ_BGL = False
1671
1672   def ExpandNames(self):
1673     self.needed_locks = {
1674       locking.LEVEL_NODE: locking.ALL_SET,
1675     }
1676     self.share_locks[locking.LEVEL_NODE] = 1
1677
1678   def CheckPrereq(self):
1679     """Check prerequisites.
1680
1681     """
1682
1683   def Exec(self, feedback_fn):
1684     """Redistribute the configuration.
1685
1686     """
1687     self.cfg.Update(self.cfg.GetClusterInfo())
1688     _RedistributeAncillaryFiles(self)
1689
1690
1691 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1692   """Sleep and poll for an instance's disk to sync.
1693
1694   """
1695   if not instance.disks:
1696     return True
1697
1698   if not oneshot:
1699     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1700
1701   node = instance.primary_node
1702
1703   for dev in instance.disks:
1704     lu.cfg.SetDiskID(dev, node)
1705
1706   retries = 0
1707   degr_retries = 10 # in seconds, as we sleep 1 second each time
1708   while True:
1709     max_time = 0
1710     done = True
1711     cumul_degraded = False
1712     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1713     msg = rstats.fail_msg
1714     if msg:
1715       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1716       retries += 1
1717       if retries >= 10:
1718         raise errors.RemoteError("Can't contact node %s for mirror data,"
1719                                  " aborting." % node)
1720       time.sleep(6)
1721       continue
1722     rstats = rstats.payload
1723     retries = 0
1724     for i, mstat in enumerate(rstats):
1725       if mstat is None:
1726         lu.LogWarning("Can't compute data for node %s/%s",
1727                            node, instance.disks[i].iv_name)
1728         continue
1729       # we ignore the ldisk parameter
1730       perc_done, est_time, is_degraded, _ = mstat
1731       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1732       if perc_done is not None:
1733         done = False
1734         if est_time is not None:
1735           rem_time = "%d estimated seconds remaining" % est_time
1736           max_time = est_time
1737         else:
1738           rem_time = "no time estimate"
1739         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1740                         (instance.disks[i].iv_name, perc_done, rem_time))
1741
1742     # if we're done but degraded, let's do a few small retries, to
1743     # make sure we see a stable and not transient situation; therefore
1744     # we force restart of the loop
1745     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1746       logging.info("Degraded disks found, %d retries left", degr_retries)
1747       degr_retries -= 1
1748       time.sleep(1)
1749       continue
1750
1751     if done or oneshot:
1752       break
1753
1754     time.sleep(min(60, max_time))
1755
1756   if done:
1757     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1758   return not cumul_degraded
1759
1760
1761 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1762   """Check that mirrors are not degraded.
1763
1764   The ldisk parameter, if True, will change the test from the
1765   is_degraded attribute (which represents overall non-ok status for
1766   the device(s)) to the ldisk (representing the local storage status).
1767
1768   """
1769   lu.cfg.SetDiskID(dev, node)
1770   if ldisk:
1771     idx = 6
1772   else:
1773     idx = 5
1774
1775   result = True
1776   if on_primary or dev.AssembleOnSecondary():
1777     rstats = lu.rpc.call_blockdev_find(node, dev)
1778     msg = rstats.fail_msg
1779     if msg:
1780       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1781       result = False
1782     elif not rstats.payload:
1783       lu.LogWarning("Can't find disk on node %s", node)
1784       result = False
1785     else:
1786       result = result and (not rstats.payload[idx])
1787   if dev.children:
1788     for child in dev.children:
1789       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1790
1791   return result
1792
1793
1794 class LUDiagnoseOS(NoHooksLU):
1795   """Logical unit for OS diagnose/query.
1796
1797   """
1798   _OP_REQP = ["output_fields", "names"]
1799   REQ_BGL = False
1800   _FIELDS_STATIC = utils.FieldSet()
1801   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1802
1803   def ExpandNames(self):
1804     if self.op.names:
1805       raise errors.OpPrereqError("Selective OS query not supported")
1806
1807     _CheckOutputFields(static=self._FIELDS_STATIC,
1808                        dynamic=self._FIELDS_DYNAMIC,
1809                        selected=self.op.output_fields)
1810
1811     # Lock all nodes, in shared mode
1812     # Temporary removal of locks, should be reverted later
1813     # TODO: reintroduce locks when they are lighter-weight
1814     self.needed_locks = {}
1815     #self.share_locks[locking.LEVEL_NODE] = 1
1816     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     """
1822
1823   @staticmethod
1824   def _DiagnoseByOS(node_list, rlist):
1825     """Remaps a per-node return list into an a per-os per-node dictionary
1826
1827     @param node_list: a list with the names of all nodes
1828     @param rlist: a map with node names as keys and OS objects as values
1829
1830     @rtype: dict
1831     @return: a dictionary with osnames as keys and as value another map, with
1832         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1833
1834           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1835                                      (/srv/..., False, "invalid api")],
1836                            "node2": [(/srv/..., True, "")]}
1837           }
1838
1839     """
1840     all_os = {}
1841     # we build here the list of nodes that didn't fail the RPC (at RPC
1842     # level), so that nodes with a non-responding node daemon don't
1843     # make all OSes invalid
1844     good_nodes = [node_name for node_name in rlist
1845                   if not rlist[node_name].fail_msg]
1846     for node_name, nr in rlist.items():
1847       if nr.fail_msg or not nr.payload:
1848         continue
1849       for name, path, status, diagnose in nr.payload:
1850         if name not in all_os:
1851           # build a list of nodes for this os containing empty lists
1852           # for each node in node_list
1853           all_os[name] = {}
1854           for nname in good_nodes:
1855             all_os[name][nname] = []
1856         all_os[name][node_name].append((path, status, diagnose))
1857     return all_os
1858
1859   def Exec(self, feedback_fn):
1860     """Compute the list of OSes.
1861
1862     """
1863     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1864     node_data = self.rpc.call_os_diagnose(valid_nodes)
1865     pol = self._DiagnoseByOS(valid_nodes, node_data)
1866     output = []
1867     for os_name, os_data in pol.items():
1868       row = []
1869       for field in self.op.output_fields:
1870         if field == "name":
1871           val = os_name
1872         elif field == "valid":
1873           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1874         elif field == "node_status":
1875           # this is just a copy of the dict
1876           val = {}
1877           for node_name, nos_list in os_data.items():
1878             val[node_name] = nos_list
1879         else:
1880           raise errors.ParameterError(field)
1881         row.append(val)
1882       output.append(row)
1883
1884     return output
1885
1886
1887 class LURemoveNode(LogicalUnit):
1888   """Logical unit for removing a node.
1889
1890   """
1891   HPATH = "node-remove"
1892   HTYPE = constants.HTYPE_NODE
1893   _OP_REQP = ["node_name"]
1894
1895   def BuildHooksEnv(self):
1896     """Build hooks env.
1897
1898     This doesn't run on the target node in the pre phase as a failed
1899     node would then be impossible to remove.
1900
1901     """
1902     env = {
1903       "OP_TARGET": self.op.node_name,
1904       "NODE_NAME": self.op.node_name,
1905       }
1906     all_nodes = self.cfg.GetNodeList()
1907     all_nodes.remove(self.op.node_name)
1908     return env, all_nodes, all_nodes
1909
1910   def CheckPrereq(self):
1911     """Check prerequisites.
1912
1913     This checks:
1914      - the node exists in the configuration
1915      - it does not have primary or secondary instances
1916      - it's not the master
1917
1918     Any errors are signaled by raising errors.OpPrereqError.
1919
1920     """
1921     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1922     if node is None:
1923       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1924
1925     instance_list = self.cfg.GetInstanceList()
1926
1927     masternode = self.cfg.GetMasterNode()
1928     if node.name == masternode:
1929       raise errors.OpPrereqError("Node is the master node,"
1930                                  " you need to failover first.")
1931
1932     for instance_name in instance_list:
1933       instance = self.cfg.GetInstanceInfo(instance_name)
1934       if node.name in instance.all_nodes:
1935         raise errors.OpPrereqError("Instance %s is still running on the node,"
1936                                    " please remove first." % instance_name)
1937     self.op.node_name = node.name
1938     self.node = node
1939
1940   def Exec(self, feedback_fn):
1941     """Removes the node from the cluster.
1942
1943     """
1944     node = self.node
1945     logging.info("Stopping the node daemon and removing configs from node %s",
1946                  node.name)
1947
1948     self.context.RemoveNode(node.name)
1949
1950     result = self.rpc.call_node_leave_cluster(node.name)
1951     msg = result.fail_msg
1952     if msg:
1953       self.LogWarning("Errors encountered on the remote node while leaving"
1954                       " the cluster: %s", msg)
1955
1956     # Promote nodes to master candidate as needed
1957     _AdjustCandidatePool(self)
1958
1959
1960 class LUQueryNodes(NoHooksLU):
1961   """Logical unit for querying nodes.
1962
1963   """
1964   _OP_REQP = ["output_fields", "names", "use_locking"]
1965   REQ_BGL = False
1966   _FIELDS_DYNAMIC = utils.FieldSet(
1967     "dtotal", "dfree",
1968     "mtotal", "mnode", "mfree",
1969     "bootid",
1970     "ctotal", "cnodes", "csockets",
1971     )
1972
1973   _FIELDS_STATIC = utils.FieldSet(
1974     "name", "pinst_cnt", "sinst_cnt",
1975     "pinst_list", "sinst_list",
1976     "pip", "sip", "tags",
1977     "serial_no",
1978     "master_candidate",
1979     "master",
1980     "offline",
1981     "drained",
1982     "role",
1983     )
1984
1985   def ExpandNames(self):
1986     _CheckOutputFields(static=self._FIELDS_STATIC,
1987                        dynamic=self._FIELDS_DYNAMIC,
1988                        selected=self.op.output_fields)
1989
1990     self.needed_locks = {}
1991     self.share_locks[locking.LEVEL_NODE] = 1
1992
1993     if self.op.names:
1994       self.wanted = _GetWantedNodes(self, self.op.names)
1995     else:
1996       self.wanted = locking.ALL_SET
1997
1998     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1999     self.do_locking = self.do_node_query and self.op.use_locking
2000     if self.do_locking:
2001       # if we don't request only static fields, we need to lock the nodes
2002       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2003
2004
2005   def CheckPrereq(self):
2006     """Check prerequisites.
2007
2008     """
2009     # The validation of the node list is done in the _GetWantedNodes,
2010     # if non empty, and if empty, there's no validation to do
2011     pass
2012
2013   def Exec(self, feedback_fn):
2014     """Computes the list of nodes and their attributes.
2015
2016     """
2017     all_info = self.cfg.GetAllNodesInfo()
2018     if self.do_locking:
2019       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2020     elif self.wanted != locking.ALL_SET:
2021       nodenames = self.wanted
2022       missing = set(nodenames).difference(all_info.keys())
2023       if missing:
2024         raise errors.OpExecError(
2025           "Some nodes were removed before retrieving their data: %s" % missing)
2026     else:
2027       nodenames = all_info.keys()
2028
2029     nodenames = utils.NiceSort(nodenames)
2030     nodelist = [all_info[name] for name in nodenames]
2031
2032     # begin data gathering
2033
2034     if self.do_node_query:
2035       live_data = {}
2036       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2037                                           self.cfg.GetHypervisorType())
2038       for name in nodenames:
2039         nodeinfo = node_data[name]
2040         if not nodeinfo.fail_msg and nodeinfo.payload:
2041           nodeinfo = nodeinfo.payload
2042           fn = utils.TryConvert
2043           live_data[name] = {
2044             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2045             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2046             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2047             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2048             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2049             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2050             "bootid": nodeinfo.get('bootid', None),
2051             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2052             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2053             }
2054         else:
2055           live_data[name] = {}
2056     else:
2057       live_data = dict.fromkeys(nodenames, {})
2058
2059     node_to_primary = dict([(name, set()) for name in nodenames])
2060     node_to_secondary = dict([(name, set()) for name in nodenames])
2061
2062     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2063                              "sinst_cnt", "sinst_list"))
2064     if inst_fields & frozenset(self.op.output_fields):
2065       instancelist = self.cfg.GetInstanceList()
2066
2067       for instance_name in instancelist:
2068         inst = self.cfg.GetInstanceInfo(instance_name)
2069         if inst.primary_node in node_to_primary:
2070           node_to_primary[inst.primary_node].add(inst.name)
2071         for secnode in inst.secondary_nodes:
2072           if secnode in node_to_secondary:
2073             node_to_secondary[secnode].add(inst.name)
2074
2075     master_node = self.cfg.GetMasterNode()
2076
2077     # end data gathering
2078
2079     output = []
2080     for node in nodelist:
2081       node_output = []
2082       for field in self.op.output_fields:
2083         if field == "name":
2084           val = node.name
2085         elif field == "pinst_list":
2086           val = list(node_to_primary[node.name])
2087         elif field == "sinst_list":
2088           val = list(node_to_secondary[node.name])
2089         elif field == "pinst_cnt":
2090           val = len(node_to_primary[node.name])
2091         elif field == "sinst_cnt":
2092           val = len(node_to_secondary[node.name])
2093         elif field == "pip":
2094           val = node.primary_ip
2095         elif field == "sip":
2096           val = node.secondary_ip
2097         elif field == "tags":
2098           val = list(node.GetTags())
2099         elif field == "serial_no":
2100           val = node.serial_no
2101         elif field == "master_candidate":
2102           val = node.master_candidate
2103         elif field == "master":
2104           val = node.name == master_node
2105         elif field == "offline":
2106           val = node.offline
2107         elif field == "drained":
2108           val = node.drained
2109         elif self._FIELDS_DYNAMIC.Matches(field):
2110           val = live_data[node.name].get(field, None)
2111         elif field == "role":
2112           if node.name == master_node:
2113             val = "M"
2114           elif node.master_candidate:
2115             val = "C"
2116           elif node.drained:
2117             val = "D"
2118           elif node.offline:
2119             val = "O"
2120           else:
2121             val = "R"
2122         else:
2123           raise errors.ParameterError(field)
2124         node_output.append(val)
2125       output.append(node_output)
2126
2127     return output
2128
2129
2130 class LUQueryNodeVolumes(NoHooksLU):
2131   """Logical unit for getting volumes on node(s).
2132
2133   """
2134   _OP_REQP = ["nodes", "output_fields"]
2135   REQ_BGL = False
2136   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2137   _FIELDS_STATIC = utils.FieldSet("node")
2138
2139   def ExpandNames(self):
2140     _CheckOutputFields(static=self._FIELDS_STATIC,
2141                        dynamic=self._FIELDS_DYNAMIC,
2142                        selected=self.op.output_fields)
2143
2144     self.needed_locks = {}
2145     self.share_locks[locking.LEVEL_NODE] = 1
2146     if not self.op.nodes:
2147       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2148     else:
2149       self.needed_locks[locking.LEVEL_NODE] = \
2150         _GetWantedNodes(self, self.op.nodes)
2151
2152   def CheckPrereq(self):
2153     """Check prerequisites.
2154
2155     This checks that the fields required are valid output fields.
2156
2157     """
2158     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2159
2160   def Exec(self, feedback_fn):
2161     """Computes the list of nodes and their attributes.
2162
2163     """
2164     nodenames = self.nodes
2165     volumes = self.rpc.call_node_volumes(nodenames)
2166
2167     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2168              in self.cfg.GetInstanceList()]
2169
2170     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2171
2172     output = []
2173     for node in nodenames:
2174       nresult = volumes[node]
2175       if nresult.offline:
2176         continue
2177       msg = nresult.fail_msg
2178       if msg:
2179         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2180         continue
2181
2182       node_vols = nresult.payload[:]
2183       node_vols.sort(key=lambda vol: vol['dev'])
2184
2185       for vol in node_vols:
2186         node_output = []
2187         for field in self.op.output_fields:
2188           if field == "node":
2189             val = node
2190           elif field == "phys":
2191             val = vol['dev']
2192           elif field == "vg":
2193             val = vol['vg']
2194           elif field == "name":
2195             val = vol['name']
2196           elif field == "size":
2197             val = int(float(vol['size']))
2198           elif field == "instance":
2199             for inst in ilist:
2200               if node not in lv_by_node[inst]:
2201                 continue
2202               if vol['name'] in lv_by_node[inst][node]:
2203                 val = inst.name
2204                 break
2205             else:
2206               val = '-'
2207           else:
2208             raise errors.ParameterError(field)
2209           node_output.append(str(val))
2210
2211         output.append(node_output)
2212
2213     return output
2214
2215
2216 class LUAddNode(LogicalUnit):
2217   """Logical unit for adding node to the cluster.
2218
2219   """
2220   HPATH = "node-add"
2221   HTYPE = constants.HTYPE_NODE
2222   _OP_REQP = ["node_name"]
2223
2224   def BuildHooksEnv(self):
2225     """Build hooks env.
2226
2227     This will run on all nodes before, and on all nodes + the new node after.
2228
2229     """
2230     env = {
2231       "OP_TARGET": self.op.node_name,
2232       "NODE_NAME": self.op.node_name,
2233       "NODE_PIP": self.op.primary_ip,
2234       "NODE_SIP": self.op.secondary_ip,
2235       }
2236     nodes_0 = self.cfg.GetNodeList()
2237     nodes_1 = nodes_0 + [self.op.node_name, ]
2238     return env, nodes_0, nodes_1
2239
2240   def CheckPrereq(self):
2241     """Check prerequisites.
2242
2243     This checks:
2244      - the new node is not already in the config
2245      - it is resolvable
2246      - its parameters (single/dual homed) matches the cluster
2247
2248     Any errors are signaled by raising errors.OpPrereqError.
2249
2250     """
2251     node_name = self.op.node_name
2252     cfg = self.cfg
2253
2254     dns_data = utils.HostInfo(node_name)
2255
2256     node = dns_data.name
2257     primary_ip = self.op.primary_ip = dns_data.ip
2258     secondary_ip = getattr(self.op, "secondary_ip", None)
2259     if secondary_ip is None:
2260       secondary_ip = primary_ip
2261     if not utils.IsValidIP(secondary_ip):
2262       raise errors.OpPrereqError("Invalid secondary IP given")
2263     self.op.secondary_ip = secondary_ip
2264
2265     node_list = cfg.GetNodeList()
2266     if not self.op.readd and node in node_list:
2267       raise errors.OpPrereqError("Node %s is already in the configuration" %
2268                                  node)
2269     elif self.op.readd and node not in node_list:
2270       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2271
2272     for existing_node_name in node_list:
2273       existing_node = cfg.GetNodeInfo(existing_node_name)
2274
2275       if self.op.readd and node == existing_node_name:
2276         if (existing_node.primary_ip != primary_ip or
2277             existing_node.secondary_ip != secondary_ip):
2278           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2279                                      " address configuration as before")
2280         continue
2281
2282       if (existing_node.primary_ip == primary_ip or
2283           existing_node.secondary_ip == primary_ip or
2284           existing_node.primary_ip == secondary_ip or
2285           existing_node.secondary_ip == secondary_ip):
2286         raise errors.OpPrereqError("New node ip address(es) conflict with"
2287                                    " existing node %s" % existing_node.name)
2288
2289     # check that the type of the node (single versus dual homed) is the
2290     # same as for the master
2291     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2292     master_singlehomed = myself.secondary_ip == myself.primary_ip
2293     newbie_singlehomed = secondary_ip == primary_ip
2294     if master_singlehomed != newbie_singlehomed:
2295       if master_singlehomed:
2296         raise errors.OpPrereqError("The master has no private ip but the"
2297                                    " new node has one")
2298       else:
2299         raise errors.OpPrereqError("The master has a private ip but the"
2300                                    " new node doesn't have one")
2301
2302     # checks reachability
2303     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2304       raise errors.OpPrereqError("Node not reachable by ping")
2305
2306     if not newbie_singlehomed:
2307       # check reachability from my secondary ip to newbie's secondary ip
2308       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2309                            source=myself.secondary_ip):
2310         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2311                                    " based ping to noded port")
2312
2313     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2314     if self.op.readd:
2315       exceptions = [node]
2316     else:
2317       exceptions = []
2318     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2319     # the new node will increase mc_max with one, so:
2320     mc_max = min(mc_max + 1, cp_size)
2321     self.master_candidate = mc_now < mc_max
2322
2323     if self.op.readd:
2324       self.new_node = self.cfg.GetNodeInfo(node)
2325       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2326     else:
2327       self.new_node = objects.Node(name=node,
2328                                    primary_ip=primary_ip,
2329                                    secondary_ip=secondary_ip,
2330                                    master_candidate=self.master_candidate,
2331                                    offline=False, drained=False)
2332
2333   def Exec(self, feedback_fn):
2334     """Adds the new node to the cluster.
2335
2336     """
2337     new_node = self.new_node
2338     node = new_node.name
2339
2340     # for re-adds, reset the offline/drained/master-candidate flags;
2341     # we need to reset here, otherwise offline would prevent RPC calls
2342     # later in the procedure; this also means that if the re-add
2343     # fails, we are left with a non-offlined, broken node
2344     if self.op.readd:
2345       new_node.drained = new_node.offline = False
2346       self.LogInfo("Readding a node, the offline/drained flags were reset")
2347       # if we demote the node, we do cleanup later in the procedure
2348       new_node.master_candidate = self.master_candidate
2349
2350     # notify the user about any possible mc promotion
2351     if new_node.master_candidate:
2352       self.LogInfo("Node will be a master candidate")
2353
2354     # check connectivity
2355     result = self.rpc.call_version([node])[node]
2356     result.Raise("Can't get version information from node %s" % node)
2357     if constants.PROTOCOL_VERSION == result.payload:
2358       logging.info("Communication to node %s fine, sw version %s match",
2359                    node, result.payload)
2360     else:
2361       raise errors.OpExecError("Version mismatch master version %s,"
2362                                " node version %s" %
2363                                (constants.PROTOCOL_VERSION, result.payload))
2364
2365     # setup ssh on node
2366     logging.info("Copy ssh key to node %s", node)
2367     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2368     keyarray = []
2369     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2370                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2371                 priv_key, pub_key]
2372
2373     for i in keyfiles:
2374       f = open(i, 'r')
2375       try:
2376         keyarray.append(f.read())
2377       finally:
2378         f.close()
2379
2380     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2381                                     keyarray[2],
2382                                     keyarray[3], keyarray[4], keyarray[5])
2383     result.Raise("Cannot transfer ssh keys to the new node")
2384
2385     # Add node to our /etc/hosts, and add key to known_hosts
2386     if self.cfg.GetClusterInfo().modify_etc_hosts:
2387       utils.AddHostToEtcHosts(new_node.name)
2388
2389     if new_node.secondary_ip != new_node.primary_ip:
2390       result = self.rpc.call_node_has_ip_address(new_node.name,
2391                                                  new_node.secondary_ip)
2392       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2393                    prereq=True)
2394       if not result.payload:
2395         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2396                                  " you gave (%s). Please fix and re-run this"
2397                                  " command." % new_node.secondary_ip)
2398
2399     node_verify_list = [self.cfg.GetMasterNode()]
2400     node_verify_param = {
2401       'nodelist': [node],
2402       # TODO: do a node-net-test as well?
2403     }
2404
2405     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2406                                        self.cfg.GetClusterName())
2407     for verifier in node_verify_list:
2408       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2409       nl_payload = result[verifier].payload['nodelist']
2410       if nl_payload:
2411         for failed in nl_payload:
2412           feedback_fn("ssh/hostname verification failed %s -> %s" %
2413                       (verifier, nl_payload[failed]))
2414         raise errors.OpExecError("ssh/hostname verification failed.")
2415
2416     if self.op.readd:
2417       _RedistributeAncillaryFiles(self)
2418       self.context.ReaddNode(new_node)
2419       # make sure we redistribute the config
2420       self.cfg.Update(new_node)
2421       # and make sure the new node will not have old files around
2422       if not new_node.master_candidate:
2423         result = self.rpc.call_node_demote_from_mc(new_node.name)
2424         msg = result.RemoteFailMsg()
2425         if msg:
2426           self.LogWarning("Node failed to demote itself from master"
2427                           " candidate status: %s" % msg)
2428     else:
2429       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2430       self.context.AddNode(new_node)
2431
2432
2433 class LUSetNodeParams(LogicalUnit):
2434   """Modifies the parameters of a node.
2435
2436   """
2437   HPATH = "node-modify"
2438   HTYPE = constants.HTYPE_NODE
2439   _OP_REQP = ["node_name"]
2440   REQ_BGL = False
2441
2442   def CheckArguments(self):
2443     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2444     if node_name is None:
2445       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2446     self.op.node_name = node_name
2447     _CheckBooleanOpField(self.op, 'master_candidate')
2448     _CheckBooleanOpField(self.op, 'offline')
2449     _CheckBooleanOpField(self.op, 'drained')
2450     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2451     if all_mods.count(None) == 3:
2452       raise errors.OpPrereqError("Please pass at least one modification")
2453     if all_mods.count(True) > 1:
2454       raise errors.OpPrereqError("Can't set the node into more than one"
2455                                  " state at the same time")
2456
2457   def ExpandNames(self):
2458     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2459
2460   def BuildHooksEnv(self):
2461     """Build hooks env.
2462
2463     This runs on the master node.
2464
2465     """
2466     env = {
2467       "OP_TARGET": self.op.node_name,
2468       "MASTER_CANDIDATE": str(self.op.master_candidate),
2469       "OFFLINE": str(self.op.offline),
2470       "DRAINED": str(self.op.drained),
2471       }
2472     nl = [self.cfg.GetMasterNode(),
2473           self.op.node_name]
2474     return env, nl, nl
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This only checks the instance list against the existing names.
2480
2481     """
2482     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2483
2484     if ((self.op.master_candidate == False or self.op.offline == True or
2485          self.op.drained == True) and node.master_candidate):
2486       # we will demote the node from master_candidate
2487       if self.op.node_name == self.cfg.GetMasterNode():
2488         raise errors.OpPrereqError("The master node has to be a"
2489                                    " master candidate, online and not drained")
2490       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2491       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2492       if num_candidates <= cp_size:
2493         msg = ("Not enough master candidates (desired"
2494                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2495         if self.op.force:
2496           self.LogWarning(msg)
2497         else:
2498           raise errors.OpPrereqError(msg)
2499
2500     if (self.op.master_candidate == True and
2501         ((node.offline and not self.op.offline == False) or
2502          (node.drained and not self.op.drained == False))):
2503       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2504                                  " to master_candidate" % node.name)
2505
2506     return
2507
2508   def Exec(self, feedback_fn):
2509     """Modifies a node.
2510
2511     """
2512     node = self.node
2513
2514     result = []
2515     changed_mc = False
2516
2517     if self.op.offline is not None:
2518       node.offline = self.op.offline
2519       result.append(("offline", str(self.op.offline)))
2520       if self.op.offline == True:
2521         if node.master_candidate:
2522           node.master_candidate = False
2523           changed_mc = True
2524           result.append(("master_candidate", "auto-demotion due to offline"))
2525         if node.drained:
2526           node.drained = False
2527           result.append(("drained", "clear drained status due to offline"))
2528
2529     if self.op.master_candidate is not None:
2530       node.master_candidate = self.op.master_candidate
2531       changed_mc = True
2532       result.append(("master_candidate", str(self.op.master_candidate)))
2533       if self.op.master_candidate == False:
2534         rrc = self.rpc.call_node_demote_from_mc(node.name)
2535         msg = rrc.fail_msg
2536         if msg:
2537           self.LogWarning("Node failed to demote itself: %s" % msg)
2538
2539     if self.op.drained is not None:
2540       node.drained = self.op.drained
2541       result.append(("drained", str(self.op.drained)))
2542       if self.op.drained == True:
2543         if node.master_candidate:
2544           node.master_candidate = False
2545           changed_mc = True
2546           result.append(("master_candidate", "auto-demotion due to drain"))
2547           rrc = self.rpc.call_node_demote_from_mc(node.name)
2548           msg = rrc.RemoteFailMsg()
2549           if msg:
2550             self.LogWarning("Node failed to demote itself: %s" % msg)
2551         if node.offline:
2552           node.offline = False
2553           result.append(("offline", "clear offline status due to drain"))
2554
2555     # this will trigger configuration file update, if needed
2556     self.cfg.Update(node)
2557     # this will trigger job queue propagation or cleanup
2558     if changed_mc:
2559       self.context.ReaddNode(node)
2560
2561     return result
2562
2563
2564 class LUPowercycleNode(NoHooksLU):
2565   """Powercycles a node.
2566
2567   """
2568   _OP_REQP = ["node_name", "force"]
2569   REQ_BGL = False
2570
2571   def CheckArguments(self):
2572     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2573     if node_name is None:
2574       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2575     self.op.node_name = node_name
2576     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2577       raise errors.OpPrereqError("The node is the master and the force"
2578                                  " parameter was not set")
2579
2580   def ExpandNames(self):
2581     """Locking for PowercycleNode.
2582
2583     This is a last-resource option and shouldn't block on other
2584     jobs. Therefore, we grab no locks.
2585
2586     """
2587     self.needed_locks = {}
2588
2589   def CheckPrereq(self):
2590     """Check prerequisites.
2591
2592     This LU has no prereqs.
2593
2594     """
2595     pass
2596
2597   def Exec(self, feedback_fn):
2598     """Reboots a node.
2599
2600     """
2601     result = self.rpc.call_node_powercycle(self.op.node_name,
2602                                            self.cfg.GetHypervisorType())
2603     result.Raise("Failed to schedule the reboot")
2604     return result.payload
2605
2606
2607 class LUQueryClusterInfo(NoHooksLU):
2608   """Query cluster configuration.
2609
2610   """
2611   _OP_REQP = []
2612   REQ_BGL = False
2613
2614   def ExpandNames(self):
2615     self.needed_locks = {}
2616
2617   def CheckPrereq(self):
2618     """No prerequsites needed for this LU.
2619
2620     """
2621     pass
2622
2623   def Exec(self, feedback_fn):
2624     """Return cluster config.
2625
2626     """
2627     cluster = self.cfg.GetClusterInfo()
2628     result = {
2629       "software_version": constants.RELEASE_VERSION,
2630       "protocol_version": constants.PROTOCOL_VERSION,
2631       "config_version": constants.CONFIG_VERSION,
2632       "os_api_version": max(constants.OS_API_VERSIONS),
2633       "export_version": constants.EXPORT_VERSION,
2634       "architecture": (platform.architecture()[0], platform.machine()),
2635       "name": cluster.cluster_name,
2636       "master": cluster.master_node,
2637       "default_hypervisor": cluster.enabled_hypervisors[0],
2638       "enabled_hypervisors": cluster.enabled_hypervisors,
2639       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2640                         for hypervisor_name in cluster.enabled_hypervisors]),
2641       "beparams": cluster.beparams,
2642       "nicparams": cluster.nicparams,
2643       "candidate_pool_size": cluster.candidate_pool_size,
2644       "master_netdev": cluster.master_netdev,
2645       "volume_group_name": cluster.volume_group_name,
2646       "file_storage_dir": cluster.file_storage_dir,
2647       }
2648
2649     return result
2650
2651
2652 class LUQueryConfigValues(NoHooksLU):
2653   """Return configuration values.
2654
2655   """
2656   _OP_REQP = []
2657   REQ_BGL = False
2658   _FIELDS_DYNAMIC = utils.FieldSet()
2659   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2660
2661   def ExpandNames(self):
2662     self.needed_locks = {}
2663
2664     _CheckOutputFields(static=self._FIELDS_STATIC,
2665                        dynamic=self._FIELDS_DYNAMIC,
2666                        selected=self.op.output_fields)
2667
2668   def CheckPrereq(self):
2669     """No prerequisites.
2670
2671     """
2672     pass
2673
2674   def Exec(self, feedback_fn):
2675     """Dump a representation of the cluster config to the standard output.
2676
2677     """
2678     values = []
2679     for field in self.op.output_fields:
2680       if field == "cluster_name":
2681         entry = self.cfg.GetClusterName()
2682       elif field == "master_node":
2683         entry = self.cfg.GetMasterNode()
2684       elif field == "drain_flag":
2685         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2686       else:
2687         raise errors.ParameterError(field)
2688       values.append(entry)
2689     return values
2690
2691
2692 class LUActivateInstanceDisks(NoHooksLU):
2693   """Bring up an instance's disks.
2694
2695   """
2696   _OP_REQP = ["instance_name"]
2697   REQ_BGL = False
2698
2699   def ExpandNames(self):
2700     self._ExpandAndLockInstance()
2701     self.needed_locks[locking.LEVEL_NODE] = []
2702     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2703
2704   def DeclareLocks(self, level):
2705     if level == locking.LEVEL_NODE:
2706       self._LockInstancesNodes()
2707
2708   def CheckPrereq(self):
2709     """Check prerequisites.
2710
2711     This checks that the instance is in the cluster.
2712
2713     """
2714     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715     assert self.instance is not None, \
2716       "Cannot retrieve locked instance %s" % self.op.instance_name
2717     _CheckNodeOnline(self, self.instance.primary_node)
2718
2719   def Exec(self, feedback_fn):
2720     """Activate the disks.
2721
2722     """
2723     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2724     if not disks_ok:
2725       raise errors.OpExecError("Cannot activate block devices")
2726
2727     return disks_info
2728
2729
2730 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2731   """Prepare the block devices for an instance.
2732
2733   This sets up the block devices on all nodes.
2734
2735   @type lu: L{LogicalUnit}
2736   @param lu: the logical unit on whose behalf we execute
2737   @type instance: L{objects.Instance}
2738   @param instance: the instance for whose disks we assemble
2739   @type ignore_secondaries: boolean
2740   @param ignore_secondaries: if true, errors on secondary nodes
2741       won't result in an error return from the function
2742   @return: False if the operation failed, otherwise a list of
2743       (host, instance_visible_name, node_visible_name)
2744       with the mapping from node devices to instance devices
2745
2746   """
2747   device_info = []
2748   disks_ok = True
2749   iname = instance.name
2750   # With the two passes mechanism we try to reduce the window of
2751   # opportunity for the race condition of switching DRBD to primary
2752   # before handshaking occured, but we do not eliminate it
2753
2754   # The proper fix would be to wait (with some limits) until the
2755   # connection has been made and drbd transitions from WFConnection
2756   # into any other network-connected state (Connected, SyncTarget,
2757   # SyncSource, etc.)
2758
2759   # 1st pass, assemble on all nodes in secondary mode
2760   for inst_disk in instance.disks:
2761     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2762       lu.cfg.SetDiskID(node_disk, node)
2763       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2764       msg = result.fail_msg
2765       if msg:
2766         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2767                            " (is_primary=False, pass=1): %s",
2768                            inst_disk.iv_name, node, msg)
2769         if not ignore_secondaries:
2770           disks_ok = False
2771
2772   # FIXME: race condition on drbd migration to primary
2773
2774   # 2nd pass, do only the primary node
2775   for inst_disk in instance.disks:
2776     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2777       if node != instance.primary_node:
2778         continue
2779       lu.cfg.SetDiskID(node_disk, node)
2780       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2781       msg = result.fail_msg
2782       if msg:
2783         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2784                            " (is_primary=True, pass=2): %s",
2785                            inst_disk.iv_name, node, msg)
2786         disks_ok = False
2787     device_info.append((instance.primary_node, inst_disk.iv_name,
2788                         result.payload))
2789
2790   # leave the disks configured for the primary node
2791   # this is a workaround that would be fixed better by
2792   # improving the logical/physical id handling
2793   for disk in instance.disks:
2794     lu.cfg.SetDiskID(disk, instance.primary_node)
2795
2796   return disks_ok, device_info
2797
2798
2799 def _StartInstanceDisks(lu, instance, force):
2800   """Start the disks of an instance.
2801
2802   """
2803   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2804                                            ignore_secondaries=force)
2805   if not disks_ok:
2806     _ShutdownInstanceDisks(lu, instance)
2807     if force is not None and not force:
2808       lu.proc.LogWarning("", hint="If the message above refers to a"
2809                          " secondary node,"
2810                          " you can retry the operation using '--force'.")
2811     raise errors.OpExecError("Disk consistency error")
2812
2813
2814 class LUDeactivateInstanceDisks(NoHooksLU):
2815   """Shutdown an instance's disks.
2816
2817   """
2818   _OP_REQP = ["instance_name"]
2819   REQ_BGL = False
2820
2821   def ExpandNames(self):
2822     self._ExpandAndLockInstance()
2823     self.needed_locks[locking.LEVEL_NODE] = []
2824     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2825
2826   def DeclareLocks(self, level):
2827     if level == locking.LEVEL_NODE:
2828       self._LockInstancesNodes()
2829
2830   def CheckPrereq(self):
2831     """Check prerequisites.
2832
2833     This checks that the instance is in the cluster.
2834
2835     """
2836     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2837     assert self.instance is not None, \
2838       "Cannot retrieve locked instance %s" % self.op.instance_name
2839
2840   def Exec(self, feedback_fn):
2841     """Deactivate the disks
2842
2843     """
2844     instance = self.instance
2845     _SafeShutdownInstanceDisks(self, instance)
2846
2847
2848 def _SafeShutdownInstanceDisks(lu, instance):
2849   """Shutdown block devices of an instance.
2850
2851   This function checks if an instance is running, before calling
2852   _ShutdownInstanceDisks.
2853
2854   """
2855   pnode = instance.primary_node
2856   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2857   ins_l.Raise("Can't contact node %s" % pnode)
2858
2859   if instance.name in ins_l.payload:
2860     raise errors.OpExecError("Instance is running, can't shutdown"
2861                              " block devices.")
2862
2863   _ShutdownInstanceDisks(lu, instance)
2864
2865
2866 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2867   """Shutdown block devices of an instance.
2868
2869   This does the shutdown on all nodes of the instance.
2870
2871   If the ignore_primary is false, errors on the primary node are
2872   ignored.
2873
2874   """
2875   all_result = True
2876   for disk in instance.disks:
2877     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2878       lu.cfg.SetDiskID(top_disk, node)
2879       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2880       msg = result.fail_msg
2881       if msg:
2882         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2883                       disk.iv_name, node, msg)
2884         if not ignore_primary or node != instance.primary_node:
2885           all_result = False
2886   return all_result
2887
2888
2889 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2890   """Checks if a node has enough free memory.
2891
2892   This function check if a given node has the needed amount of free
2893   memory. In case the node has less memory or we cannot get the
2894   information from the node, this function raise an OpPrereqError
2895   exception.
2896
2897   @type lu: C{LogicalUnit}
2898   @param lu: a logical unit from which we get configuration data
2899   @type node: C{str}
2900   @param node: the node to check
2901   @type reason: C{str}
2902   @param reason: string to use in the error message
2903   @type requested: C{int}
2904   @param requested: the amount of memory in MiB to check for
2905   @type hypervisor_name: C{str}
2906   @param hypervisor_name: the hypervisor to ask for memory stats
2907   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2908       we cannot check the node
2909
2910   """
2911   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2912   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2913   free_mem = nodeinfo[node].payload.get('memory_free', None)
2914   if not isinstance(free_mem, int):
2915     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2916                                " was '%s'" % (node, free_mem))
2917   if requested > free_mem:
2918     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2919                                " needed %s MiB, available %s MiB" %
2920                                (node, reason, requested, free_mem))
2921
2922
2923 class LUStartupInstance(LogicalUnit):
2924   """Starts an instance.
2925
2926   """
2927   HPATH = "instance-start"
2928   HTYPE = constants.HTYPE_INSTANCE
2929   _OP_REQP = ["instance_name", "force"]
2930   REQ_BGL = False
2931
2932   def ExpandNames(self):
2933     self._ExpandAndLockInstance()
2934
2935   def BuildHooksEnv(self):
2936     """Build hooks env.
2937
2938     This runs on master, primary and secondary nodes of the instance.
2939
2940     """
2941     env = {
2942       "FORCE": self.op.force,
2943       }
2944     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2945     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2946     return env, nl, nl
2947
2948   def CheckPrereq(self):
2949     """Check prerequisites.
2950
2951     This checks that the instance is in the cluster.
2952
2953     """
2954     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2955     assert self.instance is not None, \
2956       "Cannot retrieve locked instance %s" % self.op.instance_name
2957
2958     # extra beparams
2959     self.beparams = getattr(self.op, "beparams", {})
2960     if self.beparams:
2961       if not isinstance(self.beparams, dict):
2962         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2963                                    " dict" % (type(self.beparams), ))
2964       # fill the beparams dict
2965       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2966       self.op.beparams = self.beparams
2967
2968     # extra hvparams
2969     self.hvparams = getattr(self.op, "hvparams", {})
2970     if self.hvparams:
2971       if not isinstance(self.hvparams, dict):
2972         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2973                                    " dict" % (type(self.hvparams), ))
2974
2975       # check hypervisor parameter syntax (locally)
2976       cluster = self.cfg.GetClusterInfo()
2977       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2978       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2979                                     instance.hvparams)
2980       filled_hvp.update(self.hvparams)
2981       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2982       hv_type.CheckParameterSyntax(filled_hvp)
2983       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2984       self.op.hvparams = self.hvparams
2985
2986     _CheckNodeOnline(self, instance.primary_node)
2987
2988     bep = self.cfg.GetClusterInfo().FillBE(instance)
2989     # check bridges existence
2990     _CheckInstanceBridgesExist(self, instance)
2991
2992     remote_info = self.rpc.call_instance_info(instance.primary_node,
2993                                               instance.name,
2994                                               instance.hypervisor)
2995     remote_info.Raise("Error checking node %s" % instance.primary_node,
2996                       prereq=True)
2997     if not remote_info.payload: # not running already
2998       _CheckNodeFreeMemory(self, instance.primary_node,
2999                            "starting instance %s" % instance.name,
3000                            bep[constants.BE_MEMORY], instance.hypervisor)
3001
3002   def Exec(self, feedback_fn):
3003     """Start the instance.
3004
3005     """
3006     instance = self.instance
3007     force = self.op.force
3008
3009     self.cfg.MarkInstanceUp(instance.name)
3010
3011     node_current = instance.primary_node
3012
3013     _StartInstanceDisks(self, instance, force)
3014
3015     result = self.rpc.call_instance_start(node_current, instance,
3016                                           self.hvparams, self.beparams)
3017     msg = result.fail_msg
3018     if msg:
3019       _ShutdownInstanceDisks(self, instance)
3020       raise errors.OpExecError("Could not start instance: %s" % msg)
3021
3022
3023 class LURebootInstance(LogicalUnit):
3024   """Reboot an instance.
3025
3026   """
3027   HPATH = "instance-reboot"
3028   HTYPE = constants.HTYPE_INSTANCE
3029   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3030   REQ_BGL = False
3031
3032   def ExpandNames(self):
3033     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3034                                    constants.INSTANCE_REBOOT_HARD,
3035                                    constants.INSTANCE_REBOOT_FULL]:
3036       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3037                                   (constants.INSTANCE_REBOOT_SOFT,
3038                                    constants.INSTANCE_REBOOT_HARD,
3039                                    constants.INSTANCE_REBOOT_FULL))
3040     self._ExpandAndLockInstance()
3041
3042   def BuildHooksEnv(self):
3043     """Build hooks env.
3044
3045     This runs on master, primary and secondary nodes of the instance.
3046
3047     """
3048     env = {
3049       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3050       "REBOOT_TYPE": self.op.reboot_type,
3051       }
3052     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3053     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3054     return env, nl, nl
3055
3056   def CheckPrereq(self):
3057     """Check prerequisites.
3058
3059     This checks that the instance is in the cluster.
3060
3061     """
3062     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3063     assert self.instance is not None, \
3064       "Cannot retrieve locked instance %s" % self.op.instance_name
3065
3066     _CheckNodeOnline(self, instance.primary_node)
3067
3068     # check bridges existence
3069     _CheckInstanceBridgesExist(self, instance)
3070
3071   def Exec(self, feedback_fn):
3072     """Reboot the instance.
3073
3074     """
3075     instance = self.instance
3076     ignore_secondaries = self.op.ignore_secondaries
3077     reboot_type = self.op.reboot_type
3078
3079     node_current = instance.primary_node
3080
3081     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3082                        constants.INSTANCE_REBOOT_HARD]:
3083       for disk in instance.disks:
3084         self.cfg.SetDiskID(disk, node_current)
3085       result = self.rpc.call_instance_reboot(node_current, instance,
3086                                              reboot_type)
3087       result.Raise("Could not reboot instance")
3088     else:
3089       result = self.rpc.call_instance_shutdown(node_current, instance)
3090       result.Raise("Could not shutdown instance for full reboot")
3091       _ShutdownInstanceDisks(self, instance)
3092       _StartInstanceDisks(self, instance, ignore_secondaries)
3093       result = self.rpc.call_instance_start(node_current, instance, None, None)
3094       msg = result.fail_msg
3095       if msg:
3096         _ShutdownInstanceDisks(self, instance)
3097         raise errors.OpExecError("Could not start instance for"
3098                                  " full reboot: %s" % msg)
3099
3100     self.cfg.MarkInstanceUp(instance.name)
3101
3102
3103 class LUShutdownInstance(LogicalUnit):
3104   """Shutdown an instance.
3105
3106   """
3107   HPATH = "instance-stop"
3108   HTYPE = constants.HTYPE_INSTANCE
3109   _OP_REQP = ["instance_name"]
3110   REQ_BGL = False
3111
3112   def ExpandNames(self):
3113     self._ExpandAndLockInstance()
3114
3115   def BuildHooksEnv(self):
3116     """Build hooks env.
3117
3118     This runs on master, primary and secondary nodes of the instance.
3119
3120     """
3121     env = _BuildInstanceHookEnvByObject(self, self.instance)
3122     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3123     return env, nl, nl
3124
3125   def CheckPrereq(self):
3126     """Check prerequisites.
3127
3128     This checks that the instance is in the cluster.
3129
3130     """
3131     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3132     assert self.instance is not None, \
3133       "Cannot retrieve locked instance %s" % self.op.instance_name
3134     _CheckNodeOnline(self, self.instance.primary_node)
3135
3136   def Exec(self, feedback_fn):
3137     """Shutdown the instance.
3138
3139     """
3140     instance = self.instance
3141     node_current = instance.primary_node
3142     self.cfg.MarkInstanceDown(instance.name)
3143     result = self.rpc.call_instance_shutdown(node_current, instance)
3144     msg = result.fail_msg
3145     if msg:
3146       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3147
3148     _ShutdownInstanceDisks(self, instance)
3149
3150
3151 class LUReinstallInstance(LogicalUnit):
3152   """Reinstall an instance.
3153
3154   """
3155   HPATH = "instance-reinstall"
3156   HTYPE = constants.HTYPE_INSTANCE
3157   _OP_REQP = ["instance_name"]
3158   REQ_BGL = False
3159
3160   def ExpandNames(self):
3161     self._ExpandAndLockInstance()
3162
3163   def BuildHooksEnv(self):
3164     """Build hooks env.
3165
3166     This runs on master, primary and secondary nodes of the instance.
3167
3168     """
3169     env = _BuildInstanceHookEnvByObject(self, self.instance)
3170     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3171     return env, nl, nl
3172
3173   def CheckPrereq(self):
3174     """Check prerequisites.
3175
3176     This checks that the instance is in the cluster and is not running.
3177
3178     """
3179     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3180     assert instance is not None, \
3181       "Cannot retrieve locked instance %s" % self.op.instance_name
3182     _CheckNodeOnline(self, instance.primary_node)
3183
3184     if instance.disk_template == constants.DT_DISKLESS:
3185       raise errors.OpPrereqError("Instance '%s' has no disks" %
3186                                  self.op.instance_name)
3187     if instance.admin_up:
3188       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3189                                  self.op.instance_name)
3190     remote_info = self.rpc.call_instance_info(instance.primary_node,
3191                                               instance.name,
3192                                               instance.hypervisor)
3193     remote_info.Raise("Error checking node %s" % instance.primary_node,
3194                       prereq=True)
3195     if remote_info.payload:
3196       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3197                                  (self.op.instance_name,
3198                                   instance.primary_node))
3199
3200     self.op.os_type = getattr(self.op, "os_type", None)
3201     if self.op.os_type is not None:
3202       # OS verification
3203       pnode = self.cfg.GetNodeInfo(
3204         self.cfg.ExpandNodeName(instance.primary_node))
3205       if pnode is None:
3206         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3207                                    self.op.pnode)
3208       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3209       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3210                    (self.op.os_type, pnode.name), prereq=True)
3211
3212     self.instance = instance
3213
3214   def Exec(self, feedback_fn):
3215     """Reinstall the instance.
3216
3217     """
3218     inst = self.instance
3219
3220     if self.op.os_type is not None:
3221       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3222       inst.os = self.op.os_type
3223       self.cfg.Update(inst)
3224
3225     _StartInstanceDisks(self, inst, None)
3226     try:
3227       feedback_fn("Running the instance OS create scripts...")
3228       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3229       result.Raise("Could not install OS for instance %s on node %s" %
3230                    (inst.name, inst.primary_node))
3231     finally:
3232       _ShutdownInstanceDisks(self, inst)
3233
3234
3235 class LURenameInstance(LogicalUnit):
3236   """Rename an instance.
3237
3238   """
3239   HPATH = "instance-rename"
3240   HTYPE = constants.HTYPE_INSTANCE
3241   _OP_REQP = ["instance_name", "new_name"]
3242
3243   def BuildHooksEnv(self):
3244     """Build hooks env.
3245
3246     This runs on master, primary and secondary nodes of the instance.
3247
3248     """
3249     env = _BuildInstanceHookEnvByObject(self, self.instance)
3250     env["INSTANCE_NEW_NAME"] = self.op.new_name
3251     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3252     return env, nl, nl
3253
3254   def CheckPrereq(self):
3255     """Check prerequisites.
3256
3257     This checks that the instance is in the cluster and is not running.
3258
3259     """
3260     instance = self.cfg.GetInstanceInfo(
3261       self.cfg.ExpandInstanceName(self.op.instance_name))
3262     if instance is None:
3263       raise errors.OpPrereqError("Instance '%s' not known" %
3264                                  self.op.instance_name)
3265     _CheckNodeOnline(self, instance.primary_node)
3266
3267     if instance.admin_up:
3268       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3269                                  self.op.instance_name)
3270     remote_info = self.rpc.call_instance_info(instance.primary_node,
3271                                               instance.name,
3272                                               instance.hypervisor)
3273     remote_info.Raise("Error checking node %s" % instance.primary_node,
3274                       prereq=True)
3275     if remote_info.payload:
3276       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3277                                  (self.op.instance_name,
3278                                   instance.primary_node))
3279     self.instance = instance
3280
3281     # new name verification
3282     name_info = utils.HostInfo(self.op.new_name)
3283
3284     self.op.new_name = new_name = name_info.name
3285     instance_list = self.cfg.GetInstanceList()
3286     if new_name in instance_list:
3287       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3288                                  new_name)
3289
3290     if not getattr(self.op, "ignore_ip", False):
3291       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3292         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3293                                    (name_info.ip, new_name))
3294
3295
3296   def Exec(self, feedback_fn):
3297     """Reinstall the instance.
3298
3299     """
3300     inst = self.instance
3301     old_name = inst.name
3302
3303     if inst.disk_template == constants.DT_FILE:
3304       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3305
3306     self.cfg.RenameInstance(inst.name, self.op.new_name)
3307     # Change the instance lock. This is definitely safe while we hold the BGL
3308     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3309     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3310
3311     # re-read the instance from the configuration after rename
3312     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3313
3314     if inst.disk_template == constants.DT_FILE:
3315       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3316       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3317                                                      old_file_storage_dir,
3318                                                      new_file_storage_dir)
3319       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3320                    " (but the instance has been renamed in Ganeti)" %
3321                    (inst.primary_node, old_file_storage_dir,
3322                     new_file_storage_dir))
3323
3324     _StartInstanceDisks(self, inst, None)
3325     try:
3326       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3327                                                  old_name)
3328       msg = result.fail_msg
3329       if msg:
3330         msg = ("Could not run OS rename script for instance %s on node %s"
3331                " (but the instance has been renamed in Ganeti): %s" %
3332                (inst.name, inst.primary_node, msg))
3333         self.proc.LogWarning(msg)
3334     finally:
3335       _ShutdownInstanceDisks(self, inst)
3336
3337
3338 class LURemoveInstance(LogicalUnit):
3339   """Remove an instance.
3340
3341   """
3342   HPATH = "instance-remove"
3343   HTYPE = constants.HTYPE_INSTANCE
3344   _OP_REQP = ["instance_name", "ignore_failures"]
3345   REQ_BGL = False
3346
3347   def ExpandNames(self):
3348     self._ExpandAndLockInstance()
3349     self.needed_locks[locking.LEVEL_NODE] = []
3350     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351
3352   def DeclareLocks(self, level):
3353     if level == locking.LEVEL_NODE:
3354       self._LockInstancesNodes()
3355
3356   def BuildHooksEnv(self):
3357     """Build hooks env.
3358
3359     This runs on master, primary and secondary nodes of the instance.
3360
3361     """
3362     env = _BuildInstanceHookEnvByObject(self, self.instance)
3363     nl = [self.cfg.GetMasterNode()]
3364     return env, nl, nl
3365
3366   def CheckPrereq(self):
3367     """Check prerequisites.
3368
3369     This checks that the instance is in the cluster.
3370
3371     """
3372     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3373     assert self.instance is not None, \
3374       "Cannot retrieve locked instance %s" % self.op.instance_name
3375
3376   def Exec(self, feedback_fn):
3377     """Remove the instance.
3378
3379     """
3380     instance = self.instance
3381     logging.info("Shutting down instance %s on node %s",
3382                  instance.name, instance.primary_node)
3383
3384     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3385     msg = result.fail_msg
3386     if msg:
3387       if self.op.ignore_failures:
3388         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3389       else:
3390         raise errors.OpExecError("Could not shutdown instance %s on"
3391                                  " node %s: %s" %
3392                                  (instance.name, instance.primary_node, msg))
3393
3394     logging.info("Removing block devices for instance %s", instance.name)
3395
3396     if not _RemoveDisks(self, instance):
3397       if self.op.ignore_failures:
3398         feedback_fn("Warning: can't remove instance's disks")
3399       else:
3400         raise errors.OpExecError("Can't remove instance's disks")
3401
3402     logging.info("Removing instance %s out of cluster config", instance.name)
3403
3404     self.cfg.RemoveInstance(instance.name)
3405     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3406
3407
3408 class LUQueryInstances(NoHooksLU):
3409   """Logical unit for querying instances.
3410
3411   """
3412   _OP_REQP = ["output_fields", "names", "use_locking"]
3413   REQ_BGL = False
3414   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3415                                     "admin_state",
3416                                     "disk_template", "ip", "mac", "bridge",
3417                                     "nic_mode", "nic_link",
3418                                     "sda_size", "sdb_size", "vcpus", "tags",
3419                                     "network_port", "beparams",
3420                                     r"(disk)\.(size)/([0-9]+)",
3421                                     r"(disk)\.(sizes)", "disk_usage",
3422                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3423                                     r"(nic)\.(bridge)/([0-9]+)",
3424                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3425                                     r"(disk|nic)\.(count)",
3426                                     "serial_no", "hypervisor", "hvparams",] +
3427                                   ["hv/%s" % name
3428                                    for name in constants.HVS_PARAMETERS] +
3429                                   ["be/%s" % name
3430                                    for name in constants.BES_PARAMETERS])
3431   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3432
3433
3434   def ExpandNames(self):
3435     _CheckOutputFields(static=self._FIELDS_STATIC,
3436                        dynamic=self._FIELDS_DYNAMIC,
3437                        selected=self.op.output_fields)
3438
3439     self.needed_locks = {}
3440     self.share_locks[locking.LEVEL_INSTANCE] = 1
3441     self.share_locks[locking.LEVEL_NODE] = 1
3442
3443     if self.op.names:
3444       self.wanted = _GetWantedInstances(self, self.op.names)
3445     else:
3446       self.wanted = locking.ALL_SET
3447
3448     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3449     self.do_locking = self.do_node_query and self.op.use_locking
3450     if self.do_locking:
3451       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3452       self.needed_locks[locking.LEVEL_NODE] = []
3453       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3454
3455   def DeclareLocks(self, level):
3456     if level == locking.LEVEL_NODE and self.do_locking:
3457       self._LockInstancesNodes()
3458
3459   def CheckPrereq(self):
3460     """Check prerequisites.
3461
3462     """
3463     pass
3464
3465   def Exec(self, feedback_fn):
3466     """Computes the list of nodes and their attributes.
3467
3468     """
3469     all_info = self.cfg.GetAllInstancesInfo()
3470     if self.wanted == locking.ALL_SET:
3471       # caller didn't specify instance names, so ordering is not important
3472       if self.do_locking:
3473         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3474       else:
3475         instance_names = all_info.keys()
3476       instance_names = utils.NiceSort(instance_names)
3477     else:
3478       # caller did specify names, so we must keep the ordering
3479       if self.do_locking:
3480         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3481       else:
3482         tgt_set = all_info.keys()
3483       missing = set(self.wanted).difference(tgt_set)
3484       if missing:
3485         raise errors.OpExecError("Some instances were removed before"
3486                                  " retrieving their data: %s" % missing)
3487       instance_names = self.wanted
3488
3489     instance_list = [all_info[iname] for iname in instance_names]
3490
3491     # begin data gathering
3492
3493     nodes = frozenset([inst.primary_node for inst in instance_list])
3494     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3495
3496     bad_nodes = []
3497     off_nodes = []
3498     if self.do_node_query:
3499       live_data = {}
3500       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3501       for name in nodes:
3502         result = node_data[name]
3503         if result.offline:
3504           # offline nodes will be in both lists
3505           off_nodes.append(name)
3506         if result.failed or result.fail_msg:
3507           bad_nodes.append(name)
3508         else:
3509           if result.payload:
3510             live_data.update(result.payload)
3511           # else no instance is alive
3512     else:
3513       live_data = dict([(name, {}) for name in instance_names])
3514
3515     # end data gathering
3516
3517     HVPREFIX = "hv/"
3518     BEPREFIX = "be/"
3519     output = []
3520     cluster = self.cfg.GetClusterInfo()
3521     for instance in instance_list:
3522       iout = []
3523       i_hv = cluster.FillHV(instance)
3524       i_be = cluster.FillBE(instance)
3525       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3526                                  nic.nicparams) for nic in instance.nics]
3527       for field in self.op.output_fields:
3528         st_match = self._FIELDS_STATIC.Matches(field)
3529         if field == "name":
3530           val = instance.name
3531         elif field == "os":
3532           val = instance.os
3533         elif field == "pnode":
3534           val = instance.primary_node
3535         elif field == "snodes":
3536           val = list(instance.secondary_nodes)
3537         elif field == "admin_state":
3538           val = instance.admin_up
3539         elif field == "oper_state":
3540           if instance.primary_node in bad_nodes:
3541             val = None
3542           else:
3543             val = bool(live_data.get(instance.name))
3544         elif field == "status":
3545           if instance.primary_node in off_nodes:
3546             val = "ERROR_nodeoffline"
3547           elif instance.primary_node in bad_nodes:
3548             val = "ERROR_nodedown"
3549           else:
3550             running = bool(live_data.get(instance.name))
3551             if running:
3552               if instance.admin_up:
3553                 val = "running"
3554               else:
3555                 val = "ERROR_up"
3556             else:
3557               if instance.admin_up:
3558                 val = "ERROR_down"
3559               else:
3560                 val = "ADMIN_down"
3561         elif field == "oper_ram":
3562           if instance.primary_node in bad_nodes:
3563             val = None
3564           elif instance.name in live_data:
3565             val = live_data[instance.name].get("memory", "?")
3566           else:
3567             val = "-"
3568         elif field == "vcpus":
3569           val = i_be[constants.BE_VCPUS]
3570         elif field == "disk_template":
3571           val = instance.disk_template
3572         elif field == "ip":
3573           if instance.nics:
3574             val = instance.nics[0].ip
3575           else:
3576             val = None
3577         elif field == "nic_mode":
3578           if instance.nics:
3579             val = i_nicp[0][constants.NIC_MODE]
3580           else:
3581             val = None
3582         elif field == "nic_link":
3583           if instance.nics:
3584             val = i_nicp[0][constants.NIC_LINK]
3585           else:
3586             val = None
3587         elif field == "bridge":
3588           if (instance.nics and
3589               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3590             val = i_nicp[0][constants.NIC_LINK]
3591           else:
3592             val = None
3593         elif field == "mac":
3594           if instance.nics:
3595             val = instance.nics[0].mac
3596           else:
3597             val = None
3598         elif field == "sda_size" or field == "sdb_size":
3599           idx = ord(field[2]) - ord('a')
3600           try:
3601             val = instance.FindDisk(idx).size
3602           except errors.OpPrereqError:
3603             val = None
3604         elif field == "disk_usage": # total disk usage per node
3605           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3606           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3607         elif field == "tags":
3608           val = list(instance.GetTags())
3609         elif field == "serial_no":
3610           val = instance.serial_no
3611         elif field == "network_port":
3612           val = instance.network_port
3613         elif field == "hypervisor":
3614           val = instance.hypervisor
3615         elif field == "hvparams":
3616           val = i_hv
3617         elif (field.startswith(HVPREFIX) and
3618               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3619           val = i_hv.get(field[len(HVPREFIX):], None)
3620         elif field == "beparams":
3621           val = i_be
3622         elif (field.startswith(BEPREFIX) and
3623               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3624           val = i_be.get(field[len(BEPREFIX):], None)
3625         elif st_match and st_match.groups():
3626           # matches a variable list
3627           st_groups = st_match.groups()
3628           if st_groups and st_groups[0] == "disk":
3629             if st_groups[1] == "count":
3630               val = len(instance.disks)
3631             elif st_groups[1] == "sizes":
3632               val = [disk.size for disk in instance.disks]
3633             elif st_groups[1] == "size":
3634               try:
3635                 val = instance.FindDisk(st_groups[2]).size
3636               except errors.OpPrereqError:
3637                 val = None
3638             else:
3639               assert False, "Unhandled disk parameter"
3640           elif st_groups[0] == "nic":
3641             if st_groups[1] == "count":
3642               val = len(instance.nics)
3643             elif st_groups[1] == "macs":
3644               val = [nic.mac for nic in instance.nics]
3645             elif st_groups[1] == "ips":
3646               val = [nic.ip for nic in instance.nics]
3647             elif st_groups[1] == "modes":
3648               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3649             elif st_groups[1] == "links":
3650               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3651             elif st_groups[1] == "bridges":
3652               val = []
3653               for nicp in i_nicp:
3654                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3655                   val.append(nicp[constants.NIC_LINK])
3656                 else:
3657                   val.append(None)
3658             else:
3659               # index-based item
3660               nic_idx = int(st_groups[2])
3661               if nic_idx >= len(instance.nics):
3662                 val = None
3663               else:
3664                 if st_groups[1] == "mac":
3665                   val = instance.nics[nic_idx].mac
3666                 elif st_groups[1] == "ip":
3667                   val = instance.nics[nic_idx].ip
3668                 elif st_groups[1] == "mode":
3669                   val = i_nicp[nic_idx][constants.NIC_MODE]
3670                 elif st_groups[1] == "link":
3671                   val = i_nicp[nic_idx][constants.NIC_LINK]
3672                 elif st_groups[1] == "bridge":
3673                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3674                   if nic_mode == constants.NIC_MODE_BRIDGED:
3675                     val = i_nicp[nic_idx][constants.NIC_LINK]
3676                   else:
3677                     val = None
3678                 else:
3679                   assert False, "Unhandled NIC parameter"
3680           else:
3681             assert False, ("Declared but unhandled variable parameter '%s'" %
3682                            field)
3683         else:
3684           assert False, "Declared but unhandled parameter '%s'" % field
3685         iout.append(val)
3686       output.append(iout)
3687
3688     return output
3689
3690
3691 class LUFailoverInstance(LogicalUnit):
3692   """Failover an instance.
3693
3694   """
3695   HPATH = "instance-failover"
3696   HTYPE = constants.HTYPE_INSTANCE
3697   _OP_REQP = ["instance_name", "ignore_consistency"]
3698   REQ_BGL = False
3699
3700   def ExpandNames(self):
3701     self._ExpandAndLockInstance()
3702     self.needed_locks[locking.LEVEL_NODE] = []
3703     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3704
3705   def DeclareLocks(self, level):
3706     if level == locking.LEVEL_NODE:
3707       self._LockInstancesNodes()
3708
3709   def BuildHooksEnv(self):
3710     """Build hooks env.
3711
3712     This runs on master, primary and secondary nodes of the instance.
3713
3714     """
3715     env = {
3716       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3717       }
3718     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3719     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3720     return env, nl, nl
3721
3722   def CheckPrereq(self):
3723     """Check prerequisites.
3724
3725     This checks that the instance is in the cluster.
3726
3727     """
3728     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3729     assert self.instance is not None, \
3730       "Cannot retrieve locked instance %s" % self.op.instance_name
3731
3732     bep = self.cfg.GetClusterInfo().FillBE(instance)
3733     if instance.disk_template not in constants.DTS_NET_MIRROR:
3734       raise errors.OpPrereqError("Instance's disk layout is not"
3735                                  " network mirrored, cannot failover.")
3736
3737     secondary_nodes = instance.secondary_nodes
3738     if not secondary_nodes:
3739       raise errors.ProgrammerError("no secondary node but using "
3740                                    "a mirrored disk template")
3741
3742     target_node = secondary_nodes[0]
3743     _CheckNodeOnline(self, target_node)
3744     _CheckNodeNotDrained(self, target_node)
3745     if instance.admin_up:
3746       # check memory requirements on the secondary node
3747       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3748                            instance.name, bep[constants.BE_MEMORY],
3749                            instance.hypervisor)
3750     else:
3751       self.LogInfo("Not checking memory on the secondary node as"
3752                    " instance will not be started")
3753
3754     # check bridge existance
3755     _CheckInstanceBridgesExist(self, instance, node=target_node)
3756
3757   def Exec(self, feedback_fn):
3758     """Failover an instance.
3759
3760     The failover is done by shutting it down on its present node and
3761     starting it on the secondary.
3762
3763     """
3764     instance = self.instance
3765
3766     source_node = instance.primary_node
3767     target_node = instance.secondary_nodes[0]
3768
3769     feedback_fn("* checking disk consistency between source and target")
3770     for dev in instance.disks:
3771       # for drbd, these are drbd over lvm
3772       if not _CheckDiskConsistency(self, dev, target_node, False):
3773         if instance.admin_up and not self.op.ignore_consistency:
3774           raise errors.OpExecError("Disk %s is degraded on target node,"
3775                                    " aborting failover." % dev.iv_name)
3776
3777     feedback_fn("* shutting down instance on source node")
3778     logging.info("Shutting down instance %s on node %s",
3779                  instance.name, source_node)
3780
3781     result = self.rpc.call_instance_shutdown(source_node, instance)
3782     msg = result.fail_msg
3783     if msg:
3784       if self.op.ignore_consistency:
3785         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3786                              " Proceeding anyway. Please make sure node"
3787                              " %s is down. Error details: %s",
3788                              instance.name, source_node, source_node, msg)
3789       else:
3790         raise errors.OpExecError("Could not shutdown instance %s on"
3791                                  " node %s: %s" %
3792                                  (instance.name, source_node, msg))
3793
3794     feedback_fn("* deactivating the instance's disks on source node")
3795     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3796       raise errors.OpExecError("Can't shut down the instance's disks.")
3797
3798     instance.primary_node = target_node
3799     # distribute new instance config to the other nodes
3800     self.cfg.Update(instance)
3801
3802     # Only start the instance if it's marked as up
3803     if instance.admin_up:
3804       feedback_fn("* activating the instance's disks on target node")
3805       logging.info("Starting instance %s on node %s",
3806                    instance.name, target_node)
3807
3808       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3809                                                ignore_secondaries=True)
3810       if not disks_ok:
3811         _ShutdownInstanceDisks(self, instance)
3812         raise errors.OpExecError("Can't activate the instance's disks")
3813
3814       feedback_fn("* starting the instance on the target node")
3815       result = self.rpc.call_instance_start(target_node, instance, None, None)
3816       msg = result.fail_msg
3817       if msg:
3818         _ShutdownInstanceDisks(self, instance)
3819         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3820                                  (instance.name, target_node, msg))
3821
3822
3823 class LUMigrateInstance(LogicalUnit):
3824   """Migrate an instance.
3825
3826   This is migration without shutting down, compared to the failover,
3827   which is done with shutdown.
3828
3829   """
3830   HPATH = "instance-migrate"
3831   HTYPE = constants.HTYPE_INSTANCE
3832   _OP_REQP = ["instance_name", "live", "cleanup"]
3833
3834   REQ_BGL = False
3835
3836   def ExpandNames(self):
3837     self._ExpandAndLockInstance()
3838     self.needed_locks[locking.LEVEL_NODE] = []
3839     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3840
3841   def DeclareLocks(self, level):
3842     if level == locking.LEVEL_NODE:
3843       self._LockInstancesNodes()
3844
3845   def BuildHooksEnv(self):
3846     """Build hooks env.
3847
3848     This runs on master, primary and secondary nodes of the instance.
3849
3850     """
3851     env = _BuildInstanceHookEnvByObject(self, self.instance)
3852     env["MIGRATE_LIVE"] = self.op.live
3853     env["MIGRATE_CLEANUP"] = self.op.cleanup
3854     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3855     return env, nl, nl
3856
3857   def CheckPrereq(self):
3858     """Check prerequisites.
3859
3860     This checks that the instance is in the cluster.
3861
3862     """
3863     instance = self.cfg.GetInstanceInfo(
3864       self.cfg.ExpandInstanceName(self.op.instance_name))
3865     if instance is None:
3866       raise errors.OpPrereqError("Instance '%s' not known" %
3867                                  self.op.instance_name)
3868
3869     if instance.disk_template != constants.DT_DRBD8:
3870       raise errors.OpPrereqError("Instance's disk layout is not"
3871                                  " drbd8, cannot migrate.")
3872
3873     secondary_nodes = instance.secondary_nodes
3874     if not secondary_nodes:
3875       raise errors.ConfigurationError("No secondary node but using"
3876                                       " drbd8 disk template")
3877
3878     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3879
3880     target_node = secondary_nodes[0]
3881     # check memory requirements on the secondary node
3882     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3883                          instance.name, i_be[constants.BE_MEMORY],
3884                          instance.hypervisor)
3885
3886     # check bridge existance
3887     _CheckInstanceBridgesExist(self, instance, node=target_node)
3888
3889     if not self.op.cleanup:
3890       _CheckNodeNotDrained(self, target_node)
3891       result = self.rpc.call_instance_migratable(instance.primary_node,
3892                                                  instance)
3893       result.Raise("Can't migrate, please use failover", prereq=True)
3894
3895     self.instance = instance
3896
3897   def _WaitUntilSync(self):
3898     """Poll with custom rpc for disk sync.
3899
3900     This uses our own step-based rpc call.
3901
3902     """
3903     self.feedback_fn("* wait until resync is done")
3904     all_done = False
3905     while not all_done:
3906       all_done = True
3907       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3908                                             self.nodes_ip,
3909                                             self.instance.disks)
3910       min_percent = 100
3911       for node, nres in result.items():
3912         nres.Raise("Cannot resync disks on node %s" % node)
3913         node_done, node_percent = nres.payload
3914         all_done = all_done and node_done
3915         if node_percent is not None:
3916           min_percent = min(min_percent, node_percent)
3917       if not all_done:
3918         if min_percent < 100:
3919           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3920         time.sleep(2)
3921
3922   def _EnsureSecondary(self, node):
3923     """Demote a node to secondary.
3924
3925     """
3926     self.feedback_fn("* switching node %s to secondary mode" % node)
3927
3928     for dev in self.instance.disks:
3929       self.cfg.SetDiskID(dev, node)
3930
3931     result = self.rpc.call_blockdev_close(node, self.instance.name,
3932                                           self.instance.disks)
3933     result.Raise("Cannot change disk to secondary on node %s" % node)
3934
3935   def _GoStandalone(self):
3936     """Disconnect from the network.
3937
3938     """
3939     self.feedback_fn("* changing into standalone mode")
3940     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3941                                                self.instance.disks)
3942     for node, nres in result.items():
3943       nres.Raise("Cannot disconnect disks node %s" % node)
3944
3945   def _GoReconnect(self, multimaster):
3946     """Reconnect to the network.
3947
3948     """
3949     if multimaster:
3950       msg = "dual-master"
3951     else:
3952       msg = "single-master"
3953     self.feedback_fn("* changing disks into %s mode" % msg)
3954     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3955                                            self.instance.disks,
3956                                            self.instance.name, multimaster)
3957     for node, nres in result.items():
3958       nres.Raise("Cannot change disks config on node %s" % node)
3959
3960   def _ExecCleanup(self):
3961     """Try to cleanup after a failed migration.
3962
3963     The cleanup is done by:
3964       - check that the instance is running only on one node
3965         (and update the config if needed)
3966       - change disks on its secondary node to secondary
3967       - wait until disks are fully synchronized
3968       - disconnect from the network
3969       - change disks into single-master mode
3970       - wait again until disks are fully synchronized
3971
3972     """
3973     instance = self.instance
3974     target_node = self.target_node
3975     source_node = self.source_node
3976
3977     # check running on only one node
3978     self.feedback_fn("* checking where the instance actually runs"
3979                      " (if this hangs, the hypervisor might be in"
3980                      " a bad state)")
3981     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3982     for node, result in ins_l.items():
3983       result.Raise("Can't contact node %s" % node)
3984
3985     runningon_source = instance.name in ins_l[source_node].payload
3986     runningon_target = instance.name in ins_l[target_node].payload
3987
3988     if runningon_source and runningon_target:
3989       raise errors.OpExecError("Instance seems to be running on two nodes,"
3990                                " or the hypervisor is confused. You will have"
3991                                " to ensure manually that it runs only on one"
3992                                " and restart this operation.")
3993
3994     if not (runningon_source or runningon_target):
3995       raise errors.OpExecError("Instance does not seem to be running at all."
3996                                " In this case, it's safer to repair by"
3997                                " running 'gnt-instance stop' to ensure disk"
3998                                " shutdown, and then restarting it.")
3999
4000     if runningon_target:
4001       # the migration has actually succeeded, we need to update the config
4002       self.feedback_fn("* instance running on secondary node (%s),"
4003                        " updating config" % target_node)
4004       instance.primary_node = target_node
4005       self.cfg.Update(instance)
4006       demoted_node = source_node
4007     else:
4008       self.feedback_fn("* instance confirmed to be running on its"
4009                        " primary node (%s)" % source_node)
4010       demoted_node = target_node
4011
4012     self._EnsureSecondary(demoted_node)
4013     try:
4014       self._WaitUntilSync()
4015     except errors.OpExecError:
4016       # we ignore here errors, since if the device is standalone, it
4017       # won't be able to sync
4018       pass
4019     self._GoStandalone()
4020     self._GoReconnect(False)
4021     self._WaitUntilSync()
4022
4023     self.feedback_fn("* done")
4024
4025   def _RevertDiskStatus(self):
4026     """Try to revert the disk status after a failed migration.
4027
4028     """
4029     target_node = self.target_node
4030     try:
4031       self._EnsureSecondary(target_node)
4032       self._GoStandalone()
4033       self._GoReconnect(False)
4034       self._WaitUntilSync()
4035     except errors.OpExecError, err:
4036       self.LogWarning("Migration failed and I can't reconnect the"
4037                       " drives: error '%s'\n"
4038                       "Please look and recover the instance status" %
4039                       str(err))
4040
4041   def _AbortMigration(self):
4042     """Call the hypervisor code to abort a started migration.
4043
4044     """
4045     instance = self.instance
4046     target_node = self.target_node
4047     migration_info = self.migration_info
4048
4049     abort_result = self.rpc.call_finalize_migration(target_node,
4050                                                     instance,
4051                                                     migration_info,
4052                                                     False)
4053     abort_msg = abort_result.fail_msg
4054     if abort_msg:
4055       logging.error("Aborting migration failed on target node %s: %s" %
4056                     (target_node, abort_msg))
4057       # Don't raise an exception here, as we stil have to try to revert the
4058       # disk status, even if this step failed.
4059
4060   def _ExecMigration(self):
4061     """Migrate an instance.
4062
4063     The migrate is done by:
4064       - change the disks into dual-master mode
4065       - wait until disks are fully synchronized again
4066       - migrate the instance
4067       - change disks on the new secondary node (the old primary) to secondary
4068       - wait until disks are fully synchronized
4069       - change disks into single-master mode
4070
4071     """
4072     instance = self.instance
4073     target_node = self.target_node
4074     source_node = self.source_node
4075
4076     self.feedback_fn("* checking disk consistency between source and target")
4077     for dev in instance.disks:
4078       if not _CheckDiskConsistency(self, dev, target_node, False):
4079         raise errors.OpExecError("Disk %s is degraded or not fully"
4080                                  " synchronized on target node,"
4081                                  " aborting migrate." % dev.iv_name)
4082
4083     # First get the migration information from the remote node
4084     result = self.rpc.call_migration_info(source_node, instance)
4085     msg = result.fail_msg
4086     if msg:
4087       log_err = ("Failed fetching source migration information from %s: %s" %
4088                  (source_node, msg))
4089       logging.error(log_err)
4090       raise errors.OpExecError(log_err)
4091
4092     self.migration_info = migration_info = result.payload
4093
4094     # Then switch the disks to master/master mode
4095     self._EnsureSecondary(target_node)
4096     self._GoStandalone()
4097     self._GoReconnect(True)
4098     self._WaitUntilSync()
4099
4100     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4101     result = self.rpc.call_accept_instance(target_node,
4102                                            instance,
4103                                            migration_info,
4104                                            self.nodes_ip[target_node])
4105
4106     msg = result.fail_msg
4107     if msg:
4108       logging.error("Instance pre-migration failed, trying to revert"
4109                     " disk status: %s", msg)
4110       self._AbortMigration()
4111       self._RevertDiskStatus()
4112       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4113                                (instance.name, msg))
4114
4115     self.feedback_fn("* migrating instance to %s" % target_node)
4116     time.sleep(10)
4117     result = self.rpc.call_instance_migrate(source_node, instance,
4118                                             self.nodes_ip[target_node],
4119                                             self.op.live)
4120     msg = result.fail_msg
4121     if msg:
4122       logging.error("Instance migration failed, trying to revert"
4123                     " disk status: %s", msg)
4124       self._AbortMigration()
4125       self._RevertDiskStatus()
4126       raise errors.OpExecError("Could not migrate instance %s: %s" %
4127                                (instance.name, msg))
4128     time.sleep(10)
4129
4130     instance.primary_node = target_node
4131     # distribute new instance config to the other nodes
4132     self.cfg.Update(instance)
4133
4134     result = self.rpc.call_finalize_migration(target_node,
4135                                               instance,
4136                                               migration_info,
4137                                               True)
4138     msg = result.fail_msg
4139     if msg:
4140       logging.error("Instance migration succeeded, but finalization failed:"
4141                     " %s" % msg)
4142       raise errors.OpExecError("Could not finalize instance migration: %s" %
4143                                msg)
4144
4145     self._EnsureSecondary(source_node)
4146     self._WaitUntilSync()
4147     self._GoStandalone()
4148     self._GoReconnect(False)
4149     self._WaitUntilSync()
4150
4151     self.feedback_fn("* done")
4152
4153   def Exec(self, feedback_fn):
4154     """Perform the migration.
4155
4156     """
4157     self.feedback_fn = feedback_fn
4158
4159     self.source_node = self.instance.primary_node
4160     self.target_node = self.instance.secondary_nodes[0]
4161     self.all_nodes = [self.source_node, self.target_node]
4162     self.nodes_ip = {
4163       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4164       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4165       }
4166     if self.op.cleanup:
4167       return self._ExecCleanup()
4168     else:
4169       return self._ExecMigration()
4170
4171
4172 def _CreateBlockDev(lu, node, instance, device, force_create,
4173                     info, force_open):
4174   """Create a tree of block devices on a given node.
4175
4176   If this device type has to be created on secondaries, create it and
4177   all its children.
4178
4179   If not, just recurse to children keeping the same 'force' value.
4180
4181   @param lu: the lu on whose behalf we execute
4182   @param node: the node on which to create the device
4183   @type instance: L{objects.Instance}
4184   @param instance: the instance which owns the device
4185   @type device: L{objects.Disk}
4186   @param device: the device to create
4187   @type force_create: boolean
4188   @param force_create: whether to force creation of this device; this
4189       will be change to True whenever we find a device which has
4190       CreateOnSecondary() attribute
4191   @param info: the extra 'metadata' we should attach to the device
4192       (this will be represented as a LVM tag)
4193   @type force_open: boolean
4194   @param force_open: this parameter will be passes to the
4195       L{backend.BlockdevCreate} function where it specifies
4196       whether we run on primary or not, and it affects both
4197       the child assembly and the device own Open() execution
4198
4199   """
4200   if device.CreateOnSecondary():
4201     force_create = True
4202
4203   if device.children:
4204     for child in device.children:
4205       _CreateBlockDev(lu, node, instance, child, force_create,
4206                       info, force_open)
4207
4208   if not force_create:
4209     return
4210
4211   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4212
4213
4214 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4215   """Create a single block device on a given node.
4216
4217   This will not recurse over children of the device, so they must be
4218   created in advance.
4219
4220   @param lu: the lu on whose behalf we execute
4221   @param node: the node on which to create the device
4222   @type instance: L{objects.Instance}
4223   @param instance: the instance which owns the device
4224   @type device: L{objects.Disk}
4225   @param device: the device to create
4226   @param info: the extra 'metadata' we should attach to the device
4227       (this will be represented as a LVM tag)
4228   @type force_open: boolean
4229   @param force_open: this parameter will be passes to the
4230       L{backend.BlockdevCreate} function where it specifies
4231       whether we run on primary or not, and it affects both
4232       the child assembly and the device own Open() execution
4233
4234   """
4235   lu.cfg.SetDiskID(device, node)
4236   result = lu.rpc.call_blockdev_create(node, device, device.size,
4237                                        instance.name, force_open, info)
4238   result.Raise("Can't create block device %s on"
4239                " node %s for instance %s" % (device, node, instance.name))
4240   if device.physical_id is None:
4241     device.physical_id = result.payload
4242
4243
4244 def _GenerateUniqueNames(lu, exts):
4245   """Generate a suitable LV name.
4246
4247   This will generate a logical volume name for the given instance.
4248
4249   """
4250   results = []
4251   for val in exts:
4252     new_id = lu.cfg.GenerateUniqueID()
4253     results.append("%s%s" % (new_id, val))
4254   return results
4255
4256
4257 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4258                          p_minor, s_minor):
4259   """Generate a drbd8 device complete with its children.
4260
4261   """
4262   port = lu.cfg.AllocatePort()
4263   vgname = lu.cfg.GetVGName()
4264   shared_secret = lu.cfg.GenerateDRBDSecret()
4265   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4266                           logical_id=(vgname, names[0]))
4267   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4268                           logical_id=(vgname, names[1]))
4269   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4270                           logical_id=(primary, secondary, port,
4271                                       p_minor, s_minor,
4272                                       shared_secret),
4273                           children=[dev_data, dev_meta],
4274                           iv_name=iv_name)
4275   return drbd_dev
4276
4277
4278 def _GenerateDiskTemplate(lu, template_name,
4279                           instance_name, primary_node,
4280                           secondary_nodes, disk_info,
4281                           file_storage_dir, file_driver,
4282                           base_index):
4283   """Generate the entire disk layout for a given template type.
4284
4285   """
4286   #TODO: compute space requirements
4287
4288   vgname = lu.cfg.GetVGName()
4289   disk_count = len(disk_info)
4290   disks = []
4291   if template_name == constants.DT_DISKLESS:
4292     pass
4293   elif template_name == constants.DT_PLAIN:
4294     if len(secondary_nodes) != 0:
4295       raise errors.ProgrammerError("Wrong template configuration")
4296
4297     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4298                                       for i in range(disk_count)])
4299     for idx, disk in enumerate(disk_info):
4300       disk_index = idx + base_index
4301       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4302                               logical_id=(vgname, names[idx]),
4303                               iv_name="disk/%d" % disk_index,
4304                               mode=disk["mode"])
4305       disks.append(disk_dev)
4306   elif template_name == constants.DT_DRBD8:
4307     if len(secondary_nodes) != 1:
4308       raise errors.ProgrammerError("Wrong template configuration")
4309     remote_node = secondary_nodes[0]
4310     minors = lu.cfg.AllocateDRBDMinor(
4311       [primary_node, remote_node] * len(disk_info), instance_name)
4312
4313     names = []
4314     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4315                                                for i in range(disk_count)]):
4316       names.append(lv_prefix + "_data")
4317       names.append(lv_prefix + "_meta")
4318     for idx, disk in enumerate(disk_info):
4319       disk_index = idx + base_index
4320       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4321                                       disk["size"], names[idx*2:idx*2+2],
4322                                       "disk/%d" % disk_index,
4323                                       minors[idx*2], minors[idx*2+1])
4324       disk_dev.mode = disk["mode"]
4325       disks.append(disk_dev)
4326   elif template_name == constants.DT_FILE:
4327     if len(secondary_nodes) != 0:
4328       raise errors.ProgrammerError("Wrong template configuration")
4329
4330     for idx, disk in enumerate(disk_info):
4331       disk_index = idx + base_index
4332       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4333                               iv_name="disk/%d" % disk_index,
4334                               logical_id=(file_driver,
4335                                           "%s/disk%d" % (file_storage_dir,
4336                                                          disk_index)),
4337                               mode=disk["mode"])
4338       disks.append(disk_dev)
4339   else:
4340     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4341   return disks
4342
4343
4344 def _GetInstanceInfoText(instance):
4345   """Compute that text that should be added to the disk's metadata.
4346
4347   """
4348   return "originstname+%s" % instance.name
4349
4350
4351 def _CreateDisks(lu, instance):
4352   """Create all disks for an instance.
4353
4354   This abstracts away some work from AddInstance.
4355
4356   @type lu: L{LogicalUnit}
4357   @param lu: the logical unit on whose behalf we execute
4358   @type instance: L{objects.Instance}
4359   @param instance: the instance whose disks we should create
4360   @rtype: boolean
4361   @return: the success of the creation
4362
4363   """
4364   info = _GetInstanceInfoText(instance)
4365   pnode = instance.primary_node
4366
4367   if instance.disk_template == constants.DT_FILE:
4368     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4369     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4370
4371     result.Raise("Failed to create directory '%s' on"
4372                  " node %s: %s" % (file_storage_dir, pnode))
4373
4374   # Note: this needs to be kept in sync with adding of disks in
4375   # LUSetInstanceParams
4376   for device in instance.disks:
4377     logging.info("Creating volume %s for instance %s",
4378                  device.iv_name, instance.name)
4379     #HARDCODE
4380     for node in instance.all_nodes:
4381       f_create = node == pnode
4382       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4383
4384
4385 def _RemoveDisks(lu, instance):
4386   """Remove all disks for an instance.
4387
4388   This abstracts away some work from `AddInstance()` and
4389   `RemoveInstance()`. Note that in case some of the devices couldn't
4390   be removed, the removal will continue with the other ones (compare
4391   with `_CreateDisks()`).
4392
4393   @type lu: L{LogicalUnit}
4394   @param lu: the logical unit on whose behalf we execute
4395   @type instance: L{objects.Instance}
4396   @param instance: the instance whose disks we should remove
4397   @rtype: boolean
4398   @return: the success of the removal
4399
4400   """
4401   logging.info("Removing block devices for instance %s", instance.name)
4402
4403   all_result = True
4404   for device in instance.disks:
4405     for node, disk in device.ComputeNodeTree(instance.primary_node):
4406       lu.cfg.SetDiskID(disk, node)
4407       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4408       if msg:
4409         lu.LogWarning("Could not remove block device %s on node %s,"
4410                       " continuing anyway: %s", device.iv_name, node, msg)
4411         all_result = False
4412
4413   if instance.disk_template == constants.DT_FILE:
4414     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4415     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4416                                                  file_storage_dir)
4417     msg = result.fail_msg
4418     if msg:
4419       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4420                     file_storage_dir, instance.primary_node, msg)
4421       all_result = False
4422
4423   return all_result
4424
4425
4426 def _ComputeDiskSize(disk_template, disks):
4427   """Compute disk size requirements in the volume group
4428
4429   """
4430   # Required free disk space as a function of disk and swap space
4431   req_size_dict = {
4432     constants.DT_DISKLESS: None,
4433     constants.DT_PLAIN: sum(d["size"] for d in disks),
4434     # 128 MB are added for drbd metadata for each disk
4435     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4436     constants.DT_FILE: None,
4437   }
4438
4439   if disk_template not in req_size_dict:
4440     raise errors.ProgrammerError("Disk template '%s' size requirement"
4441                                  " is unknown" %  disk_template)
4442
4443   return req_size_dict[disk_template]
4444
4445
4446 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4447   """Hypervisor parameter validation.
4448
4449   This function abstract the hypervisor parameter validation to be
4450   used in both instance create and instance modify.
4451
4452   @type lu: L{LogicalUnit}
4453   @param lu: the logical unit for which we check
4454   @type nodenames: list
4455   @param nodenames: the list of nodes on which we should check
4456   @type hvname: string
4457   @param hvname: the name of the hypervisor we should use
4458   @type hvparams: dict
4459   @param hvparams: the parameters which we need to check
4460   @raise errors.OpPrereqError: if the parameters are not valid
4461
4462   """
4463   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4464                                                   hvname,
4465                                                   hvparams)
4466   for node in nodenames:
4467     info = hvinfo[node]
4468     if info.offline:
4469       continue
4470     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4471
4472
4473 class LUCreateInstance(LogicalUnit):
4474   """Create an instance.
4475
4476   """
4477   HPATH = "instance-add"
4478   HTYPE = constants.HTYPE_INSTANCE
4479   _OP_REQP = ["instance_name", "disks", "disk_template",
4480               "mode", "start",
4481               "wait_for_sync", "ip_check", "nics",
4482               "hvparams", "beparams"]
4483   REQ_BGL = False
4484
4485   def _ExpandNode(self, node):
4486     """Expands and checks one node name.
4487
4488     """
4489     node_full = self.cfg.ExpandNodeName(node)
4490     if node_full is None:
4491       raise errors.OpPrereqError("Unknown node %s" % node)
4492     return node_full
4493
4494   def ExpandNames(self):
4495     """ExpandNames for CreateInstance.
4496
4497     Figure out the right locks for instance creation.
4498
4499     """
4500     self.needed_locks = {}
4501
4502     # set optional parameters to none if they don't exist
4503     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4504       if not hasattr(self.op, attr):
4505         setattr(self.op, attr, None)
4506
4507     # cheap checks, mostly valid constants given
4508
4509     # verify creation mode
4510     if self.op.mode not in (constants.INSTANCE_CREATE,
4511                             constants.INSTANCE_IMPORT):
4512       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4513                                  self.op.mode)
4514
4515     # disk template and mirror node verification
4516     if self.op.disk_template not in constants.DISK_TEMPLATES:
4517       raise errors.OpPrereqError("Invalid disk template name")
4518
4519     if self.op.hypervisor is None:
4520       self.op.hypervisor = self.cfg.GetHypervisorType()
4521
4522     cluster = self.cfg.GetClusterInfo()
4523     enabled_hvs = cluster.enabled_hypervisors
4524     if self.op.hypervisor not in enabled_hvs:
4525       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4526                                  " cluster (%s)" % (self.op.hypervisor,
4527                                   ",".join(enabled_hvs)))
4528
4529     # check hypervisor parameter syntax (locally)
4530     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4531     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4532                                   self.op.hvparams)
4533     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4534     hv_type.CheckParameterSyntax(filled_hvp)
4535     self.hv_full = filled_hvp
4536
4537     # fill and remember the beparams dict
4538     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4539     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4540                                     self.op.beparams)
4541
4542     #### instance parameters check
4543
4544     # instance name verification
4545     hostname1 = utils.HostInfo(self.op.instance_name)
4546     self.op.instance_name = instance_name = hostname1.name
4547
4548     # this is just a preventive check, but someone might still add this
4549     # instance in the meantime, and creation will fail at lock-add time
4550     if instance_name in self.cfg.GetInstanceList():
4551       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4552                                  instance_name)
4553
4554     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4555
4556     # NIC buildup
4557     self.nics = []
4558     for idx, nic in enumerate(self.op.nics):
4559       nic_mode_req = nic.get("mode", None)
4560       nic_mode = nic_mode_req
4561       if nic_mode is None:
4562         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4563
4564       # in routed mode, for the first nic, the default ip is 'auto'
4565       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4566         default_ip_mode = constants.VALUE_AUTO
4567       else:
4568         default_ip_mode = constants.VALUE_NONE
4569
4570       # ip validity checks
4571       ip = nic.get("ip", default_ip_mode)
4572       if ip is None or ip.lower() == constants.VALUE_NONE:
4573         nic_ip = None
4574       elif ip.lower() == constants.VALUE_AUTO:
4575         nic_ip = hostname1.ip
4576       else:
4577         if not utils.IsValidIP(ip):
4578           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4579                                      " like a valid IP" % ip)
4580         nic_ip = ip
4581
4582       # TODO: check the ip for uniqueness !!
4583       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4584         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4585
4586       # MAC address verification
4587       mac = nic.get("mac", constants.VALUE_AUTO)
4588       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4589         if not utils.IsValidMac(mac.lower()):
4590           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4591                                      mac)
4592       # bridge verification
4593       bridge = nic.get("bridge", None)
4594       link = nic.get("link", None)
4595       if bridge and link:
4596         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4597                                    " at the same time")
4598       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4599         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4600       elif bridge:
4601         link = bridge
4602
4603       nicparams = {}
4604       if nic_mode_req:
4605         nicparams[constants.NIC_MODE] = nic_mode_req
4606       if link:
4607         nicparams[constants.NIC_LINK] = link
4608
4609       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4610                                       nicparams)
4611       objects.NIC.CheckParameterSyntax(check_params)
4612       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4613
4614     # disk checks/pre-build
4615     self.disks = []
4616     for disk in self.op.disks:
4617       mode = disk.get("mode", constants.DISK_RDWR)
4618       if mode not in constants.DISK_ACCESS_SET:
4619         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4620                                    mode)
4621       size = disk.get("size", None)
4622       if size is None:
4623         raise errors.OpPrereqError("Missing disk size")
4624       try:
4625         size = int(size)
4626       except ValueError:
4627         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4628       self.disks.append({"size": size, "mode": mode})
4629
4630     # used in CheckPrereq for ip ping check
4631     self.check_ip = hostname1.ip
4632
4633     # file storage checks
4634     if (self.op.file_driver and
4635         not self.op.file_driver in constants.FILE_DRIVER):
4636       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4637                                  self.op.file_driver)
4638
4639     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4640       raise errors.OpPrereqError("File storage directory path not absolute")
4641
4642     ### Node/iallocator related checks
4643     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4644       raise errors.OpPrereqError("One and only one of iallocator and primary"
4645                                  " node must be given")
4646
4647     if self.op.iallocator:
4648       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4649     else:
4650       self.op.pnode = self._ExpandNode(self.op.pnode)
4651       nodelist = [self.op.pnode]
4652       if self.op.snode is not None:
4653         self.op.snode = self._ExpandNode(self.op.snode)
4654         nodelist.append(self.op.snode)
4655       self.needed_locks[locking.LEVEL_NODE] = nodelist
4656
4657     # in case of import lock the source node too
4658     if self.op.mode == constants.INSTANCE_IMPORT:
4659       src_node = getattr(self.op, "src_node", None)
4660       src_path = getattr(self.op, "src_path", None)
4661
4662       if src_path is None:
4663         self.op.src_path = src_path = self.op.instance_name
4664
4665       if src_node is None:
4666         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4667         self.op.src_node = None
4668         if os.path.isabs(src_path):
4669           raise errors.OpPrereqError("Importing an instance from an absolute"
4670                                      " path requires a source node option.")
4671       else:
4672         self.op.src_node = src_node = self._ExpandNode(src_node)
4673         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4674           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4675         if not os.path.isabs(src_path):
4676           self.op.src_path = src_path = \
4677             os.path.join(constants.EXPORT_DIR, src_path)
4678
4679     else: # INSTANCE_CREATE
4680       if getattr(self.op, "os_type", None) is None:
4681         raise errors.OpPrereqError("No guest OS specified")
4682
4683   def _RunAllocator(self):
4684     """Run the allocator based on input opcode.
4685
4686     """
4687     nics = [n.ToDict() for n in self.nics]
4688     ial = IAllocator(self,
4689                      mode=constants.IALLOCATOR_MODE_ALLOC,
4690                      name=self.op.instance_name,
4691                      disk_template=self.op.disk_template,
4692                      tags=[],
4693                      os=self.op.os_type,
4694                      vcpus=self.be_full[constants.BE_VCPUS],
4695                      mem_size=self.be_full[constants.BE_MEMORY],
4696                      disks=self.disks,
4697                      nics=nics,
4698                      hypervisor=self.op.hypervisor,
4699                      )
4700
4701     ial.Run(self.op.iallocator)
4702
4703     if not ial.success:
4704       raise errors.OpPrereqError("Can't compute nodes using"
4705                                  " iallocator '%s': %s" % (self.op.iallocator,
4706                                                            ial.info))
4707     if len(ial.nodes) != ial.required_nodes:
4708       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4709                                  " of nodes (%s), required %s" %
4710                                  (self.op.iallocator, len(ial.nodes),
4711                                   ial.required_nodes))
4712     self.op.pnode = ial.nodes[0]
4713     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4714                  self.op.instance_name, self.op.iallocator,
4715                  ", ".join(ial.nodes))
4716     if ial.required_nodes == 2:
4717       self.op.snode = ial.nodes[1]
4718
4719   def BuildHooksEnv(self):
4720     """Build hooks env.
4721
4722     This runs on master, primary and secondary nodes of the instance.
4723
4724     """
4725     env = {
4726       "ADD_MODE": self.op.mode,
4727       }
4728     if self.op.mode == constants.INSTANCE_IMPORT:
4729       env["SRC_NODE"] = self.op.src_node
4730       env["SRC_PATH"] = self.op.src_path
4731       env["SRC_IMAGES"] = self.src_images
4732
4733     env.update(_BuildInstanceHookEnv(
4734       name=self.op.instance_name,
4735       primary_node=self.op.pnode,
4736       secondary_nodes=self.secondaries,
4737       status=self.op.start,
4738       os_type=self.op.os_type,
4739       memory=self.be_full[constants.BE_MEMORY],
4740       vcpus=self.be_full[constants.BE_VCPUS],
4741       nics=_NICListToTuple(self, self.nics),
4742       disk_template=self.op.disk_template,
4743       disks=[(d["size"], d["mode"]) for d in self.disks],
4744       bep=self.be_full,
4745       hvp=self.hv_full,
4746       hypervisor_name=self.op.hypervisor,
4747     ))
4748
4749     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4750           self.secondaries)
4751     return env, nl, nl
4752
4753
4754   def CheckPrereq(self):
4755     """Check prerequisites.
4756
4757     """
4758     if (not self.cfg.GetVGName() and
4759         self.op.disk_template not in constants.DTS_NOT_LVM):
4760       raise errors.OpPrereqError("Cluster does not support lvm-based"
4761                                  " instances")
4762
4763     if self.op.mode == constants.INSTANCE_IMPORT:
4764       src_node = self.op.src_node
4765       src_path = self.op.src_path
4766
4767       if src_node is None:
4768         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4769         exp_list = self.rpc.call_export_list(locked_nodes)
4770         found = False
4771         for node in exp_list:
4772           if exp_list[node].fail_msg:
4773             continue
4774           if src_path in exp_list[node].payload:
4775             found = True
4776             self.op.src_node = src_node = node
4777             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4778                                                        src_path)
4779             break
4780         if not found:
4781           raise errors.OpPrereqError("No export found for relative path %s" %
4782                                       src_path)
4783
4784       _CheckNodeOnline(self, src_node)
4785       result = self.rpc.call_export_info(src_node, src_path)
4786       result.Raise("No export or invalid export found in dir %s" % src_path)
4787
4788       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4789       if not export_info.has_section(constants.INISECT_EXP):
4790         raise errors.ProgrammerError("Corrupted export config")
4791
4792       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4793       if (int(ei_version) != constants.EXPORT_VERSION):
4794         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4795                                    (ei_version, constants.EXPORT_VERSION))
4796
4797       # Check that the new instance doesn't have less disks than the export
4798       instance_disks = len(self.disks)
4799       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4800       if instance_disks < export_disks:
4801         raise errors.OpPrereqError("Not enough disks to import."
4802                                    " (instance: %d, export: %d)" %
4803                                    (instance_disks, export_disks))
4804
4805       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4806       disk_images = []
4807       for idx in range(export_disks):
4808         option = 'disk%d_dump' % idx
4809         if export_info.has_option(constants.INISECT_INS, option):
4810           # FIXME: are the old os-es, disk sizes, etc. useful?
4811           export_name = export_info.get(constants.INISECT_INS, option)
4812           image = os.path.join(src_path, export_name)
4813           disk_images.append(image)
4814         else:
4815           disk_images.append(False)
4816
4817       self.src_images = disk_images
4818
4819       old_name = export_info.get(constants.INISECT_INS, 'name')
4820       # FIXME: int() here could throw a ValueError on broken exports
4821       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4822       if self.op.instance_name == old_name:
4823         for idx, nic in enumerate(self.nics):
4824           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4825             nic_mac_ini = 'nic%d_mac' % idx
4826             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4827
4828     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4829     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4830     if self.op.start and not self.op.ip_check:
4831       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4832                                  " adding an instance in start mode")
4833
4834     if self.op.ip_check:
4835       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4836         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4837                                    (self.check_ip, self.op.instance_name))
4838
4839     #### mac address generation
4840     # By generating here the mac address both the allocator and the hooks get
4841     # the real final mac address rather than the 'auto' or 'generate' value.
4842     # There is a race condition between the generation and the instance object
4843     # creation, which means that we know the mac is valid now, but we're not
4844     # sure it will be when we actually add the instance. If things go bad
4845     # adding the instance will abort because of a duplicate mac, and the
4846     # creation job will fail.
4847     for nic in self.nics:
4848       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4849         nic.mac = self.cfg.GenerateMAC()
4850
4851     #### allocator run
4852
4853     if self.op.iallocator is not None:
4854       self._RunAllocator()
4855
4856     #### node related checks
4857
4858     # check primary node
4859     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4860     assert self.pnode is not None, \
4861       "Cannot retrieve locked node %s" % self.op.pnode
4862     if pnode.offline:
4863       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4864                                  pnode.name)
4865     if pnode.drained:
4866       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4867                                  pnode.name)
4868
4869     self.secondaries = []
4870
4871     # mirror node verification
4872     if self.op.disk_template in constants.DTS_NET_MIRROR:
4873       if self.op.snode is None:
4874         raise errors.OpPrereqError("The networked disk templates need"
4875                                    " a mirror node")
4876       if self.op.snode == pnode.name:
4877         raise errors.OpPrereqError("The secondary node cannot be"
4878                                    " the primary node.")
4879       _CheckNodeOnline(self, self.op.snode)
4880       _CheckNodeNotDrained(self, self.op.snode)
4881       self.secondaries.append(self.op.snode)
4882
4883     nodenames = [pnode.name] + self.secondaries
4884
4885     req_size = _ComputeDiskSize(self.op.disk_template,
4886                                 self.disks)
4887
4888     # Check lv size requirements
4889     if req_size is not None:
4890       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4891                                          self.op.hypervisor)
4892       for node in nodenames:
4893         info = nodeinfo[node]
4894         info.Raise("Cannot get current information from node %s" % node)
4895         info = info.payload
4896         vg_free = info.get('vg_free', None)
4897         if not isinstance(vg_free, int):
4898           raise errors.OpPrereqError("Can't compute free disk space on"
4899                                      " node %s" % node)
4900         if req_size > vg_free:
4901           raise errors.OpPrereqError("Not enough disk space on target node %s."
4902                                      " %d MB available, %d MB required" %
4903                                      (node, vg_free, req_size))
4904
4905     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4906
4907     # os verification
4908     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4909     result.Raise("OS '%s' not in supported os list for primary node %s" %
4910                  (self.op.os_type, pnode.name), prereq=True)
4911
4912     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4913
4914     # memory check on primary node
4915     if self.op.start:
4916       _CheckNodeFreeMemory(self, self.pnode.name,
4917                            "creating instance %s" % self.op.instance_name,
4918                            self.be_full[constants.BE_MEMORY],
4919                            self.op.hypervisor)
4920
4921     self.dry_run_result = list(nodenames)
4922
4923   def Exec(self, feedback_fn):
4924     """Create and add the instance to the cluster.
4925
4926     """
4927     instance = self.op.instance_name
4928     pnode_name = self.pnode.name
4929
4930     ht_kind = self.op.hypervisor
4931     if ht_kind in constants.HTS_REQ_PORT:
4932       network_port = self.cfg.AllocatePort()
4933     else:
4934       network_port = None
4935
4936     ##if self.op.vnc_bind_address is None:
4937     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4938
4939     # this is needed because os.path.join does not accept None arguments
4940     if self.op.file_storage_dir is None:
4941       string_file_storage_dir = ""
4942     else:
4943       string_file_storage_dir = self.op.file_storage_dir
4944
4945     # build the full file storage dir path
4946     file_storage_dir = os.path.normpath(os.path.join(
4947                                         self.cfg.GetFileStorageDir(),
4948                                         string_file_storage_dir, instance))
4949
4950
4951     disks = _GenerateDiskTemplate(self,
4952                                   self.op.disk_template,
4953                                   instance, pnode_name,
4954                                   self.secondaries,
4955                                   self.disks,
4956                                   file_storage_dir,
4957                                   self.op.file_driver,
4958                                   0)
4959
4960     iobj = objects.Instance(name=instance, os=self.op.os_type,
4961                             primary_node=pnode_name,
4962                             nics=self.nics, disks=disks,
4963                             disk_template=self.op.disk_template,
4964                             admin_up=False,
4965                             network_port=network_port,
4966                             beparams=self.op.beparams,
4967                             hvparams=self.op.hvparams,
4968                             hypervisor=self.op.hypervisor,
4969                             )
4970
4971     feedback_fn("* creating instance disks...")
4972     try:
4973       _CreateDisks(self, iobj)
4974     except errors.OpExecError:
4975       self.LogWarning("Device creation failed, reverting...")
4976       try:
4977         _RemoveDisks(self, iobj)
4978       finally:
4979         self.cfg.ReleaseDRBDMinors(instance)
4980         raise
4981
4982     feedback_fn("adding instance %s to cluster config" % instance)
4983
4984     self.cfg.AddInstance(iobj)
4985     # Declare that we don't want to remove the instance lock anymore, as we've
4986     # added the instance to the config
4987     del self.remove_locks[locking.LEVEL_INSTANCE]
4988     # Unlock all the nodes
4989     if self.op.mode == constants.INSTANCE_IMPORT:
4990       nodes_keep = [self.op.src_node]
4991       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4992                        if node != self.op.src_node]
4993       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4994       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4995     else:
4996       self.context.glm.release(locking.LEVEL_NODE)
4997       del self.acquired_locks[locking.LEVEL_NODE]
4998
4999     if self.op.wait_for_sync:
5000       disk_abort = not _WaitForSync(self, iobj)
5001     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5002       # make sure the disks are not degraded (still sync-ing is ok)
5003       time.sleep(15)
5004       feedback_fn("* checking mirrors status")
5005       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5006     else:
5007       disk_abort = False
5008
5009     if disk_abort:
5010       _RemoveDisks(self, iobj)
5011       self.cfg.RemoveInstance(iobj.name)
5012       # Make sure the instance lock gets removed
5013       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5014       raise errors.OpExecError("There are some degraded disks for"
5015                                " this instance")
5016
5017     feedback_fn("creating os for instance %s on node %s" %
5018                 (instance, pnode_name))
5019
5020     if iobj.disk_template != constants.DT_DISKLESS:
5021       if self.op.mode == constants.INSTANCE_CREATE:
5022         feedback_fn("* running the instance OS create scripts...")
5023         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5024         result.Raise("Could not add os for instance %s"
5025                      " on node %s" % (instance, pnode_name))
5026
5027       elif self.op.mode == constants.INSTANCE_IMPORT:
5028         feedback_fn("* running the instance OS import scripts...")
5029         src_node = self.op.src_node
5030         src_images = self.src_images
5031         cluster_name = self.cfg.GetClusterName()
5032         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5033                                                          src_node, src_images,
5034                                                          cluster_name)
5035         msg = import_result.fail_msg
5036         if msg:
5037           self.LogWarning("Error while importing the disk images for instance"
5038                           " %s on node %s: %s" % (instance, pnode_name, msg))
5039       else:
5040         # also checked in the prereq part
5041         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5042                                      % self.op.mode)
5043
5044     if self.op.start:
5045       iobj.admin_up = True
5046       self.cfg.Update(iobj)
5047       logging.info("Starting instance %s on node %s", instance, pnode_name)
5048       feedback_fn("* starting instance...")
5049       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5050       result.Raise("Could not start instance")
5051
5052     return list(iobj.all_nodes)
5053
5054
5055 class LUConnectConsole(NoHooksLU):
5056   """Connect to an instance's console.
5057
5058   This is somewhat special in that it returns the command line that
5059   you need to run on the master node in order to connect to the
5060   console.
5061
5062   """
5063   _OP_REQP = ["instance_name"]
5064   REQ_BGL = False
5065
5066   def ExpandNames(self):
5067     self._ExpandAndLockInstance()
5068
5069   def CheckPrereq(self):
5070     """Check prerequisites.
5071
5072     This checks that the instance is in the cluster.
5073
5074     """
5075     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5076     assert self.instance is not None, \
5077       "Cannot retrieve locked instance %s" % self.op.instance_name
5078     _CheckNodeOnline(self, self.instance.primary_node)
5079
5080   def Exec(self, feedback_fn):
5081     """Connect to the console of an instance
5082
5083     """
5084     instance = self.instance
5085     node = instance.primary_node
5086
5087     node_insts = self.rpc.call_instance_list([node],
5088                                              [instance.hypervisor])[node]
5089     node_insts.Raise("Can't get node information from %s" % node)
5090
5091     if instance.name not in node_insts.payload:
5092       raise errors.OpExecError("Instance %s is not running." % instance.name)
5093
5094     logging.debug("Connecting to console of %s on %s", instance.name, node)
5095
5096     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5097     cluster = self.cfg.GetClusterInfo()
5098     # beparams and hvparams are passed separately, to avoid editing the
5099     # instance and then saving the defaults in the instance itself.
5100     hvparams = cluster.FillHV(instance)
5101     beparams = cluster.FillBE(instance)
5102     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5103
5104     # build ssh cmdline
5105     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5106
5107
5108 class LUReplaceDisks(LogicalUnit):
5109   """Replace the disks of an instance.
5110
5111   """
5112   HPATH = "mirrors-replace"
5113   HTYPE = constants.HTYPE_INSTANCE
5114   _OP_REQP = ["instance_name", "mode", "disks"]
5115   REQ_BGL = False
5116
5117   def CheckArguments(self):
5118     if not hasattr(self.op, "remote_node"):
5119       self.op.remote_node = None
5120     if not hasattr(self.op, "iallocator"):
5121       self.op.iallocator = None
5122
5123     # check for valid parameter combination
5124     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5125     if self.op.mode == constants.REPLACE_DISK_CHG:
5126       if cnt == 2:
5127         raise errors.OpPrereqError("When changing the secondary either an"
5128                                    " iallocator script must be used or the"
5129                                    " new node given")
5130       elif cnt == 0:
5131         raise errors.OpPrereqError("Give either the iallocator or the new"
5132                                    " secondary, not both")
5133     else: # not replacing the secondary
5134       if cnt != 2:
5135         raise errors.OpPrereqError("The iallocator and new node options can"
5136                                    " be used only when changing the"
5137                                    " secondary node")
5138
5139   def ExpandNames(self):
5140     self._ExpandAndLockInstance()
5141
5142     if self.op.iallocator is not None:
5143       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5144     elif self.op.remote_node is not None:
5145       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5146       if remote_node is None:
5147         raise errors.OpPrereqError("Node '%s' not known" %
5148                                    self.op.remote_node)
5149       self.op.remote_node = remote_node
5150       # Warning: do not remove the locking of the new secondary here
5151       # unless DRBD8.AddChildren is changed to work in parallel;
5152       # currently it doesn't since parallel invocations of
5153       # FindUnusedMinor will conflict
5154       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5155       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5156     else:
5157       self.needed_locks[locking.LEVEL_NODE] = []
5158       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5159
5160   def DeclareLocks(self, level):
5161     # If we're not already locking all nodes in the set we have to declare the
5162     # instance's primary/secondary nodes.
5163     if (level == locking.LEVEL_NODE and
5164         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5165       self._LockInstancesNodes()
5166
5167   def _RunAllocator(self):
5168     """Compute a new secondary node using an IAllocator.
5169
5170     """
5171     ial = IAllocator(self,
5172                      mode=constants.IALLOCATOR_MODE_RELOC,
5173                      name=self.op.instance_name,
5174                      relocate_from=[self.sec_node])
5175
5176     ial.Run(self.op.iallocator)
5177
5178     if not ial.success:
5179       raise errors.OpPrereqError("Can't compute nodes using"
5180                                  " iallocator '%s': %s" % (self.op.iallocator,
5181                                                            ial.info))
5182     if len(ial.nodes) != ial.required_nodes:
5183       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5184                                  " of nodes (%s), required %s" %
5185                                  (len(ial.nodes), ial.required_nodes))
5186     self.op.remote_node = ial.nodes[0]
5187     self.LogInfo("Selected new secondary for the instance: %s",
5188                  self.op.remote_node)
5189
5190   def BuildHooksEnv(self):
5191     """Build hooks env.
5192
5193     This runs on the master, the primary and all the secondaries.
5194
5195     """
5196     env = {
5197       "MODE": self.op.mode,
5198       "NEW_SECONDARY": self.op.remote_node,
5199       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5200       }
5201     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5202     nl = [
5203       self.cfg.GetMasterNode(),
5204       self.instance.primary_node,
5205       ]
5206     if self.op.remote_node is not None:
5207       nl.append(self.op.remote_node)
5208     return env, nl, nl
5209
5210   def CheckPrereq(self):
5211     """Check prerequisites.
5212
5213     This checks that the instance is in the cluster.
5214
5215     """
5216     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5217     assert instance is not None, \
5218       "Cannot retrieve locked instance %s" % self.op.instance_name
5219     self.instance = instance
5220
5221     if instance.disk_template != constants.DT_DRBD8:
5222       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5223                                  " instances")
5224
5225     if len(instance.secondary_nodes) != 1:
5226       raise errors.OpPrereqError("The instance has a strange layout,"
5227                                  " expected one secondary but found %d" %
5228                                  len(instance.secondary_nodes))
5229
5230     self.sec_node = instance.secondary_nodes[0]
5231
5232     if self.op.iallocator is not None:
5233       self._RunAllocator()
5234
5235     remote_node = self.op.remote_node
5236     if remote_node is not None:
5237       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5238       assert self.remote_node_info is not None, \
5239         "Cannot retrieve locked node %s" % remote_node
5240     else:
5241       self.remote_node_info = None
5242     if remote_node == instance.primary_node:
5243       raise errors.OpPrereqError("The specified node is the primary node of"
5244                                  " the instance.")
5245     elif remote_node == self.sec_node:
5246       raise errors.OpPrereqError("The specified node is already the"
5247                                  " secondary node of the instance.")
5248
5249     if self.op.mode == constants.REPLACE_DISK_PRI:
5250       n1 = self.tgt_node = instance.primary_node
5251       n2 = self.oth_node = self.sec_node
5252     elif self.op.mode == constants.REPLACE_DISK_SEC:
5253       n1 = self.tgt_node = self.sec_node
5254       n2 = self.oth_node = instance.primary_node
5255     elif self.op.mode == constants.REPLACE_DISK_CHG:
5256       n1 = self.new_node = remote_node
5257       n2 = self.oth_node = instance.primary_node
5258       self.tgt_node = self.sec_node
5259       _CheckNodeNotDrained(self, remote_node)
5260     else:
5261       raise errors.ProgrammerError("Unhandled disk replace mode")
5262
5263     _CheckNodeOnline(self, n1)
5264     _CheckNodeOnline(self, n2)
5265
5266     if not self.op.disks:
5267       self.op.disks = range(len(instance.disks))
5268
5269     for disk_idx in self.op.disks:
5270       instance.FindDisk(disk_idx)
5271
5272   def _ExecD8DiskOnly(self, feedback_fn):
5273     """Replace a disk on the primary or secondary for dbrd8.
5274
5275     The algorithm for replace is quite complicated:
5276
5277       1. for each disk to be replaced:
5278
5279         1. create new LVs on the target node with unique names
5280         1. detach old LVs from the drbd device
5281         1. rename old LVs to name_replaced.<time_t>
5282         1. rename new LVs to old LVs
5283         1. attach the new LVs (with the old names now) to the drbd device
5284
5285       1. wait for sync across all devices
5286
5287       1. for each modified disk:
5288
5289         1. remove old LVs (which have the name name_replaces.<time_t>)
5290
5291     Failures are not very well handled.
5292
5293     """
5294     steps_total = 6
5295     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5296     instance = self.instance
5297     iv_names = {}
5298     vgname = self.cfg.GetVGName()
5299     # start of work
5300     cfg = self.cfg
5301     tgt_node = self.tgt_node
5302     oth_node = self.oth_node
5303
5304     # Step: check device activation
5305     self.proc.LogStep(1, steps_total, "check device existence")
5306     info("checking volume groups")
5307     my_vg = cfg.GetVGName()
5308     results = self.rpc.call_vg_list([oth_node, tgt_node])
5309     if not results:
5310       raise errors.OpExecError("Can't list volume groups on the nodes")
5311     for node in oth_node, tgt_node:
5312       res = results[node]
5313       res.Raise("Error checking node %s" % node)
5314       if my_vg not in res.payload:
5315         raise errors.OpExecError("Volume group '%s' not found on %s" %
5316                                  (my_vg, node))
5317     for idx, dev in enumerate(instance.disks):
5318       if idx not in self.op.disks:
5319         continue
5320       for node in tgt_node, oth_node:
5321         info("checking disk/%d on %s" % (idx, node))
5322         cfg.SetDiskID(dev, node)
5323         result = self.rpc.call_blockdev_find(node, dev)
5324         msg = result.fail_msg
5325         if not msg and not result.payload:
5326           msg = "disk not found"
5327         if msg:
5328           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5329                                    (idx, node, msg))
5330
5331     # Step: check other node consistency
5332     self.proc.LogStep(2, steps_total, "check peer consistency")
5333     for idx, dev in enumerate(instance.disks):
5334       if idx not in self.op.disks:
5335         continue
5336       info("checking disk/%d consistency on %s" % (idx, oth_node))
5337       if not _CheckDiskConsistency(self, dev, oth_node,
5338                                    oth_node==instance.primary_node):
5339         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5340                                  " to replace disks on this node (%s)" %
5341                                  (oth_node, tgt_node))
5342
5343     # Step: create new storage
5344     self.proc.LogStep(3, steps_total, "allocate new storage")
5345     for idx, dev in enumerate(instance.disks):
5346       if idx not in self.op.disks:
5347         continue
5348       size = dev.size
5349       cfg.SetDiskID(dev, tgt_node)
5350       lv_names = [".disk%d_%s" % (idx, suf)
5351                   for suf in ["data", "meta"]]
5352       names = _GenerateUniqueNames(self, lv_names)
5353       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5354                              logical_id=(vgname, names[0]))
5355       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5356                              logical_id=(vgname, names[1]))
5357       new_lvs = [lv_data, lv_meta]
5358       old_lvs = dev.children
5359       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5360       info("creating new local storage on %s for %s" %
5361            (tgt_node, dev.iv_name))
5362       # we pass force_create=True to force the LVM creation
5363       for new_lv in new_lvs:
5364         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5365                         _GetInstanceInfoText(instance), False)
5366
5367     # Step: for each lv, detach+rename*2+attach
5368     self.proc.LogStep(4, steps_total, "change drbd configuration")
5369     for dev, old_lvs, new_lvs in iv_names.itervalues():
5370       info("detaching %s drbd from local storage" % dev.iv_name)
5371       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5372       result.Raise("Can't detach drbd from local storage on node"
5373                    " %s for device %s" % (tgt_node, dev.iv_name))
5374       #dev.children = []
5375       #cfg.Update(instance)
5376
5377       # ok, we created the new LVs, so now we know we have the needed
5378       # storage; as such, we proceed on the target node to rename
5379       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5380       # using the assumption that logical_id == physical_id (which in
5381       # turn is the unique_id on that node)
5382
5383       # FIXME(iustin): use a better name for the replaced LVs
5384       temp_suffix = int(time.time())
5385       ren_fn = lambda d, suff: (d.physical_id[0],
5386                                 d.physical_id[1] + "_replaced-%s" % suff)
5387       # build the rename list based on what LVs exist on the node
5388       rlist = []
5389       for to_ren in old_lvs:
5390         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5391         if not result.fail_msg and result.payload:
5392           # device exists
5393           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5394
5395       info("renaming the old LVs on the target node")
5396       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5397       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5398       # now we rename the new LVs to the old LVs
5399       info("renaming the new LVs on the target node")
5400       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5401       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5402       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5403
5404       for old, new in zip(old_lvs, new_lvs):
5405         new.logical_id = old.logical_id
5406         cfg.SetDiskID(new, tgt_node)
5407
5408       for disk in old_lvs:
5409         disk.logical_id = ren_fn(disk, temp_suffix)
5410         cfg.SetDiskID(disk, tgt_node)
5411
5412       # now that the new lvs have the old name, we can add them to the device
5413       info("adding new mirror component on %s" % tgt_node)
5414       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5415       msg = result.fail_msg
5416       if msg:
5417         for new_lv in new_lvs:
5418           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5419           if msg2:
5420             warning("Can't rollback device %s: %s", dev, msg2,
5421                     hint="cleanup manually the unused logical volumes")
5422         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5423
5424       dev.children = new_lvs
5425       cfg.Update(instance)
5426
5427     # Step: wait for sync
5428
5429     # this can fail as the old devices are degraded and _WaitForSync
5430     # does a combined result over all disks, so we don't check its
5431     # return value
5432     self.proc.LogStep(5, steps_total, "sync devices")
5433     _WaitForSync(self, instance, unlock=True)
5434
5435     # so check manually all the devices
5436     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5437       cfg.SetDiskID(dev, instance.primary_node)
5438       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5439       msg = result.fail_msg
5440       if not msg and not result.payload:
5441         msg = "disk not found"
5442       if msg:
5443         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5444                                  (name, msg))
5445       if result.payload[5]:
5446         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5447
5448     # Step: remove old storage
5449     self.proc.LogStep(6, steps_total, "removing old storage")
5450     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5451       info("remove logical volumes for %s" % name)
5452       for lv in old_lvs:
5453         cfg.SetDiskID(lv, tgt_node)
5454         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5455         if msg:
5456           warning("Can't remove old LV: %s" % msg,
5457                   hint="manually remove unused LVs")
5458           continue
5459
5460   def _ExecD8Secondary(self, feedback_fn):
5461     """Replace the secondary node for drbd8.
5462
5463     The algorithm for replace is quite complicated:
5464       - for all disks of the instance:
5465         - create new LVs on the new node with same names
5466         - shutdown the drbd device on the old secondary
5467         - disconnect the drbd network on the primary
5468         - create the drbd device on the new secondary
5469         - network attach the drbd on the primary, using an artifice:
5470           the drbd code for Attach() will connect to the network if it
5471           finds a device which is connected to the good local disks but
5472           not network enabled
5473       - wait for sync across all devices
5474       - remove all disks from the old secondary
5475
5476     Failures are not very well handled.
5477
5478     """
5479     steps_total = 6
5480     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5481     instance = self.instance
5482     iv_names = {}
5483     # start of work
5484     cfg = self.cfg
5485     old_node = self.tgt_node
5486     new_node = self.new_node
5487     pri_node = instance.primary_node
5488     nodes_ip = {
5489       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5490       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5491       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5492       }
5493
5494     # Step: check device activation
5495     self.proc.LogStep(1, steps_total, "check device existence")
5496     info("checking volume groups")
5497     my_vg = cfg.GetVGName()
5498     results = self.rpc.call_vg_list([pri_node, new_node])
5499     for node in pri_node, new_node:
5500       res = results[node]
5501       res.Raise("Error checking node %s" % node)
5502       if my_vg not in res.payload:
5503         raise errors.OpExecError("Volume group '%s' not found on %s" %
5504                                  (my_vg, node))
5505     for idx, dev in enumerate(instance.disks):
5506       if idx not in self.op.disks:
5507         continue
5508       info("checking disk/%d on %s" % (idx, pri_node))
5509       cfg.SetDiskID(dev, pri_node)
5510       result = self.rpc.call_blockdev_find(pri_node, dev)
5511       msg = result.fail_msg
5512       if not msg and not result.payload:
5513         msg = "disk not found"
5514       if msg:
5515         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5516                                  (idx, pri_node, msg))
5517
5518     # Step: check other node consistency
5519     self.proc.LogStep(2, steps_total, "check peer consistency")
5520     for idx, dev in enumerate(instance.disks):
5521       if idx not in self.op.disks:
5522         continue
5523       info("checking disk/%d consistency on %s" % (idx, pri_node))
5524       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5525         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5526                                  " unsafe to replace the secondary" %
5527                                  pri_node)
5528
5529     # Step: create new storage
5530     self.proc.LogStep(3, steps_total, "allocate new storage")
5531     for idx, dev in enumerate(instance.disks):
5532       info("adding new local storage on %s for disk/%d" %
5533            (new_node, idx))
5534       # we pass force_create=True to force LVM creation
5535       for new_lv in dev.children:
5536         _CreateBlockDev(self, new_node, instance, new_lv, True,
5537                         _GetInstanceInfoText(instance), False)
5538
5539     # Step 4: dbrd minors and drbd setups changes
5540     # after this, we must manually remove the drbd minors on both the
5541     # error and the success paths
5542     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5543                                    instance.name)
5544     logging.debug("Allocated minors %s" % (minors,))
5545     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5546     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5547       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5548       # create new devices on new_node; note that we create two IDs:
5549       # one without port, so the drbd will be activated without
5550       # networking information on the new node at this stage, and one
5551       # with network, for the latter activation in step 4
5552       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5553       if pri_node == o_node1:
5554         p_minor = o_minor1
5555       else:
5556         p_minor = o_minor2
5557
5558       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5559       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5560
5561       iv_names[idx] = (dev, dev.children, new_net_id)
5562       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5563                     new_net_id)
5564       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5565                               logical_id=new_alone_id,
5566                               children=dev.children,
5567                               size=dev.size)
5568       try:
5569         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5570                               _GetInstanceInfoText(instance), False)
5571       except errors.GenericError:
5572         self.cfg.ReleaseDRBDMinors(instance.name)
5573         raise
5574
5575     for idx, dev in enumerate(instance.disks):
5576       # we have new devices, shutdown the drbd on the old secondary
5577       info("shutting down drbd for disk/%d on old node" % idx)
5578       cfg.SetDiskID(dev, old_node)
5579       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5580       if msg:
5581         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5582                 (idx, msg),
5583                 hint="Please cleanup this device manually as soon as possible")
5584
5585     info("detaching primary drbds from the network (=> standalone)")
5586     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5587                                                instance.disks)[pri_node]
5588
5589     msg = result.fail_msg
5590     if msg:
5591       # detaches didn't succeed (unlikely)
5592       self.cfg.ReleaseDRBDMinors(instance.name)
5593       raise errors.OpExecError("Can't detach the disks from the network on"
5594                                " old node: %s" % (msg,))
5595
5596     # if we managed to detach at least one, we update all the disks of
5597     # the instance to point to the new secondary
5598     info("updating instance configuration")
5599     for dev, _, new_logical_id in iv_names.itervalues():
5600       dev.logical_id = new_logical_id
5601       cfg.SetDiskID(dev, pri_node)
5602     cfg.Update(instance)
5603
5604     # and now perform the drbd attach
5605     info("attaching primary drbds to new secondary (standalone => connected)")
5606     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5607                                            instance.disks, instance.name,
5608                                            False)
5609     for to_node, to_result in result.items():
5610       msg = to_result.fail_msg
5611       if msg:
5612         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5613                 hint="please do a gnt-instance info to see the"
5614                 " status of disks")
5615
5616     # this can fail as the old devices are degraded and _WaitForSync
5617     # does a combined result over all disks, so we don't check its
5618     # return value
5619     self.proc.LogStep(5, steps_total, "sync devices")
5620     _WaitForSync(self, instance, unlock=True)
5621
5622     # so check manually all the devices
5623     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5624       cfg.SetDiskID(dev, pri_node)
5625       result = self.rpc.call_blockdev_find(pri_node, dev)
5626       msg = result.fail_msg
5627       if not msg and not result.payload:
5628         msg = "disk not found"
5629       if msg:
5630         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5631                                  (idx, msg))
5632       if result.payload[5]:
5633         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5634
5635     self.proc.LogStep(6, steps_total, "removing old storage")
5636     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5637       info("remove logical volumes for disk/%d" % idx)
5638       for lv in old_lvs:
5639         cfg.SetDiskID(lv, old_node)
5640         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5641         if msg:
5642           warning("Can't remove LV on old secondary: %s", msg,
5643                   hint="Cleanup stale volumes by hand")
5644
5645   def Exec(self, feedback_fn):
5646     """Execute disk replacement.
5647
5648     This dispatches the disk replacement to the appropriate handler.
5649
5650     """
5651     instance = self.instance
5652
5653     # Activate the instance disks if we're replacing them on a down instance
5654     if not instance.admin_up:
5655       _StartInstanceDisks(self, instance, True)
5656
5657     if self.op.mode == constants.REPLACE_DISK_CHG:
5658       fn = self._ExecD8Secondary
5659     else:
5660       fn = self._ExecD8DiskOnly
5661
5662     ret = fn(feedback_fn)
5663
5664     # Deactivate the instance disks if we're replacing them on a down instance
5665     if not instance.admin_up:
5666       _SafeShutdownInstanceDisks(self, instance)
5667
5668     return ret
5669
5670
5671 class LUGrowDisk(LogicalUnit):
5672   """Grow a disk of an instance.
5673
5674   """
5675   HPATH = "disk-grow"
5676   HTYPE = constants.HTYPE_INSTANCE
5677   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5678   REQ_BGL = False
5679
5680   def ExpandNames(self):
5681     self._ExpandAndLockInstance()
5682     self.needed_locks[locking.LEVEL_NODE] = []
5683     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5684
5685   def DeclareLocks(self, level):
5686     if level == locking.LEVEL_NODE:
5687       self._LockInstancesNodes()
5688
5689   def BuildHooksEnv(self):
5690     """Build hooks env.
5691
5692     This runs on the master, the primary and all the secondaries.
5693
5694     """
5695     env = {
5696       "DISK": self.op.disk,
5697       "AMOUNT": self.op.amount,
5698       }
5699     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5700     nl = [
5701       self.cfg.GetMasterNode(),
5702       self.instance.primary_node,
5703       ]
5704     return env, nl, nl
5705
5706   def CheckPrereq(self):
5707     """Check prerequisites.
5708
5709     This checks that the instance is in the cluster.
5710
5711     """
5712     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5713     assert instance is not None, \
5714       "Cannot retrieve locked instance %s" % self.op.instance_name
5715     nodenames = list(instance.all_nodes)
5716     for node in nodenames:
5717       _CheckNodeOnline(self, node)
5718
5719
5720     self.instance = instance
5721
5722     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5723       raise errors.OpPrereqError("Instance's disk layout does not support"
5724                                  " growing.")
5725
5726     self.disk = instance.FindDisk(self.op.disk)
5727
5728     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5729                                        instance.hypervisor)
5730     for node in nodenames:
5731       info = nodeinfo[node]
5732       info.Raise("Cannot get current information from node %s" % node)
5733       vg_free = info.payload.get('vg_free', None)
5734       if not isinstance(vg_free, int):
5735         raise errors.OpPrereqError("Can't compute free disk space on"
5736                                    " node %s" % node)
5737       if self.op.amount > vg_free:
5738         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5739                                    " %d MiB available, %d MiB required" %
5740                                    (node, vg_free, self.op.amount))
5741
5742   def Exec(self, feedback_fn):
5743     """Execute disk grow.
5744
5745     """
5746     instance = self.instance
5747     disk = self.disk
5748     for node in instance.all_nodes:
5749       self.cfg.SetDiskID(disk, node)
5750       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5751       result.Raise("Grow request failed to node %s" % node)
5752     disk.RecordGrow(self.op.amount)
5753     self.cfg.Update(instance)
5754     if self.op.wait_for_sync:
5755       disk_abort = not _WaitForSync(self, instance)
5756       if disk_abort:
5757         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5758                              " status.\nPlease check the instance.")
5759
5760
5761 class LUQueryInstanceData(NoHooksLU):
5762   """Query runtime instance data.
5763
5764   """
5765   _OP_REQP = ["instances", "static"]
5766   REQ_BGL = False
5767
5768   def ExpandNames(self):
5769     self.needed_locks = {}
5770     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
5771
5772     if not isinstance(self.op.instances, list):
5773       raise errors.OpPrereqError("Invalid argument type 'instances'")
5774
5775     if self.op.instances:
5776       self.wanted_names = []
5777       for name in self.op.instances:
5778         full_name = self.cfg.ExpandInstanceName(name)
5779         if full_name is None:
5780           raise errors.OpPrereqError("Instance '%s' not known" % name)
5781         self.wanted_names.append(full_name)
5782       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5783     else:
5784       self.wanted_names = None
5785       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5786
5787     self.needed_locks[locking.LEVEL_NODE] = []
5788     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5789
5790   def DeclareLocks(self, level):
5791     if level == locking.LEVEL_NODE:
5792       self._LockInstancesNodes()
5793
5794   def CheckPrereq(self):
5795     """Check prerequisites.
5796
5797     This only checks the optional instance list against the existing names.
5798
5799     """
5800     if self.wanted_names is None:
5801       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5802
5803     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5804                              in self.wanted_names]
5805     return
5806
5807   def _ComputeDiskStatus(self, instance, snode, dev):
5808     """Compute block device status.
5809
5810     """
5811     static = self.op.static
5812     if not static:
5813       self.cfg.SetDiskID(dev, instance.primary_node)
5814       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5815       if dev_pstatus.offline:
5816         dev_pstatus = None
5817       else:
5818         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5819         dev_pstatus = dev_pstatus.payload
5820     else:
5821       dev_pstatus = None
5822
5823     if dev.dev_type in constants.LDS_DRBD:
5824       # we change the snode then (otherwise we use the one passed in)
5825       if dev.logical_id[0] == instance.primary_node:
5826         snode = dev.logical_id[1]
5827       else:
5828         snode = dev.logical_id[0]
5829
5830     if snode and not static:
5831       self.cfg.SetDiskID(dev, snode)
5832       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5833       if dev_sstatus.offline:
5834         dev_sstatus = None
5835       else:
5836         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5837         dev_sstatus = dev_sstatus.payload
5838     else:
5839       dev_sstatus = None
5840
5841     if dev.children:
5842       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5843                       for child in dev.children]
5844     else:
5845       dev_children = []
5846
5847     data = {
5848       "iv_name": dev.iv_name,
5849       "dev_type": dev.dev_type,
5850       "logical_id": dev.logical_id,
5851       "physical_id": dev.physical_id,
5852       "pstatus": dev_pstatus,
5853       "sstatus": dev_sstatus,
5854       "children": dev_children,
5855       "mode": dev.mode,
5856       "size": dev.size,
5857       }
5858
5859     return data
5860
5861   def Exec(self, feedback_fn):
5862     """Gather and return data"""
5863     result = {}
5864
5865     cluster = self.cfg.GetClusterInfo()
5866
5867     for instance in self.wanted_instances:
5868       if not self.op.static:
5869         remote_info = self.rpc.call_instance_info(instance.primary_node,
5870                                                   instance.name,
5871                                                   instance.hypervisor)
5872         remote_info.Raise("Error checking node %s" % instance.primary_node)
5873         remote_info = remote_info.payload
5874         if remote_info and "state" in remote_info:
5875           remote_state = "up"
5876         else:
5877           remote_state = "down"
5878       else:
5879         remote_state = None
5880       if instance.admin_up:
5881         config_state = "up"
5882       else:
5883         config_state = "down"
5884
5885       disks = [self._ComputeDiskStatus(instance, None, device)
5886                for device in instance.disks]
5887
5888       idict = {
5889         "name": instance.name,
5890         "config_state": config_state,
5891         "run_state": remote_state,
5892         "pnode": instance.primary_node,
5893         "snodes": instance.secondary_nodes,
5894         "os": instance.os,
5895         # this happens to be the same format used for hooks
5896         "nics": _NICListToTuple(self, instance.nics),
5897         "disks": disks,
5898         "hypervisor": instance.hypervisor,
5899         "network_port": instance.network_port,
5900         "hv_instance": instance.hvparams,
5901         "hv_actual": cluster.FillHV(instance),
5902         "be_instance": instance.beparams,
5903         "be_actual": cluster.FillBE(instance),
5904         }
5905
5906       result[instance.name] = idict
5907
5908     return result
5909
5910
5911 class LUSetInstanceParams(LogicalUnit):
5912   """Modifies an instances's parameters.
5913
5914   """
5915   HPATH = "instance-modify"
5916   HTYPE = constants.HTYPE_INSTANCE
5917   _OP_REQP = ["instance_name"]
5918   REQ_BGL = False
5919
5920   def CheckArguments(self):
5921     if not hasattr(self.op, 'nics'):
5922       self.op.nics = []
5923     if not hasattr(self.op, 'disks'):
5924       self.op.disks = []
5925     if not hasattr(self.op, 'beparams'):
5926       self.op.beparams = {}
5927     if not hasattr(self.op, 'hvparams'):
5928       self.op.hvparams = {}
5929     self.op.force = getattr(self.op, "force", False)
5930     if not (self.op.nics or self.op.disks or
5931             self.op.hvparams or self.op.beparams):
5932       raise errors.OpPrereqError("No changes submitted")
5933
5934     # Disk validation
5935     disk_addremove = 0
5936     for disk_op, disk_dict in self.op.disks:
5937       if disk_op == constants.DDM_REMOVE:
5938         disk_addremove += 1
5939         continue
5940       elif disk_op == constants.DDM_ADD:
5941         disk_addremove += 1
5942       else:
5943         if not isinstance(disk_op, int):
5944           raise errors.OpPrereqError("Invalid disk index")
5945         if not isinstance(disk_dict, dict):
5946           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5947           raise errors.OpPrereqError(msg)
5948
5949       if disk_op == constants.DDM_ADD:
5950         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5951         if mode not in constants.DISK_ACCESS_SET:
5952           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5953         size = disk_dict.get('size', None)
5954         if size is None:
5955           raise errors.OpPrereqError("Required disk parameter size missing")
5956         try:
5957           size = int(size)
5958         except ValueError, err:
5959           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5960                                      str(err))
5961         disk_dict['size'] = size
5962       else:
5963         # modification of disk
5964         if 'size' in disk_dict:
5965           raise errors.OpPrereqError("Disk size change not possible, use"
5966                                      " grow-disk")
5967
5968     if disk_addremove > 1:
5969       raise errors.OpPrereqError("Only one disk add or remove operation"
5970                                  " supported at a time")
5971
5972     # NIC validation
5973     nic_addremove = 0
5974     for nic_op, nic_dict in self.op.nics:
5975       if nic_op == constants.DDM_REMOVE:
5976         nic_addremove += 1
5977         continue
5978       elif nic_op == constants.DDM_ADD:
5979         nic_addremove += 1
5980       else:
5981         if not isinstance(nic_op, int):
5982           raise errors.OpPrereqError("Invalid nic index")
5983         if not isinstance(nic_dict, dict):
5984           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5985           raise errors.OpPrereqError(msg)
5986
5987       # nic_dict should be a dict
5988       nic_ip = nic_dict.get('ip', None)
5989       if nic_ip is not None:
5990         if nic_ip.lower() == constants.VALUE_NONE:
5991           nic_dict['ip'] = None
5992         else:
5993           if not utils.IsValidIP(nic_ip):
5994             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5995
5996       nic_bridge = nic_dict.get('bridge', None)
5997       nic_link = nic_dict.get('link', None)
5998       if nic_bridge and nic_link:
5999         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6000                                    " at the same time")
6001       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6002         nic_dict['bridge'] = None
6003       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6004         nic_dict['link'] = None
6005
6006       if nic_op == constants.DDM_ADD:
6007         nic_mac = nic_dict.get('mac', None)
6008         if nic_mac is None:
6009           nic_dict['mac'] = constants.VALUE_AUTO
6010
6011       if 'mac' in nic_dict:
6012         nic_mac = nic_dict['mac']
6013         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6014           if not utils.IsValidMac(nic_mac):
6015             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6016         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6017           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6018                                      " modifying an existing nic")
6019
6020     if nic_addremove > 1:
6021       raise errors.OpPrereqError("Only one NIC add or remove operation"
6022                                  " supported at a time")
6023
6024   def ExpandNames(self):
6025     self._ExpandAndLockInstance()
6026     self.needed_locks[locking.LEVEL_NODE] = []
6027     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6028
6029   def DeclareLocks(self, level):
6030     if level == locking.LEVEL_NODE:
6031       self._LockInstancesNodes()
6032
6033   def BuildHooksEnv(self):
6034     """Build hooks env.
6035
6036     This runs on the master, primary and secondaries.
6037
6038     """
6039     args = dict()
6040     if constants.BE_MEMORY in self.be_new:
6041       args['memory'] = self.be_new[constants.BE_MEMORY]
6042     if constants.BE_VCPUS in self.be_new:
6043       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6044     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6045     # information at all.
6046     if self.op.nics:
6047       args['nics'] = []
6048       nic_override = dict(self.op.nics)
6049       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6050       for idx, nic in enumerate(self.instance.nics):
6051         if idx in nic_override:
6052           this_nic_override = nic_override[idx]
6053         else:
6054           this_nic_override = {}
6055         if 'ip' in this_nic_override:
6056           ip = this_nic_override['ip']
6057         else:
6058           ip = nic.ip
6059         if 'mac' in this_nic_override:
6060           mac = this_nic_override['mac']
6061         else:
6062           mac = nic.mac
6063         if idx in self.nic_pnew:
6064           nicparams = self.nic_pnew[idx]
6065         else:
6066           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6067         mode = nicparams[constants.NIC_MODE]
6068         link = nicparams[constants.NIC_LINK]
6069         args['nics'].append((ip, mac, mode, link))
6070       if constants.DDM_ADD in nic_override:
6071         ip = nic_override[constants.DDM_ADD].get('ip', None)
6072         mac = nic_override[constants.DDM_ADD]['mac']
6073         nicparams = self.nic_pnew[constants.DDM_ADD]
6074         mode = nicparams[constants.NIC_MODE]
6075         link = nicparams[constants.NIC_LINK]
6076         args['nics'].append((ip, mac, mode, link))
6077       elif constants.DDM_REMOVE in nic_override:
6078         del args['nics'][-1]
6079
6080     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6081     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6082     return env, nl, nl
6083
6084   def _GetUpdatedParams(self, old_params, update_dict,
6085                         default_values, parameter_types):
6086     """Return the new params dict for the given params.
6087
6088     @type old_params: dict
6089     @param old_params: old parameters
6090     @type update_dict: dict
6091     @param update_dict: dict containing new parameter values,
6092                         or constants.VALUE_DEFAULT to reset the
6093                         parameter to its default value
6094     @type default_values: dict
6095     @param default_values: default values for the filled parameters
6096     @type parameter_types: dict
6097     @param parameter_types: dict mapping target dict keys to types
6098                             in constants.ENFORCEABLE_TYPES
6099     @rtype: (dict, dict)
6100     @return: (new_parameters, filled_parameters)
6101
6102     """
6103     params_copy = copy.deepcopy(old_params)
6104     for key, val in update_dict.iteritems():
6105       if val == constants.VALUE_DEFAULT:
6106         try:
6107           del params_copy[key]
6108         except KeyError:
6109           pass
6110       else:
6111         params_copy[key] = val
6112     utils.ForceDictType(params_copy, parameter_types)
6113     params_filled = objects.FillDict(default_values, params_copy)
6114     return (params_copy, params_filled)
6115
6116   def CheckPrereq(self):
6117     """Check prerequisites.
6118
6119     This only checks the instance list against the existing names.
6120
6121     """
6122     self.force = self.op.force
6123
6124     # checking the new params on the primary/secondary nodes
6125
6126     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6127     cluster = self.cluster = self.cfg.GetClusterInfo()
6128     assert self.instance is not None, \
6129       "Cannot retrieve locked instance %s" % self.op.instance_name
6130     pnode = instance.primary_node
6131     nodelist = list(instance.all_nodes)
6132
6133     # hvparams processing
6134     if self.op.hvparams:
6135       i_hvdict, hv_new = self._GetUpdatedParams(
6136                              instance.hvparams, self.op.hvparams,
6137                              cluster.hvparams[instance.hypervisor],
6138                              constants.HVS_PARAMETER_TYPES)
6139       # local check
6140       hypervisor.GetHypervisor(
6141         instance.hypervisor).CheckParameterSyntax(hv_new)
6142       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6143       self.hv_new = hv_new # the new actual values
6144       self.hv_inst = i_hvdict # the new dict (without defaults)
6145     else:
6146       self.hv_new = self.hv_inst = {}
6147
6148     # beparams processing
6149     if self.op.beparams:
6150       i_bedict, be_new = self._GetUpdatedParams(
6151                              instance.beparams, self.op.beparams,
6152                              cluster.beparams[constants.PP_DEFAULT],
6153                              constants.BES_PARAMETER_TYPES)
6154       self.be_new = be_new # the new actual values
6155       self.be_inst = i_bedict # the new dict (without defaults)
6156     else:
6157       self.be_new = self.be_inst = {}
6158
6159     self.warn = []
6160
6161     if constants.BE_MEMORY in self.op.beparams and not self.force:
6162       mem_check_list = [pnode]
6163       if be_new[constants.BE_AUTO_BALANCE]:
6164         # either we changed auto_balance to yes or it was from before
6165         mem_check_list.extend(instance.secondary_nodes)
6166       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6167                                                   instance.hypervisor)
6168       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6169                                          instance.hypervisor)
6170       pninfo = nodeinfo[pnode]
6171       msg = pninfo.fail_msg
6172       if msg:
6173         # Assume the primary node is unreachable and go ahead
6174         self.warn.append("Can't get info from primary node %s: %s" %
6175                          (pnode,  msg))
6176       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6177         self.warn.append("Node data from primary node %s doesn't contain"
6178                          " free memory information" % pnode)
6179       elif instance_info.fail_msg:
6180         self.warn.append("Can't get instance runtime information: %s" %
6181                         instance_info.fail_msg)
6182       else:
6183         if instance_info.payload:
6184           current_mem = int(instance_info.payload['memory'])
6185         else:
6186           # Assume instance not running
6187           # (there is a slight race condition here, but it's not very probable,
6188           # and we have no other way to check)
6189           current_mem = 0
6190         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6191                     pninfo.payload['memory_free'])
6192         if miss_mem > 0:
6193           raise errors.OpPrereqError("This change will prevent the instance"
6194                                      " from starting, due to %d MB of memory"
6195                                      " missing on its primary node" % miss_mem)
6196
6197       if be_new[constants.BE_AUTO_BALANCE]:
6198         for node, nres in nodeinfo.items():
6199           if node not in instance.secondary_nodes:
6200             continue
6201           msg = nres.fail_msg
6202           if msg:
6203             self.warn.append("Can't get info from secondary node %s: %s" %
6204                              (node, msg))
6205           elif not isinstance(nres.payload.get('memory_free', None), int):
6206             self.warn.append("Secondary node %s didn't return free"
6207                              " memory information" % node)
6208           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6209             self.warn.append("Not enough memory to failover instance to"
6210                              " secondary node %s" % node)
6211
6212     # NIC processing
6213     self.nic_pnew = {}
6214     self.nic_pinst = {}
6215     for nic_op, nic_dict in self.op.nics:
6216       if nic_op == constants.DDM_REMOVE:
6217         if not instance.nics:
6218           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6219         continue
6220       if nic_op != constants.DDM_ADD:
6221         # an existing nic
6222         if nic_op < 0 or nic_op >= len(instance.nics):
6223           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6224                                      " are 0 to %d" %
6225                                      (nic_op, len(instance.nics)))
6226         old_nic_params = instance.nics[nic_op].nicparams
6227         old_nic_ip = instance.nics[nic_op].ip
6228       else:
6229         old_nic_params = {}
6230         old_nic_ip = None
6231
6232       update_params_dict = dict([(key, nic_dict[key])
6233                                  for key in constants.NICS_PARAMETERS
6234                                  if key in nic_dict])
6235
6236       if 'bridge' in nic_dict:
6237         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6238
6239       new_nic_params, new_filled_nic_params = \
6240           self._GetUpdatedParams(old_nic_params, update_params_dict,
6241                                  cluster.nicparams[constants.PP_DEFAULT],
6242                                  constants.NICS_PARAMETER_TYPES)
6243       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6244       self.nic_pinst[nic_op] = new_nic_params
6245       self.nic_pnew[nic_op] = new_filled_nic_params
6246       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6247
6248       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6249         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6250         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6251         if msg:
6252           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6253           if self.force:
6254             self.warn.append(msg)
6255           else:
6256             raise errors.OpPrereqError(msg)
6257       if new_nic_mode == constants.NIC_MODE_ROUTED:
6258         if 'ip' in nic_dict:
6259           nic_ip = nic_dict['ip']
6260         else:
6261           nic_ip = old_nic_ip
6262         if nic_ip is None:
6263           raise errors.OpPrereqError('Cannot set the nic ip to None'
6264                                      ' on a routed nic')
6265       if 'mac' in nic_dict:
6266         nic_mac = nic_dict['mac']
6267         if nic_mac is None:
6268           raise errors.OpPrereqError('Cannot set the nic mac to None')
6269         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6270           # otherwise generate the mac
6271           nic_dict['mac'] = self.cfg.GenerateMAC()
6272         else:
6273           # or validate/reserve the current one
6274           if self.cfg.IsMacInUse(nic_mac):
6275             raise errors.OpPrereqError("MAC address %s already in use"
6276                                        " in cluster" % nic_mac)
6277
6278     # DISK processing
6279     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6280       raise errors.OpPrereqError("Disk operations not supported for"
6281                                  " diskless instances")
6282     for disk_op, disk_dict in self.op.disks:
6283       if disk_op == constants.DDM_REMOVE:
6284         if len(instance.disks) == 1:
6285           raise errors.OpPrereqError("Cannot remove the last disk of"
6286                                      " an instance")
6287         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6288         ins_l = ins_l[pnode]
6289         msg = ins_l.fail_msg
6290         if msg:
6291           raise errors.OpPrereqError("Can't contact node %s: %s" %
6292                                      (pnode, msg))
6293         if instance.name in ins_l.payload:
6294           raise errors.OpPrereqError("Instance is running, can't remove"
6295                                      " disks.")
6296
6297       if (disk_op == constants.DDM_ADD and
6298           len(instance.nics) >= constants.MAX_DISKS):
6299         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6300                                    " add more" % constants.MAX_DISKS)
6301       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6302         # an existing disk
6303         if disk_op < 0 or disk_op >= len(instance.disks):
6304           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6305                                      " are 0 to %d" %
6306                                      (disk_op, len(instance.disks)))
6307
6308     return
6309
6310   def Exec(self, feedback_fn):
6311     """Modifies an instance.
6312
6313     All parameters take effect only at the next restart of the instance.
6314
6315     """
6316     # Process here the warnings from CheckPrereq, as we don't have a
6317     # feedback_fn there.
6318     for warn in self.warn:
6319       feedback_fn("WARNING: %s" % warn)
6320
6321     result = []
6322     instance = self.instance
6323     cluster = self.cluster
6324     # disk changes
6325     for disk_op, disk_dict in self.op.disks:
6326       if disk_op == constants.DDM_REMOVE:
6327         # remove the last disk
6328         device = instance.disks.pop()
6329         device_idx = len(instance.disks)
6330         for node, disk in device.ComputeNodeTree(instance.primary_node):
6331           self.cfg.SetDiskID(disk, node)
6332           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6333           if msg:
6334             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6335                             " continuing anyway", device_idx, node, msg)
6336         result.append(("disk/%d" % device_idx, "remove"))
6337       elif disk_op == constants.DDM_ADD:
6338         # add a new disk
6339         if instance.disk_template == constants.DT_FILE:
6340           file_driver, file_path = instance.disks[0].logical_id
6341           file_path = os.path.dirname(file_path)
6342         else:
6343           file_driver = file_path = None
6344         disk_idx_base = len(instance.disks)
6345         new_disk = _GenerateDiskTemplate(self,
6346                                          instance.disk_template,
6347                                          instance.name, instance.primary_node,
6348                                          instance.secondary_nodes,
6349                                          [disk_dict],
6350                                          file_path,
6351                                          file_driver,
6352                                          disk_idx_base)[0]
6353         instance.disks.append(new_disk)
6354         info = _GetInstanceInfoText(instance)
6355
6356         logging.info("Creating volume %s for instance %s",
6357                      new_disk.iv_name, instance.name)
6358         # Note: this needs to be kept in sync with _CreateDisks
6359         #HARDCODE
6360         for node in instance.all_nodes:
6361           f_create = node == instance.primary_node
6362           try:
6363             _CreateBlockDev(self, node, instance, new_disk,
6364                             f_create, info, f_create)
6365           except errors.OpExecError, err:
6366             self.LogWarning("Failed to create volume %s (%s) on"
6367                             " node %s: %s",
6368                             new_disk.iv_name, new_disk, node, err)
6369         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6370                        (new_disk.size, new_disk.mode)))
6371       else:
6372         # change a given disk
6373         instance.disks[disk_op].mode = disk_dict['mode']
6374         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6375     # NIC changes
6376     for nic_op, nic_dict in self.op.nics:
6377       if nic_op == constants.DDM_REMOVE:
6378         # remove the last nic
6379         del instance.nics[-1]
6380         result.append(("nic.%d" % len(instance.nics), "remove"))
6381       elif nic_op == constants.DDM_ADD:
6382         # mac and bridge should be set, by now
6383         mac = nic_dict['mac']
6384         ip = nic_dict.get('ip', None)
6385         nicparams = self.nic_pinst[constants.DDM_ADD]
6386         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6387         instance.nics.append(new_nic)
6388         result.append(("nic.%d" % (len(instance.nics) - 1),
6389                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6390                        (new_nic.mac, new_nic.ip,
6391                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6392                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6393                        )))
6394       else:
6395         for key in 'mac', 'ip':
6396           if key in nic_dict:
6397             setattr(instance.nics[nic_op], key, nic_dict[key])
6398         if nic_op in self.nic_pnew:
6399           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6400         for key, val in nic_dict.iteritems():
6401           result.append(("nic.%s/%d" % (key, nic_op), val))
6402
6403     # hvparams changes
6404     if self.op.hvparams:
6405       instance.hvparams = self.hv_inst
6406       for key, val in self.op.hvparams.iteritems():
6407         result.append(("hv/%s" % key, val))
6408
6409     # beparams changes
6410     if self.op.beparams:
6411       instance.beparams = self.be_inst
6412       for key, val in self.op.beparams.iteritems():
6413         result.append(("be/%s" % key, val))
6414
6415     self.cfg.Update(instance)
6416
6417     return result
6418
6419
6420 class LUQueryExports(NoHooksLU):
6421   """Query the exports list
6422
6423   """
6424   _OP_REQP = ['nodes']
6425   REQ_BGL = False
6426
6427   def ExpandNames(self):
6428     self.needed_locks = {}
6429     self.share_locks[locking.LEVEL_NODE] = 1
6430     if not self.op.nodes:
6431       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6432     else:
6433       self.needed_locks[locking.LEVEL_NODE] = \
6434         _GetWantedNodes(self, self.op.nodes)
6435
6436   def CheckPrereq(self):
6437     """Check prerequisites.
6438
6439     """
6440     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6441
6442   def Exec(self, feedback_fn):
6443     """Compute the list of all the exported system images.
6444
6445     @rtype: dict
6446     @return: a dictionary with the structure node->(export-list)
6447         where export-list is a list of the instances exported on
6448         that node.
6449
6450     """
6451     rpcresult = self.rpc.call_export_list(self.nodes)
6452     result = {}
6453     for node in rpcresult:
6454       if rpcresult[node].fail_msg:
6455         result[node] = False
6456       else:
6457         result[node] = rpcresult[node].payload
6458
6459     return result
6460
6461
6462 class LUExportInstance(LogicalUnit):
6463   """Export an instance to an image in the cluster.
6464
6465   """
6466   HPATH = "instance-export"
6467   HTYPE = constants.HTYPE_INSTANCE
6468   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6469   REQ_BGL = False
6470
6471   def ExpandNames(self):
6472     self._ExpandAndLockInstance()
6473     # FIXME: lock only instance primary and destination node
6474     #
6475     # Sad but true, for now we have do lock all nodes, as we don't know where
6476     # the previous export might be, and and in this LU we search for it and
6477     # remove it from its current node. In the future we could fix this by:
6478     #  - making a tasklet to search (share-lock all), then create the new one,
6479     #    then one to remove, after
6480     #  - removing the removal operation altogether
6481     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6482
6483   def DeclareLocks(self, level):
6484     """Last minute lock declaration."""
6485     # All nodes are locked anyway, so nothing to do here.
6486
6487   def BuildHooksEnv(self):
6488     """Build hooks env.
6489
6490     This will run on the master, primary node and target node.
6491
6492     """
6493     env = {
6494       "EXPORT_NODE": self.op.target_node,
6495       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6496       }
6497     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6498     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6499           self.op.target_node]
6500     return env, nl, nl
6501
6502   def CheckPrereq(self):
6503     """Check prerequisites.
6504
6505     This checks that the instance and node names are valid.
6506
6507     """
6508     instance_name = self.op.instance_name
6509     self.instance = self.cfg.GetInstanceInfo(instance_name)
6510     assert self.instance is not None, \
6511           "Cannot retrieve locked instance %s" % self.op.instance_name
6512     _CheckNodeOnline(self, self.instance.primary_node)
6513
6514     self.dst_node = self.cfg.GetNodeInfo(
6515       self.cfg.ExpandNodeName(self.op.target_node))
6516
6517     if self.dst_node is None:
6518       # This is wrong node name, not a non-locked node
6519       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6520     _CheckNodeOnline(self, self.dst_node.name)
6521     _CheckNodeNotDrained(self, self.dst_node.name)
6522
6523     # instance disk type verification
6524     for disk in self.instance.disks:
6525       if disk.dev_type == constants.LD_FILE:
6526         raise errors.OpPrereqError("Export not supported for instances with"
6527                                    " file-based disks")
6528
6529   def Exec(self, feedback_fn):
6530     """Export an instance to an image in the cluster.
6531
6532     """
6533     instance = self.instance
6534     dst_node = self.dst_node
6535     src_node = instance.primary_node
6536     if self.op.shutdown:
6537       # shutdown the instance, but not the disks
6538       result = self.rpc.call_instance_shutdown(src_node, instance)
6539       result.Raise("Could not shutdown instance %s on"
6540                    " node %s" % (instance.name, src_node))
6541
6542     vgname = self.cfg.GetVGName()
6543
6544     snap_disks = []
6545
6546     # set the disks ID correctly since call_instance_start needs the
6547     # correct drbd minor to create the symlinks
6548     for disk in instance.disks:
6549       self.cfg.SetDiskID(disk, src_node)
6550
6551     try:
6552       for idx, disk in enumerate(instance.disks):
6553         # result.payload will be a snapshot of an lvm leaf of the one we passed
6554         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6555         msg = result.fail_msg
6556         if msg:
6557           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6558                           idx, src_node, msg)
6559           snap_disks.append(False)
6560         else:
6561           disk_id = (vgname, result.payload)
6562           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6563                                  logical_id=disk_id, physical_id=disk_id,
6564                                  iv_name=disk.iv_name)
6565           snap_disks.append(new_dev)
6566
6567     finally:
6568       if self.op.shutdown and instance.admin_up:
6569         result = self.rpc.call_instance_start(src_node, instance, None, None)
6570         msg = result.fail_msg
6571         if msg:
6572           _ShutdownInstanceDisks(self, instance)
6573           raise errors.OpExecError("Could not start instance: %s" % msg)
6574
6575     # TODO: check for size
6576
6577     cluster_name = self.cfg.GetClusterName()
6578     for idx, dev in enumerate(snap_disks):
6579       if dev:
6580         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6581                                                instance, cluster_name, idx)
6582         msg = result.fail_msg
6583         if msg:
6584           self.LogWarning("Could not export disk/%s from node %s to"
6585                           " node %s: %s", idx, src_node, dst_node.name, msg)
6586         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6587         if msg:
6588           self.LogWarning("Could not remove snapshot for disk/%d from node"
6589                           " %s: %s", idx, src_node, msg)
6590
6591     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6592     msg = result.fail_msg
6593     if msg:
6594       self.LogWarning("Could not finalize export for instance %s"
6595                       " on node %s: %s", instance.name, dst_node.name, msg)
6596
6597     nodelist = self.cfg.GetNodeList()
6598     nodelist.remove(dst_node.name)
6599
6600     # on one-node clusters nodelist will be empty after the removal
6601     # if we proceed the backup would be removed because OpQueryExports
6602     # substitutes an empty list with the full cluster node list.
6603     iname = instance.name
6604     if nodelist:
6605       exportlist = self.rpc.call_export_list(nodelist)
6606       for node in exportlist:
6607         if exportlist[node].fail_msg:
6608           continue
6609         if iname in exportlist[node].payload:
6610           msg = self.rpc.call_export_remove(node, iname).fail_msg
6611           if msg:
6612             self.LogWarning("Could not remove older export for instance %s"
6613                             " on node %s: %s", iname, node, msg)
6614
6615
6616 class LURemoveExport(NoHooksLU):
6617   """Remove exports related to the named instance.
6618
6619   """
6620   _OP_REQP = ["instance_name"]
6621   REQ_BGL = False
6622
6623   def ExpandNames(self):
6624     self.needed_locks = {}
6625     # We need all nodes to be locked in order for RemoveExport to work, but we
6626     # don't need to lock the instance itself, as nothing will happen to it (and
6627     # we can remove exports also for a removed instance)
6628     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6629
6630   def CheckPrereq(self):
6631     """Check prerequisites.
6632     """
6633     pass
6634
6635   def Exec(self, feedback_fn):
6636     """Remove any export.
6637
6638     """
6639     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6640     # If the instance was not found we'll try with the name that was passed in.
6641     # This will only work if it was an FQDN, though.
6642     fqdn_warn = False
6643     if not instance_name:
6644       fqdn_warn = True
6645       instance_name = self.op.instance_name
6646
6647     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6648     exportlist = self.rpc.call_export_list(locked_nodes)
6649     found = False
6650     for node in exportlist:
6651       msg = exportlist[node].fail_msg
6652       if msg:
6653         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6654         continue
6655       if instance_name in exportlist[node].payload:
6656         found = True
6657         result = self.rpc.call_export_remove(node, instance_name)
6658         msg = result.fail_msg
6659         if msg:
6660           logging.error("Could not remove export for instance %s"
6661                         " on node %s: %s", instance_name, node, msg)
6662
6663     if fqdn_warn and not found:
6664       feedback_fn("Export not found. If trying to remove an export belonging"
6665                   " to a deleted instance please use its Fully Qualified"
6666                   " Domain Name.")
6667
6668
6669 class TagsLU(NoHooksLU):
6670   """Generic tags LU.
6671
6672   This is an abstract class which is the parent of all the other tags LUs.
6673
6674   """
6675
6676   def ExpandNames(self):
6677     self.needed_locks = {}
6678     if self.op.kind == constants.TAG_NODE:
6679       name = self.cfg.ExpandNodeName(self.op.name)
6680       if name is None:
6681         raise errors.OpPrereqError("Invalid node name (%s)" %
6682                                    (self.op.name,))
6683       self.op.name = name
6684       self.needed_locks[locking.LEVEL_NODE] = name
6685     elif self.op.kind == constants.TAG_INSTANCE:
6686       name = self.cfg.ExpandInstanceName(self.op.name)
6687       if name is None:
6688         raise errors.OpPrereqError("Invalid instance name (%s)" %
6689                                    (self.op.name,))
6690       self.op.name = name
6691       self.needed_locks[locking.LEVEL_INSTANCE] = name
6692
6693   def CheckPrereq(self):
6694     """Check prerequisites.
6695
6696     """
6697     if self.op.kind == constants.TAG_CLUSTER:
6698       self.target = self.cfg.GetClusterInfo()
6699     elif self.op.kind == constants.TAG_NODE:
6700       self.target = self.cfg.GetNodeInfo(self.op.name)
6701     elif self.op.kind == constants.TAG_INSTANCE:
6702       self.target = self.cfg.GetInstanceInfo(self.op.name)
6703     else:
6704       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6705                                  str(self.op.kind))
6706
6707
6708 class LUGetTags(TagsLU):
6709   """Returns the tags of a given object.
6710
6711   """
6712   _OP_REQP = ["kind", "name"]
6713   REQ_BGL = False
6714
6715   def Exec(self, feedback_fn):
6716     """Returns the tag list.
6717
6718     """
6719     return list(self.target.GetTags())
6720
6721
6722 class LUSearchTags(NoHooksLU):
6723   """Searches the tags for a given pattern.
6724
6725   """
6726   _OP_REQP = ["pattern"]
6727   REQ_BGL = False
6728
6729   def ExpandNames(self):
6730     self.needed_locks = {}
6731
6732   def CheckPrereq(self):
6733     """Check prerequisites.
6734
6735     This checks the pattern passed for validity by compiling it.
6736
6737     """
6738     try:
6739       self.re = re.compile(self.op.pattern)
6740     except re.error, err:
6741       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6742                                  (self.op.pattern, err))
6743
6744   def Exec(self, feedback_fn):
6745     """Returns the tag list.
6746
6747     """
6748     cfg = self.cfg
6749     tgts = [("/cluster", cfg.GetClusterInfo())]
6750     ilist = cfg.GetAllInstancesInfo().values()
6751     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6752     nlist = cfg.GetAllNodesInfo().values()
6753     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6754     results = []
6755     for path, target in tgts:
6756       for tag in target.GetTags():
6757         if self.re.search(tag):
6758           results.append((path, tag))
6759     return results
6760
6761
6762 class LUAddTags(TagsLU):
6763   """Sets a tag on a given object.
6764
6765   """
6766   _OP_REQP = ["kind", "name", "tags"]
6767   REQ_BGL = False
6768
6769   def CheckPrereq(self):
6770     """Check prerequisites.
6771
6772     This checks the type and length of the tag name and value.
6773
6774     """
6775     TagsLU.CheckPrereq(self)
6776     for tag in self.op.tags:
6777       objects.TaggableObject.ValidateTag(tag)
6778
6779   def Exec(self, feedback_fn):
6780     """Sets the tag.
6781
6782     """
6783     try:
6784       for tag in self.op.tags:
6785         self.target.AddTag(tag)
6786     except errors.TagError, err:
6787       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6788     try:
6789       self.cfg.Update(self.target)
6790     except errors.ConfigurationError:
6791       raise errors.OpRetryError("There has been a modification to the"
6792                                 " config file and the operation has been"
6793                                 " aborted. Please retry.")
6794
6795
6796 class LUDelTags(TagsLU):
6797   """Delete a list of tags from a given object.
6798
6799   """
6800   _OP_REQP = ["kind", "name", "tags"]
6801   REQ_BGL = False
6802
6803   def CheckPrereq(self):
6804     """Check prerequisites.
6805
6806     This checks that we have the given tag.
6807
6808     """
6809     TagsLU.CheckPrereq(self)
6810     for tag in self.op.tags:
6811       objects.TaggableObject.ValidateTag(tag)
6812     del_tags = frozenset(self.op.tags)
6813     cur_tags = self.target.GetTags()
6814     if not del_tags <= cur_tags:
6815       diff_tags = del_tags - cur_tags
6816       diff_names = ["'%s'" % tag for tag in diff_tags]
6817       diff_names.sort()
6818       raise errors.OpPrereqError("Tag(s) %s not found" %
6819                                  (",".join(diff_names)))
6820
6821   def Exec(self, feedback_fn):
6822     """Remove the tag from the object.
6823
6824     """
6825     for tag in self.op.tags:
6826       self.target.RemoveTag(tag)
6827     try:
6828       self.cfg.Update(self.target)
6829     except errors.ConfigurationError:
6830       raise errors.OpRetryError("There has been a modification to the"
6831                                 " config file and the operation has been"
6832                                 " aborted. Please retry.")
6833
6834
6835 class LUTestDelay(NoHooksLU):
6836   """Sleep for a specified amount of time.
6837
6838   This LU sleeps on the master and/or nodes for a specified amount of
6839   time.
6840
6841   """
6842   _OP_REQP = ["duration", "on_master", "on_nodes"]
6843   REQ_BGL = False
6844
6845   def ExpandNames(self):
6846     """Expand names and set required locks.
6847
6848     This expands the node list, if any.
6849
6850     """
6851     self.needed_locks = {}
6852     if self.op.on_nodes:
6853       # _GetWantedNodes can be used here, but is not always appropriate to use
6854       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6855       # more information.
6856       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6857       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6858
6859   def CheckPrereq(self):
6860     """Check prerequisites.
6861
6862     """
6863
6864   def Exec(self, feedback_fn):
6865     """Do the actual sleep.
6866
6867     """
6868     if self.op.on_master:
6869       if not utils.TestDelay(self.op.duration):
6870         raise errors.OpExecError("Error during master delay test")
6871     if self.op.on_nodes:
6872       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6873       for node, node_result in result.items():
6874         node_result.Raise("Failure during rpc call to node %s" % node)
6875
6876
6877 class IAllocator(object):
6878   """IAllocator framework.
6879
6880   An IAllocator instance has three sets of attributes:
6881     - cfg that is needed to query the cluster
6882     - input data (all members of the _KEYS class attribute are required)
6883     - four buffer attributes (in|out_data|text), that represent the
6884       input (to the external script) in text and data structure format,
6885       and the output from it, again in two formats
6886     - the result variables from the script (success, info, nodes) for
6887       easy usage
6888
6889   """
6890   _ALLO_KEYS = [
6891     "mem_size", "disks", "disk_template",
6892     "os", "tags", "nics", "vcpus", "hypervisor",
6893     ]
6894   _RELO_KEYS = [
6895     "relocate_from",
6896     ]
6897
6898   def __init__(self, lu, mode, name, **kwargs):
6899     self.lu = lu
6900     # init buffer variables
6901     self.in_text = self.out_text = self.in_data = self.out_data = None
6902     # init all input fields so that pylint is happy
6903     self.mode = mode
6904     self.name = name
6905     self.mem_size = self.disks = self.disk_template = None
6906     self.os = self.tags = self.nics = self.vcpus = None
6907     self.hypervisor = None
6908     self.relocate_from = None
6909     # computed fields
6910     self.required_nodes = None
6911     # init result fields
6912     self.success = self.info = self.nodes = None
6913     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6914       keyset = self._ALLO_KEYS
6915     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6916       keyset = self._RELO_KEYS
6917     else:
6918       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6919                                    " IAllocator" % self.mode)
6920     for key in kwargs:
6921       if key not in keyset:
6922         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6923                                      " IAllocator" % key)
6924       setattr(self, key, kwargs[key])
6925     for key in keyset:
6926       if key not in kwargs:
6927         raise errors.ProgrammerError("Missing input parameter '%s' to"
6928                                      " IAllocator" % key)
6929     self._BuildInputData()
6930
6931   def _ComputeClusterData(self):
6932     """Compute the generic allocator input data.
6933
6934     This is the data that is independent of the actual operation.
6935
6936     """
6937     cfg = self.lu.cfg
6938     cluster_info = cfg.GetClusterInfo()
6939     # cluster data
6940     data = {
6941       "version": constants.IALLOCATOR_VERSION,
6942       "cluster_name": cfg.GetClusterName(),
6943       "cluster_tags": list(cluster_info.GetTags()),
6944       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6945       # we don't have job IDs
6946       }
6947     iinfo = cfg.GetAllInstancesInfo().values()
6948     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6949
6950     # node data
6951     node_results = {}
6952     node_list = cfg.GetNodeList()
6953
6954     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6955       hypervisor_name = self.hypervisor
6956     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6957       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6958
6959     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6960                                            hypervisor_name)
6961     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6962                        cluster_info.enabled_hypervisors)
6963     for nname, nresult in node_data.items():
6964       # first fill in static (config-based) values
6965       ninfo = cfg.GetNodeInfo(nname)
6966       pnr = {
6967         "tags": list(ninfo.GetTags()),
6968         "primary_ip": ninfo.primary_ip,
6969         "secondary_ip": ninfo.secondary_ip,
6970         "offline": ninfo.offline,
6971         "drained": ninfo.drained,
6972         "master_candidate": ninfo.master_candidate,
6973         }
6974
6975       if not ninfo.offline:
6976         nresult.Raise("Can't get data for node %s" % nname)
6977         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6978                                 nname)
6979         remote_info = nresult.payload
6980         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6981                      'vg_size', 'vg_free', 'cpu_total']:
6982           if attr not in remote_info:
6983             raise errors.OpExecError("Node '%s' didn't return attribute"
6984                                      " '%s'" % (nname, attr))
6985           if not isinstance(remote_info[attr], int):
6986             raise errors.OpExecError("Node '%s' returned invalid value"
6987                                      " for '%s': %s" %
6988                                      (nname, attr, remote_info[attr]))
6989         # compute memory used by primary instances
6990         i_p_mem = i_p_up_mem = 0
6991         for iinfo, beinfo in i_list:
6992           if iinfo.primary_node == nname:
6993             i_p_mem += beinfo[constants.BE_MEMORY]
6994             if iinfo.name not in node_iinfo[nname].payload:
6995               i_used_mem = 0
6996             else:
6997               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6998             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6999             remote_info['memory_free'] -= max(0, i_mem_diff)
7000
7001             if iinfo.admin_up:
7002               i_p_up_mem += beinfo[constants.BE_MEMORY]
7003
7004         # compute memory used by instances
7005         pnr_dyn = {
7006           "total_memory": remote_info['memory_total'],
7007           "reserved_memory": remote_info['memory_dom0'],
7008           "free_memory": remote_info['memory_free'],
7009           "total_disk": remote_info['vg_size'],
7010           "free_disk": remote_info['vg_free'],
7011           "total_cpus": remote_info['cpu_total'],
7012           "i_pri_memory": i_p_mem,
7013           "i_pri_up_memory": i_p_up_mem,
7014           }
7015         pnr.update(pnr_dyn)
7016
7017       node_results[nname] = pnr
7018     data["nodes"] = node_results
7019
7020     # instance data
7021     instance_data = {}
7022     for iinfo, beinfo in i_list:
7023       nic_data = []
7024       for nic in iinfo.nics:
7025         filled_params = objects.FillDict(
7026             cluster_info.nicparams[constants.PP_DEFAULT],
7027             nic.nicparams)
7028         nic_dict = {"mac": nic.mac,
7029                     "ip": nic.ip,
7030                     "mode": filled_params[constants.NIC_MODE],
7031                     "link": filled_params[constants.NIC_LINK],
7032                    }
7033         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7034           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7035         nic_data.append(nic_dict)
7036       pir = {
7037         "tags": list(iinfo.GetTags()),
7038         "admin_up": iinfo.admin_up,
7039         "vcpus": beinfo[constants.BE_VCPUS],
7040         "memory": beinfo[constants.BE_MEMORY],
7041         "os": iinfo.os,
7042         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7043         "nics": nic_data,
7044         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7045         "disk_template": iinfo.disk_template,
7046         "hypervisor": iinfo.hypervisor,
7047         }
7048       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7049                                                  pir["disks"])
7050       instance_data[iinfo.name] = pir
7051
7052     data["instances"] = instance_data
7053
7054     self.in_data = data
7055
7056   def _AddNewInstance(self):
7057     """Add new instance data to allocator structure.
7058
7059     This in combination with _AllocatorGetClusterData will create the
7060     correct structure needed as input for the allocator.
7061
7062     The checks for the completeness of the opcode must have already been
7063     done.
7064
7065     """
7066     data = self.in_data
7067
7068     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7069
7070     if self.disk_template in constants.DTS_NET_MIRROR:
7071       self.required_nodes = 2
7072     else:
7073       self.required_nodes = 1
7074     request = {
7075       "type": "allocate",
7076       "name": self.name,
7077       "disk_template": self.disk_template,
7078       "tags": self.tags,
7079       "os": self.os,
7080       "vcpus": self.vcpus,
7081       "memory": self.mem_size,
7082       "disks": self.disks,
7083       "disk_space_total": disk_space,
7084       "nics": self.nics,
7085       "required_nodes": self.required_nodes,
7086       }
7087     data["request"] = request
7088
7089   def _AddRelocateInstance(self):
7090     """Add relocate instance data to allocator structure.
7091
7092     This in combination with _IAllocatorGetClusterData will create the
7093     correct structure needed as input for the allocator.
7094
7095     The checks for the completeness of the opcode must have already been
7096     done.
7097
7098     """
7099     instance = self.lu.cfg.GetInstanceInfo(self.name)
7100     if instance is None:
7101       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7102                                    " IAllocator" % self.name)
7103
7104     if instance.disk_template not in constants.DTS_NET_MIRROR:
7105       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7106
7107     if len(instance.secondary_nodes) != 1:
7108       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7109
7110     self.required_nodes = 1
7111     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7112     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7113
7114     request = {
7115       "type": "relocate",
7116       "name": self.name,
7117       "disk_space_total": disk_space,
7118       "required_nodes": self.required_nodes,
7119       "relocate_from": self.relocate_from,
7120       }
7121     self.in_data["request"] = request
7122
7123   def _BuildInputData(self):
7124     """Build input data structures.
7125
7126     """
7127     self._ComputeClusterData()
7128
7129     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7130       self._AddNewInstance()
7131     else:
7132       self._AddRelocateInstance()
7133
7134     self.in_text = serializer.Dump(self.in_data)
7135
7136   def Run(self, name, validate=True, call_fn=None):
7137     """Run an instance allocator and return the results.
7138
7139     """
7140     if call_fn is None:
7141       call_fn = self.lu.rpc.call_iallocator_runner
7142
7143     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7144     result.Raise("Failure while running the iallocator script")
7145
7146     self.out_text = result.payload
7147     if validate:
7148       self._ValidateResult()
7149
7150   def _ValidateResult(self):
7151     """Process the allocator results.
7152
7153     This will process and if successful save the result in
7154     self.out_data and the other parameters.
7155
7156     """
7157     try:
7158       rdict = serializer.Load(self.out_text)
7159     except Exception, err:
7160       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7161
7162     if not isinstance(rdict, dict):
7163       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7164
7165     for key in "success", "info", "nodes":
7166       if key not in rdict:
7167         raise errors.OpExecError("Can't parse iallocator results:"
7168                                  " missing key '%s'" % key)
7169       setattr(self, key, rdict[key])
7170
7171     if not isinstance(rdict["nodes"], list):
7172       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7173                                " is not a list")
7174     self.out_data = rdict
7175
7176
7177 class LUTestAllocator(NoHooksLU):
7178   """Run allocator tests.
7179
7180   This LU runs the allocator tests
7181
7182   """
7183   _OP_REQP = ["direction", "mode", "name"]
7184
7185   def CheckPrereq(self):
7186     """Check prerequisites.
7187
7188     This checks the opcode parameters depending on the director and mode test.
7189
7190     """
7191     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7192       for attr in ["name", "mem_size", "disks", "disk_template",
7193                    "os", "tags", "nics", "vcpus"]:
7194         if not hasattr(self.op, attr):
7195           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7196                                      attr)
7197       iname = self.cfg.ExpandInstanceName(self.op.name)
7198       if iname is not None:
7199         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7200                                    iname)
7201       if not isinstance(self.op.nics, list):
7202         raise errors.OpPrereqError("Invalid parameter 'nics'")
7203       for row in self.op.nics:
7204         if (not isinstance(row, dict) or
7205             "mac" not in row or
7206             "ip" not in row or
7207             "bridge" not in row):
7208           raise errors.OpPrereqError("Invalid contents of the"
7209                                      " 'nics' parameter")
7210       if not isinstance(self.op.disks, list):
7211         raise errors.OpPrereqError("Invalid parameter 'disks'")
7212       for row in self.op.disks:
7213         if (not isinstance(row, dict) or
7214             "size" not in row or
7215             not isinstance(row["size"], int) or
7216             "mode" not in row or
7217             row["mode"] not in ['r', 'w']):
7218           raise errors.OpPrereqError("Invalid contents of the"
7219                                      " 'disks' parameter")
7220       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7221         self.op.hypervisor = self.cfg.GetHypervisorType()
7222     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7223       if not hasattr(self.op, "name"):
7224         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7225       fname = self.cfg.ExpandInstanceName(self.op.name)
7226       if fname is None:
7227         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7228                                    self.op.name)
7229       self.op.name = fname
7230       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7231     else:
7232       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7233                                  self.op.mode)
7234
7235     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7236       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7237         raise errors.OpPrereqError("Missing allocator name")
7238     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7239       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7240                                  self.op.direction)
7241
7242   def Exec(self, feedback_fn):
7243     """Run the allocator test.
7244
7245     """
7246     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7247       ial = IAllocator(self,
7248                        mode=self.op.mode,
7249                        name=self.op.name,
7250                        mem_size=self.op.mem_size,
7251                        disks=self.op.disks,
7252                        disk_template=self.op.disk_template,
7253                        os=self.op.os,
7254                        tags=self.op.tags,
7255                        nics=self.op.nics,
7256                        vcpus=self.op.vcpus,
7257                        hypervisor=self.op.hypervisor,
7258                        )
7259     else:
7260       ial = IAllocator(self,
7261                        mode=self.op.mode,
7262                        name=self.op.name,
7263                        relocate_from=list(self.relocate_from),
7264                        )
7265
7266     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7267       result = ial.in_text
7268     else:
7269       ial.Run(self.op.allocator, validate=False)
7270       result = ial.out_text
7271     return result