Generate a shared HMAC key at cluster init time
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overridden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     self.LogStep = processor.LogStep
93     # support for dry-run
94     self.dry_run_result = None
95
96     for attr_name in self._OP_REQP:
97       attr_val = getattr(op, attr_name, None)
98       if attr_val is None:
99         raise errors.OpPrereqError("Required parameter '%s' missing" %
100                                    attr_name)
101     self.CheckArguments()
102
103   def __GetSSH(self):
104     """Returns the SshRunner object
105
106     """
107     if not self.__ssh:
108       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109     return self.__ssh
110
111   ssh = property(fget=__GetSSH)
112
113   def CheckArguments(self):
114     """Check syntactic validity for the opcode arguments.
115
116     This method is for doing a simple syntactic check and ensure
117     validity of opcode parameters, without any cluster-related
118     checks. While the same can be accomplished in ExpandNames and/or
119     CheckPrereq, doing these separate is better because:
120
121       - ExpandNames is left as as purely a lock-related function
122       - CheckPrereq is run after we have acquired locks (and possible
123         waited for them)
124
125     The function is allowed to change the self.op attribute so that
126     later methods can no longer worry about missing parameters.
127
128     """
129     pass
130
131   def ExpandNames(self):
132     """Expand names for this LU.
133
134     This method is called before starting to execute the opcode, and it should
135     update all the parameters of the opcode to their canonical form (e.g. a
136     short node name must be fully expanded after this method has successfully
137     completed). This way locking, hooks, logging, ecc. can work correctly.
138
139     LUs which implement this method must also populate the self.needed_locks
140     member, as a dict with lock levels as keys, and a list of needed lock names
141     as values. Rules:
142
143       - use an empty dict if you don't need any lock
144       - if you don't need any lock at a particular level omit that level
145       - don't put anything for the BGL level
146       - if you want all locks at a level use locking.ALL_SET as a value
147
148     If you need to share locks (rather than acquire them exclusively) at one
149     level you can modify self.share_locks, setting a true value (usually 1) for
150     that level. By default locks are not shared.
151
152     Examples::
153
154       # Acquire all nodes and one instance
155       self.needed_locks = {
156         locking.LEVEL_NODE: locking.ALL_SET,
157         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
158       }
159       # Acquire just two nodes
160       self.needed_locks = {
161         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162       }
163       # Acquire no locks
164       self.needed_locks = {} # No, you can't leave it to the default value None
165
166     """
167     # The implementation of this method is mandatory only if the new LU is
168     # concurrent, so that old LUs don't need to be changed all at the same
169     # time.
170     if self.REQ_BGL:
171       self.needed_locks = {} # Exclusive LUs don't need locks.
172     else:
173       raise NotImplementedError
174
175   def DeclareLocks(self, level):
176     """Declare LU locking needs for a level
177
178     While most LUs can just declare their locking needs at ExpandNames time,
179     sometimes there's the need to calculate some locks after having acquired
180     the ones before. This function is called just before acquiring locks at a
181     particular level, but after acquiring the ones at lower levels, and permits
182     such calculations. It can be used to modify self.needed_locks, and by
183     default it does nothing.
184
185     This function is only called if you have something already set in
186     self.needed_locks for the level.
187
188     @param level: Locking level which is going to be locked
189     @type level: member of ganeti.locking.LEVELS
190
191     """
192
193   def CheckPrereq(self):
194     """Check prerequisites for this LU.
195
196     This method should check that the prerequisites for the execution
197     of this LU are fulfilled. It can do internode communication, but
198     it should be idempotent - no cluster or system changes are
199     allowed.
200
201     The method should raise errors.OpPrereqError in case something is
202     not fulfilled. Its return value is ignored.
203
204     This method should also update all the parameters of the opcode to
205     their canonical form if it hasn't been done by ExpandNames before.
206
207     """
208     raise NotImplementedError
209
210   def Exec(self, feedback_fn):
211     """Execute the LU.
212
213     This method should implement the actual work. It should raise
214     errors.OpExecError for failures that are somewhat dealt with in
215     code, or expected.
216
217     """
218     raise NotImplementedError
219
220   def BuildHooksEnv(self):
221     """Build hooks environment for this LU.
222
223     This method should return a three-node tuple consisting of: a dict
224     containing the environment that will be used for running the
225     specific hook for this LU, a list of node names on which the hook
226     should run before the execution, and a list of node names on which
227     the hook should run after the execution.
228
229     The keys of the dict must not have 'GANETI_' prefixed as this will
230     be handled in the hooks runner. Also note additional keys will be
231     added by the hooks runner. If the LU doesn't define any
232     environment, an empty dict (and not None) should be returned.
233
234     No nodes should be returned as an empty list (and not None).
235
236     Note that if the HPATH for a LU class is None, this function will
237     not be called.
238
239     """
240     raise NotImplementedError
241
242   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
243     """Notify the LU about the results of its hooks.
244
245     This method is called every time a hooks phase is executed, and notifies
246     the Logical Unit about the hooks' result. The LU can then use it to alter
247     its result based on the hooks.  By default the method does nothing and the
248     previous result is passed back unchanged but any LU can define it if it
249     wants to use the local cluster hook-scripts somehow.
250
251     @param phase: one of L{constants.HOOKS_PHASE_POST} or
252         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
253     @param hook_results: the results of the multi-node hooks rpc call
254     @param feedback_fn: function used send feedback back to the caller
255     @param lu_result: the previous Exec result this LU had, or None
256         in the PRE phase
257     @return: the new Exec result, based on the previous result
258         and hook results
259
260     """
261     return lu_result
262
263   def _ExpandAndLockInstance(self):
264     """Helper function to expand and lock an instance.
265
266     Many LUs that work on an instance take its name in self.op.instance_name
267     and need to expand it and then declare the expanded name for locking. This
268     function does it, and then updates self.op.instance_name to the expanded
269     name. It also initializes needed_locks as a dict, if this hasn't been done
270     before.
271
272     """
273     if self.needed_locks is None:
274       self.needed_locks = {}
275     else:
276       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
277         "_ExpandAndLockInstance called with instance-level locks set"
278     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
279     if expanded_name is None:
280       raise errors.OpPrereqError("Instance '%s' not known" %
281                                   self.op.instance_name)
282     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
283     self.op.instance_name = expanded_name
284
285   def _LockInstancesNodes(self, primary_only=False):
286     """Helper function to declare instances' nodes for locking.
287
288     This function should be called after locking one or more instances to lock
289     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
290     with all primary or secondary nodes for instances already locked and
291     present in self.needed_locks[locking.LEVEL_INSTANCE].
292
293     It should be called from DeclareLocks, and for safety only works if
294     self.recalculate_locks[locking.LEVEL_NODE] is set.
295
296     In the future it may grow parameters to just lock some instance's nodes, or
297     to just lock primaries or secondary nodes, if needed.
298
299     If should be called in DeclareLocks in a way similar to::
300
301       if level == locking.LEVEL_NODE:
302         self._LockInstancesNodes()
303
304     @type primary_only: boolean
305     @param primary_only: only lock primary nodes of locked instances
306
307     """
308     assert locking.LEVEL_NODE in self.recalculate_locks, \
309       "_LockInstancesNodes helper function called with no nodes to recalculate"
310
311     # TODO: check if we're really been called with the instance locks held
312
313     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
314     # future we might want to have different behaviors depending on the value
315     # of self.recalculate_locks[locking.LEVEL_NODE]
316     wanted_nodes = []
317     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
318       instance = self.context.cfg.GetInstanceInfo(instance_name)
319       wanted_nodes.append(instance.primary_node)
320       if not primary_only:
321         wanted_nodes.extend(instance.secondary_nodes)
322
323     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
324       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
325     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
326       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
327
328     del self.recalculate_locks[locking.LEVEL_NODE]
329
330
331 class NoHooksLU(LogicalUnit):
332   """Simple LU which runs no hooks.
333
334   This LU is intended as a parent for other LogicalUnits which will
335   run no hooks, in order to reduce duplicate code.
336
337   """
338   HPATH = None
339   HTYPE = None
340
341
342 def _GetWantedNodes(lu, nodes):
343   """Returns list of checked and expanded node names.
344
345   @type lu: L{LogicalUnit}
346   @param lu: the logical unit on whose behalf we execute
347   @type nodes: list
348   @param nodes: list of node names or None for all nodes
349   @rtype: list
350   @return: the list of nodes, sorted
351   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352
353   """
354   if not isinstance(nodes, list):
355     raise errors.OpPrereqError("Invalid argument type 'nodes'")
356
357   if not nodes:
358     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
359       " non-empty list of nodes whose name is to be expanded.")
360
361   wanted = []
362   for name in nodes:
363     node = lu.cfg.ExpandNodeName(name)
364     if node is None:
365       raise errors.OpPrereqError("No such node name '%s'" % name)
366     wanted.append(node)
367
368   return utils.NiceSort(wanted)
369
370
371 def _GetWantedInstances(lu, instances):
372   """Returns list of checked and expanded instance names.
373
374   @type lu: L{LogicalUnit}
375   @param lu: the logical unit on whose behalf we execute
376   @type instances: list
377   @param instances: list of instance names or None for all instances
378   @rtype: list
379   @return: the list of instances, sorted
380   @raise errors.OpPrereqError: if the instances parameter is wrong type
381   @raise errors.OpPrereqError: if any of the passed instances is not found
382
383   """
384   if not isinstance(instances, list):
385     raise errors.OpPrereqError("Invalid argument type 'instances'")
386
387   if instances:
388     wanted = []
389
390     for name in instances:
391       instance = lu.cfg.ExpandInstanceName(name)
392       if instance is None:
393         raise errors.OpPrereqError("No such instance name '%s'" % name)
394       wanted.append(instance)
395
396   else:
397     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398   return wanted
399
400
401 def _CheckOutputFields(static, dynamic, selected):
402   """Checks whether all selected fields are valid.
403
404   @type static: L{utils.FieldSet}
405   @param static: static fields set
406   @type dynamic: L{utils.FieldSet}
407   @param dynamic: dynamic fields set
408
409   """
410   f = utils.FieldSet()
411   f.Extend(static)
412   f.Extend(dynamic)
413
414   delta = f.NonMatching(selected)
415   if delta:
416     raise errors.OpPrereqError("Unknown output fields selected: %s"
417                                % ",".join(delta))
418
419
420 def _CheckBooleanOpField(op, name):
421   """Validates boolean opcode parameters.
422
423   This will ensure that an opcode parameter is either a boolean value,
424   or None (but that it always exists).
425
426   """
427   val = getattr(op, name, None)
428   if not (val is None or isinstance(val, bool)):
429     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
430                                (name, str(val)))
431   setattr(op, name, val)
432
433
434 def _CheckNodeOnline(lu, node):
435   """Ensure that a given node is online.
436
437   @param lu: the LU on behalf of which we make the check
438   @param node: the node to check
439   @raise errors.OpPrereqError: if the node is offline
440
441   """
442   if lu.cfg.GetNodeInfo(node).offline:
443     raise errors.OpPrereqError("Can't use offline node %s" % node)
444
445
446 def _CheckNodeNotDrained(lu, node):
447   """Ensure that a given node is not drained.
448
449   @param lu: the LU on behalf of which we make the check
450   @param node: the node to check
451   @raise errors.OpPrereqError: if the node is drained
452
453   """
454   if lu.cfg.GetNodeInfo(node).drained:
455     raise errors.OpPrereqError("Can't use drained node %s" % node)
456
457
458 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
459                           memory, vcpus, nics, disk_template, disks,
460                           bep, hvp, hypervisor_name):
461   """Builds instance related env variables for hooks
462
463   This builds the hook environment from individual variables.
464
465   @type name: string
466   @param name: the name of the instance
467   @type primary_node: string
468   @param primary_node: the name of the instance's primary node
469   @type secondary_nodes: list
470   @param secondary_nodes: list of secondary nodes as strings
471   @type os_type: string
472   @param os_type: the name of the instance's OS
473   @type status: boolean
474   @param status: the should_run status of the instance
475   @type memory: string
476   @param memory: the memory size of the instance
477   @type vcpus: string
478   @param vcpus: the count of VCPUs the instance has
479   @type nics: list
480   @param nics: list of tuples (ip, mac, mode, link) representing
481       the NICs the instance has
482   @type disk_template: string
483   @param disk_template: the disk template of the instance
484   @type disks: list
485   @param disks: the list of (size, mode) pairs
486   @type bep: dict
487   @param bep: the backend parameters for the instance
488   @type hvp: dict
489   @param hvp: the hypervisor parameters for the instance
490   @type hypervisor_name: string
491   @param hypervisor_name: the hypervisor for the instance
492   @rtype: dict
493   @return: the hook environment for this instance
494
495   """
496   if status:
497     str_status = "up"
498   else:
499     str_status = "down"
500   env = {
501     "OP_TARGET": name,
502     "INSTANCE_NAME": name,
503     "INSTANCE_PRIMARY": primary_node,
504     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
505     "INSTANCE_OS_TYPE": os_type,
506     "INSTANCE_STATUS": str_status,
507     "INSTANCE_MEMORY": memory,
508     "INSTANCE_VCPUS": vcpus,
509     "INSTANCE_DISK_TEMPLATE": disk_template,
510     "INSTANCE_HYPERVISOR": hypervisor_name,
511   }
512
513   if nics:
514     nic_count = len(nics)
515     for idx, (ip, mac, mode, link) in enumerate(nics):
516       if ip is None:
517         ip = ""
518       env["INSTANCE_NIC%d_IP" % idx] = ip
519       env["INSTANCE_NIC%d_MAC" % idx] = mac
520       env["INSTANCE_NIC%d_MODE" % idx] = mode
521       env["INSTANCE_NIC%d_LINK" % idx] = link
522       if mode == constants.NIC_MODE_BRIDGED:
523         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
524   else:
525     nic_count = 0
526
527   env["INSTANCE_NIC_COUNT"] = nic_count
528
529   if disks:
530     disk_count = len(disks)
531     for idx, (size, mode) in enumerate(disks):
532       env["INSTANCE_DISK%d_SIZE" % idx] = size
533       env["INSTANCE_DISK%d_MODE" % idx] = mode
534   else:
535     disk_count = 0
536
537   env["INSTANCE_DISK_COUNT"] = disk_count
538
539   for source, kind in [(bep, "BE"), (hvp, "HV")]:
540     for key, value in source.items():
541       env["INSTANCE_%s_%s" % (kind, key)] = value
542
543   return env
544
545 def _NICListToTuple(lu, nics):
546   """Build a list of nic information tuples.
547
548   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
549   value in LUQueryInstanceData.
550
551   @type lu:  L{LogicalUnit}
552   @param lu: the logical unit on whose behalf we execute
553   @type nics: list of L{objects.NIC}
554   @param nics: list of nics to convert to hooks tuples
555
556   """
557   hooks_nics = []
558   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
559   for nic in nics:
560     ip = nic.ip
561     mac = nic.mac
562     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
563     mode = filled_params[constants.NIC_MODE]
564     link = filled_params[constants.NIC_LINK]
565     hooks_nics.append((ip, mac, mode, link))
566   return hooks_nics
567
568 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
569   """Builds instance related env variables for hooks from an object.
570
571   @type lu: L{LogicalUnit}
572   @param lu: the logical unit on whose behalf we execute
573   @type instance: L{objects.Instance}
574   @param instance: the instance for which we should build the
575       environment
576   @type override: dict
577   @param override: dictionary with key/values that will override
578       our values
579   @rtype: dict
580   @return: the hook environment dictionary
581
582   """
583   cluster = lu.cfg.GetClusterInfo()
584   bep = cluster.FillBE(instance)
585   hvp = cluster.FillHV(instance)
586   args = {
587     'name': instance.name,
588     'primary_node': instance.primary_node,
589     'secondary_nodes': instance.secondary_nodes,
590     'os_type': instance.os,
591     'status': instance.admin_up,
592     'memory': bep[constants.BE_MEMORY],
593     'vcpus': bep[constants.BE_VCPUS],
594     'nics': _NICListToTuple(lu, instance.nics),
595     'disk_template': instance.disk_template,
596     'disks': [(disk.size, disk.mode) for disk in instance.disks],
597     'bep': bep,
598     'hvp': hvp,
599     'hypervisor_name': instance.hypervisor,
600   }
601   if override:
602     args.update(override)
603   return _BuildInstanceHookEnv(**args)
604
605
606 def _AdjustCandidatePool(lu):
607   """Adjust the candidate pool after node operations.
608
609   """
610   mod_list = lu.cfg.MaintainCandidatePool()
611   if mod_list:
612     lu.LogInfo("Promoted nodes to master candidate role: %s",
613                ", ".join(node.name for node in mod_list))
614     for name in mod_list:
615       lu.context.ReaddNode(name)
616   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
617   if mc_now > mc_max:
618     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
619                (mc_now, mc_max))
620
621
622 def _CheckNicsBridgesExist(lu, target_nics, target_node,
623                                profile=constants.PP_DEFAULT):
624   """Check that the brigdes needed by a list of nics exist.
625
626   """
627   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
628   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
629                 for nic in target_nics]
630   brlist = [params[constants.NIC_LINK] for params in paramslist
631             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
632   if brlist:
633     result = lu.rpc.call_bridges_exist(target_node, brlist)
634     result.Raise("Error checking bridges on destination node '%s'" %
635                  target_node, prereq=True)
636
637
638 def _CheckInstanceBridgesExist(lu, instance, node=None):
639   """Check that the brigdes needed by an instance exist.
640
641   """
642   if node is None:
643     node = instance.primary_node
644   _CheckNicsBridgesExist(lu, instance.nics, node)
645
646
647 class LUDestroyCluster(NoHooksLU):
648   """Logical unit for destroying the cluster.
649
650   """
651   _OP_REQP = []
652
653   def CheckPrereq(self):
654     """Check prerequisites.
655
656     This checks whether the cluster is empty.
657
658     Any errors are signaled by raising errors.OpPrereqError.
659
660     """
661     master = self.cfg.GetMasterNode()
662
663     nodelist = self.cfg.GetNodeList()
664     if len(nodelist) != 1 or nodelist[0] != master:
665       raise errors.OpPrereqError("There are still %d node(s) in"
666                                  " this cluster." % (len(nodelist) - 1))
667     instancelist = self.cfg.GetInstanceList()
668     if instancelist:
669       raise errors.OpPrereqError("There are still %d instance(s) in"
670                                  " this cluster." % len(instancelist))
671
672   def Exec(self, feedback_fn):
673     """Destroys the cluster.
674
675     """
676     master = self.cfg.GetMasterNode()
677     result = self.rpc.call_node_stop_master(master, False)
678     result.Raise("Could not disable the master role")
679     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
680     utils.CreateBackup(priv_key)
681     utils.CreateBackup(pub_key)
682     return master
683
684
685 class LUVerifyCluster(LogicalUnit):
686   """Verifies the cluster status.
687
688   """
689   HPATH = "cluster-verify"
690   HTYPE = constants.HTYPE_CLUSTER
691   _OP_REQP = ["skip_checks"]
692   REQ_BGL = False
693
694   def ExpandNames(self):
695     self.needed_locks = {
696       locking.LEVEL_NODE: locking.ALL_SET,
697       locking.LEVEL_INSTANCE: locking.ALL_SET,
698     }
699     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
700
701   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
702                   node_result, feedback_fn, master_files,
703                   drbd_map, vg_name):
704     """Run multiple tests against a node.
705
706     Test list:
707
708       - compares ganeti version
709       - checks vg existence and size > 20G
710       - checks config file checksum
711       - checks ssh to other nodes
712
713     @type nodeinfo: L{objects.Node}
714     @param nodeinfo: the node to check
715     @param file_list: required list of files
716     @param local_cksum: dictionary of local files and their checksums
717     @param node_result: the results from the node
718     @param feedback_fn: function used to accumulate results
719     @param master_files: list of files that only masters should have
720     @param drbd_map: the useddrbd minors for this node, in
721         form of minor: (instance, must_exist) which correspond to instances
722         and their running status
723     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
724
725     """
726     node = nodeinfo.name
727
728     # main result, node_result should be a non-empty dict
729     if not node_result or not isinstance(node_result, dict):
730       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
731       return True
732
733     # compares ganeti version
734     local_version = constants.PROTOCOL_VERSION
735     remote_version = node_result.get('version', None)
736     if not (remote_version and isinstance(remote_version, (list, tuple)) and
737             len(remote_version) == 2):
738       feedback_fn("  - ERROR: connection to %s failed" % (node))
739       return True
740
741     if local_version != remote_version[0]:
742       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
743                   " node %s %s" % (local_version, node, remote_version[0]))
744       return True
745
746     # node seems compatible, we can actually try to look into its results
747
748     bad = False
749
750     # full package version
751     if constants.RELEASE_VERSION != remote_version[1]:
752       feedback_fn("  - WARNING: software version mismatch: master %s,"
753                   " node %s %s" %
754                   (constants.RELEASE_VERSION, node, remote_version[1]))
755
756     # checks vg existence and size > 20G
757     if vg_name is not None:
758       vglist = node_result.get(constants.NV_VGLIST, None)
759       if not vglist:
760         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
761                         (node,))
762         bad = True
763       else:
764         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
765                                               constants.MIN_VG_SIZE)
766         if vgstatus:
767           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
768           bad = True
769
770     # checks config file checksum
771
772     remote_cksum = node_result.get(constants.NV_FILELIST, None)
773     if not isinstance(remote_cksum, dict):
774       bad = True
775       feedback_fn("  - ERROR: node hasn't returned file checksum data")
776     else:
777       for file_name in file_list:
778         node_is_mc = nodeinfo.master_candidate
779         must_have_file = file_name not in master_files
780         if file_name not in remote_cksum:
781           if node_is_mc or must_have_file:
782             bad = True
783             feedback_fn("  - ERROR: file '%s' missing" % file_name)
784         elif remote_cksum[file_name] != local_cksum[file_name]:
785           if node_is_mc or must_have_file:
786             bad = True
787             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
788           else:
789             # not candidate and this is not a must-have file
790             bad = True
791             feedback_fn("  - ERROR: file '%s' should not exist on non master"
792                         " candidates (and the file is outdated)" % file_name)
793         else:
794           # all good, except non-master/non-must have combination
795           if not node_is_mc and not must_have_file:
796             feedback_fn("  - ERROR: file '%s' should not exist on non master"
797                         " candidates" % file_name)
798
799     # checks ssh to any
800
801     if constants.NV_NODELIST not in node_result:
802       bad = True
803       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
804     else:
805       if node_result[constants.NV_NODELIST]:
806         bad = True
807         for node in node_result[constants.NV_NODELIST]:
808           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
809                           (node, node_result[constants.NV_NODELIST][node]))
810
811     if constants.NV_NODENETTEST not in node_result:
812       bad = True
813       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
814     else:
815       if node_result[constants.NV_NODENETTEST]:
816         bad = True
817         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
818         for node in nlist:
819           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
820                           (node, node_result[constants.NV_NODENETTEST][node]))
821
822     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
823     if isinstance(hyp_result, dict):
824       for hv_name, hv_result in hyp_result.iteritems():
825         if hv_result is not None:
826           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
827                       (hv_name, hv_result))
828
829     # check used drbd list
830     if vg_name is not None:
831       used_minors = node_result.get(constants.NV_DRBDLIST, [])
832       if not isinstance(used_minors, (tuple, list)):
833         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
834                     str(used_minors))
835       else:
836         for minor, (iname, must_exist) in drbd_map.items():
837           if minor not in used_minors and must_exist:
838             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
839                         " not active" % (minor, iname))
840             bad = True
841         for minor in used_minors:
842           if minor not in drbd_map:
843             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
844                         minor)
845             bad = True
846
847     return bad
848
849   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
850                       node_instance, feedback_fn, n_offline):
851     """Verify an instance.
852
853     This function checks to see if the required block devices are
854     available on the instance's node.
855
856     """
857     bad = False
858
859     node_current = instanceconfig.primary_node
860
861     node_vol_should = {}
862     instanceconfig.MapLVsByNode(node_vol_should)
863
864     for node in node_vol_should:
865       if node in n_offline:
866         # ignore missing volumes on offline nodes
867         continue
868       for volume in node_vol_should[node]:
869         if node not in node_vol_is or volume not in node_vol_is[node]:
870           feedback_fn("  - ERROR: volume %s missing on node %s" %
871                           (volume, node))
872           bad = True
873
874     if instanceconfig.admin_up:
875       if ((node_current not in node_instance or
876           not instance in node_instance[node_current]) and
877           node_current not in n_offline):
878         feedback_fn("  - ERROR: instance %s not running on node %s" %
879                         (instance, node_current))
880         bad = True
881
882     for node in node_instance:
883       if (not node == node_current):
884         if instance in node_instance[node]:
885           feedback_fn("  - ERROR: instance %s should not run on node %s" %
886                           (instance, node))
887           bad = True
888
889     return bad
890
891   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
892     """Verify if there are any unknown volumes in the cluster.
893
894     The .os, .swap and backup volumes are ignored. All other volumes are
895     reported as unknown.
896
897     """
898     bad = False
899
900     for node in node_vol_is:
901       for volume in node_vol_is[node]:
902         if node not in node_vol_should or volume not in node_vol_should[node]:
903           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
904                       (volume, node))
905           bad = True
906     return bad
907
908   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
909     """Verify the list of running instances.
910
911     This checks what instances are running but unknown to the cluster.
912
913     """
914     bad = False
915     for node in node_instance:
916       for runninginstance in node_instance[node]:
917         if runninginstance not in instancelist:
918           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
919                           (runninginstance, node))
920           bad = True
921     return bad
922
923   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
924     """Verify N+1 Memory Resilience.
925
926     Check that if one single node dies we can still start all the instances it
927     was primary for.
928
929     """
930     bad = False
931
932     for node, nodeinfo in node_info.iteritems():
933       # This code checks that every node which is now listed as secondary has
934       # enough memory to host all instances it is supposed to should a single
935       # other node in the cluster fail.
936       # FIXME: not ready for failover to an arbitrary node
937       # FIXME: does not support file-backed instances
938       # WARNING: we currently take into account down instances as well as up
939       # ones, considering that even if they're down someone might want to start
940       # them even in the event of a node failure.
941       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
942         needed_mem = 0
943         for instance in instances:
944           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
945           if bep[constants.BE_AUTO_BALANCE]:
946             needed_mem += bep[constants.BE_MEMORY]
947         if nodeinfo['mfree'] < needed_mem:
948           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
949                       " failovers should node %s fail" % (node, prinode))
950           bad = True
951     return bad
952
953   def CheckPrereq(self):
954     """Check prerequisites.
955
956     Transform the list of checks we're going to skip into a set and check that
957     all its members are valid.
958
959     """
960     self.skip_set = frozenset(self.op.skip_checks)
961     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
962       raise errors.OpPrereqError("Invalid checks to be skipped specified")
963
964   def BuildHooksEnv(self):
965     """Build hooks env.
966
967     Cluster-Verify hooks just ran in the post phase and their failure makes
968     the output be logged in the verify output and the verification to fail.
969
970     """
971     all_nodes = self.cfg.GetNodeList()
972     env = {
973       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
974       }
975     for node in self.cfg.GetAllNodesInfo().values():
976       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
977
978     return env, [], all_nodes
979
980   def Exec(self, feedback_fn):
981     """Verify integrity of cluster, performing various test on nodes.
982
983     """
984     bad = False
985     feedback_fn("* Verifying global settings")
986     for msg in self.cfg.VerifyConfig():
987       feedback_fn("  - ERROR: %s" % msg)
988
989     vg_name = self.cfg.GetVGName()
990     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
991     nodelist = utils.NiceSort(self.cfg.GetNodeList())
992     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
993     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
994     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
995                         for iname in instancelist)
996     i_non_redundant = [] # Non redundant instances
997     i_non_a_balanced = [] # Non auto-balanced instances
998     n_offline = [] # List of offline nodes
999     n_drained = [] # List of nodes being drained
1000     node_volume = {}
1001     node_instance = {}
1002     node_info = {}
1003     instance_cfg = {}
1004
1005     # FIXME: verify OS list
1006     # do local checksums
1007     master_files = [constants.CLUSTER_CONF_FILE]
1008
1009     file_names = ssconf.SimpleStore().GetFileList()
1010     file_names.append(constants.SSL_CERT_FILE)
1011     file_names.append(constants.RAPI_CERT_FILE)
1012     file_names.extend(master_files)
1013
1014     local_checksums = utils.FingerprintFiles(file_names)
1015
1016     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1017     node_verify_param = {
1018       constants.NV_FILELIST: file_names,
1019       constants.NV_NODELIST: [node.name for node in nodeinfo
1020                               if not node.offline],
1021       constants.NV_HYPERVISOR: hypervisors,
1022       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1023                                   node.secondary_ip) for node in nodeinfo
1024                                  if not node.offline],
1025       constants.NV_INSTANCELIST: hypervisors,
1026       constants.NV_VERSION: None,
1027       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1028       }
1029     if vg_name is not None:
1030       node_verify_param[constants.NV_VGLIST] = None
1031       node_verify_param[constants.NV_LVLIST] = vg_name
1032       node_verify_param[constants.NV_DRBDLIST] = None
1033     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1034                                            self.cfg.GetClusterName())
1035
1036     cluster = self.cfg.GetClusterInfo()
1037     master_node = self.cfg.GetMasterNode()
1038     all_drbd_map = self.cfg.ComputeDRBDMap()
1039
1040     for node_i in nodeinfo:
1041       node = node_i.name
1042
1043       if node_i.offline:
1044         feedback_fn("* Skipping offline node %s" % (node,))
1045         n_offline.append(node)
1046         continue
1047
1048       if node == master_node:
1049         ntype = "master"
1050       elif node_i.master_candidate:
1051         ntype = "master candidate"
1052       elif node_i.drained:
1053         ntype = "drained"
1054         n_drained.append(node)
1055       else:
1056         ntype = "regular"
1057       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1058
1059       msg = all_nvinfo[node].fail_msg
1060       if msg:
1061         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1062         bad = True
1063         continue
1064
1065       nresult = all_nvinfo[node].payload
1066       node_drbd = {}
1067       for minor, instance in all_drbd_map[node].items():
1068         if instance not in instanceinfo:
1069           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1070                       instance)
1071           # ghost instance should not be running, but otherwise we
1072           # don't give double warnings (both ghost instance and
1073           # unallocated minor in use)
1074           node_drbd[minor] = (instance, False)
1075         else:
1076           instance = instanceinfo[instance]
1077           node_drbd[minor] = (instance.name, instance.admin_up)
1078       result = self._VerifyNode(node_i, file_names, local_checksums,
1079                                 nresult, feedback_fn, master_files,
1080                                 node_drbd, vg_name)
1081       bad = bad or result
1082
1083       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1084       if vg_name is None:
1085         node_volume[node] = {}
1086       elif isinstance(lvdata, basestring):
1087         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1088                     (node, utils.SafeEncode(lvdata)))
1089         bad = True
1090         node_volume[node] = {}
1091       elif not isinstance(lvdata, dict):
1092         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1093         bad = True
1094         continue
1095       else:
1096         node_volume[node] = lvdata
1097
1098       # node_instance
1099       idata = nresult.get(constants.NV_INSTANCELIST, None)
1100       if not isinstance(idata, list):
1101         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1102                     (node,))
1103         bad = True
1104         continue
1105
1106       node_instance[node] = idata
1107
1108       # node_info
1109       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1110       if not isinstance(nodeinfo, dict):
1111         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1112         bad = True
1113         continue
1114
1115       try:
1116         node_info[node] = {
1117           "mfree": int(nodeinfo['memory_free']),
1118           "pinst": [],
1119           "sinst": [],
1120           # dictionary holding all instances this node is secondary for,
1121           # grouped by their primary node. Each key is a cluster node, and each
1122           # value is a list of instances which have the key as primary and the
1123           # current node as secondary.  this is handy to calculate N+1 memory
1124           # availability if you can only failover from a primary to its
1125           # secondary.
1126           "sinst-by-pnode": {},
1127         }
1128         # FIXME: devise a free space model for file based instances as well
1129         if vg_name is not None:
1130           if (constants.NV_VGLIST not in nresult or
1131               vg_name not in nresult[constants.NV_VGLIST]):
1132             feedback_fn("  - ERROR: node %s didn't return data for the"
1133                         " volume group '%s' - it is either missing or broken" %
1134                         (node, vg_name))
1135             bad = True
1136             continue
1137           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1138       except (ValueError, KeyError):
1139         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1140                     " from node %s" % (node,))
1141         bad = True
1142         continue
1143
1144     node_vol_should = {}
1145
1146     for instance in instancelist:
1147       feedback_fn("* Verifying instance %s" % instance)
1148       inst_config = instanceinfo[instance]
1149       result =  self._VerifyInstance(instance, inst_config, node_volume,
1150                                      node_instance, feedback_fn, n_offline)
1151       bad = bad or result
1152       inst_nodes_offline = []
1153
1154       inst_config.MapLVsByNode(node_vol_should)
1155
1156       instance_cfg[instance] = inst_config
1157
1158       pnode = inst_config.primary_node
1159       if pnode in node_info:
1160         node_info[pnode]['pinst'].append(instance)
1161       elif pnode not in n_offline:
1162         feedback_fn("  - ERROR: instance %s, connection to primary node"
1163                     " %s failed" % (instance, pnode))
1164         bad = True
1165
1166       if pnode in n_offline:
1167         inst_nodes_offline.append(pnode)
1168
1169       # If the instance is non-redundant we cannot survive losing its primary
1170       # node, so we are not N+1 compliant. On the other hand we have no disk
1171       # templates with more than one secondary so that situation is not well
1172       # supported either.
1173       # FIXME: does not support file-backed instances
1174       if len(inst_config.secondary_nodes) == 0:
1175         i_non_redundant.append(instance)
1176       elif len(inst_config.secondary_nodes) > 1:
1177         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1178                     % instance)
1179
1180       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1181         i_non_a_balanced.append(instance)
1182
1183       for snode in inst_config.secondary_nodes:
1184         if snode in node_info:
1185           node_info[snode]['sinst'].append(instance)
1186           if pnode not in node_info[snode]['sinst-by-pnode']:
1187             node_info[snode]['sinst-by-pnode'][pnode] = []
1188           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1189         elif snode not in n_offline:
1190           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1191                       " %s failed" % (instance, snode))
1192           bad = True
1193         if snode in n_offline:
1194           inst_nodes_offline.append(snode)
1195
1196       if inst_nodes_offline:
1197         # warn that the instance lives on offline nodes, and set bad=True
1198         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1199                     ", ".join(inst_nodes_offline))
1200         bad = True
1201
1202     feedback_fn("* Verifying orphan volumes")
1203     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1204                                        feedback_fn)
1205     bad = bad or result
1206
1207     feedback_fn("* Verifying remaining instances")
1208     result = self._VerifyOrphanInstances(instancelist, node_instance,
1209                                          feedback_fn)
1210     bad = bad or result
1211
1212     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1213       feedback_fn("* Verifying N+1 Memory redundancy")
1214       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1215       bad = bad or result
1216
1217     feedback_fn("* Other Notes")
1218     if i_non_redundant:
1219       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1220                   % len(i_non_redundant))
1221
1222     if i_non_a_balanced:
1223       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1224                   % len(i_non_a_balanced))
1225
1226     if n_offline:
1227       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1228
1229     if n_drained:
1230       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1231
1232     return not bad
1233
1234   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1235     """Analyze the post-hooks' result
1236
1237     This method analyses the hook result, handles it, and sends some
1238     nicely-formatted feedback back to the user.
1239
1240     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1241         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1242     @param hooks_results: the results of the multi-node hooks rpc call
1243     @param feedback_fn: function used send feedback back to the caller
1244     @param lu_result: previous Exec result
1245     @return: the new Exec result, based on the previous result
1246         and hook results
1247
1248     """
1249     # We only really run POST phase hooks, and are only interested in
1250     # their results
1251     if phase == constants.HOOKS_PHASE_POST:
1252       # Used to change hooks' output to proper indentation
1253       indent_re = re.compile('^', re.M)
1254       feedback_fn("* Hooks Results")
1255       if not hooks_results:
1256         feedback_fn("  - ERROR: general communication failure")
1257         lu_result = 1
1258       else:
1259         for node_name in hooks_results:
1260           show_node_header = True
1261           res = hooks_results[node_name]
1262           msg = res.fail_msg
1263           if msg:
1264             if res.offline:
1265               # no need to warn or set fail return value
1266               continue
1267             feedback_fn("    Communication failure in hooks execution: %s" %
1268                         msg)
1269             lu_result = 1
1270             continue
1271           for script, hkr, output in res.payload:
1272             if hkr == constants.HKR_FAIL:
1273               # The node header is only shown once, if there are
1274               # failing hooks on that node
1275               if show_node_header:
1276                 feedback_fn("  Node %s:" % node_name)
1277                 show_node_header = False
1278               feedback_fn("    ERROR: Script %s failed, output:" % script)
1279               output = indent_re.sub('      ', output)
1280               feedback_fn("%s" % output)
1281               lu_result = 1
1282
1283       return lu_result
1284
1285
1286 class LUVerifyDisks(NoHooksLU):
1287   """Verifies the cluster disks status.
1288
1289   """
1290   _OP_REQP = []
1291   REQ_BGL = False
1292
1293   def ExpandNames(self):
1294     self.needed_locks = {
1295       locking.LEVEL_NODE: locking.ALL_SET,
1296       locking.LEVEL_INSTANCE: locking.ALL_SET,
1297     }
1298     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1299
1300   def CheckPrereq(self):
1301     """Check prerequisites.
1302
1303     This has no prerequisites.
1304
1305     """
1306     pass
1307
1308   def Exec(self, feedback_fn):
1309     """Verify integrity of cluster disks.
1310
1311     @rtype: tuple of three items
1312     @return: a tuple of (dict of node-to-node_error, list of instances
1313         which need activate-disks, dict of instance: (node, volume) for
1314         missing volumes
1315
1316     """
1317     result = res_nodes, res_instances, res_missing = {}, [], {}
1318
1319     vg_name = self.cfg.GetVGName()
1320     nodes = utils.NiceSort(self.cfg.GetNodeList())
1321     instances = [self.cfg.GetInstanceInfo(name)
1322                  for name in self.cfg.GetInstanceList()]
1323
1324     nv_dict = {}
1325     for inst in instances:
1326       inst_lvs = {}
1327       if (not inst.admin_up or
1328           inst.disk_template not in constants.DTS_NET_MIRROR):
1329         continue
1330       inst.MapLVsByNode(inst_lvs)
1331       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1332       for node, vol_list in inst_lvs.iteritems():
1333         for vol in vol_list:
1334           nv_dict[(node, vol)] = inst
1335
1336     if not nv_dict:
1337       return result
1338
1339     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1340
1341     for node in nodes:
1342       # node_volume
1343       node_res = node_lvs[node]
1344       if node_res.offline:
1345         continue
1346       msg = node_res.fail_msg
1347       if msg:
1348         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349         res_nodes[node] = msg
1350         continue
1351
1352       lvs = node_res.payload
1353       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354         inst = nv_dict.pop((node, lv_name), None)
1355         if (not lv_online and inst is not None
1356             and inst.name not in res_instances):
1357           res_instances.append(inst.name)
1358
1359     # any leftover items in nv_dict are missing LVs, let's arrange the
1360     # data better
1361     for key, inst in nv_dict.iteritems():
1362       if inst.name not in res_missing:
1363         res_missing[inst.name] = []
1364       res_missing[inst.name].append(key)
1365
1366     return result
1367
1368
1369 class LURenameCluster(LogicalUnit):
1370   """Rename the cluster.
1371
1372   """
1373   HPATH = "cluster-rename"
1374   HTYPE = constants.HTYPE_CLUSTER
1375   _OP_REQP = ["name"]
1376
1377   def BuildHooksEnv(self):
1378     """Build hooks env.
1379
1380     """
1381     env = {
1382       "OP_TARGET": self.cfg.GetClusterName(),
1383       "NEW_NAME": self.op.name,
1384       }
1385     mn = self.cfg.GetMasterNode()
1386     return env, [mn], [mn]
1387
1388   def CheckPrereq(self):
1389     """Verify that the passed name is a valid one.
1390
1391     """
1392     hostname = utils.HostInfo(self.op.name)
1393
1394     new_name = hostname.name
1395     self.ip = new_ip = hostname.ip
1396     old_name = self.cfg.GetClusterName()
1397     old_ip = self.cfg.GetMasterIP()
1398     if new_name == old_name and new_ip == old_ip:
1399       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400                                  " cluster has changed")
1401     if new_ip != old_ip:
1402       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404                                    " reachable on the network. Aborting." %
1405                                    new_ip)
1406
1407     self.op.name = new_name
1408
1409   def Exec(self, feedback_fn):
1410     """Rename the cluster.
1411
1412     """
1413     clustername = self.op.name
1414     ip = self.ip
1415
1416     # shutdown the master IP
1417     master = self.cfg.GetMasterNode()
1418     result = self.rpc.call_node_stop_master(master, False)
1419     result.Raise("Could not disable the master role")
1420
1421     try:
1422       cluster = self.cfg.GetClusterInfo()
1423       cluster.cluster_name = clustername
1424       cluster.master_ip = ip
1425       self.cfg.Update(cluster)
1426
1427       # update the known hosts file
1428       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429       node_list = self.cfg.GetNodeList()
1430       try:
1431         node_list.remove(master)
1432       except ValueError:
1433         pass
1434       result = self.rpc.call_upload_file(node_list,
1435                                          constants.SSH_KNOWN_HOSTS_FILE)
1436       for to_node, to_result in result.iteritems():
1437         msg = to_result.fail_msg
1438         if msg:
1439           msg = ("Copy of file %s to node %s failed: %s" %
1440                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441           self.proc.LogWarning(msg)
1442
1443     finally:
1444       result = self.rpc.call_node_start_master(master, False, False)
1445       msg = result.fail_msg
1446       if msg:
1447         self.LogWarning("Could not re-enable the master role on"
1448                         " the master, please restart manually: %s", msg)
1449
1450
1451 def _RecursiveCheckIfLVMBased(disk):
1452   """Check if the given disk or its children are lvm-based.
1453
1454   @type disk: L{objects.Disk}
1455   @param disk: the disk to check
1456   @rtype: boolean
1457   @return: boolean indicating whether a LD_LV dev_type was found or not
1458
1459   """
1460   if disk.children:
1461     for chdisk in disk.children:
1462       if _RecursiveCheckIfLVMBased(chdisk):
1463         return True
1464   return disk.dev_type == constants.LD_LV
1465
1466
1467 class LUSetClusterParams(LogicalUnit):
1468   """Change the parameters of the cluster.
1469
1470   """
1471   HPATH = "cluster-modify"
1472   HTYPE = constants.HTYPE_CLUSTER
1473   _OP_REQP = []
1474   REQ_BGL = False
1475
1476   def CheckArguments(self):
1477     """Check parameters
1478
1479     """
1480     if not hasattr(self.op, "candidate_pool_size"):
1481       self.op.candidate_pool_size = None
1482     if self.op.candidate_pool_size is not None:
1483       try:
1484         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485       except (ValueError, TypeError), err:
1486         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1487                                    str(err))
1488       if self.op.candidate_pool_size < 1:
1489         raise errors.OpPrereqError("At least one master candidate needed")
1490
1491   def ExpandNames(self):
1492     # FIXME: in the future maybe other cluster params won't require checking on
1493     # all nodes to be modified.
1494     self.needed_locks = {
1495       locking.LEVEL_NODE: locking.ALL_SET,
1496     }
1497     self.share_locks[locking.LEVEL_NODE] = 1
1498
1499   def BuildHooksEnv(self):
1500     """Build hooks env.
1501
1502     """
1503     env = {
1504       "OP_TARGET": self.cfg.GetClusterName(),
1505       "NEW_VG_NAME": self.op.vg_name,
1506       }
1507     mn = self.cfg.GetMasterNode()
1508     return env, [mn], [mn]
1509
1510   def CheckPrereq(self):
1511     """Check prerequisites.
1512
1513     This checks whether the given params don't conflict and
1514     if the given volume group is valid.
1515
1516     """
1517     if self.op.vg_name is not None and not self.op.vg_name:
1518       instances = self.cfg.GetAllInstancesInfo().values()
1519       for inst in instances:
1520         for disk in inst.disks:
1521           if _RecursiveCheckIfLVMBased(disk):
1522             raise errors.OpPrereqError("Cannot disable lvm storage while"
1523                                        " lvm-based instances exist")
1524
1525     node_list = self.acquired_locks[locking.LEVEL_NODE]
1526
1527     # if vg_name not None, checks given volume group on all nodes
1528     if self.op.vg_name:
1529       vglist = self.rpc.call_vg_list(node_list)
1530       for node in node_list:
1531         msg = vglist[node].fail_msg
1532         if msg:
1533           # ignoring down node
1534           self.LogWarning("Error while gathering data on node %s"
1535                           " (ignoring node): %s", node, msg)
1536           continue
1537         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1538                                               self.op.vg_name,
1539                                               constants.MIN_VG_SIZE)
1540         if vgstatus:
1541           raise errors.OpPrereqError("Error on node '%s': %s" %
1542                                      (node, vgstatus))
1543
1544     self.cluster = cluster = self.cfg.GetClusterInfo()
1545     # validate params changes
1546     if self.op.beparams:
1547       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548       self.new_beparams = objects.FillDict(
1549         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1550
1551     if self.op.nicparams:
1552       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553       self.new_nicparams = objects.FillDict(
1554         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1556
1557     # hypervisor list/parameters
1558     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559     if self.op.hvparams:
1560       if not isinstance(self.op.hvparams, dict):
1561         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562       for hv_name, hv_dict in self.op.hvparams.items():
1563         if hv_name not in self.new_hvparams:
1564           self.new_hvparams[hv_name] = hv_dict
1565         else:
1566           self.new_hvparams[hv_name].update(hv_dict)
1567
1568     if self.op.enabled_hypervisors is not None:
1569       self.hv_list = self.op.enabled_hypervisors
1570       if not self.hv_list:
1571         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1572                                    " least one member")
1573       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1574       if invalid_hvs:
1575         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1576                                    " entries: %s" % invalid_hvs)
1577     else:
1578       self.hv_list = cluster.enabled_hypervisors
1579
1580     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1581       # either the enabled list has changed, or the parameters have, validate
1582       for hv_name, hv_params in self.new_hvparams.items():
1583         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1584             (self.op.enabled_hypervisors and
1585              hv_name in self.op.enabled_hypervisors)):
1586           # either this is a new hypervisor, or its parameters have changed
1587           hv_class = hypervisor.GetHypervisor(hv_name)
1588           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1589           hv_class.CheckParameterSyntax(hv_params)
1590           _CheckHVParams(self, node_list, hv_name, hv_params)
1591
1592   def Exec(self, feedback_fn):
1593     """Change the parameters of the cluster.
1594
1595     """
1596     if self.op.vg_name is not None:
1597       new_volume = self.op.vg_name
1598       if not new_volume:
1599         new_volume = None
1600       if new_volume != self.cfg.GetVGName():
1601         self.cfg.SetVGName(new_volume)
1602       else:
1603         feedback_fn("Cluster LVM configuration already in desired"
1604                     " state, not changing")
1605     if self.op.hvparams:
1606       self.cluster.hvparams = self.new_hvparams
1607     if self.op.enabled_hypervisors is not None:
1608       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1609     if self.op.beparams:
1610       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1611     if self.op.nicparams:
1612       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1613
1614     if self.op.candidate_pool_size is not None:
1615       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1616       # we need to update the pool size here, otherwise the save will fail
1617       _AdjustCandidatePool(self)
1618
1619     self.cfg.Update(self.cluster)
1620
1621
1622 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1623   """Distribute additional files which are part of the cluster configuration.
1624
1625   ConfigWriter takes care of distributing the config and ssconf files, but
1626   there are more files which should be distributed to all nodes. This function
1627   makes sure those are copied.
1628
1629   @param lu: calling logical unit
1630   @param additional_nodes: list of nodes not in the config to distribute to
1631
1632   """
1633   # 1. Gather target nodes
1634   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1635   dist_nodes = lu.cfg.GetNodeList()
1636   if additional_nodes is not None:
1637     dist_nodes.extend(additional_nodes)
1638   if myself.name in dist_nodes:
1639     dist_nodes.remove(myself.name)
1640   # 2. Gather files to distribute
1641   dist_files = set([constants.ETC_HOSTS,
1642                     constants.SSH_KNOWN_HOSTS_FILE,
1643                     constants.RAPI_CERT_FILE,
1644                     constants.RAPI_USERS_FILE,
1645                     constants.HMAC_CLUSTER_KEY,
1646                    ])
1647
1648   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1649   for hv_name in enabled_hypervisors:
1650     hv_class = hypervisor.GetHypervisor(hv_name)
1651     dist_files.update(hv_class.GetAncillaryFiles())
1652
1653   # 3. Perform the files upload
1654   for fname in dist_files:
1655     if os.path.exists(fname):
1656       result = lu.rpc.call_upload_file(dist_nodes, fname)
1657       for to_node, to_result in result.items():
1658         msg = to_result.fail_msg
1659         if msg:
1660           msg = ("Copy of file %s to node %s failed: %s" %
1661                  (fname, to_node, msg))
1662           lu.proc.LogWarning(msg)
1663
1664
1665 class LURedistributeConfig(NoHooksLU):
1666   """Force the redistribution of cluster configuration.
1667
1668   This is a very simple LU.
1669
1670   """
1671   _OP_REQP = []
1672   REQ_BGL = False
1673
1674   def ExpandNames(self):
1675     self.needed_locks = {
1676       locking.LEVEL_NODE: locking.ALL_SET,
1677     }
1678     self.share_locks[locking.LEVEL_NODE] = 1
1679
1680   def CheckPrereq(self):
1681     """Check prerequisites.
1682
1683     """
1684
1685   def Exec(self, feedback_fn):
1686     """Redistribute the configuration.
1687
1688     """
1689     self.cfg.Update(self.cfg.GetClusterInfo())
1690     _RedistributeAncillaryFiles(self)
1691
1692
1693 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1694   """Sleep and poll for an instance's disk to sync.
1695
1696   """
1697   if not instance.disks:
1698     return True
1699
1700   if not oneshot:
1701     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1702
1703   node = instance.primary_node
1704
1705   for dev in instance.disks:
1706     lu.cfg.SetDiskID(dev, node)
1707
1708   retries = 0
1709   degr_retries = 10 # in seconds, as we sleep 1 second each time
1710   while True:
1711     max_time = 0
1712     done = True
1713     cumul_degraded = False
1714     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1715     msg = rstats.fail_msg
1716     if msg:
1717       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1718       retries += 1
1719       if retries >= 10:
1720         raise errors.RemoteError("Can't contact node %s for mirror data,"
1721                                  " aborting." % node)
1722       time.sleep(6)
1723       continue
1724     rstats = rstats.payload
1725     retries = 0
1726     for i, mstat in enumerate(rstats):
1727       if mstat is None:
1728         lu.LogWarning("Can't compute data for node %s/%s",
1729                            node, instance.disks[i].iv_name)
1730         continue
1731       # we ignore the ldisk parameter
1732       perc_done, est_time, is_degraded, _ = mstat
1733       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1734       if perc_done is not None:
1735         done = False
1736         if est_time is not None:
1737           rem_time = "%d estimated seconds remaining" % est_time
1738           max_time = est_time
1739         else:
1740           rem_time = "no time estimate"
1741         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1742                         (instance.disks[i].iv_name, perc_done, rem_time))
1743
1744     # if we're done but degraded, let's do a few small retries, to
1745     # make sure we see a stable and not transient situation; therefore
1746     # we force restart of the loop
1747     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1748       logging.info("Degraded disks found, %d retries left", degr_retries)
1749       degr_retries -= 1
1750       time.sleep(1)
1751       continue
1752
1753     if done or oneshot:
1754       break
1755
1756     time.sleep(min(60, max_time))
1757
1758   if done:
1759     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1760   return not cumul_degraded
1761
1762
1763 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1764   """Check that mirrors are not degraded.
1765
1766   The ldisk parameter, if True, will change the test from the
1767   is_degraded attribute (which represents overall non-ok status for
1768   the device(s)) to the ldisk (representing the local storage status).
1769
1770   """
1771   lu.cfg.SetDiskID(dev, node)
1772   if ldisk:
1773     idx = 6
1774   else:
1775     idx = 5
1776
1777   result = True
1778   if on_primary or dev.AssembleOnSecondary():
1779     rstats = lu.rpc.call_blockdev_find(node, dev)
1780     msg = rstats.fail_msg
1781     if msg:
1782       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1783       result = False
1784     elif not rstats.payload:
1785       lu.LogWarning("Can't find disk on node %s", node)
1786       result = False
1787     else:
1788       result = result and (not rstats.payload[idx])
1789   if dev.children:
1790     for child in dev.children:
1791       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1792
1793   return result
1794
1795
1796 class LUDiagnoseOS(NoHooksLU):
1797   """Logical unit for OS diagnose/query.
1798
1799   """
1800   _OP_REQP = ["output_fields", "names"]
1801   REQ_BGL = False
1802   _FIELDS_STATIC = utils.FieldSet()
1803   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1804
1805   def ExpandNames(self):
1806     if self.op.names:
1807       raise errors.OpPrereqError("Selective OS query not supported")
1808
1809     _CheckOutputFields(static=self._FIELDS_STATIC,
1810                        dynamic=self._FIELDS_DYNAMIC,
1811                        selected=self.op.output_fields)
1812
1813     # Lock all nodes, in shared mode
1814     # Temporary removal of locks, should be reverted later
1815     # TODO: reintroduce locks when they are lighter-weight
1816     self.needed_locks = {}
1817     #self.share_locks[locking.LEVEL_NODE] = 1
1818     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1819
1820   def CheckPrereq(self):
1821     """Check prerequisites.
1822
1823     """
1824
1825   @staticmethod
1826   def _DiagnoseByOS(node_list, rlist):
1827     """Remaps a per-node return list into an a per-os per-node dictionary
1828
1829     @param node_list: a list with the names of all nodes
1830     @param rlist: a map with node names as keys and OS objects as values
1831
1832     @rtype: dict
1833     @return: a dictionary with osnames as keys and as value another map, with
1834         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1835
1836           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1837                                      (/srv/..., False, "invalid api")],
1838                            "node2": [(/srv/..., True, "")]}
1839           }
1840
1841     """
1842     all_os = {}
1843     # we build here the list of nodes that didn't fail the RPC (at RPC
1844     # level), so that nodes with a non-responding node daemon don't
1845     # make all OSes invalid
1846     good_nodes = [node_name for node_name in rlist
1847                   if not rlist[node_name].fail_msg]
1848     for node_name, nr in rlist.items():
1849       if nr.fail_msg or not nr.payload:
1850         continue
1851       for name, path, status, diagnose in nr.payload:
1852         if name not in all_os:
1853           # build a list of nodes for this os containing empty lists
1854           # for each node in node_list
1855           all_os[name] = {}
1856           for nname in good_nodes:
1857             all_os[name][nname] = []
1858         all_os[name][node_name].append((path, status, diagnose))
1859     return all_os
1860
1861   def Exec(self, feedback_fn):
1862     """Compute the list of OSes.
1863
1864     """
1865     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1866     node_data = self.rpc.call_os_diagnose(valid_nodes)
1867     pol = self._DiagnoseByOS(valid_nodes, node_data)
1868     output = []
1869     for os_name, os_data in pol.items():
1870       row = []
1871       for field in self.op.output_fields:
1872         if field == "name":
1873           val = os_name
1874         elif field == "valid":
1875           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1876         elif field == "node_status":
1877           # this is just a copy of the dict
1878           val = {}
1879           for node_name, nos_list in os_data.items():
1880             val[node_name] = nos_list
1881         else:
1882           raise errors.ParameterError(field)
1883         row.append(val)
1884       output.append(row)
1885
1886     return output
1887
1888
1889 class LURemoveNode(LogicalUnit):
1890   """Logical unit for removing a node.
1891
1892   """
1893   HPATH = "node-remove"
1894   HTYPE = constants.HTYPE_NODE
1895   _OP_REQP = ["node_name"]
1896
1897   def BuildHooksEnv(self):
1898     """Build hooks env.
1899
1900     This doesn't run on the target node in the pre phase as a failed
1901     node would then be impossible to remove.
1902
1903     """
1904     env = {
1905       "OP_TARGET": self.op.node_name,
1906       "NODE_NAME": self.op.node_name,
1907       }
1908     all_nodes = self.cfg.GetNodeList()
1909     all_nodes.remove(self.op.node_name)
1910     return env, all_nodes, all_nodes
1911
1912   def CheckPrereq(self):
1913     """Check prerequisites.
1914
1915     This checks:
1916      - the node exists in the configuration
1917      - it does not have primary or secondary instances
1918      - it's not the master
1919
1920     Any errors are signaled by raising errors.OpPrereqError.
1921
1922     """
1923     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1924     if node is None:
1925       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1926
1927     instance_list = self.cfg.GetInstanceList()
1928
1929     masternode = self.cfg.GetMasterNode()
1930     if node.name == masternode:
1931       raise errors.OpPrereqError("Node is the master node,"
1932                                  " you need to failover first.")
1933
1934     for instance_name in instance_list:
1935       instance = self.cfg.GetInstanceInfo(instance_name)
1936       if node.name in instance.all_nodes:
1937         raise errors.OpPrereqError("Instance %s is still running on the node,"
1938                                    " please remove first." % instance_name)
1939     self.op.node_name = node.name
1940     self.node = node
1941
1942   def Exec(self, feedback_fn):
1943     """Removes the node from the cluster.
1944
1945     """
1946     node = self.node
1947     logging.info("Stopping the node daemon and removing configs from node %s",
1948                  node.name)
1949
1950     self.context.RemoveNode(node.name)
1951
1952     result = self.rpc.call_node_leave_cluster(node.name)
1953     msg = result.fail_msg
1954     if msg:
1955       self.LogWarning("Errors encountered on the remote node while leaving"
1956                       " the cluster: %s", msg)
1957
1958     # Promote nodes to master candidate as needed
1959     _AdjustCandidatePool(self)
1960
1961
1962 class LUQueryNodes(NoHooksLU):
1963   """Logical unit for querying nodes.
1964
1965   """
1966   _OP_REQP = ["output_fields", "names", "use_locking"]
1967   REQ_BGL = False
1968   _FIELDS_DYNAMIC = utils.FieldSet(
1969     "dtotal", "dfree",
1970     "mtotal", "mnode", "mfree",
1971     "bootid",
1972     "ctotal", "cnodes", "csockets",
1973     )
1974
1975   _FIELDS_STATIC = utils.FieldSet(
1976     "name", "pinst_cnt", "sinst_cnt",
1977     "pinst_list", "sinst_list",
1978     "pip", "sip", "tags",
1979     "serial_no",
1980     "master_candidate",
1981     "master",
1982     "offline",
1983     "drained",
1984     "role",
1985     )
1986
1987   def ExpandNames(self):
1988     _CheckOutputFields(static=self._FIELDS_STATIC,
1989                        dynamic=self._FIELDS_DYNAMIC,
1990                        selected=self.op.output_fields)
1991
1992     self.needed_locks = {}
1993     self.share_locks[locking.LEVEL_NODE] = 1
1994
1995     if self.op.names:
1996       self.wanted = _GetWantedNodes(self, self.op.names)
1997     else:
1998       self.wanted = locking.ALL_SET
1999
2000     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2001     self.do_locking = self.do_node_query and self.op.use_locking
2002     if self.do_locking:
2003       # if we don't request only static fields, we need to lock the nodes
2004       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2005
2006
2007   def CheckPrereq(self):
2008     """Check prerequisites.
2009
2010     """
2011     # The validation of the node list is done in the _GetWantedNodes,
2012     # if non empty, and if empty, there's no validation to do
2013     pass
2014
2015   def Exec(self, feedback_fn):
2016     """Computes the list of nodes and their attributes.
2017
2018     """
2019     all_info = self.cfg.GetAllNodesInfo()
2020     if self.do_locking:
2021       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2022     elif self.wanted != locking.ALL_SET:
2023       nodenames = self.wanted
2024       missing = set(nodenames).difference(all_info.keys())
2025       if missing:
2026         raise errors.OpExecError(
2027           "Some nodes were removed before retrieving their data: %s" % missing)
2028     else:
2029       nodenames = all_info.keys()
2030
2031     nodenames = utils.NiceSort(nodenames)
2032     nodelist = [all_info[name] for name in nodenames]
2033
2034     # begin data gathering
2035
2036     if self.do_node_query:
2037       live_data = {}
2038       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2039                                           self.cfg.GetHypervisorType())
2040       for name in nodenames:
2041         nodeinfo = node_data[name]
2042         if not nodeinfo.fail_msg and nodeinfo.payload:
2043           nodeinfo = nodeinfo.payload
2044           fn = utils.TryConvert
2045           live_data[name] = {
2046             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2047             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2048             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2049             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2050             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2051             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2052             "bootid": nodeinfo.get('bootid', None),
2053             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2054             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2055             }
2056         else:
2057           live_data[name] = {}
2058     else:
2059       live_data = dict.fromkeys(nodenames, {})
2060
2061     node_to_primary = dict([(name, set()) for name in nodenames])
2062     node_to_secondary = dict([(name, set()) for name in nodenames])
2063
2064     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2065                              "sinst_cnt", "sinst_list"))
2066     if inst_fields & frozenset(self.op.output_fields):
2067       instancelist = self.cfg.GetInstanceList()
2068
2069       for instance_name in instancelist:
2070         inst = self.cfg.GetInstanceInfo(instance_name)
2071         if inst.primary_node in node_to_primary:
2072           node_to_primary[inst.primary_node].add(inst.name)
2073         for secnode in inst.secondary_nodes:
2074           if secnode in node_to_secondary:
2075             node_to_secondary[secnode].add(inst.name)
2076
2077     master_node = self.cfg.GetMasterNode()
2078
2079     # end data gathering
2080
2081     output = []
2082     for node in nodelist:
2083       node_output = []
2084       for field in self.op.output_fields:
2085         if field == "name":
2086           val = node.name
2087         elif field == "pinst_list":
2088           val = list(node_to_primary[node.name])
2089         elif field == "sinst_list":
2090           val = list(node_to_secondary[node.name])
2091         elif field == "pinst_cnt":
2092           val = len(node_to_primary[node.name])
2093         elif field == "sinst_cnt":
2094           val = len(node_to_secondary[node.name])
2095         elif field == "pip":
2096           val = node.primary_ip
2097         elif field == "sip":
2098           val = node.secondary_ip
2099         elif field == "tags":
2100           val = list(node.GetTags())
2101         elif field == "serial_no":
2102           val = node.serial_no
2103         elif field == "master_candidate":
2104           val = node.master_candidate
2105         elif field == "master":
2106           val = node.name == master_node
2107         elif field == "offline":
2108           val = node.offline
2109         elif field == "drained":
2110           val = node.drained
2111         elif self._FIELDS_DYNAMIC.Matches(field):
2112           val = live_data[node.name].get(field, None)
2113         elif field == "role":
2114           if node.name == master_node:
2115             val = "M"
2116           elif node.master_candidate:
2117             val = "C"
2118           elif node.drained:
2119             val = "D"
2120           elif node.offline:
2121             val = "O"
2122           else:
2123             val = "R"
2124         else:
2125           raise errors.ParameterError(field)
2126         node_output.append(val)
2127       output.append(node_output)
2128
2129     return output
2130
2131
2132 class LUQueryNodeVolumes(NoHooksLU):
2133   """Logical unit for getting volumes on node(s).
2134
2135   """
2136   _OP_REQP = ["nodes", "output_fields"]
2137   REQ_BGL = False
2138   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2139   _FIELDS_STATIC = utils.FieldSet("node")
2140
2141   def ExpandNames(self):
2142     _CheckOutputFields(static=self._FIELDS_STATIC,
2143                        dynamic=self._FIELDS_DYNAMIC,
2144                        selected=self.op.output_fields)
2145
2146     self.needed_locks = {}
2147     self.share_locks[locking.LEVEL_NODE] = 1
2148     if not self.op.nodes:
2149       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2150     else:
2151       self.needed_locks[locking.LEVEL_NODE] = \
2152         _GetWantedNodes(self, self.op.nodes)
2153
2154   def CheckPrereq(self):
2155     """Check prerequisites.
2156
2157     This checks that the fields required are valid output fields.
2158
2159     """
2160     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2161
2162   def Exec(self, feedback_fn):
2163     """Computes the list of nodes and their attributes.
2164
2165     """
2166     nodenames = self.nodes
2167     volumes = self.rpc.call_node_volumes(nodenames)
2168
2169     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2170              in self.cfg.GetInstanceList()]
2171
2172     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2173
2174     output = []
2175     for node in nodenames:
2176       nresult = volumes[node]
2177       if nresult.offline:
2178         continue
2179       msg = nresult.fail_msg
2180       if msg:
2181         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2182         continue
2183
2184       node_vols = nresult.payload[:]
2185       node_vols.sort(key=lambda vol: vol['dev'])
2186
2187       for vol in node_vols:
2188         node_output = []
2189         for field in self.op.output_fields:
2190           if field == "node":
2191             val = node
2192           elif field == "phys":
2193             val = vol['dev']
2194           elif field == "vg":
2195             val = vol['vg']
2196           elif field == "name":
2197             val = vol['name']
2198           elif field == "size":
2199             val = int(float(vol['size']))
2200           elif field == "instance":
2201             for inst in ilist:
2202               if node not in lv_by_node[inst]:
2203                 continue
2204               if vol['name'] in lv_by_node[inst][node]:
2205                 val = inst.name
2206                 break
2207             else:
2208               val = '-'
2209           else:
2210             raise errors.ParameterError(field)
2211           node_output.append(str(val))
2212
2213         output.append(node_output)
2214
2215     return output
2216
2217
2218 class LUAddNode(LogicalUnit):
2219   """Logical unit for adding node to the cluster.
2220
2221   """
2222   HPATH = "node-add"
2223   HTYPE = constants.HTYPE_NODE
2224   _OP_REQP = ["node_name"]
2225
2226   def BuildHooksEnv(self):
2227     """Build hooks env.
2228
2229     This will run on all nodes before, and on all nodes + the new node after.
2230
2231     """
2232     env = {
2233       "OP_TARGET": self.op.node_name,
2234       "NODE_NAME": self.op.node_name,
2235       "NODE_PIP": self.op.primary_ip,
2236       "NODE_SIP": self.op.secondary_ip,
2237       }
2238     nodes_0 = self.cfg.GetNodeList()
2239     nodes_1 = nodes_0 + [self.op.node_name, ]
2240     return env, nodes_0, nodes_1
2241
2242   def CheckPrereq(self):
2243     """Check prerequisites.
2244
2245     This checks:
2246      - the new node is not already in the config
2247      - it is resolvable
2248      - its parameters (single/dual homed) matches the cluster
2249
2250     Any errors are signaled by raising errors.OpPrereqError.
2251
2252     """
2253     node_name = self.op.node_name
2254     cfg = self.cfg
2255
2256     dns_data = utils.HostInfo(node_name)
2257
2258     node = dns_data.name
2259     primary_ip = self.op.primary_ip = dns_data.ip
2260     secondary_ip = getattr(self.op, "secondary_ip", None)
2261     if secondary_ip is None:
2262       secondary_ip = primary_ip
2263     if not utils.IsValidIP(secondary_ip):
2264       raise errors.OpPrereqError("Invalid secondary IP given")
2265     self.op.secondary_ip = secondary_ip
2266
2267     node_list = cfg.GetNodeList()
2268     if not self.op.readd and node in node_list:
2269       raise errors.OpPrereqError("Node %s is already in the configuration" %
2270                                  node)
2271     elif self.op.readd and node not in node_list:
2272       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2273
2274     for existing_node_name in node_list:
2275       existing_node = cfg.GetNodeInfo(existing_node_name)
2276
2277       if self.op.readd and node == existing_node_name:
2278         if (existing_node.primary_ip != primary_ip or
2279             existing_node.secondary_ip != secondary_ip):
2280           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2281                                      " address configuration as before")
2282         continue
2283
2284       if (existing_node.primary_ip == primary_ip or
2285           existing_node.secondary_ip == primary_ip or
2286           existing_node.primary_ip == secondary_ip or
2287           existing_node.secondary_ip == secondary_ip):
2288         raise errors.OpPrereqError("New node ip address(es) conflict with"
2289                                    " existing node %s" % existing_node.name)
2290
2291     # check that the type of the node (single versus dual homed) is the
2292     # same as for the master
2293     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2294     master_singlehomed = myself.secondary_ip == myself.primary_ip
2295     newbie_singlehomed = secondary_ip == primary_ip
2296     if master_singlehomed != newbie_singlehomed:
2297       if master_singlehomed:
2298         raise errors.OpPrereqError("The master has no private ip but the"
2299                                    " new node has one")
2300       else:
2301         raise errors.OpPrereqError("The master has a private ip but the"
2302                                    " new node doesn't have one")
2303
2304     # checks reachability
2305     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2306       raise errors.OpPrereqError("Node not reachable by ping")
2307
2308     if not newbie_singlehomed:
2309       # check reachability from my secondary ip to newbie's secondary ip
2310       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2311                            source=myself.secondary_ip):
2312         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2313                                    " based ping to noded port")
2314
2315     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2316     if self.op.readd:
2317       exceptions = [node]
2318     else:
2319       exceptions = []
2320     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2321     # the new node will increase mc_max with one, so:
2322     mc_max = min(mc_max + 1, cp_size)
2323     self.master_candidate = mc_now < mc_max
2324
2325     if self.op.readd:
2326       self.new_node = self.cfg.GetNodeInfo(node)
2327       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2328     else:
2329       self.new_node = objects.Node(name=node,
2330                                    primary_ip=primary_ip,
2331                                    secondary_ip=secondary_ip,
2332                                    master_candidate=self.master_candidate,
2333                                    offline=False, drained=False)
2334
2335   def Exec(self, feedback_fn):
2336     """Adds the new node to the cluster.
2337
2338     """
2339     new_node = self.new_node
2340     node = new_node.name
2341
2342     # for re-adds, reset the offline/drained/master-candidate flags;
2343     # we need to reset here, otherwise offline would prevent RPC calls
2344     # later in the procedure; this also means that if the re-add
2345     # fails, we are left with a non-offlined, broken node
2346     if self.op.readd:
2347       new_node.drained = new_node.offline = False
2348       self.LogInfo("Readding a node, the offline/drained flags were reset")
2349       # if we demote the node, we do cleanup later in the procedure
2350       new_node.master_candidate = self.master_candidate
2351
2352     # notify the user about any possible mc promotion
2353     if new_node.master_candidate:
2354       self.LogInfo("Node will be a master candidate")
2355
2356     # check connectivity
2357     result = self.rpc.call_version([node])[node]
2358     result.Raise("Can't get version information from node %s" % node)
2359     if constants.PROTOCOL_VERSION == result.payload:
2360       logging.info("Communication to node %s fine, sw version %s match",
2361                    node, result.payload)
2362     else:
2363       raise errors.OpExecError("Version mismatch master version %s,"
2364                                " node version %s" %
2365                                (constants.PROTOCOL_VERSION, result.payload))
2366
2367     # setup ssh on node
2368     logging.info("Copy ssh key to node %s", node)
2369     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2370     keyarray = []
2371     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2372                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2373                 priv_key, pub_key]
2374
2375     for i in keyfiles:
2376       f = open(i, 'r')
2377       try:
2378         keyarray.append(f.read())
2379       finally:
2380         f.close()
2381
2382     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2383                                     keyarray[2],
2384                                     keyarray[3], keyarray[4], keyarray[5])
2385     result.Raise("Cannot transfer ssh keys to the new node")
2386
2387     # Add node to our /etc/hosts, and add key to known_hosts
2388     if self.cfg.GetClusterInfo().modify_etc_hosts:
2389       utils.AddHostToEtcHosts(new_node.name)
2390
2391     if new_node.secondary_ip != new_node.primary_ip:
2392       result = self.rpc.call_node_has_ip_address(new_node.name,
2393                                                  new_node.secondary_ip)
2394       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2395                    prereq=True)
2396       if not result.payload:
2397         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2398                                  " you gave (%s). Please fix and re-run this"
2399                                  " command." % new_node.secondary_ip)
2400
2401     node_verify_list = [self.cfg.GetMasterNode()]
2402     node_verify_param = {
2403       'nodelist': [node],
2404       # TODO: do a node-net-test as well?
2405     }
2406
2407     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2408                                        self.cfg.GetClusterName())
2409     for verifier in node_verify_list:
2410       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2411       nl_payload = result[verifier].payload['nodelist']
2412       if nl_payload:
2413         for failed in nl_payload:
2414           feedback_fn("ssh/hostname verification failed %s -> %s" %
2415                       (verifier, nl_payload[failed]))
2416         raise errors.OpExecError("ssh/hostname verification failed.")
2417
2418     if self.op.readd:
2419       _RedistributeAncillaryFiles(self)
2420       self.context.ReaddNode(new_node)
2421       # make sure we redistribute the config
2422       self.cfg.Update(new_node)
2423       # and make sure the new node will not have old files around
2424       if not new_node.master_candidate:
2425         result = self.rpc.call_node_demote_from_mc(new_node.name)
2426         msg = result.RemoteFailMsg()
2427         if msg:
2428           self.LogWarning("Node failed to demote itself from master"
2429                           " candidate status: %s" % msg)
2430     else:
2431       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2432       self.context.AddNode(new_node)
2433
2434
2435 class LUSetNodeParams(LogicalUnit):
2436   """Modifies the parameters of a node.
2437
2438   """
2439   HPATH = "node-modify"
2440   HTYPE = constants.HTYPE_NODE
2441   _OP_REQP = ["node_name"]
2442   REQ_BGL = False
2443
2444   def CheckArguments(self):
2445     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2446     if node_name is None:
2447       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2448     self.op.node_name = node_name
2449     _CheckBooleanOpField(self.op, 'master_candidate')
2450     _CheckBooleanOpField(self.op, 'offline')
2451     _CheckBooleanOpField(self.op, 'drained')
2452     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2453     if all_mods.count(None) == 3:
2454       raise errors.OpPrereqError("Please pass at least one modification")
2455     if all_mods.count(True) > 1:
2456       raise errors.OpPrereqError("Can't set the node into more than one"
2457                                  " state at the same time")
2458
2459   def ExpandNames(self):
2460     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2461
2462   def BuildHooksEnv(self):
2463     """Build hooks env.
2464
2465     This runs on the master node.
2466
2467     """
2468     env = {
2469       "OP_TARGET": self.op.node_name,
2470       "MASTER_CANDIDATE": str(self.op.master_candidate),
2471       "OFFLINE": str(self.op.offline),
2472       "DRAINED": str(self.op.drained),
2473       }
2474     nl = [self.cfg.GetMasterNode(),
2475           self.op.node_name]
2476     return env, nl, nl
2477
2478   def CheckPrereq(self):
2479     """Check prerequisites.
2480
2481     This only checks the instance list against the existing names.
2482
2483     """
2484     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2485
2486     if ((self.op.master_candidate == False or self.op.offline == True or
2487          self.op.drained == True) and node.master_candidate):
2488       # we will demote the node from master_candidate
2489       if self.op.node_name == self.cfg.GetMasterNode():
2490         raise errors.OpPrereqError("The master node has to be a"
2491                                    " master candidate, online and not drained")
2492       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2493       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2494       if num_candidates <= cp_size:
2495         msg = ("Not enough master candidates (desired"
2496                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2497         if self.op.force:
2498           self.LogWarning(msg)
2499         else:
2500           raise errors.OpPrereqError(msg)
2501
2502     if (self.op.master_candidate == True and
2503         ((node.offline and not self.op.offline == False) or
2504          (node.drained and not self.op.drained == False))):
2505       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2506                                  " to master_candidate" % node.name)
2507
2508     return
2509
2510   def Exec(self, feedback_fn):
2511     """Modifies a node.
2512
2513     """
2514     node = self.node
2515
2516     result = []
2517     changed_mc = False
2518
2519     if self.op.offline is not None:
2520       node.offline = self.op.offline
2521       result.append(("offline", str(self.op.offline)))
2522       if self.op.offline == True:
2523         if node.master_candidate:
2524           node.master_candidate = False
2525           changed_mc = True
2526           result.append(("master_candidate", "auto-demotion due to offline"))
2527         if node.drained:
2528           node.drained = False
2529           result.append(("drained", "clear drained status due to offline"))
2530
2531     if self.op.master_candidate is not None:
2532       node.master_candidate = self.op.master_candidate
2533       changed_mc = True
2534       result.append(("master_candidate", str(self.op.master_candidate)))
2535       if self.op.master_candidate == False:
2536         rrc = self.rpc.call_node_demote_from_mc(node.name)
2537         msg = rrc.fail_msg
2538         if msg:
2539           self.LogWarning("Node failed to demote itself: %s" % msg)
2540
2541     if self.op.drained is not None:
2542       node.drained = self.op.drained
2543       result.append(("drained", str(self.op.drained)))
2544       if self.op.drained == True:
2545         if node.master_candidate:
2546           node.master_candidate = False
2547           changed_mc = True
2548           result.append(("master_candidate", "auto-demotion due to drain"))
2549           rrc = self.rpc.call_node_demote_from_mc(node.name)
2550           msg = rrc.RemoteFailMsg()
2551           if msg:
2552             self.LogWarning("Node failed to demote itself: %s" % msg)
2553         if node.offline:
2554           node.offline = False
2555           result.append(("offline", "clear offline status due to drain"))
2556
2557     # this will trigger configuration file update, if needed
2558     self.cfg.Update(node)
2559     # this will trigger job queue propagation or cleanup
2560     if changed_mc:
2561       self.context.ReaddNode(node)
2562
2563     return result
2564
2565
2566 class LUPowercycleNode(NoHooksLU):
2567   """Powercycles a node.
2568
2569   """
2570   _OP_REQP = ["node_name", "force"]
2571   REQ_BGL = False
2572
2573   def CheckArguments(self):
2574     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2575     if node_name is None:
2576       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2577     self.op.node_name = node_name
2578     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2579       raise errors.OpPrereqError("The node is the master and the force"
2580                                  " parameter was not set")
2581
2582   def ExpandNames(self):
2583     """Locking for PowercycleNode.
2584
2585     This is a last-resource option and shouldn't block on other
2586     jobs. Therefore, we grab no locks.
2587
2588     """
2589     self.needed_locks = {}
2590
2591   def CheckPrereq(self):
2592     """Check prerequisites.
2593
2594     This LU has no prereqs.
2595
2596     """
2597     pass
2598
2599   def Exec(self, feedback_fn):
2600     """Reboots a node.
2601
2602     """
2603     result = self.rpc.call_node_powercycle(self.op.node_name,
2604                                            self.cfg.GetHypervisorType())
2605     result.Raise("Failed to schedule the reboot")
2606     return result.payload
2607
2608
2609 class LUQueryClusterInfo(NoHooksLU):
2610   """Query cluster configuration.
2611
2612   """
2613   _OP_REQP = []
2614   REQ_BGL = False
2615
2616   def ExpandNames(self):
2617     self.needed_locks = {}
2618
2619   def CheckPrereq(self):
2620     """No prerequsites needed for this LU.
2621
2622     """
2623     pass
2624
2625   def Exec(self, feedback_fn):
2626     """Return cluster config.
2627
2628     """
2629     cluster = self.cfg.GetClusterInfo()
2630     result = {
2631       "software_version": constants.RELEASE_VERSION,
2632       "protocol_version": constants.PROTOCOL_VERSION,
2633       "config_version": constants.CONFIG_VERSION,
2634       "os_api_version": max(constants.OS_API_VERSIONS),
2635       "export_version": constants.EXPORT_VERSION,
2636       "architecture": (platform.architecture()[0], platform.machine()),
2637       "name": cluster.cluster_name,
2638       "master": cluster.master_node,
2639       "default_hypervisor": cluster.enabled_hypervisors[0],
2640       "enabled_hypervisors": cluster.enabled_hypervisors,
2641       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2642                         for hypervisor_name in cluster.enabled_hypervisors]),
2643       "beparams": cluster.beparams,
2644       "nicparams": cluster.nicparams,
2645       "candidate_pool_size": cluster.candidate_pool_size,
2646       "master_netdev": cluster.master_netdev,
2647       "volume_group_name": cluster.volume_group_name,
2648       "file_storage_dir": cluster.file_storage_dir,
2649       }
2650
2651     return result
2652
2653
2654 class LUQueryConfigValues(NoHooksLU):
2655   """Return configuration values.
2656
2657   """
2658   _OP_REQP = []
2659   REQ_BGL = False
2660   _FIELDS_DYNAMIC = utils.FieldSet()
2661   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2662
2663   def ExpandNames(self):
2664     self.needed_locks = {}
2665
2666     _CheckOutputFields(static=self._FIELDS_STATIC,
2667                        dynamic=self._FIELDS_DYNAMIC,
2668                        selected=self.op.output_fields)
2669
2670   def CheckPrereq(self):
2671     """No prerequisites.
2672
2673     """
2674     pass
2675
2676   def Exec(self, feedback_fn):
2677     """Dump a representation of the cluster config to the standard output.
2678
2679     """
2680     values = []
2681     for field in self.op.output_fields:
2682       if field == "cluster_name":
2683         entry = self.cfg.GetClusterName()
2684       elif field == "master_node":
2685         entry = self.cfg.GetMasterNode()
2686       elif field == "drain_flag":
2687         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2688       else:
2689         raise errors.ParameterError(field)
2690       values.append(entry)
2691     return values
2692
2693
2694 class LUActivateInstanceDisks(NoHooksLU):
2695   """Bring up an instance's disks.
2696
2697   """
2698   _OP_REQP = ["instance_name"]
2699   REQ_BGL = False
2700
2701   def ExpandNames(self):
2702     self._ExpandAndLockInstance()
2703     self.needed_locks[locking.LEVEL_NODE] = []
2704     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705
2706   def DeclareLocks(self, level):
2707     if level == locking.LEVEL_NODE:
2708       self._LockInstancesNodes()
2709
2710   def CheckPrereq(self):
2711     """Check prerequisites.
2712
2713     This checks that the instance is in the cluster.
2714
2715     """
2716     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717     assert self.instance is not None, \
2718       "Cannot retrieve locked instance %s" % self.op.instance_name
2719     _CheckNodeOnline(self, self.instance.primary_node)
2720
2721   def Exec(self, feedback_fn):
2722     """Activate the disks.
2723
2724     """
2725     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2726     if not disks_ok:
2727       raise errors.OpExecError("Cannot activate block devices")
2728
2729     return disks_info
2730
2731
2732 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2733   """Prepare the block devices for an instance.
2734
2735   This sets up the block devices on all nodes.
2736
2737   @type lu: L{LogicalUnit}
2738   @param lu: the logical unit on whose behalf we execute
2739   @type instance: L{objects.Instance}
2740   @param instance: the instance for whose disks we assemble
2741   @type ignore_secondaries: boolean
2742   @param ignore_secondaries: if true, errors on secondary nodes
2743       won't result in an error return from the function
2744   @return: False if the operation failed, otherwise a list of
2745       (host, instance_visible_name, node_visible_name)
2746       with the mapping from node devices to instance devices
2747
2748   """
2749   device_info = []
2750   disks_ok = True
2751   iname = instance.name
2752   # With the two passes mechanism we try to reduce the window of
2753   # opportunity for the race condition of switching DRBD to primary
2754   # before handshaking occured, but we do not eliminate it
2755
2756   # The proper fix would be to wait (with some limits) until the
2757   # connection has been made and drbd transitions from WFConnection
2758   # into any other network-connected state (Connected, SyncTarget,
2759   # SyncSource, etc.)
2760
2761   # 1st pass, assemble on all nodes in secondary mode
2762   for inst_disk in instance.disks:
2763     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2764       lu.cfg.SetDiskID(node_disk, node)
2765       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2766       msg = result.fail_msg
2767       if msg:
2768         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2769                            " (is_primary=False, pass=1): %s",
2770                            inst_disk.iv_name, node, msg)
2771         if not ignore_secondaries:
2772           disks_ok = False
2773
2774   # FIXME: race condition on drbd migration to primary
2775
2776   # 2nd pass, do only the primary node
2777   for inst_disk in instance.disks:
2778     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2779       if node != instance.primary_node:
2780         continue
2781       lu.cfg.SetDiskID(node_disk, node)
2782       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2783       msg = result.fail_msg
2784       if msg:
2785         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2786                            " (is_primary=True, pass=2): %s",
2787                            inst_disk.iv_name, node, msg)
2788         disks_ok = False
2789     device_info.append((instance.primary_node, inst_disk.iv_name,
2790                         result.payload))
2791
2792   # leave the disks configured for the primary node
2793   # this is a workaround that would be fixed better by
2794   # improving the logical/physical id handling
2795   for disk in instance.disks:
2796     lu.cfg.SetDiskID(disk, instance.primary_node)
2797
2798   return disks_ok, device_info
2799
2800
2801 def _StartInstanceDisks(lu, instance, force):
2802   """Start the disks of an instance.
2803
2804   """
2805   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2806                                            ignore_secondaries=force)
2807   if not disks_ok:
2808     _ShutdownInstanceDisks(lu, instance)
2809     if force is not None and not force:
2810       lu.proc.LogWarning("", hint="If the message above refers to a"
2811                          " secondary node,"
2812                          " you can retry the operation using '--force'.")
2813     raise errors.OpExecError("Disk consistency error")
2814
2815
2816 class LUDeactivateInstanceDisks(NoHooksLU):
2817   """Shutdown an instance's disks.
2818
2819   """
2820   _OP_REQP = ["instance_name"]
2821   REQ_BGL = False
2822
2823   def ExpandNames(self):
2824     self._ExpandAndLockInstance()
2825     self.needed_locks[locking.LEVEL_NODE] = []
2826     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2827
2828   def DeclareLocks(self, level):
2829     if level == locking.LEVEL_NODE:
2830       self._LockInstancesNodes()
2831
2832   def CheckPrereq(self):
2833     """Check prerequisites.
2834
2835     This checks that the instance is in the cluster.
2836
2837     """
2838     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2839     assert self.instance is not None, \
2840       "Cannot retrieve locked instance %s" % self.op.instance_name
2841
2842   def Exec(self, feedback_fn):
2843     """Deactivate the disks
2844
2845     """
2846     instance = self.instance
2847     _SafeShutdownInstanceDisks(self, instance)
2848
2849
2850 def _SafeShutdownInstanceDisks(lu, instance):
2851   """Shutdown block devices of an instance.
2852
2853   This function checks if an instance is running, before calling
2854   _ShutdownInstanceDisks.
2855
2856   """
2857   pnode = instance.primary_node
2858   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2859   ins_l.Raise("Can't contact node %s" % pnode)
2860
2861   if instance.name in ins_l.payload:
2862     raise errors.OpExecError("Instance is running, can't shutdown"
2863                              " block devices.")
2864
2865   _ShutdownInstanceDisks(lu, instance)
2866
2867
2868 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2869   """Shutdown block devices of an instance.
2870
2871   This does the shutdown on all nodes of the instance.
2872
2873   If the ignore_primary is false, errors on the primary node are
2874   ignored.
2875
2876   """
2877   all_result = True
2878   for disk in instance.disks:
2879     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2880       lu.cfg.SetDiskID(top_disk, node)
2881       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2882       msg = result.fail_msg
2883       if msg:
2884         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2885                       disk.iv_name, node, msg)
2886         if not ignore_primary or node != instance.primary_node:
2887           all_result = False
2888   return all_result
2889
2890
2891 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2892   """Checks if a node has enough free memory.
2893
2894   This function check if a given node has the needed amount of free
2895   memory. In case the node has less memory or we cannot get the
2896   information from the node, this function raise an OpPrereqError
2897   exception.
2898
2899   @type lu: C{LogicalUnit}
2900   @param lu: a logical unit from which we get configuration data
2901   @type node: C{str}
2902   @param node: the node to check
2903   @type reason: C{str}
2904   @param reason: string to use in the error message
2905   @type requested: C{int}
2906   @param requested: the amount of memory in MiB to check for
2907   @type hypervisor_name: C{str}
2908   @param hypervisor_name: the hypervisor to ask for memory stats
2909   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2910       we cannot check the node
2911
2912   """
2913   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2914   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2915   free_mem = nodeinfo[node].payload.get('memory_free', None)
2916   if not isinstance(free_mem, int):
2917     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2918                                " was '%s'" % (node, free_mem))
2919   if requested > free_mem:
2920     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2921                                " needed %s MiB, available %s MiB" %
2922                                (node, reason, requested, free_mem))
2923
2924
2925 class LUStartupInstance(LogicalUnit):
2926   """Starts an instance.
2927
2928   """
2929   HPATH = "instance-start"
2930   HTYPE = constants.HTYPE_INSTANCE
2931   _OP_REQP = ["instance_name", "force"]
2932   REQ_BGL = False
2933
2934   def ExpandNames(self):
2935     self._ExpandAndLockInstance()
2936
2937   def BuildHooksEnv(self):
2938     """Build hooks env.
2939
2940     This runs on master, primary and secondary nodes of the instance.
2941
2942     """
2943     env = {
2944       "FORCE": self.op.force,
2945       }
2946     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2947     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2948     return env, nl, nl
2949
2950   def CheckPrereq(self):
2951     """Check prerequisites.
2952
2953     This checks that the instance is in the cluster.
2954
2955     """
2956     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2957     assert self.instance is not None, \
2958       "Cannot retrieve locked instance %s" % self.op.instance_name
2959
2960     # extra beparams
2961     self.beparams = getattr(self.op, "beparams", {})
2962     if self.beparams:
2963       if not isinstance(self.beparams, dict):
2964         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2965                                    " dict" % (type(self.beparams), ))
2966       # fill the beparams dict
2967       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2968       self.op.beparams = self.beparams
2969
2970     # extra hvparams
2971     self.hvparams = getattr(self.op, "hvparams", {})
2972     if self.hvparams:
2973       if not isinstance(self.hvparams, dict):
2974         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2975                                    " dict" % (type(self.hvparams), ))
2976
2977       # check hypervisor parameter syntax (locally)
2978       cluster = self.cfg.GetClusterInfo()
2979       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2980       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2981                                     instance.hvparams)
2982       filled_hvp.update(self.hvparams)
2983       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2984       hv_type.CheckParameterSyntax(filled_hvp)
2985       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2986       self.op.hvparams = self.hvparams
2987
2988     _CheckNodeOnline(self, instance.primary_node)
2989
2990     bep = self.cfg.GetClusterInfo().FillBE(instance)
2991     # check bridges existence
2992     _CheckInstanceBridgesExist(self, instance)
2993
2994     remote_info = self.rpc.call_instance_info(instance.primary_node,
2995                                               instance.name,
2996                                               instance.hypervisor)
2997     remote_info.Raise("Error checking node %s" % instance.primary_node,
2998                       prereq=True)
2999     if not remote_info.payload: # not running already
3000       _CheckNodeFreeMemory(self, instance.primary_node,
3001                            "starting instance %s" % instance.name,
3002                            bep[constants.BE_MEMORY], instance.hypervisor)
3003
3004   def Exec(self, feedback_fn):
3005     """Start the instance.
3006
3007     """
3008     instance = self.instance
3009     force = self.op.force
3010
3011     self.cfg.MarkInstanceUp(instance.name)
3012
3013     node_current = instance.primary_node
3014
3015     _StartInstanceDisks(self, instance, force)
3016
3017     result = self.rpc.call_instance_start(node_current, instance,
3018                                           self.hvparams, self.beparams)
3019     msg = result.fail_msg
3020     if msg:
3021       _ShutdownInstanceDisks(self, instance)
3022       raise errors.OpExecError("Could not start instance: %s" % msg)
3023
3024
3025 class LURebootInstance(LogicalUnit):
3026   """Reboot an instance.
3027
3028   """
3029   HPATH = "instance-reboot"
3030   HTYPE = constants.HTYPE_INSTANCE
3031   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3032   REQ_BGL = False
3033
3034   def ExpandNames(self):
3035     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3036                                    constants.INSTANCE_REBOOT_HARD,
3037                                    constants.INSTANCE_REBOOT_FULL]:
3038       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3039                                   (constants.INSTANCE_REBOOT_SOFT,
3040                                    constants.INSTANCE_REBOOT_HARD,
3041                                    constants.INSTANCE_REBOOT_FULL))
3042     self._ExpandAndLockInstance()
3043
3044   def BuildHooksEnv(self):
3045     """Build hooks env.
3046
3047     This runs on master, primary and secondary nodes of the instance.
3048
3049     """
3050     env = {
3051       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3052       "REBOOT_TYPE": self.op.reboot_type,
3053       }
3054     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3055     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3056     return env, nl, nl
3057
3058   def CheckPrereq(self):
3059     """Check prerequisites.
3060
3061     This checks that the instance is in the cluster.
3062
3063     """
3064     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3065     assert self.instance is not None, \
3066       "Cannot retrieve locked instance %s" % self.op.instance_name
3067
3068     _CheckNodeOnline(self, instance.primary_node)
3069
3070     # check bridges existence
3071     _CheckInstanceBridgesExist(self, instance)
3072
3073   def Exec(self, feedback_fn):
3074     """Reboot the instance.
3075
3076     """
3077     instance = self.instance
3078     ignore_secondaries = self.op.ignore_secondaries
3079     reboot_type = self.op.reboot_type
3080
3081     node_current = instance.primary_node
3082
3083     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3084                        constants.INSTANCE_REBOOT_HARD]:
3085       for disk in instance.disks:
3086         self.cfg.SetDiskID(disk, node_current)
3087       result = self.rpc.call_instance_reboot(node_current, instance,
3088                                              reboot_type)
3089       result.Raise("Could not reboot instance")
3090     else:
3091       result = self.rpc.call_instance_shutdown(node_current, instance)
3092       result.Raise("Could not shutdown instance for full reboot")
3093       _ShutdownInstanceDisks(self, instance)
3094       _StartInstanceDisks(self, instance, ignore_secondaries)
3095       result = self.rpc.call_instance_start(node_current, instance, None, None)
3096       msg = result.fail_msg
3097       if msg:
3098         _ShutdownInstanceDisks(self, instance)
3099         raise errors.OpExecError("Could not start instance for"
3100                                  " full reboot: %s" % msg)
3101
3102     self.cfg.MarkInstanceUp(instance.name)
3103
3104
3105 class LUShutdownInstance(LogicalUnit):
3106   """Shutdown an instance.
3107
3108   """
3109   HPATH = "instance-stop"
3110   HTYPE = constants.HTYPE_INSTANCE
3111   _OP_REQP = ["instance_name"]
3112   REQ_BGL = False
3113
3114   def ExpandNames(self):
3115     self._ExpandAndLockInstance()
3116
3117   def BuildHooksEnv(self):
3118     """Build hooks env.
3119
3120     This runs on master, primary and secondary nodes of the instance.
3121
3122     """
3123     env = _BuildInstanceHookEnvByObject(self, self.instance)
3124     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3125     return env, nl, nl
3126
3127   def CheckPrereq(self):
3128     """Check prerequisites.
3129
3130     This checks that the instance is in the cluster.
3131
3132     """
3133     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3134     assert self.instance is not None, \
3135       "Cannot retrieve locked instance %s" % self.op.instance_name
3136     _CheckNodeOnline(self, self.instance.primary_node)
3137
3138   def Exec(self, feedback_fn):
3139     """Shutdown the instance.
3140
3141     """
3142     instance = self.instance
3143     node_current = instance.primary_node
3144     self.cfg.MarkInstanceDown(instance.name)
3145     result = self.rpc.call_instance_shutdown(node_current, instance)
3146     msg = result.fail_msg
3147     if msg:
3148       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3149
3150     _ShutdownInstanceDisks(self, instance)
3151
3152
3153 class LUReinstallInstance(LogicalUnit):
3154   """Reinstall an instance.
3155
3156   """
3157   HPATH = "instance-reinstall"
3158   HTYPE = constants.HTYPE_INSTANCE
3159   _OP_REQP = ["instance_name"]
3160   REQ_BGL = False
3161
3162   def ExpandNames(self):
3163     self._ExpandAndLockInstance()
3164
3165   def BuildHooksEnv(self):
3166     """Build hooks env.
3167
3168     This runs on master, primary and secondary nodes of the instance.
3169
3170     """
3171     env = _BuildInstanceHookEnvByObject(self, self.instance)
3172     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3173     return env, nl, nl
3174
3175   def CheckPrereq(self):
3176     """Check prerequisites.
3177
3178     This checks that the instance is in the cluster and is not running.
3179
3180     """
3181     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3182     assert instance is not None, \
3183       "Cannot retrieve locked instance %s" % self.op.instance_name
3184     _CheckNodeOnline(self, instance.primary_node)
3185
3186     if instance.disk_template == constants.DT_DISKLESS:
3187       raise errors.OpPrereqError("Instance '%s' has no disks" %
3188                                  self.op.instance_name)
3189     if instance.admin_up:
3190       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3191                                  self.op.instance_name)
3192     remote_info = self.rpc.call_instance_info(instance.primary_node,
3193                                               instance.name,
3194                                               instance.hypervisor)
3195     remote_info.Raise("Error checking node %s" % instance.primary_node,
3196                       prereq=True)
3197     if remote_info.payload:
3198       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3199                                  (self.op.instance_name,
3200                                   instance.primary_node))
3201
3202     self.op.os_type = getattr(self.op, "os_type", None)
3203     if self.op.os_type is not None:
3204       # OS verification
3205       pnode = self.cfg.GetNodeInfo(
3206         self.cfg.ExpandNodeName(instance.primary_node))
3207       if pnode is None:
3208         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3209                                    self.op.pnode)
3210       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3211       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3212                    (self.op.os_type, pnode.name), prereq=True)
3213
3214     self.instance = instance
3215
3216   def Exec(self, feedback_fn):
3217     """Reinstall the instance.
3218
3219     """
3220     inst = self.instance
3221
3222     if self.op.os_type is not None:
3223       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3224       inst.os = self.op.os_type
3225       self.cfg.Update(inst)
3226
3227     _StartInstanceDisks(self, inst, None)
3228     try:
3229       feedback_fn("Running the instance OS create scripts...")
3230       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3231       result.Raise("Could not install OS for instance %s on node %s" %
3232                    (inst.name, inst.primary_node))
3233     finally:
3234       _ShutdownInstanceDisks(self, inst)
3235
3236
3237 class LURenameInstance(LogicalUnit):
3238   """Rename an instance.
3239
3240   """
3241   HPATH = "instance-rename"
3242   HTYPE = constants.HTYPE_INSTANCE
3243   _OP_REQP = ["instance_name", "new_name"]
3244
3245   def BuildHooksEnv(self):
3246     """Build hooks env.
3247
3248     This runs on master, primary and secondary nodes of the instance.
3249
3250     """
3251     env = _BuildInstanceHookEnvByObject(self, self.instance)
3252     env["INSTANCE_NEW_NAME"] = self.op.new_name
3253     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3254     return env, nl, nl
3255
3256   def CheckPrereq(self):
3257     """Check prerequisites.
3258
3259     This checks that the instance is in the cluster and is not running.
3260
3261     """
3262     instance = self.cfg.GetInstanceInfo(
3263       self.cfg.ExpandInstanceName(self.op.instance_name))
3264     if instance is None:
3265       raise errors.OpPrereqError("Instance '%s' not known" %
3266                                  self.op.instance_name)
3267     _CheckNodeOnline(self, instance.primary_node)
3268
3269     if instance.admin_up:
3270       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3271                                  self.op.instance_name)
3272     remote_info = self.rpc.call_instance_info(instance.primary_node,
3273                                               instance.name,
3274                                               instance.hypervisor)
3275     remote_info.Raise("Error checking node %s" % instance.primary_node,
3276                       prereq=True)
3277     if remote_info.payload:
3278       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3279                                  (self.op.instance_name,
3280                                   instance.primary_node))
3281     self.instance = instance
3282
3283     # new name verification
3284     name_info = utils.HostInfo(self.op.new_name)
3285
3286     self.op.new_name = new_name = name_info.name
3287     instance_list = self.cfg.GetInstanceList()
3288     if new_name in instance_list:
3289       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3290                                  new_name)
3291
3292     if not getattr(self.op, "ignore_ip", False):
3293       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3294         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3295                                    (name_info.ip, new_name))
3296
3297
3298   def Exec(self, feedback_fn):
3299     """Reinstall the instance.
3300
3301     """
3302     inst = self.instance
3303     old_name = inst.name
3304
3305     if inst.disk_template == constants.DT_FILE:
3306       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3307
3308     self.cfg.RenameInstance(inst.name, self.op.new_name)
3309     # Change the instance lock. This is definitely safe while we hold the BGL
3310     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3311     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3312
3313     # re-read the instance from the configuration after rename
3314     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3315
3316     if inst.disk_template == constants.DT_FILE:
3317       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3318       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3319                                                      old_file_storage_dir,
3320                                                      new_file_storage_dir)
3321       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3322                    " (but the instance has been renamed in Ganeti)" %
3323                    (inst.primary_node, old_file_storage_dir,
3324                     new_file_storage_dir))
3325
3326     _StartInstanceDisks(self, inst, None)
3327     try:
3328       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3329                                                  old_name)
3330       msg = result.fail_msg
3331       if msg:
3332         msg = ("Could not run OS rename script for instance %s on node %s"
3333                " (but the instance has been renamed in Ganeti): %s" %
3334                (inst.name, inst.primary_node, msg))
3335         self.proc.LogWarning(msg)
3336     finally:
3337       _ShutdownInstanceDisks(self, inst)
3338
3339
3340 class LURemoveInstance(LogicalUnit):
3341   """Remove an instance.
3342
3343   """
3344   HPATH = "instance-remove"
3345   HTYPE = constants.HTYPE_INSTANCE
3346   _OP_REQP = ["instance_name", "ignore_failures"]
3347   REQ_BGL = False
3348
3349   def ExpandNames(self):
3350     self._ExpandAndLockInstance()
3351     self.needed_locks[locking.LEVEL_NODE] = []
3352     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3353
3354   def DeclareLocks(self, level):
3355     if level == locking.LEVEL_NODE:
3356       self._LockInstancesNodes()
3357
3358   def BuildHooksEnv(self):
3359     """Build hooks env.
3360
3361     This runs on master, primary and secondary nodes of the instance.
3362
3363     """
3364     env = _BuildInstanceHookEnvByObject(self, self.instance)
3365     nl = [self.cfg.GetMasterNode()]
3366     return env, nl, nl
3367
3368   def CheckPrereq(self):
3369     """Check prerequisites.
3370
3371     This checks that the instance is in the cluster.
3372
3373     """
3374     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3375     assert self.instance is not None, \
3376       "Cannot retrieve locked instance %s" % self.op.instance_name
3377
3378   def Exec(self, feedback_fn):
3379     """Remove the instance.
3380
3381     """
3382     instance = self.instance
3383     logging.info("Shutting down instance %s on node %s",
3384                  instance.name, instance.primary_node)
3385
3386     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3387     msg = result.fail_msg
3388     if msg:
3389       if self.op.ignore_failures:
3390         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3391       else:
3392         raise errors.OpExecError("Could not shutdown instance %s on"
3393                                  " node %s: %s" %
3394                                  (instance.name, instance.primary_node, msg))
3395
3396     logging.info("Removing block devices for instance %s", instance.name)
3397
3398     if not _RemoveDisks(self, instance):
3399       if self.op.ignore_failures:
3400         feedback_fn("Warning: can't remove instance's disks")
3401       else:
3402         raise errors.OpExecError("Can't remove instance's disks")
3403
3404     logging.info("Removing instance %s out of cluster config", instance.name)
3405
3406     self.cfg.RemoveInstance(instance.name)
3407     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3408
3409
3410 class LUQueryInstances(NoHooksLU):
3411   """Logical unit for querying instances.
3412
3413   """
3414   _OP_REQP = ["output_fields", "names", "use_locking"]
3415   REQ_BGL = False
3416   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3417                                     "admin_state",
3418                                     "disk_template", "ip", "mac", "bridge",
3419                                     "nic_mode", "nic_link",
3420                                     "sda_size", "sdb_size", "vcpus", "tags",
3421                                     "network_port", "beparams",
3422                                     r"(disk)\.(size)/([0-9]+)",
3423                                     r"(disk)\.(sizes)", "disk_usage",
3424                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3425                                     r"(nic)\.(bridge)/([0-9]+)",
3426                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3427                                     r"(disk|nic)\.(count)",
3428                                     "serial_no", "hypervisor", "hvparams",] +
3429                                   ["hv/%s" % name
3430                                    for name in constants.HVS_PARAMETERS] +
3431                                   ["be/%s" % name
3432                                    for name in constants.BES_PARAMETERS])
3433   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3434
3435
3436   def ExpandNames(self):
3437     _CheckOutputFields(static=self._FIELDS_STATIC,
3438                        dynamic=self._FIELDS_DYNAMIC,
3439                        selected=self.op.output_fields)
3440
3441     self.needed_locks = {}
3442     self.share_locks[locking.LEVEL_INSTANCE] = 1
3443     self.share_locks[locking.LEVEL_NODE] = 1
3444
3445     if self.op.names:
3446       self.wanted = _GetWantedInstances(self, self.op.names)
3447     else:
3448       self.wanted = locking.ALL_SET
3449
3450     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3451     self.do_locking = self.do_node_query and self.op.use_locking
3452     if self.do_locking:
3453       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3454       self.needed_locks[locking.LEVEL_NODE] = []
3455       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3456
3457   def DeclareLocks(self, level):
3458     if level == locking.LEVEL_NODE and self.do_locking:
3459       self._LockInstancesNodes()
3460
3461   def CheckPrereq(self):
3462     """Check prerequisites.
3463
3464     """
3465     pass
3466
3467   def Exec(self, feedback_fn):
3468     """Computes the list of nodes and their attributes.
3469
3470     """
3471     all_info = self.cfg.GetAllInstancesInfo()
3472     if self.wanted == locking.ALL_SET:
3473       # caller didn't specify instance names, so ordering is not important
3474       if self.do_locking:
3475         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3476       else:
3477         instance_names = all_info.keys()
3478       instance_names = utils.NiceSort(instance_names)
3479     else:
3480       # caller did specify names, so we must keep the ordering
3481       if self.do_locking:
3482         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3483       else:
3484         tgt_set = all_info.keys()
3485       missing = set(self.wanted).difference(tgt_set)
3486       if missing:
3487         raise errors.OpExecError("Some instances were removed before"
3488                                  " retrieving their data: %s" % missing)
3489       instance_names = self.wanted
3490
3491     instance_list = [all_info[iname] for iname in instance_names]
3492
3493     # begin data gathering
3494
3495     nodes = frozenset([inst.primary_node for inst in instance_list])
3496     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3497
3498     bad_nodes = []
3499     off_nodes = []
3500     if self.do_node_query:
3501       live_data = {}
3502       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3503       for name in nodes:
3504         result = node_data[name]
3505         if result.offline:
3506           # offline nodes will be in both lists
3507           off_nodes.append(name)
3508         if result.failed or result.fail_msg:
3509           bad_nodes.append(name)
3510         else:
3511           if result.payload:
3512             live_data.update(result.payload)
3513           # else no instance is alive
3514     else:
3515       live_data = dict([(name, {}) for name in instance_names])
3516
3517     # end data gathering
3518
3519     HVPREFIX = "hv/"
3520     BEPREFIX = "be/"
3521     output = []
3522     cluster = self.cfg.GetClusterInfo()
3523     for instance in instance_list:
3524       iout = []
3525       i_hv = cluster.FillHV(instance)
3526       i_be = cluster.FillBE(instance)
3527       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3528                                  nic.nicparams) for nic in instance.nics]
3529       for field in self.op.output_fields:
3530         st_match = self._FIELDS_STATIC.Matches(field)
3531         if field == "name":
3532           val = instance.name
3533         elif field == "os":
3534           val = instance.os
3535         elif field == "pnode":
3536           val = instance.primary_node
3537         elif field == "snodes":
3538           val = list(instance.secondary_nodes)
3539         elif field == "admin_state":
3540           val = instance.admin_up
3541         elif field == "oper_state":
3542           if instance.primary_node in bad_nodes:
3543             val = None
3544           else:
3545             val = bool(live_data.get(instance.name))
3546         elif field == "status":
3547           if instance.primary_node in off_nodes:
3548             val = "ERROR_nodeoffline"
3549           elif instance.primary_node in bad_nodes:
3550             val = "ERROR_nodedown"
3551           else:
3552             running = bool(live_data.get(instance.name))
3553             if running:
3554               if instance.admin_up:
3555                 val = "running"
3556               else:
3557                 val = "ERROR_up"
3558             else:
3559               if instance.admin_up:
3560                 val = "ERROR_down"
3561               else:
3562                 val = "ADMIN_down"
3563         elif field == "oper_ram":
3564           if instance.primary_node in bad_nodes:
3565             val = None
3566           elif instance.name in live_data:
3567             val = live_data[instance.name].get("memory", "?")
3568           else:
3569             val = "-"
3570         elif field == "vcpus":
3571           val = i_be[constants.BE_VCPUS]
3572         elif field == "disk_template":
3573           val = instance.disk_template
3574         elif field == "ip":
3575           if instance.nics:
3576             val = instance.nics[0].ip
3577           else:
3578             val = None
3579         elif field == "nic_mode":
3580           if instance.nics:
3581             val = i_nicp[0][constants.NIC_MODE]
3582           else:
3583             val = None
3584         elif field == "nic_link":
3585           if instance.nics:
3586             val = i_nicp[0][constants.NIC_LINK]
3587           else:
3588             val = None
3589         elif field == "bridge":
3590           if (instance.nics and
3591               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3592             val = i_nicp[0][constants.NIC_LINK]
3593           else:
3594             val = None
3595         elif field == "mac":
3596           if instance.nics:
3597             val = instance.nics[0].mac
3598           else:
3599             val = None
3600         elif field == "sda_size" or field == "sdb_size":
3601           idx = ord(field[2]) - ord('a')
3602           try:
3603             val = instance.FindDisk(idx).size
3604           except errors.OpPrereqError:
3605             val = None
3606         elif field == "disk_usage": # total disk usage per node
3607           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3608           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3609         elif field == "tags":
3610           val = list(instance.GetTags())
3611         elif field == "serial_no":
3612           val = instance.serial_no
3613         elif field == "network_port":
3614           val = instance.network_port
3615         elif field == "hypervisor":
3616           val = instance.hypervisor
3617         elif field == "hvparams":
3618           val = i_hv
3619         elif (field.startswith(HVPREFIX) and
3620               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3621           val = i_hv.get(field[len(HVPREFIX):], None)
3622         elif field == "beparams":
3623           val = i_be
3624         elif (field.startswith(BEPREFIX) and
3625               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3626           val = i_be.get(field[len(BEPREFIX):], None)
3627         elif st_match and st_match.groups():
3628           # matches a variable list
3629           st_groups = st_match.groups()
3630           if st_groups and st_groups[0] == "disk":
3631             if st_groups[1] == "count":
3632               val = len(instance.disks)
3633             elif st_groups[1] == "sizes":
3634               val = [disk.size for disk in instance.disks]
3635             elif st_groups[1] == "size":
3636               try:
3637                 val = instance.FindDisk(st_groups[2]).size
3638               except errors.OpPrereqError:
3639                 val = None
3640             else:
3641               assert False, "Unhandled disk parameter"
3642           elif st_groups[0] == "nic":
3643             if st_groups[1] == "count":
3644               val = len(instance.nics)
3645             elif st_groups[1] == "macs":
3646               val = [nic.mac for nic in instance.nics]
3647             elif st_groups[1] == "ips":
3648               val = [nic.ip for nic in instance.nics]
3649             elif st_groups[1] == "modes":
3650               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3651             elif st_groups[1] == "links":
3652               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3653             elif st_groups[1] == "bridges":
3654               val = []
3655               for nicp in i_nicp:
3656                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3657                   val.append(nicp[constants.NIC_LINK])
3658                 else:
3659                   val.append(None)
3660             else:
3661               # index-based item
3662               nic_idx = int(st_groups[2])
3663               if nic_idx >= len(instance.nics):
3664                 val = None
3665               else:
3666                 if st_groups[1] == "mac":
3667                   val = instance.nics[nic_idx].mac
3668                 elif st_groups[1] == "ip":
3669                   val = instance.nics[nic_idx].ip
3670                 elif st_groups[1] == "mode":
3671                   val = i_nicp[nic_idx][constants.NIC_MODE]
3672                 elif st_groups[1] == "link":
3673                   val = i_nicp[nic_idx][constants.NIC_LINK]
3674                 elif st_groups[1] == "bridge":
3675                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3676                   if nic_mode == constants.NIC_MODE_BRIDGED:
3677                     val = i_nicp[nic_idx][constants.NIC_LINK]
3678                   else:
3679                     val = None
3680                 else:
3681                   assert False, "Unhandled NIC parameter"
3682           else:
3683             assert False, ("Declared but unhandled variable parameter '%s'" %
3684                            field)
3685         else:
3686           assert False, "Declared but unhandled parameter '%s'" % field
3687         iout.append(val)
3688       output.append(iout)
3689
3690     return output
3691
3692
3693 class LUFailoverInstance(LogicalUnit):
3694   """Failover an instance.
3695
3696   """
3697   HPATH = "instance-failover"
3698   HTYPE = constants.HTYPE_INSTANCE
3699   _OP_REQP = ["instance_name", "ignore_consistency"]
3700   REQ_BGL = False
3701
3702   def ExpandNames(self):
3703     self._ExpandAndLockInstance()
3704     self.needed_locks[locking.LEVEL_NODE] = []
3705     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3706
3707   def DeclareLocks(self, level):
3708     if level == locking.LEVEL_NODE:
3709       self._LockInstancesNodes()
3710
3711   def BuildHooksEnv(self):
3712     """Build hooks env.
3713
3714     This runs on master, primary and secondary nodes of the instance.
3715
3716     """
3717     env = {
3718       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3719       }
3720     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3721     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3722     return env, nl, nl
3723
3724   def CheckPrereq(self):
3725     """Check prerequisites.
3726
3727     This checks that the instance is in the cluster.
3728
3729     """
3730     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3731     assert self.instance is not None, \
3732       "Cannot retrieve locked instance %s" % self.op.instance_name
3733
3734     bep = self.cfg.GetClusterInfo().FillBE(instance)
3735     if instance.disk_template not in constants.DTS_NET_MIRROR:
3736       raise errors.OpPrereqError("Instance's disk layout is not"
3737                                  " network mirrored, cannot failover.")
3738
3739     secondary_nodes = instance.secondary_nodes
3740     if not secondary_nodes:
3741       raise errors.ProgrammerError("no secondary node but using "
3742                                    "a mirrored disk template")
3743
3744     target_node = secondary_nodes[0]
3745     _CheckNodeOnline(self, target_node)
3746     _CheckNodeNotDrained(self, target_node)
3747     if instance.admin_up:
3748       # check memory requirements on the secondary node
3749       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3750                            instance.name, bep[constants.BE_MEMORY],
3751                            instance.hypervisor)
3752     else:
3753       self.LogInfo("Not checking memory on the secondary node as"
3754                    " instance will not be started")
3755
3756     # check bridge existance
3757     _CheckInstanceBridgesExist(self, instance, node=target_node)
3758
3759   def Exec(self, feedback_fn):
3760     """Failover an instance.
3761
3762     The failover is done by shutting it down on its present node and
3763     starting it on the secondary.
3764
3765     """
3766     instance = self.instance
3767
3768     source_node = instance.primary_node
3769     target_node = instance.secondary_nodes[0]
3770
3771     feedback_fn("* checking disk consistency between source and target")
3772     for dev in instance.disks:
3773       # for drbd, these are drbd over lvm
3774       if not _CheckDiskConsistency(self, dev, target_node, False):
3775         if instance.admin_up and not self.op.ignore_consistency:
3776           raise errors.OpExecError("Disk %s is degraded on target node,"
3777                                    " aborting failover." % dev.iv_name)
3778
3779     feedback_fn("* shutting down instance on source node")
3780     logging.info("Shutting down instance %s on node %s",
3781                  instance.name, source_node)
3782
3783     result = self.rpc.call_instance_shutdown(source_node, instance)
3784     msg = result.fail_msg
3785     if msg:
3786       if self.op.ignore_consistency:
3787         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3788                              " Proceeding anyway. Please make sure node"
3789                              " %s is down. Error details: %s",
3790                              instance.name, source_node, source_node, msg)
3791       else:
3792         raise errors.OpExecError("Could not shutdown instance %s on"
3793                                  " node %s: %s" %
3794                                  (instance.name, source_node, msg))
3795
3796     feedback_fn("* deactivating the instance's disks on source node")
3797     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3798       raise errors.OpExecError("Can't shut down the instance's disks.")
3799
3800     instance.primary_node = target_node
3801     # distribute new instance config to the other nodes
3802     self.cfg.Update(instance)
3803
3804     # Only start the instance if it's marked as up
3805     if instance.admin_up:
3806       feedback_fn("* activating the instance's disks on target node")
3807       logging.info("Starting instance %s on node %s",
3808                    instance.name, target_node)
3809
3810       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3811                                                ignore_secondaries=True)
3812       if not disks_ok:
3813         _ShutdownInstanceDisks(self, instance)
3814         raise errors.OpExecError("Can't activate the instance's disks")
3815
3816       feedback_fn("* starting the instance on the target node")
3817       result = self.rpc.call_instance_start(target_node, instance, None, None)
3818       msg = result.fail_msg
3819       if msg:
3820         _ShutdownInstanceDisks(self, instance)
3821         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3822                                  (instance.name, target_node, msg))
3823
3824
3825 class LUMigrateInstance(LogicalUnit):
3826   """Migrate an instance.
3827
3828   This is migration without shutting down, compared to the failover,
3829   which is done with shutdown.
3830
3831   """
3832   HPATH = "instance-migrate"
3833   HTYPE = constants.HTYPE_INSTANCE
3834   _OP_REQP = ["instance_name", "live", "cleanup"]
3835
3836   REQ_BGL = False
3837
3838   def ExpandNames(self):
3839     self._ExpandAndLockInstance()
3840     self.needed_locks[locking.LEVEL_NODE] = []
3841     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3842
3843   def DeclareLocks(self, level):
3844     if level == locking.LEVEL_NODE:
3845       self._LockInstancesNodes()
3846
3847   def BuildHooksEnv(self):
3848     """Build hooks env.
3849
3850     This runs on master, primary and secondary nodes of the instance.
3851
3852     """
3853     env = _BuildInstanceHookEnvByObject(self, self.instance)
3854     env["MIGRATE_LIVE"] = self.op.live
3855     env["MIGRATE_CLEANUP"] = self.op.cleanup
3856     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3857     return env, nl, nl
3858
3859   def CheckPrereq(self):
3860     """Check prerequisites.
3861
3862     This checks that the instance is in the cluster.
3863
3864     """
3865     instance = self.cfg.GetInstanceInfo(
3866       self.cfg.ExpandInstanceName(self.op.instance_name))
3867     if instance is None:
3868       raise errors.OpPrereqError("Instance '%s' not known" %
3869                                  self.op.instance_name)
3870
3871     if instance.disk_template != constants.DT_DRBD8:
3872       raise errors.OpPrereqError("Instance's disk layout is not"
3873                                  " drbd8, cannot migrate.")
3874
3875     secondary_nodes = instance.secondary_nodes
3876     if not secondary_nodes:
3877       raise errors.ConfigurationError("No secondary node but using"
3878                                       " drbd8 disk template")
3879
3880     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3881
3882     target_node = secondary_nodes[0]
3883     # check memory requirements on the secondary node
3884     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3885                          instance.name, i_be[constants.BE_MEMORY],
3886                          instance.hypervisor)
3887
3888     # check bridge existance
3889     _CheckInstanceBridgesExist(self, instance, node=target_node)
3890
3891     if not self.op.cleanup:
3892       _CheckNodeNotDrained(self, target_node)
3893       result = self.rpc.call_instance_migratable(instance.primary_node,
3894                                                  instance)
3895       result.Raise("Can't migrate, please use failover", prereq=True)
3896
3897     self.instance = instance
3898
3899   def _WaitUntilSync(self):
3900     """Poll with custom rpc for disk sync.
3901
3902     This uses our own step-based rpc call.
3903
3904     """
3905     self.feedback_fn("* wait until resync is done")
3906     all_done = False
3907     while not all_done:
3908       all_done = True
3909       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3910                                             self.nodes_ip,
3911                                             self.instance.disks)
3912       min_percent = 100
3913       for node, nres in result.items():
3914         nres.Raise("Cannot resync disks on node %s" % node)
3915         node_done, node_percent = nres.payload
3916         all_done = all_done and node_done
3917         if node_percent is not None:
3918           min_percent = min(min_percent, node_percent)
3919       if not all_done:
3920         if min_percent < 100:
3921           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3922         time.sleep(2)
3923
3924   def _EnsureSecondary(self, node):
3925     """Demote a node to secondary.
3926
3927     """
3928     self.feedback_fn("* switching node %s to secondary mode" % node)
3929
3930     for dev in self.instance.disks:
3931       self.cfg.SetDiskID(dev, node)
3932
3933     result = self.rpc.call_blockdev_close(node, self.instance.name,
3934                                           self.instance.disks)
3935     result.Raise("Cannot change disk to secondary on node %s" % node)
3936
3937   def _GoStandalone(self):
3938     """Disconnect from the network.
3939
3940     """
3941     self.feedback_fn("* changing into standalone mode")
3942     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3943                                                self.instance.disks)
3944     for node, nres in result.items():
3945       nres.Raise("Cannot disconnect disks node %s" % node)
3946
3947   def _GoReconnect(self, multimaster):
3948     """Reconnect to the network.
3949
3950     """
3951     if multimaster:
3952       msg = "dual-master"
3953     else:
3954       msg = "single-master"
3955     self.feedback_fn("* changing disks into %s mode" % msg)
3956     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3957                                            self.instance.disks,
3958                                            self.instance.name, multimaster)
3959     for node, nres in result.items():
3960       nres.Raise("Cannot change disks config on node %s" % node)
3961
3962   def _ExecCleanup(self):
3963     """Try to cleanup after a failed migration.
3964
3965     The cleanup is done by:
3966       - check that the instance is running only on one node
3967         (and update the config if needed)
3968       - change disks on its secondary node to secondary
3969       - wait until disks are fully synchronized
3970       - disconnect from the network
3971       - change disks into single-master mode
3972       - wait again until disks are fully synchronized
3973
3974     """
3975     instance = self.instance
3976     target_node = self.target_node
3977     source_node = self.source_node
3978
3979     # check running on only one node
3980     self.feedback_fn("* checking where the instance actually runs"
3981                      " (if this hangs, the hypervisor might be in"
3982                      " a bad state)")
3983     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3984     for node, result in ins_l.items():
3985       result.Raise("Can't contact node %s" % node)
3986
3987     runningon_source = instance.name in ins_l[source_node].payload
3988     runningon_target = instance.name in ins_l[target_node].payload
3989
3990     if runningon_source and runningon_target:
3991       raise errors.OpExecError("Instance seems to be running on two nodes,"
3992                                " or the hypervisor is confused. You will have"
3993                                " to ensure manually that it runs only on one"
3994                                " and restart this operation.")
3995
3996     if not (runningon_source or runningon_target):
3997       raise errors.OpExecError("Instance does not seem to be running at all."
3998                                " In this case, it's safer to repair by"
3999                                " running 'gnt-instance stop' to ensure disk"
4000                                " shutdown, and then restarting it.")
4001
4002     if runningon_target:
4003       # the migration has actually succeeded, we need to update the config
4004       self.feedback_fn("* instance running on secondary node (%s),"
4005                        " updating config" % target_node)
4006       instance.primary_node = target_node
4007       self.cfg.Update(instance)
4008       demoted_node = source_node
4009     else:
4010       self.feedback_fn("* instance confirmed to be running on its"
4011                        " primary node (%s)" % source_node)
4012       demoted_node = target_node
4013
4014     self._EnsureSecondary(demoted_node)
4015     try:
4016       self._WaitUntilSync()
4017     except errors.OpExecError:
4018       # we ignore here errors, since if the device is standalone, it
4019       # won't be able to sync
4020       pass
4021     self._GoStandalone()
4022     self._GoReconnect(False)
4023     self._WaitUntilSync()
4024
4025     self.feedback_fn("* done")
4026
4027   def _RevertDiskStatus(self):
4028     """Try to revert the disk status after a failed migration.
4029
4030     """
4031     target_node = self.target_node
4032     try:
4033       self._EnsureSecondary(target_node)
4034       self._GoStandalone()
4035       self._GoReconnect(False)
4036       self._WaitUntilSync()
4037     except errors.OpExecError, err:
4038       self.LogWarning("Migration failed and I can't reconnect the"
4039                       " drives: error '%s'\n"
4040                       "Please look and recover the instance status" %
4041                       str(err))
4042
4043   def _AbortMigration(self):
4044     """Call the hypervisor code to abort a started migration.
4045
4046     """
4047     instance = self.instance
4048     target_node = self.target_node
4049     migration_info = self.migration_info
4050
4051     abort_result = self.rpc.call_finalize_migration(target_node,
4052                                                     instance,
4053                                                     migration_info,
4054                                                     False)
4055     abort_msg = abort_result.fail_msg
4056     if abort_msg:
4057       logging.error("Aborting migration failed on target node %s: %s" %
4058                     (target_node, abort_msg))
4059       # Don't raise an exception here, as we stil have to try to revert the
4060       # disk status, even if this step failed.
4061
4062   def _ExecMigration(self):
4063     """Migrate an instance.
4064
4065     The migrate is done by:
4066       - change the disks into dual-master mode
4067       - wait until disks are fully synchronized again
4068       - migrate the instance
4069       - change disks on the new secondary node (the old primary) to secondary
4070       - wait until disks are fully synchronized
4071       - change disks into single-master mode
4072
4073     """
4074     instance = self.instance
4075     target_node = self.target_node
4076     source_node = self.source_node
4077
4078     self.feedback_fn("* checking disk consistency between source and target")
4079     for dev in instance.disks:
4080       if not _CheckDiskConsistency(self, dev, target_node, False):
4081         raise errors.OpExecError("Disk %s is degraded or not fully"
4082                                  " synchronized on target node,"
4083                                  " aborting migrate." % dev.iv_name)
4084
4085     # First get the migration information from the remote node
4086     result = self.rpc.call_migration_info(source_node, instance)
4087     msg = result.fail_msg
4088     if msg:
4089       log_err = ("Failed fetching source migration information from %s: %s" %
4090                  (source_node, msg))
4091       logging.error(log_err)
4092       raise errors.OpExecError(log_err)
4093
4094     self.migration_info = migration_info = result.payload
4095
4096     # Then switch the disks to master/master mode
4097     self._EnsureSecondary(target_node)
4098     self._GoStandalone()
4099     self._GoReconnect(True)
4100     self._WaitUntilSync()
4101
4102     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4103     result = self.rpc.call_accept_instance(target_node,
4104                                            instance,
4105                                            migration_info,
4106                                            self.nodes_ip[target_node])
4107
4108     msg = result.fail_msg
4109     if msg:
4110       logging.error("Instance pre-migration failed, trying to revert"
4111                     " disk status: %s", msg)
4112       self._AbortMigration()
4113       self._RevertDiskStatus()
4114       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4115                                (instance.name, msg))
4116
4117     self.feedback_fn("* migrating instance to %s" % target_node)
4118     time.sleep(10)
4119     result = self.rpc.call_instance_migrate(source_node, instance,
4120                                             self.nodes_ip[target_node],
4121                                             self.op.live)
4122     msg = result.fail_msg
4123     if msg:
4124       logging.error("Instance migration failed, trying to revert"
4125                     " disk status: %s", msg)
4126       self._AbortMigration()
4127       self._RevertDiskStatus()
4128       raise errors.OpExecError("Could not migrate instance %s: %s" %
4129                                (instance.name, msg))
4130     time.sleep(10)
4131
4132     instance.primary_node = target_node
4133     # distribute new instance config to the other nodes
4134     self.cfg.Update(instance)
4135
4136     result = self.rpc.call_finalize_migration(target_node,
4137                                               instance,
4138                                               migration_info,
4139                                               True)
4140     msg = result.fail_msg
4141     if msg:
4142       logging.error("Instance migration succeeded, but finalization failed:"
4143                     " %s" % msg)
4144       raise errors.OpExecError("Could not finalize instance migration: %s" %
4145                                msg)
4146
4147     self._EnsureSecondary(source_node)
4148     self._WaitUntilSync()
4149     self._GoStandalone()
4150     self._GoReconnect(False)
4151     self._WaitUntilSync()
4152
4153     self.feedback_fn("* done")
4154
4155   def Exec(self, feedback_fn):
4156     """Perform the migration.
4157
4158     """
4159     self.feedback_fn = feedback_fn
4160
4161     self.source_node = self.instance.primary_node
4162     self.target_node = self.instance.secondary_nodes[0]
4163     self.all_nodes = [self.source_node, self.target_node]
4164     self.nodes_ip = {
4165       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4166       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4167       }
4168     if self.op.cleanup:
4169       return self._ExecCleanup()
4170     else:
4171       return self._ExecMigration()
4172
4173
4174 def _CreateBlockDev(lu, node, instance, device, force_create,
4175                     info, force_open):
4176   """Create a tree of block devices on a given node.
4177
4178   If this device type has to be created on secondaries, create it and
4179   all its children.
4180
4181   If not, just recurse to children keeping the same 'force' value.
4182
4183   @param lu: the lu on whose behalf we execute
4184   @param node: the node on which to create the device
4185   @type instance: L{objects.Instance}
4186   @param instance: the instance which owns the device
4187   @type device: L{objects.Disk}
4188   @param device: the device to create
4189   @type force_create: boolean
4190   @param force_create: whether to force creation of this device; this
4191       will be change to True whenever we find a device which has
4192       CreateOnSecondary() attribute
4193   @param info: the extra 'metadata' we should attach to the device
4194       (this will be represented as a LVM tag)
4195   @type force_open: boolean
4196   @param force_open: this parameter will be passes to the
4197       L{backend.BlockdevCreate} function where it specifies
4198       whether we run on primary or not, and it affects both
4199       the child assembly and the device own Open() execution
4200
4201   """
4202   if device.CreateOnSecondary():
4203     force_create = True
4204
4205   if device.children:
4206     for child in device.children:
4207       _CreateBlockDev(lu, node, instance, child, force_create,
4208                       info, force_open)
4209
4210   if not force_create:
4211     return
4212
4213   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4214
4215
4216 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4217   """Create a single block device on a given node.
4218
4219   This will not recurse over children of the device, so they must be
4220   created in advance.
4221
4222   @param lu: the lu on whose behalf we execute
4223   @param node: the node on which to create the device
4224   @type instance: L{objects.Instance}
4225   @param instance: the instance which owns the device
4226   @type device: L{objects.Disk}
4227   @param device: the device to create
4228   @param info: the extra 'metadata' we should attach to the device
4229       (this will be represented as a LVM tag)
4230   @type force_open: boolean
4231   @param force_open: this parameter will be passes to the
4232       L{backend.BlockdevCreate} function where it specifies
4233       whether we run on primary or not, and it affects both
4234       the child assembly and the device own Open() execution
4235
4236   """
4237   lu.cfg.SetDiskID(device, node)
4238   result = lu.rpc.call_blockdev_create(node, device, device.size,
4239                                        instance.name, force_open, info)
4240   result.Raise("Can't create block device %s on"
4241                " node %s for instance %s" % (device, node, instance.name))
4242   if device.physical_id is None:
4243     device.physical_id = result.payload
4244
4245
4246 def _GenerateUniqueNames(lu, exts):
4247   """Generate a suitable LV name.
4248
4249   This will generate a logical volume name for the given instance.
4250
4251   """
4252   results = []
4253   for val in exts:
4254     new_id = lu.cfg.GenerateUniqueID()
4255     results.append("%s%s" % (new_id, val))
4256   return results
4257
4258
4259 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4260                          p_minor, s_minor):
4261   """Generate a drbd8 device complete with its children.
4262
4263   """
4264   port = lu.cfg.AllocatePort()
4265   vgname = lu.cfg.GetVGName()
4266   shared_secret = lu.cfg.GenerateDRBDSecret()
4267   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4268                           logical_id=(vgname, names[0]))
4269   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4270                           logical_id=(vgname, names[1]))
4271   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4272                           logical_id=(primary, secondary, port,
4273                                       p_minor, s_minor,
4274                                       shared_secret),
4275                           children=[dev_data, dev_meta],
4276                           iv_name=iv_name)
4277   return drbd_dev
4278
4279
4280 def _GenerateDiskTemplate(lu, template_name,
4281                           instance_name, primary_node,
4282                           secondary_nodes, disk_info,
4283                           file_storage_dir, file_driver,
4284                           base_index):
4285   """Generate the entire disk layout for a given template type.
4286
4287   """
4288   #TODO: compute space requirements
4289
4290   vgname = lu.cfg.GetVGName()
4291   disk_count = len(disk_info)
4292   disks = []
4293   if template_name == constants.DT_DISKLESS:
4294     pass
4295   elif template_name == constants.DT_PLAIN:
4296     if len(secondary_nodes) != 0:
4297       raise errors.ProgrammerError("Wrong template configuration")
4298
4299     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4300                                       for i in range(disk_count)])
4301     for idx, disk in enumerate(disk_info):
4302       disk_index = idx + base_index
4303       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4304                               logical_id=(vgname, names[idx]),
4305                               iv_name="disk/%d" % disk_index,
4306                               mode=disk["mode"])
4307       disks.append(disk_dev)
4308   elif template_name == constants.DT_DRBD8:
4309     if len(secondary_nodes) != 1:
4310       raise errors.ProgrammerError("Wrong template configuration")
4311     remote_node = secondary_nodes[0]
4312     minors = lu.cfg.AllocateDRBDMinor(
4313       [primary_node, remote_node] * len(disk_info), instance_name)
4314
4315     names = []
4316     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4317                                                for i in range(disk_count)]):
4318       names.append(lv_prefix + "_data")
4319       names.append(lv_prefix + "_meta")
4320     for idx, disk in enumerate(disk_info):
4321       disk_index = idx + base_index
4322       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4323                                       disk["size"], names[idx*2:idx*2+2],
4324                                       "disk/%d" % disk_index,
4325                                       minors[idx*2], minors[idx*2+1])
4326       disk_dev.mode = disk["mode"]
4327       disks.append(disk_dev)
4328   elif template_name == constants.DT_FILE:
4329     if len(secondary_nodes) != 0:
4330       raise errors.ProgrammerError("Wrong template configuration")
4331
4332     for idx, disk in enumerate(disk_info):
4333       disk_index = idx + base_index
4334       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4335                               iv_name="disk/%d" % disk_index,
4336                               logical_id=(file_driver,
4337                                           "%s/disk%d" % (file_storage_dir,
4338                                                          disk_index)),
4339                               mode=disk["mode"])
4340       disks.append(disk_dev)
4341   else:
4342     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4343   return disks
4344
4345
4346 def _GetInstanceInfoText(instance):
4347   """Compute that text that should be added to the disk's metadata.
4348
4349   """
4350   return "originstname+%s" % instance.name
4351
4352
4353 def _CreateDisks(lu, instance):
4354   """Create all disks for an instance.
4355
4356   This abstracts away some work from AddInstance.
4357
4358   @type lu: L{LogicalUnit}
4359   @param lu: the logical unit on whose behalf we execute
4360   @type instance: L{objects.Instance}
4361   @param instance: the instance whose disks we should create
4362   @rtype: boolean
4363   @return: the success of the creation
4364
4365   """
4366   info = _GetInstanceInfoText(instance)
4367   pnode = instance.primary_node
4368
4369   if instance.disk_template == constants.DT_FILE:
4370     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4371     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4372
4373     result.Raise("Failed to create directory '%s' on"
4374                  " node %s: %s" % (file_storage_dir, pnode))
4375
4376   # Note: this needs to be kept in sync with adding of disks in
4377   # LUSetInstanceParams
4378   for device in instance.disks:
4379     logging.info("Creating volume %s for instance %s",
4380                  device.iv_name, instance.name)
4381     #HARDCODE
4382     for node in instance.all_nodes:
4383       f_create = node == pnode
4384       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4385
4386
4387 def _RemoveDisks(lu, instance):
4388   """Remove all disks for an instance.
4389
4390   This abstracts away some work from `AddInstance()` and
4391   `RemoveInstance()`. Note that in case some of the devices couldn't
4392   be removed, the removal will continue with the other ones (compare
4393   with `_CreateDisks()`).
4394
4395   @type lu: L{LogicalUnit}
4396   @param lu: the logical unit on whose behalf we execute
4397   @type instance: L{objects.Instance}
4398   @param instance: the instance whose disks we should remove
4399   @rtype: boolean
4400   @return: the success of the removal
4401
4402   """
4403   logging.info("Removing block devices for instance %s", instance.name)
4404
4405   all_result = True
4406   for device in instance.disks:
4407     for node, disk in device.ComputeNodeTree(instance.primary_node):
4408       lu.cfg.SetDiskID(disk, node)
4409       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4410       if msg:
4411         lu.LogWarning("Could not remove block device %s on node %s,"
4412                       " continuing anyway: %s", device.iv_name, node, msg)
4413         all_result = False
4414
4415   if instance.disk_template == constants.DT_FILE:
4416     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4417     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4418                                                  file_storage_dir)
4419     msg = result.fail_msg
4420     if msg:
4421       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4422                     file_storage_dir, instance.primary_node, msg)
4423       all_result = False
4424
4425   return all_result
4426
4427
4428 def _ComputeDiskSize(disk_template, disks):
4429   """Compute disk size requirements in the volume group
4430
4431   """
4432   # Required free disk space as a function of disk and swap space
4433   req_size_dict = {
4434     constants.DT_DISKLESS: None,
4435     constants.DT_PLAIN: sum(d["size"] for d in disks),
4436     # 128 MB are added for drbd metadata for each disk
4437     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4438     constants.DT_FILE: None,
4439   }
4440
4441   if disk_template not in req_size_dict:
4442     raise errors.ProgrammerError("Disk template '%s' size requirement"
4443                                  " is unknown" %  disk_template)
4444
4445   return req_size_dict[disk_template]
4446
4447
4448 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4449   """Hypervisor parameter validation.
4450
4451   This function abstract the hypervisor parameter validation to be
4452   used in both instance create and instance modify.
4453
4454   @type lu: L{LogicalUnit}
4455   @param lu: the logical unit for which we check
4456   @type nodenames: list
4457   @param nodenames: the list of nodes on which we should check
4458   @type hvname: string
4459   @param hvname: the name of the hypervisor we should use
4460   @type hvparams: dict
4461   @param hvparams: the parameters which we need to check
4462   @raise errors.OpPrereqError: if the parameters are not valid
4463
4464   """
4465   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4466                                                   hvname,
4467                                                   hvparams)
4468   for node in nodenames:
4469     info = hvinfo[node]
4470     if info.offline:
4471       continue
4472     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4473
4474
4475 class LUCreateInstance(LogicalUnit):
4476   """Create an instance.
4477
4478   """
4479   HPATH = "instance-add"
4480   HTYPE = constants.HTYPE_INSTANCE
4481   _OP_REQP = ["instance_name", "disks", "disk_template",
4482               "mode", "start",
4483               "wait_for_sync", "ip_check", "nics",
4484               "hvparams", "beparams"]
4485   REQ_BGL = False
4486
4487   def _ExpandNode(self, node):
4488     """Expands and checks one node name.
4489
4490     """
4491     node_full = self.cfg.ExpandNodeName(node)
4492     if node_full is None:
4493       raise errors.OpPrereqError("Unknown node %s" % node)
4494     return node_full
4495
4496   def ExpandNames(self):
4497     """ExpandNames for CreateInstance.
4498
4499     Figure out the right locks for instance creation.
4500
4501     """
4502     self.needed_locks = {}
4503
4504     # set optional parameters to none if they don't exist
4505     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4506       if not hasattr(self.op, attr):
4507         setattr(self.op, attr, None)
4508
4509     # cheap checks, mostly valid constants given
4510
4511     # verify creation mode
4512     if self.op.mode not in (constants.INSTANCE_CREATE,
4513                             constants.INSTANCE_IMPORT):
4514       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4515                                  self.op.mode)
4516
4517     # disk template and mirror node verification
4518     if self.op.disk_template not in constants.DISK_TEMPLATES:
4519       raise errors.OpPrereqError("Invalid disk template name")
4520
4521     if self.op.hypervisor is None:
4522       self.op.hypervisor = self.cfg.GetHypervisorType()
4523
4524     cluster = self.cfg.GetClusterInfo()
4525     enabled_hvs = cluster.enabled_hypervisors
4526     if self.op.hypervisor not in enabled_hvs:
4527       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4528                                  " cluster (%s)" % (self.op.hypervisor,
4529                                   ",".join(enabled_hvs)))
4530
4531     # check hypervisor parameter syntax (locally)
4532     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4533     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4534                                   self.op.hvparams)
4535     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4536     hv_type.CheckParameterSyntax(filled_hvp)
4537     self.hv_full = filled_hvp
4538
4539     # fill and remember the beparams dict
4540     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4541     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4542                                     self.op.beparams)
4543
4544     #### instance parameters check
4545
4546     # instance name verification
4547     hostname1 = utils.HostInfo(self.op.instance_name)
4548     self.op.instance_name = instance_name = hostname1.name
4549
4550     # this is just a preventive check, but someone might still add this
4551     # instance in the meantime, and creation will fail at lock-add time
4552     if instance_name in self.cfg.GetInstanceList():
4553       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4554                                  instance_name)
4555
4556     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4557
4558     # NIC buildup
4559     self.nics = []
4560     for idx, nic in enumerate(self.op.nics):
4561       nic_mode_req = nic.get("mode", None)
4562       nic_mode = nic_mode_req
4563       if nic_mode is None:
4564         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4565
4566       # in routed mode, for the first nic, the default ip is 'auto'
4567       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4568         default_ip_mode = constants.VALUE_AUTO
4569       else:
4570         default_ip_mode = constants.VALUE_NONE
4571
4572       # ip validity checks
4573       ip = nic.get("ip", default_ip_mode)
4574       if ip is None or ip.lower() == constants.VALUE_NONE:
4575         nic_ip = None
4576       elif ip.lower() == constants.VALUE_AUTO:
4577         nic_ip = hostname1.ip
4578       else:
4579         if not utils.IsValidIP(ip):
4580           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4581                                      " like a valid IP" % ip)
4582         nic_ip = ip
4583
4584       # TODO: check the ip for uniqueness !!
4585       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4586         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4587
4588       # MAC address verification
4589       mac = nic.get("mac", constants.VALUE_AUTO)
4590       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4591         if not utils.IsValidMac(mac.lower()):
4592           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4593                                      mac)
4594       # bridge verification
4595       bridge = nic.get("bridge", None)
4596       link = nic.get("link", None)
4597       if bridge and link:
4598         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4599                                    " at the same time")
4600       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4601         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4602       elif bridge:
4603         link = bridge
4604
4605       nicparams = {}
4606       if nic_mode_req:
4607         nicparams[constants.NIC_MODE] = nic_mode_req
4608       if link:
4609         nicparams[constants.NIC_LINK] = link
4610
4611       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4612                                       nicparams)
4613       objects.NIC.CheckParameterSyntax(check_params)
4614       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4615
4616     # disk checks/pre-build
4617     self.disks = []
4618     for disk in self.op.disks:
4619       mode = disk.get("mode", constants.DISK_RDWR)
4620       if mode not in constants.DISK_ACCESS_SET:
4621         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4622                                    mode)
4623       size = disk.get("size", None)
4624       if size is None:
4625         raise errors.OpPrereqError("Missing disk size")
4626       try:
4627         size = int(size)
4628       except ValueError:
4629         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4630       self.disks.append({"size": size, "mode": mode})
4631
4632     # used in CheckPrereq for ip ping check
4633     self.check_ip = hostname1.ip
4634
4635     # file storage checks
4636     if (self.op.file_driver and
4637         not self.op.file_driver in constants.FILE_DRIVER):
4638       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4639                                  self.op.file_driver)
4640
4641     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4642       raise errors.OpPrereqError("File storage directory path not absolute")
4643
4644     ### Node/iallocator related checks
4645     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4646       raise errors.OpPrereqError("One and only one of iallocator and primary"
4647                                  " node must be given")
4648
4649     if self.op.iallocator:
4650       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4651     else:
4652       self.op.pnode = self._ExpandNode(self.op.pnode)
4653       nodelist = [self.op.pnode]
4654       if self.op.snode is not None:
4655         self.op.snode = self._ExpandNode(self.op.snode)
4656         nodelist.append(self.op.snode)
4657       self.needed_locks[locking.LEVEL_NODE] = nodelist
4658
4659     # in case of import lock the source node too
4660     if self.op.mode == constants.INSTANCE_IMPORT:
4661       src_node = getattr(self.op, "src_node", None)
4662       src_path = getattr(self.op, "src_path", None)
4663
4664       if src_path is None:
4665         self.op.src_path = src_path = self.op.instance_name
4666
4667       if src_node is None:
4668         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4669         self.op.src_node = None
4670         if os.path.isabs(src_path):
4671           raise errors.OpPrereqError("Importing an instance from an absolute"
4672                                      " path requires a source node option.")
4673       else:
4674         self.op.src_node = src_node = self._ExpandNode(src_node)
4675         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4676           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4677         if not os.path.isabs(src_path):
4678           self.op.src_path = src_path = \
4679             os.path.join(constants.EXPORT_DIR, src_path)
4680
4681     else: # INSTANCE_CREATE
4682       if getattr(self.op, "os_type", None) is None:
4683         raise errors.OpPrereqError("No guest OS specified")
4684
4685   def _RunAllocator(self):
4686     """Run the allocator based on input opcode.
4687
4688     """
4689     nics = [n.ToDict() for n in self.nics]
4690     ial = IAllocator(self.cfg, self.rpc,
4691                      mode=constants.IALLOCATOR_MODE_ALLOC,
4692                      name=self.op.instance_name,
4693                      disk_template=self.op.disk_template,
4694                      tags=[],
4695                      os=self.op.os_type,
4696                      vcpus=self.be_full[constants.BE_VCPUS],
4697                      mem_size=self.be_full[constants.BE_MEMORY],
4698                      disks=self.disks,
4699                      nics=nics,
4700                      hypervisor=self.op.hypervisor,
4701                      )
4702
4703     ial.Run(self.op.iallocator)
4704
4705     if not ial.success:
4706       raise errors.OpPrereqError("Can't compute nodes using"
4707                                  " iallocator '%s': %s" % (self.op.iallocator,
4708                                                            ial.info))
4709     if len(ial.nodes) != ial.required_nodes:
4710       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4711                                  " of nodes (%s), required %s" %
4712                                  (self.op.iallocator, len(ial.nodes),
4713                                   ial.required_nodes))
4714     self.op.pnode = ial.nodes[0]
4715     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4716                  self.op.instance_name, self.op.iallocator,
4717                  ", ".join(ial.nodes))
4718     if ial.required_nodes == 2:
4719       self.op.snode = ial.nodes[1]
4720
4721   def BuildHooksEnv(self):
4722     """Build hooks env.
4723
4724     This runs on master, primary and secondary nodes of the instance.
4725
4726     """
4727     env = {
4728       "ADD_MODE": self.op.mode,
4729       }
4730     if self.op.mode == constants.INSTANCE_IMPORT:
4731       env["SRC_NODE"] = self.op.src_node
4732       env["SRC_PATH"] = self.op.src_path
4733       env["SRC_IMAGES"] = self.src_images
4734
4735     env.update(_BuildInstanceHookEnv(
4736       name=self.op.instance_name,
4737       primary_node=self.op.pnode,
4738       secondary_nodes=self.secondaries,
4739       status=self.op.start,
4740       os_type=self.op.os_type,
4741       memory=self.be_full[constants.BE_MEMORY],
4742       vcpus=self.be_full[constants.BE_VCPUS],
4743       nics=_NICListToTuple(self, self.nics),
4744       disk_template=self.op.disk_template,
4745       disks=[(d["size"], d["mode"]) for d in self.disks],
4746       bep=self.be_full,
4747       hvp=self.hv_full,
4748       hypervisor_name=self.op.hypervisor,
4749     ))
4750
4751     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4752           self.secondaries)
4753     return env, nl, nl
4754
4755
4756   def CheckPrereq(self):
4757     """Check prerequisites.
4758
4759     """
4760     if (not self.cfg.GetVGName() and
4761         self.op.disk_template not in constants.DTS_NOT_LVM):
4762       raise errors.OpPrereqError("Cluster does not support lvm-based"
4763                                  " instances")
4764
4765     if self.op.mode == constants.INSTANCE_IMPORT:
4766       src_node = self.op.src_node
4767       src_path = self.op.src_path
4768
4769       if src_node is None:
4770         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4771         exp_list = self.rpc.call_export_list(locked_nodes)
4772         found = False
4773         for node in exp_list:
4774           if exp_list[node].fail_msg:
4775             continue
4776           if src_path in exp_list[node].payload:
4777             found = True
4778             self.op.src_node = src_node = node
4779             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4780                                                        src_path)
4781             break
4782         if not found:
4783           raise errors.OpPrereqError("No export found for relative path %s" %
4784                                       src_path)
4785
4786       _CheckNodeOnline(self, src_node)
4787       result = self.rpc.call_export_info(src_node, src_path)
4788       result.Raise("No export or invalid export found in dir %s" % src_path)
4789
4790       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4791       if not export_info.has_section(constants.INISECT_EXP):
4792         raise errors.ProgrammerError("Corrupted export config")
4793
4794       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4795       if (int(ei_version) != constants.EXPORT_VERSION):
4796         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4797                                    (ei_version, constants.EXPORT_VERSION))
4798
4799       # Check that the new instance doesn't have less disks than the export
4800       instance_disks = len(self.disks)
4801       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4802       if instance_disks < export_disks:
4803         raise errors.OpPrereqError("Not enough disks to import."
4804                                    " (instance: %d, export: %d)" %
4805                                    (instance_disks, export_disks))
4806
4807       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4808       disk_images = []
4809       for idx in range(export_disks):
4810         option = 'disk%d_dump' % idx
4811         if export_info.has_option(constants.INISECT_INS, option):
4812           # FIXME: are the old os-es, disk sizes, etc. useful?
4813           export_name = export_info.get(constants.INISECT_INS, option)
4814           image = os.path.join(src_path, export_name)
4815           disk_images.append(image)
4816         else:
4817           disk_images.append(False)
4818
4819       self.src_images = disk_images
4820
4821       old_name = export_info.get(constants.INISECT_INS, 'name')
4822       # FIXME: int() here could throw a ValueError on broken exports
4823       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4824       if self.op.instance_name == old_name:
4825         for idx, nic in enumerate(self.nics):
4826           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4827             nic_mac_ini = 'nic%d_mac' % idx
4828             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4829
4830     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4831     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4832     if self.op.start and not self.op.ip_check:
4833       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4834                                  " adding an instance in start mode")
4835
4836     if self.op.ip_check:
4837       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4838         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4839                                    (self.check_ip, self.op.instance_name))
4840
4841     #### mac address generation
4842     # By generating here the mac address both the allocator and the hooks get
4843     # the real final mac address rather than the 'auto' or 'generate' value.
4844     # There is a race condition between the generation and the instance object
4845     # creation, which means that we know the mac is valid now, but we're not
4846     # sure it will be when we actually add the instance. If things go bad
4847     # adding the instance will abort because of a duplicate mac, and the
4848     # creation job will fail.
4849     for nic in self.nics:
4850       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4851         nic.mac = self.cfg.GenerateMAC()
4852
4853     #### allocator run
4854
4855     if self.op.iallocator is not None:
4856       self._RunAllocator()
4857
4858     #### node related checks
4859
4860     # check primary node
4861     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4862     assert self.pnode is not None, \
4863       "Cannot retrieve locked node %s" % self.op.pnode
4864     if pnode.offline:
4865       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4866                                  pnode.name)
4867     if pnode.drained:
4868       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4869                                  pnode.name)
4870
4871     self.secondaries = []
4872
4873     # mirror node verification
4874     if self.op.disk_template in constants.DTS_NET_MIRROR:
4875       if self.op.snode is None:
4876         raise errors.OpPrereqError("The networked disk templates need"
4877                                    " a mirror node")
4878       if self.op.snode == pnode.name:
4879         raise errors.OpPrereqError("The secondary node cannot be"
4880                                    " the primary node.")
4881       _CheckNodeOnline(self, self.op.snode)
4882       _CheckNodeNotDrained(self, self.op.snode)
4883       self.secondaries.append(self.op.snode)
4884
4885     nodenames = [pnode.name] + self.secondaries
4886
4887     req_size = _ComputeDiskSize(self.op.disk_template,
4888                                 self.disks)
4889
4890     # Check lv size requirements
4891     if req_size is not None:
4892       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4893                                          self.op.hypervisor)
4894       for node in nodenames:
4895         info = nodeinfo[node]
4896         info.Raise("Cannot get current information from node %s" % node)
4897         info = info.payload
4898         vg_free = info.get('vg_free', None)
4899         if not isinstance(vg_free, int):
4900           raise errors.OpPrereqError("Can't compute free disk space on"
4901                                      " node %s" % node)
4902         if req_size > vg_free:
4903           raise errors.OpPrereqError("Not enough disk space on target node %s."
4904                                      " %d MB available, %d MB required" %
4905                                      (node, vg_free, req_size))
4906
4907     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4908
4909     # os verification
4910     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4911     result.Raise("OS '%s' not in supported os list for primary node %s" %
4912                  (self.op.os_type, pnode.name), prereq=True)
4913
4914     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4915
4916     # memory check on primary node
4917     if self.op.start:
4918       _CheckNodeFreeMemory(self, self.pnode.name,
4919                            "creating instance %s" % self.op.instance_name,
4920                            self.be_full[constants.BE_MEMORY],
4921                            self.op.hypervisor)
4922
4923     self.dry_run_result = list(nodenames)
4924
4925   def Exec(self, feedback_fn):
4926     """Create and add the instance to the cluster.
4927
4928     """
4929     instance = self.op.instance_name
4930     pnode_name = self.pnode.name
4931
4932     ht_kind = self.op.hypervisor
4933     if ht_kind in constants.HTS_REQ_PORT:
4934       network_port = self.cfg.AllocatePort()
4935     else:
4936       network_port = None
4937
4938     ##if self.op.vnc_bind_address is None:
4939     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4940
4941     # this is needed because os.path.join does not accept None arguments
4942     if self.op.file_storage_dir is None:
4943       string_file_storage_dir = ""
4944     else:
4945       string_file_storage_dir = self.op.file_storage_dir
4946
4947     # build the full file storage dir path
4948     file_storage_dir = os.path.normpath(os.path.join(
4949                                         self.cfg.GetFileStorageDir(),
4950                                         string_file_storage_dir, instance))
4951
4952
4953     disks = _GenerateDiskTemplate(self,
4954                                   self.op.disk_template,
4955                                   instance, pnode_name,
4956                                   self.secondaries,
4957                                   self.disks,
4958                                   file_storage_dir,
4959                                   self.op.file_driver,
4960                                   0)
4961
4962     iobj = objects.Instance(name=instance, os=self.op.os_type,
4963                             primary_node=pnode_name,
4964                             nics=self.nics, disks=disks,
4965                             disk_template=self.op.disk_template,
4966                             admin_up=False,
4967                             network_port=network_port,
4968                             beparams=self.op.beparams,
4969                             hvparams=self.op.hvparams,
4970                             hypervisor=self.op.hypervisor,
4971                             )
4972
4973     feedback_fn("* creating instance disks...")
4974     try:
4975       _CreateDisks(self, iobj)
4976     except errors.OpExecError:
4977       self.LogWarning("Device creation failed, reverting...")
4978       try:
4979         _RemoveDisks(self, iobj)
4980       finally:
4981         self.cfg.ReleaseDRBDMinors(instance)
4982         raise
4983
4984     feedback_fn("adding instance %s to cluster config" % instance)
4985
4986     self.cfg.AddInstance(iobj)
4987     # Declare that we don't want to remove the instance lock anymore, as we've
4988     # added the instance to the config
4989     del self.remove_locks[locking.LEVEL_INSTANCE]
4990     # Unlock all the nodes
4991     if self.op.mode == constants.INSTANCE_IMPORT:
4992       nodes_keep = [self.op.src_node]
4993       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4994                        if node != self.op.src_node]
4995       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4996       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4997     else:
4998       self.context.glm.release(locking.LEVEL_NODE)
4999       del self.acquired_locks[locking.LEVEL_NODE]
5000
5001     if self.op.wait_for_sync:
5002       disk_abort = not _WaitForSync(self, iobj)
5003     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5004       # make sure the disks are not degraded (still sync-ing is ok)
5005       time.sleep(15)
5006       feedback_fn("* checking mirrors status")
5007       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5008     else:
5009       disk_abort = False
5010
5011     if disk_abort:
5012       _RemoveDisks(self, iobj)
5013       self.cfg.RemoveInstance(iobj.name)
5014       # Make sure the instance lock gets removed
5015       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5016       raise errors.OpExecError("There are some degraded disks for"
5017                                " this instance")
5018
5019     feedback_fn("creating os for instance %s on node %s" %
5020                 (instance, pnode_name))
5021
5022     if iobj.disk_template != constants.DT_DISKLESS:
5023       if self.op.mode == constants.INSTANCE_CREATE:
5024         feedback_fn("* running the instance OS create scripts...")
5025         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5026         result.Raise("Could not add os for instance %s"
5027                      " on node %s" % (instance, pnode_name))
5028
5029       elif self.op.mode == constants.INSTANCE_IMPORT:
5030         feedback_fn("* running the instance OS import scripts...")
5031         src_node = self.op.src_node
5032         src_images = self.src_images
5033         cluster_name = self.cfg.GetClusterName()
5034         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5035                                                          src_node, src_images,
5036                                                          cluster_name)
5037         msg = import_result.fail_msg
5038         if msg:
5039           self.LogWarning("Error while importing the disk images for instance"
5040                           " %s on node %s: %s" % (instance, pnode_name, msg))
5041       else:
5042         # also checked in the prereq part
5043         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5044                                      % self.op.mode)
5045
5046     if self.op.start:
5047       iobj.admin_up = True
5048       self.cfg.Update(iobj)
5049       logging.info("Starting instance %s on node %s", instance, pnode_name)
5050       feedback_fn("* starting instance...")
5051       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5052       result.Raise("Could not start instance")
5053
5054     return list(iobj.all_nodes)
5055
5056
5057 class LUConnectConsole(NoHooksLU):
5058   """Connect to an instance's console.
5059
5060   This is somewhat special in that it returns the command line that
5061   you need to run on the master node in order to connect to the
5062   console.
5063
5064   """
5065   _OP_REQP = ["instance_name"]
5066   REQ_BGL = False
5067
5068   def ExpandNames(self):
5069     self._ExpandAndLockInstance()
5070
5071   def CheckPrereq(self):
5072     """Check prerequisites.
5073
5074     This checks that the instance is in the cluster.
5075
5076     """
5077     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5078     assert self.instance is not None, \
5079       "Cannot retrieve locked instance %s" % self.op.instance_name
5080     _CheckNodeOnline(self, self.instance.primary_node)
5081
5082   def Exec(self, feedback_fn):
5083     """Connect to the console of an instance
5084
5085     """
5086     instance = self.instance
5087     node = instance.primary_node
5088
5089     node_insts = self.rpc.call_instance_list([node],
5090                                              [instance.hypervisor])[node]
5091     node_insts.Raise("Can't get node information from %s" % node)
5092
5093     if instance.name not in node_insts.payload:
5094       raise errors.OpExecError("Instance %s is not running." % instance.name)
5095
5096     logging.debug("Connecting to console of %s on %s", instance.name, node)
5097
5098     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5099     cluster = self.cfg.GetClusterInfo()
5100     # beparams and hvparams are passed separately, to avoid editing the
5101     # instance and then saving the defaults in the instance itself.
5102     hvparams = cluster.FillHV(instance)
5103     beparams = cluster.FillBE(instance)
5104     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5105
5106     # build ssh cmdline
5107     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5108
5109
5110 class LUReplaceDisks(LogicalUnit):
5111   """Replace the disks of an instance.
5112
5113   """
5114   HPATH = "mirrors-replace"
5115   HTYPE = constants.HTYPE_INSTANCE
5116   _OP_REQP = ["instance_name", "mode", "disks"]
5117   REQ_BGL = False
5118
5119   def CheckArguments(self):
5120     if not hasattr(self.op, "remote_node"):
5121       self.op.remote_node = None
5122     if not hasattr(self.op, "iallocator"):
5123       self.op.iallocator = None
5124
5125     _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
5126                                  self.op.iallocator)
5127
5128   def ExpandNames(self):
5129     self._ExpandAndLockInstance()
5130
5131     if self.op.iallocator is not None:
5132       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5133
5134     elif self.op.remote_node is not None:
5135       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5136       if remote_node is None:
5137         raise errors.OpPrereqError("Node '%s' not known" %
5138                                    self.op.remote_node)
5139
5140       self.op.remote_node = remote_node
5141
5142       # Warning: do not remove the locking of the new secondary here
5143       # unless DRBD8.AddChildren is changed to work in parallel;
5144       # currently it doesn't since parallel invocations of
5145       # FindUnusedMinor will conflict
5146       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5147       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5148
5149     else:
5150       self.needed_locks[locking.LEVEL_NODE] = []
5151       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5152
5153     self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
5154                                   self.op.iallocator, self.op.remote_node,
5155                                   self.op.disks)
5156
5157   def DeclareLocks(self, level):
5158     # If we're not already locking all nodes in the set we have to declare the
5159     # instance's primary/secondary nodes.
5160     if (level == locking.LEVEL_NODE and
5161         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5162       self._LockInstancesNodes()
5163
5164   def BuildHooksEnv(self):
5165     """Build hooks env.
5166
5167     This runs on the master, the primary and all the secondaries.
5168
5169     """
5170     instance = self.replacer.instance
5171     env = {
5172       "MODE": self.op.mode,
5173       "NEW_SECONDARY": self.op.remote_node,
5174       "OLD_SECONDARY": instance.secondary_nodes[0],
5175       }
5176     env.update(_BuildInstanceHookEnvByObject(self, instance))
5177     nl = [
5178       self.cfg.GetMasterNode(),
5179       instance.primary_node,
5180       ]
5181     if self.op.remote_node is not None:
5182       nl.append(self.op.remote_node)
5183     return env, nl, nl
5184
5185   def CheckPrereq(self):
5186     """Check prerequisites.
5187
5188     This checks that the instance is in the cluster.
5189
5190     """
5191     self.replacer.CheckPrereq()
5192
5193   def Exec(self, feedback_fn):
5194     """Execute disk replacement.
5195
5196     This dispatches the disk replacement to the appropriate handler.
5197
5198     """
5199     self.replacer.Exec()
5200
5201
5202 class _DiskReplacer:
5203   """Replaces disks for an instance.
5204
5205   Note: Locking is not within the scope of this class.
5206
5207   """
5208   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5209                disks):
5210     """Initializes this class.
5211
5212     """
5213     # Parameters
5214     self.lu = lu
5215     self.instance_name = instance_name
5216     self.mode = mode
5217     self.iallocator_name = iallocator_name
5218     self.remote_node = remote_node
5219     self.disks = disks
5220
5221     # Shortcuts
5222     self.cfg = lu.cfg
5223     self.rpc = lu.rpc
5224
5225     # Runtime data
5226     self.instance = None
5227     self.new_node = None
5228     self.target_node = None
5229     self.other_node = None
5230     self.remote_node_info = None
5231     self.node_secondary_ip = None
5232
5233   @staticmethod
5234   def CheckArguments(mode, remote_node, iallocator):
5235     # check for valid parameter combination
5236     cnt = [remote_node, iallocator].count(None)
5237     if mode == constants.REPLACE_DISK_CHG:
5238       if cnt == 2:
5239         raise errors.OpPrereqError("When changing the secondary either an"
5240                                    " iallocator script must be used or the"
5241                                    " new node given")
5242       elif cnt == 0:
5243         raise errors.OpPrereqError("Give either the iallocator or the new"
5244                                    " secondary, not both")
5245     else: # not replacing the secondary
5246       if cnt != 2:
5247         raise errors.OpPrereqError("The iallocator and new node options can"
5248                                    " be used only when changing the"
5249                                    " secondary node")
5250
5251   @staticmethod
5252   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5253     """Compute a new secondary node using an IAllocator.
5254
5255     """
5256     ial = IAllocator(lu.cfg, lu.rpc,
5257                      mode=constants.IALLOCATOR_MODE_RELOC,
5258                      name=instance_name,
5259                      relocate_from=relocate_from)
5260
5261     ial.Run(iallocator_name)
5262
5263     if not ial.success:
5264       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5265                                  " %s" % (iallocator_name, ial.info))
5266
5267     if len(ial.nodes) != ial.required_nodes:
5268       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5269                                  " of nodes (%s), required %s" %
5270                                  (len(ial.nodes), ial.required_nodes))
5271
5272     remote_node_name = ial.nodes[0]
5273
5274     lu.LogInfo("Selected new secondary for instance '%s': %s",
5275                instance_name, remote_node_name)
5276
5277     return remote_node_name
5278
5279   def CheckPrereq(self):
5280     """Check prerequisites.
5281
5282     This checks that the instance is in the cluster.
5283
5284     """
5285     self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5286     assert self.instance is not None, \
5287       "Cannot retrieve locked instance %s" % self.instance_name
5288
5289     if self.instance.disk_template != constants.DT_DRBD8:
5290       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5291                                  " instances")
5292
5293     if len(self.instance.secondary_nodes) != 1:
5294       raise errors.OpPrereqError("The instance has a strange layout,"
5295                                  " expected one secondary but found %d" %
5296                                  len(self.instance.secondary_nodes))
5297
5298     secondary_node = self.instance.secondary_nodes[0]
5299
5300     if self.iallocator_name is None:
5301       remote_node = self.remote_node
5302     else:
5303       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5304                                        self.instance.name, secondary_node)
5305
5306     if remote_node is not None:
5307       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5308       assert self.remote_node_info is not None, \
5309         "Cannot retrieve locked node %s" % remote_node
5310     else:
5311       self.remote_node_info = None
5312
5313     if remote_node == self.instance.primary_node:
5314       raise errors.OpPrereqError("The specified node is the primary node of"
5315                                  " the instance.")
5316
5317     if remote_node == secondary_node:
5318       raise errors.OpPrereqError("The specified node is already the"
5319                                  " secondary node of the instance.")
5320
5321     if self.mode == constants.REPLACE_DISK_PRI:
5322       self.target_node = self.instance.primary_node
5323       self.other_node = secondary_node
5324       check_nodes = [self.target_node, self.other_node]
5325
5326     elif self.mode == constants.REPLACE_DISK_SEC:
5327       self.target_node = secondary_node
5328       self.other_node = self.instance.primary_node
5329       check_nodes = [self.target_node, self.other_node]
5330
5331     elif self.mode == constants.REPLACE_DISK_CHG:
5332       self.new_node = remote_node
5333       self.other_node = self.instance.primary_node
5334       self.target_node = secondary_node
5335       check_nodes = [self.new_node, self.other_node]
5336
5337       _CheckNodeNotDrained(self.lu, remote_node)
5338
5339     else:
5340       raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5341                                    self.mode)
5342
5343     for node in check_nodes:
5344       _CheckNodeOnline(self.lu, node)
5345
5346     # If not specified all disks should be replaced
5347     if not self.disks:
5348       self.disks = range(len(self.instance.disks))
5349
5350     # Check whether disks are valid
5351     for disk_idx in self.disks:
5352       self.instance.FindDisk(disk_idx)
5353
5354     # Get secondary node IP addresses
5355     node_2nd_ip = {}
5356
5357     for node_name in [self.target_node, self.other_node, self.new_node]:
5358       if node_name is not None:
5359         node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5360
5361     self.node_secondary_ip = node_2nd_ip
5362
5363   def Exec(self):
5364     """Execute disk replacement.
5365
5366     This dispatches the disk replacement to the appropriate handler.
5367
5368     """
5369     activate_disks = (not self.instance.admin_up)
5370
5371     # Activate the instance disks if we're replacing them on a down instance
5372     if activate_disks:
5373       _StartInstanceDisks(self.lu, self.instance, True)
5374
5375     try:
5376       if self.mode == constants.REPLACE_DISK_CHG:
5377         return self._ExecDrbd8Secondary()
5378       else:
5379         return self._ExecDrbd8DiskOnly()
5380
5381     finally:
5382       # Deactivate the instance disks if we're replacing them on a down instance
5383       if activate_disks:
5384         _SafeShutdownInstanceDisks(self.lu, self.instance)
5385
5386   def _CheckVolumeGroup(self, nodes):
5387     self.lu.LogInfo("Checking volume groups")
5388
5389     vgname = self.cfg.GetVGName()
5390
5391     # Make sure volume group exists on all involved nodes
5392     results = self.rpc.call_vg_list(nodes)
5393     if not results:
5394       raise errors.OpExecError("Can't list volume groups on the nodes")
5395
5396     for node in nodes:
5397       res = results[node]
5398       res.Raise("Error checking node %s" % node)
5399       if vgname not in res.payload:
5400         raise errors.OpExecError("Volume group '%s' not found on node %s" %
5401                                  (vgname, node))
5402
5403   def _CheckDisksExistence(self, nodes):
5404     # Check disk existence
5405     for idx, dev in enumerate(self.instance.disks):
5406       if idx not in self.disks:
5407         continue
5408
5409       for node in nodes:
5410         self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5411         self.cfg.SetDiskID(dev, node)
5412
5413         result = self.rpc.call_blockdev_find(node, dev)
5414
5415         msg = result.fail_msg
5416         if msg or not result.payload:
5417           if not msg:
5418             msg = "disk not found"
5419           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5420                                    (idx, node, msg))
5421
5422   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5423     for idx, dev in enumerate(self.instance.disks):
5424       if idx not in self.disks:
5425         continue
5426
5427       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5428                       (idx, node_name))
5429
5430       if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5431                                    ldisk=ldisk):
5432         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5433                                  " replace disks for instance %s" %
5434                                  (node_name, self.instance.name))
5435
5436   def _CreateNewStorage(self, node_name):
5437     vgname = self.cfg.GetVGName()
5438     iv_names = {}
5439
5440     for idx, dev in enumerate(self.instance.disks):
5441       if idx not in self.disks:
5442         continue
5443
5444       self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5445
5446       self.cfg.SetDiskID(dev, node_name)
5447
5448       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5449       names = _GenerateUniqueNames(self.lu, lv_names)
5450
5451       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5452                              logical_id=(vgname, names[0]))
5453       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5454                              logical_id=(vgname, names[1]))
5455
5456       new_lvs = [lv_data, lv_meta]
5457       old_lvs = dev.children
5458       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5459
5460       # we pass force_create=True to force the LVM creation
5461       for new_lv in new_lvs:
5462         _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5463                         _GetInstanceInfoText(self.instance), False)
5464
5465     return iv_names
5466
5467   def _CheckDevices(self, node_name, iv_names):
5468     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5469       self.cfg.SetDiskID(dev, node_name)
5470
5471       result = self.rpc.call_blockdev_find(node_name, dev)
5472
5473       msg = result.fail_msg
5474       if msg or not result.payload:
5475         if not msg:
5476           msg = "disk not found"
5477         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5478                                  (name, msg))
5479
5480       if result.payload[5]:
5481         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5482
5483   def _RemoveOldStorage(self, node_name, iv_names):
5484     for name, (dev, old_lvs, _) in iv_names.iteritems():
5485       self.lu.LogInfo("Remove logical volumes for %s" % name)
5486
5487       for lv in old_lvs:
5488         self.cfg.SetDiskID(lv, node_name)
5489
5490         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5491         if msg:
5492           self.lu.LogWarning("Can't remove old LV: %s" % msg,
5493                              hint="remove unused LVs manually")
5494
5495   def _ExecDrbd8DiskOnly(self):
5496     """Replace a disk on the primary or secondary for DRBD 8.
5497
5498     The algorithm for replace is quite complicated:
5499
5500       1. for each disk to be replaced:
5501
5502         1. create new LVs on the target node with unique names
5503         1. detach old LVs from the drbd device
5504         1. rename old LVs to name_replaced.<time_t>
5505         1. rename new LVs to old LVs
5506         1. attach the new LVs (with the old names now) to the drbd device
5507
5508       1. wait for sync across all devices
5509
5510       1. for each modified disk:
5511
5512         1. remove old LVs (which have the name name_replaces.<time_t>)
5513
5514     Failures are not very well handled.
5515
5516     """
5517     steps_total = 6
5518
5519     # Step: check device activation
5520     self.lu.LogStep(1, steps_total, "Check device existence")
5521     self._CheckDisksExistence([self.other_node, self.target_node])
5522     self._CheckVolumeGroup([self.target_node, self.other_node])
5523
5524     # Step: check other node consistency
5525     self.lu.LogStep(2, steps_total, "Check peer consistency")
5526     self._CheckDisksConsistency(self.other_node,
5527                                 self.other_node == self.instance.primary_node,
5528                                 False)
5529
5530     # Step: create new storage
5531     self.lu.LogStep(3, steps_total, "Allocate new storage")
5532     iv_names = self._CreateNewStorage(self.target_node)
5533
5534     # Step: for each lv, detach+rename*2+attach
5535     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5536     for dev, old_lvs, new_lvs in iv_names.itervalues():
5537       self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5538
5539       result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5540       result.Raise("Can't detach drbd from local storage on node"
5541                    " %s for device %s" % (self.target_node, dev.iv_name))
5542       #dev.children = []
5543       #cfg.Update(instance)
5544
5545       # ok, we created the new LVs, so now we know we have the needed
5546       # storage; as such, we proceed on the target node to rename
5547       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5548       # using the assumption that logical_id == physical_id (which in
5549       # turn is the unique_id on that node)
5550
5551       # FIXME(iustin): use a better name for the replaced LVs
5552       temp_suffix = int(time.time())
5553       ren_fn = lambda d, suff: (d.physical_id[0],
5554                                 d.physical_id[1] + "_replaced-%s" % suff)
5555
5556       # Build the rename list based on what LVs exist on the node
5557       rename_old_to_new = []
5558       for to_ren in old_lvs:
5559         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5560         if not result.fail_msg and result.payload:
5561           # device exists
5562           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5563
5564       self.lu.LogInfo("Renaming the old LVs on the target node")
5565       result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5566       result.Raise("Can't rename old LVs on node %s" % self.target_node)
5567
5568       # Now we rename the new LVs to the old LVs
5569       self.lu.LogInfo("Renaming the new LVs on the target node")
5570       rename_new_to_old = [(new, old.physical_id)
5571                            for old, new in zip(old_lvs, new_lvs)]
5572       result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5573       result.Raise("Can't rename new LVs on node %s" % self.target_node)
5574
5575       for old, new in zip(old_lvs, new_lvs):
5576         new.logical_id = old.logical_id
5577         self.cfg.SetDiskID(new, self.target_node)
5578
5579       for disk in old_lvs:
5580         disk.logical_id = ren_fn(disk, temp_suffix)
5581         self.cfg.SetDiskID(disk, self.target_node)
5582
5583       # Now that the new lvs have the old name, we can add them to the device
5584       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5585       result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5586       msg = result.fail_msg
5587       if msg:
5588         for new_lv in new_lvs:
5589           msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5590           if msg2:
5591             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
5592                                hint=("cleanup manually the unused logical"
5593                                      "volumes"))
5594         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5595
5596       dev.children = new_lvs
5597
5598       self.cfg.Update(self.instance)
5599
5600     # Wait for sync
5601     # This can fail as the old devices are degraded and _WaitForSync
5602     # does a combined result over all disks, so we don't check its return value
5603     self.lu.LogStep(5, steps_total, "Sync devices")
5604     _WaitForSync(self.lu, self.instance, unlock=True)
5605
5606     # Check all devices manually
5607     self._CheckDevices(self.instance.primary_node, iv_names)
5608
5609     # Step: remove old storage
5610     self.lu.LogStep(6, steps_total, "Removing old storage")
5611     self._RemoveOldStorage(self.target_node, iv_names)
5612
5613   def _ExecDrbd8Secondary(self):
5614     """Replace the secondary node for DRBD 8.
5615
5616     The algorithm for replace is quite complicated:
5617       - for all disks of the instance:
5618         - create new LVs on the new node with same names
5619         - shutdown the drbd device on the old secondary
5620         - disconnect the drbd network on the primary
5621         - create the drbd device on the new secondary
5622         - network attach the drbd on the primary, using an artifice:
5623           the drbd code for Attach() will connect to the network if it
5624           finds a device which is connected to the good local disks but
5625           not network enabled
5626       - wait for sync across all devices
5627       - remove all disks from the old secondary
5628
5629     Failures are not very well handled.
5630
5631     """
5632     steps_total = 6
5633
5634     # Step: check device activation
5635     self.lu.LogStep(1, steps_total, "Check device existence")
5636     self._CheckDisksExistence([self.instance.primary_node])
5637     self._CheckVolumeGroup([self.instance.primary_node])
5638
5639     # Step: check other node consistency
5640     self.lu.LogStep(2, steps_total, "Check peer consistency")
5641     self._CheckDisksConsistency(self.instance.primary_node, True, True)
5642
5643     # Step: create new storage
5644     self.lu.LogStep(3, steps_total, "Allocate new storage")
5645     for idx, dev in enumerate(self.instance.disks):
5646       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
5647                       (self.new_node, idx))
5648       # we pass force_create=True to force LVM creation
5649       for new_lv in dev.children:
5650         _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
5651                         _GetInstanceInfoText(self.instance), False)
5652
5653     # Step 4: dbrd minors and drbd setups changes
5654     # after this, we must manually remove the drbd minors on both the
5655     # error and the success paths
5656     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5657     minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
5658                                         self.instance.name)
5659     logging.debug("Allocated minors %r" % (minors,))
5660
5661     iv_names = {}
5662     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
5663       self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
5664       # create new devices on new_node; note that we create two IDs:
5665       # one without port, so the drbd will be activated without
5666       # networking information on the new node at this stage, and one
5667       # with network, for the latter activation in step 4
5668       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5669       if self.instance.primary_node == o_node1:
5670         p_minor = o_minor1
5671       else:
5672         p_minor = o_minor2
5673
5674       new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
5675       new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
5676
5677       iv_names[idx] = (dev, dev.children, new_net_id)
5678       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5679                     new_net_id)
5680       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5681                               logical_id=new_alone_id,
5682                               children=dev.children,
5683                               size=dev.size)
5684       try:
5685         _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
5686                               _GetInstanceInfoText(self.instance), False)
5687       except errors.GenericError:
5688         self.cfg.ReleaseDRBDMinors(self.instance.name)
5689         raise
5690
5691     # We have new devices, shutdown the drbd on the old secondary
5692     for idx, dev in enumerate(self.instance.disks):
5693       self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
5694       self.cfg.SetDiskID(dev, self.target_node)
5695       msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
5696       if msg:
5697         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
5698                            "node: %s" % (idx, msg),
5699                            hint=("Please cleanup this device manually as"
5700                                  " soon as possible"))
5701
5702     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
5703     result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
5704                                                self.instance.disks)[self.instance.primary_node]
5705
5706     msg = result.fail_msg
5707     if msg:
5708       # detaches didn't succeed (unlikely)
5709       self.cfg.ReleaseDRBDMinors(self.instance.name)
5710       raise errors.OpExecError("Can't detach the disks from the network on"
5711                                " old node: %s" % (msg,))
5712
5713     # if we managed to detach at least one, we update all the disks of
5714     # the instance to point to the new secondary
5715     self.lu.LogInfo("Updating instance configuration")
5716     for dev, _, new_logical_id in iv_names.itervalues():
5717       dev.logical_id = new_logical_id
5718       self.cfg.SetDiskID(dev, self.instance.primary_node)
5719
5720     self.cfg.Update(self.instance)
5721
5722     # and now perform the drbd attach
5723     self.lu.LogInfo("Attaching primary drbds to new secondary"
5724                     " (standalone => connected)")
5725     result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
5726                                            self.instance.disks, self.instance.name,
5727                                            False)
5728     for to_node, to_result in result.items():
5729       msg = to_result.fail_msg
5730       if msg:
5731         self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
5732                            hint=("please do a gnt-instance info to see the"
5733                                  " status of disks"))
5734
5735     # Wait for sync
5736     # This can fail as the old devices are degraded and _WaitForSync
5737     # does a combined result over all disks, so we don't check its return value
5738     self.lu.LogStep(5, steps_total, "Sync devices")
5739     _WaitForSync(self.lu, self.instance, unlock=True)
5740
5741     # Check all devices manually
5742     self._CheckDevices(self.instance.primary_node, iv_names)
5743
5744     # Step: remove old storage
5745     self.lu.LogStep(6, steps_total, "Removing old storage")
5746     self._RemoveOldStorage(self.target_node, iv_names)
5747
5748
5749 class LUGrowDisk(LogicalUnit):
5750   """Grow a disk of an instance.
5751
5752   """
5753   HPATH = "disk-grow"
5754   HTYPE = constants.HTYPE_INSTANCE
5755   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5756   REQ_BGL = False
5757
5758   def ExpandNames(self):
5759     self._ExpandAndLockInstance()
5760     self.needed_locks[locking.LEVEL_NODE] = []
5761     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5762
5763   def DeclareLocks(self, level):
5764     if level == locking.LEVEL_NODE:
5765       self._LockInstancesNodes()
5766
5767   def BuildHooksEnv(self):
5768     """Build hooks env.
5769
5770     This runs on the master, the primary and all the secondaries.
5771
5772     """
5773     env = {
5774       "DISK": self.op.disk,
5775       "AMOUNT": self.op.amount,
5776       }
5777     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5778     nl = [
5779       self.cfg.GetMasterNode(),
5780       self.instance.primary_node,
5781       ]
5782     return env, nl, nl
5783
5784   def CheckPrereq(self):
5785     """Check prerequisites.
5786
5787     This checks that the instance is in the cluster.
5788
5789     """
5790     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5791     assert instance is not None, \
5792       "Cannot retrieve locked instance %s" % self.op.instance_name
5793     nodenames = list(instance.all_nodes)
5794     for node in nodenames:
5795       _CheckNodeOnline(self, node)
5796
5797
5798     self.instance = instance
5799
5800     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5801       raise errors.OpPrereqError("Instance's disk layout does not support"
5802                                  " growing.")
5803
5804     self.disk = instance.FindDisk(self.op.disk)
5805
5806     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5807                                        instance.hypervisor)
5808     for node in nodenames:
5809       info = nodeinfo[node]
5810       info.Raise("Cannot get current information from node %s" % node)
5811       vg_free = info.payload.get('vg_free', None)
5812       if not isinstance(vg_free, int):
5813         raise errors.OpPrereqError("Can't compute free disk space on"
5814                                    " node %s" % node)
5815       if self.op.amount > vg_free:
5816         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5817                                    " %d MiB available, %d MiB required" %
5818                                    (node, vg_free, self.op.amount))
5819
5820   def Exec(self, feedback_fn):
5821     """Execute disk grow.
5822
5823     """
5824     instance = self.instance
5825     disk = self.disk
5826     for node in instance.all_nodes:
5827       self.cfg.SetDiskID(disk, node)
5828       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5829       result.Raise("Grow request failed to node %s" % node)
5830     disk.RecordGrow(self.op.amount)
5831     self.cfg.Update(instance)
5832     if self.op.wait_for_sync:
5833       disk_abort = not _WaitForSync(self, instance)
5834       if disk_abort:
5835         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5836                              " status.\nPlease check the instance.")
5837
5838
5839 class LUQueryInstanceData(NoHooksLU):
5840   """Query runtime instance data.
5841
5842   """
5843   _OP_REQP = ["instances", "static"]
5844   REQ_BGL = False
5845
5846   def ExpandNames(self):
5847     self.needed_locks = {}
5848     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
5849
5850     if not isinstance(self.op.instances, list):
5851       raise errors.OpPrereqError("Invalid argument type 'instances'")
5852
5853     if self.op.instances:
5854       self.wanted_names = []
5855       for name in self.op.instances:
5856         full_name = self.cfg.ExpandInstanceName(name)
5857         if full_name is None:
5858           raise errors.OpPrereqError("Instance '%s' not known" % name)
5859         self.wanted_names.append(full_name)
5860       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5861     else:
5862       self.wanted_names = None
5863       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5864
5865     self.needed_locks[locking.LEVEL_NODE] = []
5866     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5867
5868   def DeclareLocks(self, level):
5869     if level == locking.LEVEL_NODE:
5870       self._LockInstancesNodes()
5871
5872   def CheckPrereq(self):
5873     """Check prerequisites.
5874
5875     This only checks the optional instance list against the existing names.
5876
5877     """
5878     if self.wanted_names is None:
5879       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5880
5881     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5882                              in self.wanted_names]
5883     return
5884
5885   def _ComputeDiskStatus(self, instance, snode, dev):
5886     """Compute block device status.
5887
5888     """
5889     static = self.op.static
5890     if not static:
5891       self.cfg.SetDiskID(dev, instance.primary_node)
5892       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5893       if dev_pstatus.offline:
5894         dev_pstatus = None
5895       else:
5896         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5897         dev_pstatus = dev_pstatus.payload
5898     else:
5899       dev_pstatus = None
5900
5901     if dev.dev_type in constants.LDS_DRBD:
5902       # we change the snode then (otherwise we use the one passed in)
5903       if dev.logical_id[0] == instance.primary_node:
5904         snode = dev.logical_id[1]
5905       else:
5906         snode = dev.logical_id[0]
5907
5908     if snode and not static:
5909       self.cfg.SetDiskID(dev, snode)
5910       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5911       if dev_sstatus.offline:
5912         dev_sstatus = None
5913       else:
5914         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5915         dev_sstatus = dev_sstatus.payload
5916     else:
5917       dev_sstatus = None
5918
5919     if dev.children:
5920       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5921                       for child in dev.children]
5922     else:
5923       dev_children = []
5924
5925     data = {
5926       "iv_name": dev.iv_name,
5927       "dev_type": dev.dev_type,
5928       "logical_id": dev.logical_id,
5929       "physical_id": dev.physical_id,
5930       "pstatus": dev_pstatus,
5931       "sstatus": dev_sstatus,
5932       "children": dev_children,
5933       "mode": dev.mode,
5934       "size": dev.size,
5935       }
5936
5937     return data
5938
5939   def Exec(self, feedback_fn):
5940     """Gather and return data"""
5941     result = {}
5942
5943     cluster = self.cfg.GetClusterInfo()
5944
5945     for instance in self.wanted_instances:
5946       if not self.op.static:
5947         remote_info = self.rpc.call_instance_info(instance.primary_node,
5948                                                   instance.name,
5949                                                   instance.hypervisor)
5950         remote_info.Raise("Error checking node %s" % instance.primary_node)
5951         remote_info = remote_info.payload
5952         if remote_info and "state" in remote_info:
5953           remote_state = "up"
5954         else:
5955           remote_state = "down"
5956       else:
5957         remote_state = None
5958       if instance.admin_up:
5959         config_state = "up"
5960       else:
5961         config_state = "down"
5962
5963       disks = [self._ComputeDiskStatus(instance, None, device)
5964                for device in instance.disks]
5965
5966       idict = {
5967         "name": instance.name,
5968         "config_state": config_state,
5969         "run_state": remote_state,
5970         "pnode": instance.primary_node,
5971         "snodes": instance.secondary_nodes,
5972         "os": instance.os,
5973         # this happens to be the same format used for hooks
5974         "nics": _NICListToTuple(self, instance.nics),
5975         "disks": disks,
5976         "hypervisor": instance.hypervisor,
5977         "network_port": instance.network_port,
5978         "hv_instance": instance.hvparams,
5979         "hv_actual": cluster.FillHV(instance),
5980         "be_instance": instance.beparams,
5981         "be_actual": cluster.FillBE(instance),
5982         }
5983
5984       result[instance.name] = idict
5985
5986     return result
5987
5988
5989 class LUSetInstanceParams(LogicalUnit):
5990   """Modifies an instances's parameters.
5991
5992   """
5993   HPATH = "instance-modify"
5994   HTYPE = constants.HTYPE_INSTANCE
5995   _OP_REQP = ["instance_name"]
5996   REQ_BGL = False
5997
5998   def CheckArguments(self):
5999     if not hasattr(self.op, 'nics'):
6000       self.op.nics = []
6001     if not hasattr(self.op, 'disks'):
6002       self.op.disks = []
6003     if not hasattr(self.op, 'beparams'):
6004       self.op.beparams = {}
6005     if not hasattr(self.op, 'hvparams'):
6006       self.op.hvparams = {}
6007     self.op.force = getattr(self.op, "force", False)
6008     if not (self.op.nics or self.op.disks or
6009             self.op.hvparams or self.op.beparams):
6010       raise errors.OpPrereqError("No changes submitted")
6011
6012     # Disk validation
6013     disk_addremove = 0
6014     for disk_op, disk_dict in self.op.disks:
6015       if disk_op == constants.DDM_REMOVE:
6016         disk_addremove += 1
6017         continue
6018       elif disk_op == constants.DDM_ADD:
6019         disk_addremove += 1
6020       else:
6021         if not isinstance(disk_op, int):
6022           raise errors.OpPrereqError("Invalid disk index")
6023         if not isinstance(disk_dict, dict):
6024           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6025           raise errors.OpPrereqError(msg)
6026
6027       if disk_op == constants.DDM_ADD:
6028         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6029         if mode not in constants.DISK_ACCESS_SET:
6030           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6031         size = disk_dict.get('size', None)
6032         if size is None:
6033           raise errors.OpPrereqError("Required disk parameter size missing")
6034         try:
6035           size = int(size)
6036         except ValueError, err:
6037           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6038                                      str(err))
6039         disk_dict['size'] = size
6040       else:
6041         # modification of disk
6042         if 'size' in disk_dict:
6043           raise errors.OpPrereqError("Disk size change not possible, use"
6044                                      " grow-disk")
6045
6046     if disk_addremove > 1:
6047       raise errors.OpPrereqError("Only one disk add or remove operation"
6048                                  " supported at a time")
6049
6050     # NIC validation
6051     nic_addremove = 0
6052     for nic_op, nic_dict in self.op.nics:
6053       if nic_op == constants.DDM_REMOVE:
6054         nic_addremove += 1
6055         continue
6056       elif nic_op == constants.DDM_ADD:
6057         nic_addremove += 1
6058       else:
6059         if not isinstance(nic_op, int):
6060           raise errors.OpPrereqError("Invalid nic index")
6061         if not isinstance(nic_dict, dict):
6062           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6063           raise errors.OpPrereqError(msg)
6064
6065       # nic_dict should be a dict
6066       nic_ip = nic_dict.get('ip', None)
6067       if nic_ip is not None:
6068         if nic_ip.lower() == constants.VALUE_NONE:
6069           nic_dict['ip'] = None
6070         else:
6071           if not utils.IsValidIP(nic_ip):
6072             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6073
6074       nic_bridge = nic_dict.get('bridge', None)
6075       nic_link = nic_dict.get('link', None)
6076       if nic_bridge and nic_link:
6077         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6078                                    " at the same time")
6079       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6080         nic_dict['bridge'] = None
6081       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6082         nic_dict['link'] = None
6083
6084       if nic_op == constants.DDM_ADD:
6085         nic_mac = nic_dict.get('mac', None)
6086         if nic_mac is None:
6087           nic_dict['mac'] = constants.VALUE_AUTO
6088
6089       if 'mac' in nic_dict:
6090         nic_mac = nic_dict['mac']
6091         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6092           if not utils.IsValidMac(nic_mac):
6093             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6094         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6095           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6096                                      " modifying an existing nic")
6097
6098     if nic_addremove > 1:
6099       raise errors.OpPrereqError("Only one NIC add or remove operation"
6100                                  " supported at a time")
6101
6102   def ExpandNames(self):
6103     self._ExpandAndLockInstance()
6104     self.needed_locks[locking.LEVEL_NODE] = []
6105     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6106
6107   def DeclareLocks(self, level):
6108     if level == locking.LEVEL_NODE:
6109       self._LockInstancesNodes()
6110
6111   def BuildHooksEnv(self):
6112     """Build hooks env.
6113
6114     This runs on the master, primary and secondaries.
6115
6116     """
6117     args = dict()
6118     if constants.BE_MEMORY in self.be_new:
6119       args['memory'] = self.be_new[constants.BE_MEMORY]
6120     if constants.BE_VCPUS in self.be_new:
6121       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6122     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6123     # information at all.
6124     if self.op.nics:
6125       args['nics'] = []
6126       nic_override = dict(self.op.nics)
6127       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6128       for idx, nic in enumerate(self.instance.nics):
6129         if idx in nic_override:
6130           this_nic_override = nic_override[idx]
6131         else:
6132           this_nic_override = {}
6133         if 'ip' in this_nic_override:
6134           ip = this_nic_override['ip']
6135         else:
6136           ip = nic.ip
6137         if 'mac' in this_nic_override:
6138           mac = this_nic_override['mac']
6139         else:
6140           mac = nic.mac
6141         if idx in self.nic_pnew:
6142           nicparams = self.nic_pnew[idx]
6143         else:
6144           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6145         mode = nicparams[constants.NIC_MODE]
6146         link = nicparams[constants.NIC_LINK]
6147         args['nics'].append((ip, mac, mode, link))
6148       if constants.DDM_ADD in nic_override:
6149         ip = nic_override[constants.DDM_ADD].get('ip', None)
6150         mac = nic_override[constants.DDM_ADD]['mac']
6151         nicparams = self.nic_pnew[constants.DDM_ADD]
6152         mode = nicparams[constants.NIC_MODE]
6153         link = nicparams[constants.NIC_LINK]
6154         args['nics'].append((ip, mac, mode, link))
6155       elif constants.DDM_REMOVE in nic_override:
6156         del args['nics'][-1]
6157
6158     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6159     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6160     return env, nl, nl
6161
6162   def _GetUpdatedParams(self, old_params, update_dict,
6163                         default_values, parameter_types):
6164     """Return the new params dict for the given params.
6165
6166     @type old_params: dict
6167     @param old_params: old parameters
6168     @type update_dict: dict
6169     @param update_dict: dict containing new parameter values,
6170                         or constants.VALUE_DEFAULT to reset the
6171                         parameter to its default value
6172     @type default_values: dict
6173     @param default_values: default values for the filled parameters
6174     @type parameter_types: dict
6175     @param parameter_types: dict mapping target dict keys to types
6176                             in constants.ENFORCEABLE_TYPES
6177     @rtype: (dict, dict)
6178     @return: (new_parameters, filled_parameters)
6179
6180     """
6181     params_copy = copy.deepcopy(old_params)
6182     for key, val in update_dict.iteritems():
6183       if val == constants.VALUE_DEFAULT:
6184         try:
6185           del params_copy[key]
6186         except KeyError:
6187           pass
6188       else:
6189         params_copy[key] = val
6190     utils.ForceDictType(params_copy, parameter_types)
6191     params_filled = objects.FillDict(default_values, params_copy)
6192     return (params_copy, params_filled)
6193
6194   def CheckPrereq(self):
6195     """Check prerequisites.
6196
6197     This only checks the instance list against the existing names.
6198
6199     """
6200     self.force = self.op.force
6201
6202     # checking the new params on the primary/secondary nodes
6203
6204     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6205     cluster = self.cluster = self.cfg.GetClusterInfo()
6206     assert self.instance is not None, \
6207       "Cannot retrieve locked instance %s" % self.op.instance_name
6208     pnode = instance.primary_node
6209     nodelist = list(instance.all_nodes)
6210
6211     # hvparams processing
6212     if self.op.hvparams:
6213       i_hvdict, hv_new = self._GetUpdatedParams(
6214                              instance.hvparams, self.op.hvparams,
6215                              cluster.hvparams[instance.hypervisor],
6216                              constants.HVS_PARAMETER_TYPES)
6217       # local check
6218       hypervisor.GetHypervisor(
6219         instance.hypervisor).CheckParameterSyntax(hv_new)
6220       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6221       self.hv_new = hv_new # the new actual values
6222       self.hv_inst = i_hvdict # the new dict (without defaults)
6223     else:
6224       self.hv_new = self.hv_inst = {}
6225
6226     # beparams processing
6227     if self.op.beparams:
6228       i_bedict, be_new = self._GetUpdatedParams(
6229                              instance.beparams, self.op.beparams,
6230                              cluster.beparams[constants.PP_DEFAULT],
6231                              constants.BES_PARAMETER_TYPES)
6232       self.be_new = be_new # the new actual values
6233       self.be_inst = i_bedict # the new dict (without defaults)
6234     else:
6235       self.be_new = self.be_inst = {}
6236
6237     self.warn = []
6238
6239     if constants.BE_MEMORY in self.op.beparams and not self.force:
6240       mem_check_list = [pnode]
6241       if be_new[constants.BE_AUTO_BALANCE]:
6242         # either we changed auto_balance to yes or it was from before
6243         mem_check_list.extend(instance.secondary_nodes)
6244       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6245                                                   instance.hypervisor)
6246       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6247                                          instance.hypervisor)
6248       pninfo = nodeinfo[pnode]
6249       msg = pninfo.fail_msg
6250       if msg:
6251         # Assume the primary node is unreachable and go ahead
6252         self.warn.append("Can't get info from primary node %s: %s" %
6253                          (pnode,  msg))
6254       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6255         self.warn.append("Node data from primary node %s doesn't contain"
6256                          " free memory information" % pnode)
6257       elif instance_info.fail_msg:
6258         self.warn.append("Can't get instance runtime information: %s" %
6259                         instance_info.fail_msg)
6260       else:
6261         if instance_info.payload:
6262           current_mem = int(instance_info.payload['memory'])
6263         else:
6264           # Assume instance not running
6265           # (there is a slight race condition here, but it's not very probable,
6266           # and we have no other way to check)
6267           current_mem = 0
6268         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6269                     pninfo.payload['memory_free'])
6270         if miss_mem > 0:
6271           raise errors.OpPrereqError("This change will prevent the instance"
6272                                      " from starting, due to %d MB of memory"
6273                                      " missing on its primary node" % miss_mem)
6274
6275       if be_new[constants.BE_AUTO_BALANCE]:
6276         for node, nres in nodeinfo.items():
6277           if node not in instance.secondary_nodes:
6278             continue
6279           msg = nres.fail_msg
6280           if msg:
6281             self.warn.append("Can't get info from secondary node %s: %s" %
6282                              (node, msg))
6283           elif not isinstance(nres.payload.get('memory_free', None), int):
6284             self.warn.append("Secondary node %s didn't return free"
6285                              " memory information" % node)
6286           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6287             self.warn.append("Not enough memory to failover instance to"
6288                              " secondary node %s" % node)
6289
6290     # NIC processing
6291     self.nic_pnew = {}
6292     self.nic_pinst = {}
6293     for nic_op, nic_dict in self.op.nics:
6294       if nic_op == constants.DDM_REMOVE:
6295         if not instance.nics:
6296           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6297         continue
6298       if nic_op != constants.DDM_ADD:
6299         # an existing nic
6300         if nic_op < 0 or nic_op >= len(instance.nics):
6301           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6302                                      " are 0 to %d" %
6303                                      (nic_op, len(instance.nics)))
6304         old_nic_params = instance.nics[nic_op].nicparams
6305         old_nic_ip = instance.nics[nic_op].ip
6306       else:
6307         old_nic_params = {}
6308         old_nic_ip = None
6309
6310       update_params_dict = dict([(key, nic_dict[key])
6311                                  for key in constants.NICS_PARAMETERS
6312                                  if key in nic_dict])
6313
6314       if 'bridge' in nic_dict:
6315         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6316
6317       new_nic_params, new_filled_nic_params = \
6318           self._GetUpdatedParams(old_nic_params, update_params_dict,
6319                                  cluster.nicparams[constants.PP_DEFAULT],
6320                                  constants.NICS_PARAMETER_TYPES)
6321       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6322       self.nic_pinst[nic_op] = new_nic_params
6323       self.nic_pnew[nic_op] = new_filled_nic_params
6324       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6325
6326       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6327         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6328         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6329         if msg:
6330           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6331           if self.force:
6332             self.warn.append(msg)
6333           else:
6334             raise errors.OpPrereqError(msg)
6335       if new_nic_mode == constants.NIC_MODE_ROUTED:
6336         if 'ip' in nic_dict:
6337           nic_ip = nic_dict['ip']
6338         else:
6339           nic_ip = old_nic_ip
6340         if nic_ip is None:
6341           raise errors.OpPrereqError('Cannot set the nic ip to None'
6342                                      ' on a routed nic')
6343       if 'mac' in nic_dict:
6344         nic_mac = nic_dict['mac']
6345         if nic_mac is None:
6346           raise errors.OpPrereqError('Cannot set the nic mac to None')
6347         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6348           # otherwise generate the mac
6349           nic_dict['mac'] = self.cfg.GenerateMAC()
6350         else:
6351           # or validate/reserve the current one
6352           if self.cfg.IsMacInUse(nic_mac):
6353             raise errors.OpPrereqError("MAC address %s already in use"
6354                                        " in cluster" % nic_mac)
6355
6356     # DISK processing
6357     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6358       raise errors.OpPrereqError("Disk operations not supported for"
6359                                  " diskless instances")
6360     for disk_op, disk_dict in self.op.disks:
6361       if disk_op == constants.DDM_REMOVE:
6362         if len(instance.disks) == 1:
6363           raise errors.OpPrereqError("Cannot remove the last disk of"
6364                                      " an instance")
6365         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6366         ins_l = ins_l[pnode]
6367         msg = ins_l.fail_msg
6368         if msg:
6369           raise errors.OpPrereqError("Can't contact node %s: %s" %
6370                                      (pnode, msg))
6371         if instance.name in ins_l.payload:
6372           raise errors.OpPrereqError("Instance is running, can't remove"
6373                                      " disks.")
6374
6375       if (disk_op == constants.DDM_ADD and
6376           len(instance.nics) >= constants.MAX_DISKS):
6377         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6378                                    " add more" % constants.MAX_DISKS)
6379       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6380         # an existing disk
6381         if disk_op < 0 or disk_op >= len(instance.disks):
6382           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6383                                      " are 0 to %d" %
6384                                      (disk_op, len(instance.disks)))
6385
6386     return
6387
6388   def Exec(self, feedback_fn):
6389     """Modifies an instance.
6390
6391     All parameters take effect only at the next restart of the instance.
6392
6393     """
6394     # Process here the warnings from CheckPrereq, as we don't have a
6395     # feedback_fn there.
6396     for warn in self.warn:
6397       feedback_fn("WARNING: %s" % warn)
6398
6399     result = []
6400     instance = self.instance
6401     cluster = self.cluster
6402     # disk changes
6403     for disk_op, disk_dict in self.op.disks:
6404       if disk_op == constants.DDM_REMOVE:
6405         # remove the last disk
6406         device = instance.disks.pop()
6407         device_idx = len(instance.disks)
6408         for node, disk in device.ComputeNodeTree(instance.primary_node):
6409           self.cfg.SetDiskID(disk, node)
6410           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6411           if msg:
6412             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6413                             " continuing anyway", device_idx, node, msg)
6414         result.append(("disk/%d" % device_idx, "remove"))
6415       elif disk_op == constants.DDM_ADD:
6416         # add a new disk
6417         if instance.disk_template == constants.DT_FILE:
6418           file_driver, file_path = instance.disks[0].logical_id
6419           file_path = os.path.dirname(file_path)
6420         else:
6421           file_driver = file_path = None
6422         disk_idx_base = len(instance.disks)
6423         new_disk = _GenerateDiskTemplate(self,
6424                                          instance.disk_template,
6425                                          instance.name, instance.primary_node,
6426                                          instance.secondary_nodes,
6427                                          [disk_dict],
6428                                          file_path,
6429                                          file_driver,
6430                                          disk_idx_base)[0]
6431         instance.disks.append(new_disk)
6432         info = _GetInstanceInfoText(instance)
6433
6434         logging.info("Creating volume %s for instance %s",
6435                      new_disk.iv_name, instance.name)
6436         # Note: this needs to be kept in sync with _CreateDisks
6437         #HARDCODE
6438         for node in instance.all_nodes:
6439           f_create = node == instance.primary_node
6440           try:
6441             _CreateBlockDev(self, node, instance, new_disk,
6442                             f_create, info, f_create)
6443           except errors.OpExecError, err:
6444             self.LogWarning("Failed to create volume %s (%s) on"
6445                             " node %s: %s",
6446                             new_disk.iv_name, new_disk, node, err)
6447         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6448                        (new_disk.size, new_disk.mode)))
6449       else:
6450         # change a given disk
6451         instance.disks[disk_op].mode = disk_dict['mode']
6452         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6453     # NIC changes
6454     for nic_op, nic_dict in self.op.nics:
6455       if nic_op == constants.DDM_REMOVE:
6456         # remove the last nic
6457         del instance.nics[-1]
6458         result.append(("nic.%d" % len(instance.nics), "remove"))
6459       elif nic_op == constants.DDM_ADD:
6460         # mac and bridge should be set, by now
6461         mac = nic_dict['mac']
6462         ip = nic_dict.get('ip', None)
6463         nicparams = self.nic_pinst[constants.DDM_ADD]
6464         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6465         instance.nics.append(new_nic)
6466         result.append(("nic.%d" % (len(instance.nics) - 1),
6467                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6468                        (new_nic.mac, new_nic.ip,
6469                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6470                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6471                        )))
6472       else:
6473         for key in 'mac', 'ip':
6474           if key in nic_dict:
6475             setattr(instance.nics[nic_op], key, nic_dict[key])
6476         if nic_op in self.nic_pnew:
6477           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6478         for key, val in nic_dict.iteritems():
6479           result.append(("nic.%s/%d" % (key, nic_op), val))
6480
6481     # hvparams changes
6482     if self.op.hvparams:
6483       instance.hvparams = self.hv_inst
6484       for key, val in self.op.hvparams.iteritems():
6485         result.append(("hv/%s" % key, val))
6486
6487     # beparams changes
6488     if self.op.beparams:
6489       instance.beparams = self.be_inst
6490       for key, val in self.op.beparams.iteritems():
6491         result.append(("be/%s" % key, val))
6492
6493     self.cfg.Update(instance)
6494
6495     return result
6496
6497
6498 class LUQueryExports(NoHooksLU):
6499   """Query the exports list
6500
6501   """
6502   _OP_REQP = ['nodes']
6503   REQ_BGL = False
6504
6505   def ExpandNames(self):
6506     self.needed_locks = {}
6507     self.share_locks[locking.LEVEL_NODE] = 1
6508     if not self.op.nodes:
6509       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6510     else:
6511       self.needed_locks[locking.LEVEL_NODE] = \
6512         _GetWantedNodes(self, self.op.nodes)
6513
6514   def CheckPrereq(self):
6515     """Check prerequisites.
6516
6517     """
6518     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6519
6520   def Exec(self, feedback_fn):
6521     """Compute the list of all the exported system images.
6522
6523     @rtype: dict
6524     @return: a dictionary with the structure node->(export-list)
6525         where export-list is a list of the instances exported on
6526         that node.
6527
6528     """
6529     rpcresult = self.rpc.call_export_list(self.nodes)
6530     result = {}
6531     for node in rpcresult:
6532       if rpcresult[node].fail_msg:
6533         result[node] = False
6534       else:
6535         result[node] = rpcresult[node].payload
6536
6537     return result
6538
6539
6540 class LUExportInstance(LogicalUnit):
6541   """Export an instance to an image in the cluster.
6542
6543   """
6544   HPATH = "instance-export"
6545   HTYPE = constants.HTYPE_INSTANCE
6546   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6547   REQ_BGL = False
6548
6549   def ExpandNames(self):
6550     self._ExpandAndLockInstance()
6551     # FIXME: lock only instance primary and destination node
6552     #
6553     # Sad but true, for now we have do lock all nodes, as we don't know where
6554     # the previous export might be, and and in this LU we search for it and
6555     # remove it from its current node. In the future we could fix this by:
6556     #  - making a tasklet to search (share-lock all), then create the new one,
6557     #    then one to remove, after
6558     #  - removing the removal operation altogether
6559     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6560
6561   def DeclareLocks(self, level):
6562     """Last minute lock declaration."""
6563     # All nodes are locked anyway, so nothing to do here.
6564
6565   def BuildHooksEnv(self):
6566     """Build hooks env.
6567
6568     This will run on the master, primary node and target node.
6569
6570     """
6571     env = {
6572       "EXPORT_NODE": self.op.target_node,
6573       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6574       }
6575     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6576     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6577           self.op.target_node]
6578     return env, nl, nl
6579
6580   def CheckPrereq(self):
6581     """Check prerequisites.
6582
6583     This checks that the instance and node names are valid.
6584
6585     """
6586     instance_name = self.op.instance_name
6587     self.instance = self.cfg.GetInstanceInfo(instance_name)
6588     assert self.instance is not None, \
6589           "Cannot retrieve locked instance %s" % self.op.instance_name
6590     _CheckNodeOnline(self, self.instance.primary_node)
6591
6592     self.dst_node = self.cfg.GetNodeInfo(
6593       self.cfg.ExpandNodeName(self.op.target_node))
6594
6595     if self.dst_node is None:
6596       # This is wrong node name, not a non-locked node
6597       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6598     _CheckNodeOnline(self, self.dst_node.name)
6599     _CheckNodeNotDrained(self, self.dst_node.name)
6600
6601     # instance disk type verification
6602     for disk in self.instance.disks:
6603       if disk.dev_type == constants.LD_FILE:
6604         raise errors.OpPrereqError("Export not supported for instances with"
6605                                    " file-based disks")
6606
6607   def Exec(self, feedback_fn):
6608     """Export an instance to an image in the cluster.
6609
6610     """
6611     instance = self.instance
6612     dst_node = self.dst_node
6613     src_node = instance.primary_node
6614     if self.op.shutdown:
6615       # shutdown the instance, but not the disks
6616       result = self.rpc.call_instance_shutdown(src_node, instance)
6617       result.Raise("Could not shutdown instance %s on"
6618                    " node %s" % (instance.name, src_node))
6619
6620     vgname = self.cfg.GetVGName()
6621
6622     snap_disks = []
6623
6624     # set the disks ID correctly since call_instance_start needs the
6625     # correct drbd minor to create the symlinks
6626     for disk in instance.disks:
6627       self.cfg.SetDiskID(disk, src_node)
6628
6629     try:
6630       for idx, disk in enumerate(instance.disks):
6631         # result.payload will be a snapshot of an lvm leaf of the one we passed
6632         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6633         msg = result.fail_msg
6634         if msg:
6635           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6636                           idx, src_node, msg)
6637           snap_disks.append(False)
6638         else:
6639           disk_id = (vgname, result.payload)
6640           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6641                                  logical_id=disk_id, physical_id=disk_id,
6642                                  iv_name=disk.iv_name)
6643           snap_disks.append(new_dev)
6644
6645     finally:
6646       if self.op.shutdown and instance.admin_up:
6647         result = self.rpc.call_instance_start(src_node, instance, None, None)
6648         msg = result.fail_msg
6649         if msg:
6650           _ShutdownInstanceDisks(self, instance)
6651           raise errors.OpExecError("Could not start instance: %s" % msg)
6652
6653     # TODO: check for size
6654
6655     cluster_name = self.cfg.GetClusterName()
6656     for idx, dev in enumerate(snap_disks):
6657       if dev:
6658         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6659                                                instance, cluster_name, idx)
6660         msg = result.fail_msg
6661         if msg:
6662           self.LogWarning("Could not export disk/%s from node %s to"
6663                           " node %s: %s", idx, src_node, dst_node.name, msg)
6664         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6665         if msg:
6666           self.LogWarning("Could not remove snapshot for disk/%d from node"
6667                           " %s: %s", idx, src_node, msg)
6668
6669     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6670     msg = result.fail_msg
6671     if msg:
6672       self.LogWarning("Could not finalize export for instance %s"
6673                       " on node %s: %s", instance.name, dst_node.name, msg)
6674
6675     nodelist = self.cfg.GetNodeList()
6676     nodelist.remove(dst_node.name)
6677
6678     # on one-node clusters nodelist will be empty after the removal
6679     # if we proceed the backup would be removed because OpQueryExports
6680     # substitutes an empty list with the full cluster node list.
6681     iname = instance.name
6682     if nodelist:
6683       exportlist = self.rpc.call_export_list(nodelist)
6684       for node in exportlist:
6685         if exportlist[node].fail_msg:
6686           continue
6687         if iname in exportlist[node].payload:
6688           msg = self.rpc.call_export_remove(node, iname).fail_msg
6689           if msg:
6690             self.LogWarning("Could not remove older export for instance %s"
6691                             " on node %s: %s", iname, node, msg)
6692
6693
6694 class LURemoveExport(NoHooksLU):
6695   """Remove exports related to the named instance.
6696
6697   """
6698   _OP_REQP = ["instance_name"]
6699   REQ_BGL = False
6700
6701   def ExpandNames(self):
6702     self.needed_locks = {}
6703     # We need all nodes to be locked in order for RemoveExport to work, but we
6704     # don't need to lock the instance itself, as nothing will happen to it (and
6705     # we can remove exports also for a removed instance)
6706     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6707
6708   def CheckPrereq(self):
6709     """Check prerequisites.
6710     """
6711     pass
6712
6713   def Exec(self, feedback_fn):
6714     """Remove any export.
6715
6716     """
6717     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6718     # If the instance was not found we'll try with the name that was passed in.
6719     # This will only work if it was an FQDN, though.
6720     fqdn_warn = False
6721     if not instance_name:
6722       fqdn_warn = True
6723       instance_name = self.op.instance_name
6724
6725     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6726     exportlist = self.rpc.call_export_list(locked_nodes)
6727     found = False
6728     for node in exportlist:
6729       msg = exportlist[node].fail_msg
6730       if msg:
6731         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6732         continue
6733       if instance_name in exportlist[node].payload:
6734         found = True
6735         result = self.rpc.call_export_remove(node, instance_name)
6736         msg = result.fail_msg
6737         if msg:
6738           logging.error("Could not remove export for instance %s"
6739                         " on node %s: %s", instance_name, node, msg)
6740
6741     if fqdn_warn and not found:
6742       feedback_fn("Export not found. If trying to remove an export belonging"
6743                   " to a deleted instance please use its Fully Qualified"
6744                   " Domain Name.")
6745
6746
6747 class TagsLU(NoHooksLU):
6748   """Generic tags LU.
6749
6750   This is an abstract class which is the parent of all the other tags LUs.
6751
6752   """
6753
6754   def ExpandNames(self):
6755     self.needed_locks = {}
6756     if self.op.kind == constants.TAG_NODE:
6757       name = self.cfg.ExpandNodeName(self.op.name)
6758       if name is None:
6759         raise errors.OpPrereqError("Invalid node name (%s)" %
6760                                    (self.op.name,))
6761       self.op.name = name
6762       self.needed_locks[locking.LEVEL_NODE] = name
6763     elif self.op.kind == constants.TAG_INSTANCE:
6764       name = self.cfg.ExpandInstanceName(self.op.name)
6765       if name is None:
6766         raise errors.OpPrereqError("Invalid instance name (%s)" %
6767                                    (self.op.name,))
6768       self.op.name = name
6769       self.needed_locks[locking.LEVEL_INSTANCE] = name
6770
6771   def CheckPrereq(self):
6772     """Check prerequisites.
6773
6774     """
6775     if self.op.kind == constants.TAG_CLUSTER:
6776       self.target = self.cfg.GetClusterInfo()
6777     elif self.op.kind == constants.TAG_NODE:
6778       self.target = self.cfg.GetNodeInfo(self.op.name)
6779     elif self.op.kind == constants.TAG_INSTANCE:
6780       self.target = self.cfg.GetInstanceInfo(self.op.name)
6781     else:
6782       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6783                                  str(self.op.kind))
6784
6785
6786 class LUGetTags(TagsLU):
6787   """Returns the tags of a given object.
6788
6789   """
6790   _OP_REQP = ["kind", "name"]
6791   REQ_BGL = False
6792
6793   def Exec(self, feedback_fn):
6794     """Returns the tag list.
6795
6796     """
6797     return list(self.target.GetTags())
6798
6799
6800 class LUSearchTags(NoHooksLU):
6801   """Searches the tags for a given pattern.
6802
6803   """
6804   _OP_REQP = ["pattern"]
6805   REQ_BGL = False
6806
6807   def ExpandNames(self):
6808     self.needed_locks = {}
6809
6810   def CheckPrereq(self):
6811     """Check prerequisites.
6812
6813     This checks the pattern passed for validity by compiling it.
6814
6815     """
6816     try:
6817       self.re = re.compile(self.op.pattern)
6818     except re.error, err:
6819       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6820                                  (self.op.pattern, err))
6821
6822   def Exec(self, feedback_fn):
6823     """Returns the tag list.
6824
6825     """
6826     cfg = self.cfg
6827     tgts = [("/cluster", cfg.GetClusterInfo())]
6828     ilist = cfg.GetAllInstancesInfo().values()
6829     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6830     nlist = cfg.GetAllNodesInfo().values()
6831     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6832     results = []
6833     for path, target in tgts:
6834       for tag in target.GetTags():
6835         if self.re.search(tag):
6836           results.append((path, tag))
6837     return results
6838
6839
6840 class LUAddTags(TagsLU):
6841   """Sets a tag on a given object.
6842
6843   """
6844   _OP_REQP = ["kind", "name", "tags"]
6845   REQ_BGL = False
6846
6847   def CheckPrereq(self):
6848     """Check prerequisites.
6849
6850     This checks the type and length of the tag name and value.
6851
6852     """
6853     TagsLU.CheckPrereq(self)
6854     for tag in self.op.tags:
6855       objects.TaggableObject.ValidateTag(tag)
6856
6857   def Exec(self, feedback_fn):
6858     """Sets the tag.
6859
6860     """
6861     try:
6862       for tag in self.op.tags:
6863         self.target.AddTag(tag)
6864     except errors.TagError, err:
6865       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6866     try:
6867       self.cfg.Update(self.target)
6868     except errors.ConfigurationError:
6869       raise errors.OpRetryError("There has been a modification to the"
6870                                 " config file and the operation has been"
6871                                 " aborted. Please retry.")
6872
6873
6874 class LUDelTags(TagsLU):
6875   """Delete a list of tags from a given object.
6876
6877   """
6878   _OP_REQP = ["kind", "name", "tags"]
6879   REQ_BGL = False
6880
6881   def CheckPrereq(self):
6882     """Check prerequisites.
6883
6884     This checks that we have the given tag.
6885
6886     """
6887     TagsLU.CheckPrereq(self)
6888     for tag in self.op.tags:
6889       objects.TaggableObject.ValidateTag(tag)
6890     del_tags = frozenset(self.op.tags)
6891     cur_tags = self.target.GetTags()
6892     if not del_tags <= cur_tags:
6893       diff_tags = del_tags - cur_tags
6894       diff_names = ["'%s'" % tag for tag in diff_tags]
6895       diff_names.sort()
6896       raise errors.OpPrereqError("Tag(s) %s not found" %
6897                                  (",".join(diff_names)))
6898
6899   def Exec(self, feedback_fn):
6900     """Remove the tag from the object.
6901
6902     """
6903     for tag in self.op.tags:
6904       self.target.RemoveTag(tag)
6905     try:
6906       self.cfg.Update(self.target)
6907     except errors.ConfigurationError:
6908       raise errors.OpRetryError("There has been a modification to the"
6909                                 " config file and the operation has been"
6910                                 " aborted. Please retry.")
6911
6912
6913 class LUTestDelay(NoHooksLU):
6914   """Sleep for a specified amount of time.
6915
6916   This LU sleeps on the master and/or nodes for a specified amount of
6917   time.
6918
6919   """
6920   _OP_REQP = ["duration", "on_master", "on_nodes"]
6921   REQ_BGL = False
6922
6923   def ExpandNames(self):
6924     """Expand names and set required locks.
6925
6926     This expands the node list, if any.
6927
6928     """
6929     self.needed_locks = {}
6930     if self.op.on_nodes:
6931       # _GetWantedNodes can be used here, but is not always appropriate to use
6932       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6933       # more information.
6934       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6935       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6936
6937   def CheckPrereq(self):
6938     """Check prerequisites.
6939
6940     """
6941
6942   def Exec(self, feedback_fn):
6943     """Do the actual sleep.
6944
6945     """
6946     if self.op.on_master:
6947       if not utils.TestDelay(self.op.duration):
6948         raise errors.OpExecError("Error during master delay test")
6949     if self.op.on_nodes:
6950       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6951       for node, node_result in result.items():
6952         node_result.Raise("Failure during rpc call to node %s" % node)
6953
6954
6955 class IAllocator(object):
6956   """IAllocator framework.
6957
6958   An IAllocator instance has three sets of attributes:
6959     - cfg that is needed to query the cluster
6960     - input data (all members of the _KEYS class attribute are required)
6961     - four buffer attributes (in|out_data|text), that represent the
6962       input (to the external script) in text and data structure format,
6963       and the output from it, again in two formats
6964     - the result variables from the script (success, info, nodes) for
6965       easy usage
6966
6967   """
6968   _ALLO_KEYS = [
6969     "mem_size", "disks", "disk_template",
6970     "os", "tags", "nics", "vcpus", "hypervisor",
6971     ]
6972   _RELO_KEYS = [
6973     "relocate_from",
6974     ]
6975
6976   def __init__(self, cfg, rpc, mode, name, **kwargs):
6977     self.cfg = cfg
6978     self.rpc = rpc
6979     # init buffer variables
6980     self.in_text = self.out_text = self.in_data = self.out_data = None
6981     # init all input fields so that pylint is happy
6982     self.mode = mode
6983     self.name = name
6984     self.mem_size = self.disks = self.disk_template = None
6985     self.os = self.tags = self.nics = self.vcpus = None
6986     self.hypervisor = None
6987     self.relocate_from = None
6988     # computed fields
6989     self.required_nodes = None
6990     # init result fields
6991     self.success = self.info = self.nodes = None
6992     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6993       keyset = self._ALLO_KEYS
6994     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6995       keyset = self._RELO_KEYS
6996     else:
6997       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6998                                    " IAllocator" % self.mode)
6999     for key in kwargs:
7000       if key not in keyset:
7001         raise errors.ProgrammerError("Invalid input parameter '%s' to"
7002                                      " IAllocator" % key)
7003       setattr(self, key, kwargs[key])
7004     for key in keyset:
7005       if key not in kwargs:
7006         raise errors.ProgrammerError("Missing input parameter '%s' to"
7007                                      " IAllocator" % key)
7008     self._BuildInputData()
7009
7010   def _ComputeClusterData(self):
7011     """Compute the generic allocator input data.
7012
7013     This is the data that is independent of the actual operation.
7014
7015     """
7016     cfg = self.cfg
7017     cluster_info = cfg.GetClusterInfo()
7018     # cluster data
7019     data = {
7020       "version": constants.IALLOCATOR_VERSION,
7021       "cluster_name": cfg.GetClusterName(),
7022       "cluster_tags": list(cluster_info.GetTags()),
7023       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7024       # we don't have job IDs
7025       }
7026     iinfo = cfg.GetAllInstancesInfo().values()
7027     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7028
7029     # node data
7030     node_results = {}
7031     node_list = cfg.GetNodeList()
7032
7033     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7034       hypervisor_name = self.hypervisor
7035     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7036       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7037
7038     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7039                                         hypervisor_name)
7040     node_iinfo = \
7041       self.rpc.call_all_instances_info(node_list,
7042                                        cluster_info.enabled_hypervisors)
7043     for nname, nresult in node_data.items():
7044       # first fill in static (config-based) values
7045       ninfo = cfg.GetNodeInfo(nname)
7046       pnr = {
7047         "tags": list(ninfo.GetTags()),
7048         "primary_ip": ninfo.primary_ip,
7049         "secondary_ip": ninfo.secondary_ip,
7050         "offline": ninfo.offline,
7051         "drained": ninfo.drained,
7052         "master_candidate": ninfo.master_candidate,
7053         }
7054
7055       if not ninfo.offline:
7056         nresult.Raise("Can't get data for node %s" % nname)
7057         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7058                                 nname)
7059         remote_info = nresult.payload
7060         for attr in ['memory_total', 'memory_free', 'memory_dom0',
7061                      'vg_size', 'vg_free', 'cpu_total']:
7062           if attr not in remote_info:
7063             raise errors.OpExecError("Node '%s' didn't return attribute"
7064                                      " '%s'" % (nname, attr))
7065           if not isinstance(remote_info[attr], int):
7066             raise errors.OpExecError("Node '%s' returned invalid value"
7067                                      " for '%s': %s" %
7068                                      (nname, attr, remote_info[attr]))
7069         # compute memory used by primary instances
7070         i_p_mem = i_p_up_mem = 0
7071         for iinfo, beinfo in i_list:
7072           if iinfo.primary_node == nname:
7073             i_p_mem += beinfo[constants.BE_MEMORY]
7074             if iinfo.name not in node_iinfo[nname].payload:
7075               i_used_mem = 0
7076             else:
7077               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7078             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7079             remote_info['memory_free'] -= max(0, i_mem_diff)
7080
7081             if iinfo.admin_up:
7082               i_p_up_mem += beinfo[constants.BE_MEMORY]
7083
7084         # compute memory used by instances
7085         pnr_dyn = {
7086           "total_memory": remote_info['memory_total'],
7087           "reserved_memory": remote_info['memory_dom0'],
7088           "free_memory": remote_info['memory_free'],
7089           "total_disk": remote_info['vg_size'],
7090           "free_disk": remote_info['vg_free'],
7091           "total_cpus": remote_info['cpu_total'],
7092           "i_pri_memory": i_p_mem,
7093           "i_pri_up_memory": i_p_up_mem,
7094           }
7095         pnr.update(pnr_dyn)
7096
7097       node_results[nname] = pnr
7098     data["nodes"] = node_results
7099
7100     # instance data
7101     instance_data = {}
7102     for iinfo, beinfo in i_list:
7103       nic_data = []
7104       for nic in iinfo.nics:
7105         filled_params = objects.FillDict(
7106             cluster_info.nicparams[constants.PP_DEFAULT],
7107             nic.nicparams)
7108         nic_dict = {"mac": nic.mac,
7109                     "ip": nic.ip,
7110                     "mode": filled_params[constants.NIC_MODE],
7111                     "link": filled_params[constants.NIC_LINK],
7112                    }
7113         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7114           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7115         nic_data.append(nic_dict)
7116       pir = {
7117         "tags": list(iinfo.GetTags()),
7118         "admin_up": iinfo.admin_up,
7119         "vcpus": beinfo[constants.BE_VCPUS],
7120         "memory": beinfo[constants.BE_MEMORY],
7121         "os": iinfo.os,
7122         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7123         "nics": nic_data,
7124         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7125         "disk_template": iinfo.disk_template,
7126         "hypervisor": iinfo.hypervisor,
7127         }
7128       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7129                                                  pir["disks"])
7130       instance_data[iinfo.name] = pir
7131
7132     data["instances"] = instance_data
7133
7134     self.in_data = data
7135
7136   def _AddNewInstance(self):
7137     """Add new instance data to allocator structure.
7138
7139     This in combination with _AllocatorGetClusterData will create the
7140     correct structure needed as input for the allocator.
7141
7142     The checks for the completeness of the opcode must have already been
7143     done.
7144
7145     """
7146     data = self.in_data
7147
7148     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7149
7150     if self.disk_template in constants.DTS_NET_MIRROR:
7151       self.required_nodes = 2
7152     else:
7153       self.required_nodes = 1
7154     request = {
7155       "type": "allocate",
7156       "name": self.name,
7157       "disk_template": self.disk_template,
7158       "tags": self.tags,
7159       "os": self.os,
7160       "vcpus": self.vcpus,
7161       "memory": self.mem_size,
7162       "disks": self.disks,
7163       "disk_space_total": disk_space,
7164       "nics": self.nics,
7165       "required_nodes": self.required_nodes,
7166       }
7167     data["request"] = request
7168
7169   def _AddRelocateInstance(self):
7170     """Add relocate instance data to allocator structure.
7171
7172     This in combination with _IAllocatorGetClusterData will create the
7173     correct structure needed as input for the allocator.
7174
7175     The checks for the completeness of the opcode must have already been
7176     done.
7177
7178     """
7179     instance = self.cfg.GetInstanceInfo(self.name)
7180     if instance is None:
7181       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7182                                    " IAllocator" % self.name)
7183
7184     if instance.disk_template not in constants.DTS_NET_MIRROR:
7185       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7186
7187     if len(instance.secondary_nodes) != 1:
7188       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7189
7190     self.required_nodes = 1
7191     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7192     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7193
7194     request = {
7195       "type": "relocate",
7196       "name": self.name,
7197       "disk_space_total": disk_space,
7198       "required_nodes": self.required_nodes,
7199       "relocate_from": self.relocate_from,
7200       }
7201     self.in_data["request"] = request
7202
7203   def _BuildInputData(self):
7204     """Build input data structures.
7205
7206     """
7207     self._ComputeClusterData()
7208
7209     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7210       self._AddNewInstance()
7211     else:
7212       self._AddRelocateInstance()
7213
7214     self.in_text = serializer.Dump(self.in_data)
7215
7216   def Run(self, name, validate=True, call_fn=None):
7217     """Run an instance allocator and return the results.
7218
7219     """
7220     if call_fn is None:
7221       call_fn = self.rpc.call_iallocator_runner
7222
7223     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7224     result.Raise("Failure while running the iallocator script")
7225
7226     self.out_text = result.payload
7227     if validate:
7228       self._ValidateResult()
7229
7230   def _ValidateResult(self):
7231     """Process the allocator results.
7232
7233     This will process and if successful save the result in
7234     self.out_data and the other parameters.
7235
7236     """
7237     try:
7238       rdict = serializer.Load(self.out_text)
7239     except Exception, err:
7240       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7241
7242     if not isinstance(rdict, dict):
7243       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7244
7245     for key in "success", "info", "nodes":
7246       if key not in rdict:
7247         raise errors.OpExecError("Can't parse iallocator results:"
7248                                  " missing key '%s'" % key)
7249       setattr(self, key, rdict[key])
7250
7251     if not isinstance(rdict["nodes"], list):
7252       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7253                                " is not a list")
7254     self.out_data = rdict
7255
7256
7257 class LUTestAllocator(NoHooksLU):
7258   """Run allocator tests.
7259
7260   This LU runs the allocator tests
7261
7262   """
7263   _OP_REQP = ["direction", "mode", "name"]
7264
7265   def CheckPrereq(self):
7266     """Check prerequisites.
7267
7268     This checks the opcode parameters depending on the director and mode test.
7269
7270     """
7271     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7272       for attr in ["name", "mem_size", "disks", "disk_template",
7273                    "os", "tags", "nics", "vcpus"]:
7274         if not hasattr(self.op, attr):
7275           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7276                                      attr)
7277       iname = self.cfg.ExpandInstanceName(self.op.name)
7278       if iname is not None:
7279         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7280                                    iname)
7281       if not isinstance(self.op.nics, list):
7282         raise errors.OpPrereqError("Invalid parameter 'nics'")
7283       for row in self.op.nics:
7284         if (not isinstance(row, dict) or
7285             "mac" not in row or
7286             "ip" not in row or
7287             "bridge" not in row):
7288           raise errors.OpPrereqError("Invalid contents of the"
7289                                      " 'nics' parameter")
7290       if not isinstance(self.op.disks, list):
7291         raise errors.OpPrereqError("Invalid parameter 'disks'")
7292       for row in self.op.disks:
7293         if (not isinstance(row, dict) or
7294             "size" not in row or
7295             not isinstance(row["size"], int) or
7296             "mode" not in row or
7297             row["mode"] not in ['r', 'w']):
7298           raise errors.OpPrereqError("Invalid contents of the"
7299                                      " 'disks' parameter")
7300       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7301         self.op.hypervisor = self.cfg.GetHypervisorType()
7302     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7303       if not hasattr(self.op, "name"):
7304         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7305       fname = self.cfg.ExpandInstanceName(self.op.name)
7306       if fname is None:
7307         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7308                                    self.op.name)
7309       self.op.name = fname
7310       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7311     else:
7312       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7313                                  self.op.mode)
7314
7315     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7316       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7317         raise errors.OpPrereqError("Missing allocator name")
7318     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7319       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7320                                  self.op.direction)
7321
7322   def Exec(self, feedback_fn):
7323     """Run the allocator test.
7324
7325     """
7326     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7327       ial = IAllocator(self.cfg, self.rpc,
7328                        mode=self.op.mode,
7329                        name=self.op.name,
7330                        mem_size=self.op.mem_size,
7331                        disks=self.op.disks,
7332                        disk_template=self.op.disk_template,
7333                        os=self.op.os,
7334                        tags=self.op.tags,
7335                        nics=self.op.nics,
7336                        vcpus=self.op.vcpus,
7337                        hypervisor=self.op.hypervisor,
7338                        )
7339     else:
7340       ial = IAllocator(self.cfg, self.rpc,
7341                        mode=self.op.mode,
7342                        name=self.op.name,
7343                        relocate_from=list(self.relocate_from),
7344                        )
7345
7346     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7347       result = ial.in_text
7348     else:
7349       ial.Run(self.op.allocator, validate=False)
7350       result = ial.out_text
7351     return result