RAPI: fixes related to write mode
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @rtype: dict
480   @return: the hook environment for this instance
481
482   """
483   if status:
484     str_status = "up"
485   else:
486     str_status = "down"
487   env = {
488     "OP_TARGET": name,
489     "INSTANCE_NAME": name,
490     "INSTANCE_PRIMARY": primary_node,
491     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492     "INSTANCE_OS_TYPE": os_type,
493     "INSTANCE_STATUS": str_status,
494     "INSTANCE_MEMORY": memory,
495     "INSTANCE_VCPUS": vcpus,
496   }
497
498   if nics:
499     nic_count = len(nics)
500     for idx, (ip, bridge, mac) in enumerate(nics):
501       if ip is None:
502         ip = ""
503       env["INSTANCE_NIC%d_IP" % idx] = ip
504       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506   else:
507     nic_count = 0
508
509   env["INSTANCE_NIC_COUNT"] = nic_count
510
511   return env
512
513
514 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515   """Builds instance related env variables for hooks from an object.
516
517   @type lu: L{LogicalUnit}
518   @param lu: the logical unit on whose behalf we execute
519   @type instance: L{objects.Instance}
520   @param instance: the instance for which we should build the
521       environment
522   @type override: dict
523   @param override: dictionary with key/values that will override
524       our values
525   @rtype: dict
526   @return: the hook environment dictionary
527
528   """
529   bep = lu.cfg.GetClusterInfo().FillBE(instance)
530   args = {
531     'name': instance.name,
532     'primary_node': instance.primary_node,
533     'secondary_nodes': instance.secondary_nodes,
534     'os_type': instance.os,
535     'status': instance.admin_up,
536     'memory': bep[constants.BE_MEMORY],
537     'vcpus': bep[constants.BE_VCPUS],
538     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539   }
540   if override:
541     args.update(override)
542   return _BuildInstanceHookEnv(**args)
543
544
545 def _AdjustCandidatePool(lu):
546   """Adjust the candidate pool after node operations.
547
548   """
549   mod_list = lu.cfg.MaintainCandidatePool()
550   if mod_list:
551     lu.LogInfo("Promoted nodes to master candidate role: %s",
552                ", ".join(node.name for node in mod_list))
553     for name in mod_list:
554       lu.context.ReaddNode(name)
555   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556   if mc_now > mc_max:
557     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558                (mc_now, mc_max))
559
560
561 def _CheckInstanceBridgesExist(lu, instance):
562   """Check that the brigdes needed by an instance exist.
563
564   """
565   # check bridges existance
566   brlist = [nic.bridge for nic in instance.nics]
567   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568   result.Raise()
569   if not result.data:
570     raise errors.OpPrereqError("One or more target bridges %s does not"
571                                " exist on destination node '%s'" %
572                                (brlist, instance.primary_node))
573
574
575 class LUDestroyCluster(NoHooksLU):
576   """Logical unit for destroying the cluster.
577
578   """
579   _OP_REQP = []
580
581   def CheckPrereq(self):
582     """Check prerequisites.
583
584     This checks whether the cluster is empty.
585
586     Any errors are signalled by raising errors.OpPrereqError.
587
588     """
589     master = self.cfg.GetMasterNode()
590
591     nodelist = self.cfg.GetNodeList()
592     if len(nodelist) != 1 or nodelist[0] != master:
593       raise errors.OpPrereqError("There are still %d node(s) in"
594                                  " this cluster." % (len(nodelist) - 1))
595     instancelist = self.cfg.GetInstanceList()
596     if instancelist:
597       raise errors.OpPrereqError("There are still %d instance(s) in"
598                                  " this cluster." % len(instancelist))
599
600   def Exec(self, feedback_fn):
601     """Destroys the cluster.
602
603     """
604     master = self.cfg.GetMasterNode()
605     result = self.rpc.call_node_stop_master(master, False)
606     result.Raise()
607     if not result.data:
608       raise errors.OpExecError("Could not disable the master role")
609     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610     utils.CreateBackup(priv_key)
611     utils.CreateBackup(pub_key)
612     return master
613
614
615 class LUVerifyCluster(LogicalUnit):
616   """Verifies the cluster status.
617
618   """
619   HPATH = "cluster-verify"
620   HTYPE = constants.HTYPE_CLUSTER
621   _OP_REQP = ["skip_checks"]
622   REQ_BGL = False
623
624   def ExpandNames(self):
625     self.needed_locks = {
626       locking.LEVEL_NODE: locking.ALL_SET,
627       locking.LEVEL_INSTANCE: locking.ALL_SET,
628     }
629     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630
631   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632                   node_result, feedback_fn, master_files,
633                   drbd_map):
634     """Run multiple tests against a node.
635
636     Test list:
637
638       - compares ganeti version
639       - checks vg existance and size > 20G
640       - checks config file checksum
641       - checks ssh to other nodes
642
643     @type nodeinfo: L{objects.Node}
644     @param nodeinfo: the node to check
645     @param file_list: required list of files
646     @param local_cksum: dictionary of local files and their checksums
647     @param node_result: the results from the node
648     @param feedback_fn: function used to accumulate results
649     @param master_files: list of files that only masters should have
650     @param drbd_map: the useddrbd minors for this node, in
651         form of minor: (instance, must_exist) which correspond to instances
652         and their running status
653
654     """
655     node = nodeinfo.name
656
657     # main result, node_result should be a non-empty dict
658     if not node_result or not isinstance(node_result, dict):
659       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660       return True
661
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     remote_version = node_result.get('version', None)
665     if not (remote_version and isinstance(remote_version, (list, tuple)) and
666             len(remote_version) == 2):
667       feedback_fn("  - ERROR: connection to %s failed" % (node))
668       return True
669
670     if local_version != remote_version[0]:
671       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672                   " node %s %s" % (local_version, node, remote_version[0]))
673       return True
674
675     # node seems compatible, we can actually try to look into its results
676
677     bad = False
678
679     # full package version
680     if constants.RELEASE_VERSION != remote_version[1]:
681       feedback_fn("  - WARNING: software version mismatch: master %s,"
682                   " node %s %s" %
683                   (constants.RELEASE_VERSION, node, remote_version[1]))
684
685     # checks vg existence and size > 20G
686
687     vglist = node_result.get(constants.NV_VGLIST, None)
688     if not vglist:
689       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690                       (node,))
691       bad = True
692     else:
693       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694                                             constants.MIN_VG_SIZE)
695       if vgstatus:
696         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697         bad = True
698
699     # checks config file checksum
700
701     remote_cksum = node_result.get(constants.NV_FILELIST, None)
702     if not isinstance(remote_cksum, dict):
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned file checksum data")
705     else:
706       for file_name in file_list:
707         node_is_mc = nodeinfo.master_candidate
708         must_have_file = file_name not in master_files
709         if file_name not in remote_cksum:
710           if node_is_mc or must_have_file:
711             bad = True
712             feedback_fn("  - ERROR: file '%s' missing" % file_name)
713         elif remote_cksum[file_name] != local_cksum[file_name]:
714           if node_is_mc or must_have_file:
715             bad = True
716             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717           else:
718             # not candidate and this is not a must-have file
719             bad = True
720             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721                         " '%s'" % file_name)
722         else:
723           # all good, except non-master/non-must have combination
724           if not node_is_mc and not must_have_file:
725             feedback_fn("  - ERROR: file '%s' should not exist on non master"
726                         " candidates" % file_name)
727
728     # checks ssh to any
729
730     if constants.NV_NODELIST not in node_result:
731       bad = True
732       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733     else:
734       if node_result[constants.NV_NODELIST]:
735         bad = True
736         for node in node_result[constants.NV_NODELIST]:
737           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738                           (node, node_result[constants.NV_NODELIST][node]))
739
740     if constants.NV_NODENETTEST not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743     else:
744       if node_result[constants.NV_NODENETTEST]:
745         bad = True
746         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747         for node in nlist:
748           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749                           (node, node_result[constants.NV_NODENETTEST][node]))
750
751     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752     if isinstance(hyp_result, dict):
753       for hv_name, hv_result in hyp_result.iteritems():
754         if hv_result is not None:
755           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756                       (hv_name, hv_result))
757
758     # check used drbd list
759     used_minors = node_result.get(constants.NV_DRBDLIST, [])
760     for minor, (iname, must_exist) in drbd_map.items():
761       if minor not in used_minors and must_exist:
762         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
763                     (minor, iname))
764         bad = True
765     for minor in used_minors:
766       if minor not in drbd_map:
767         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
768         bad = True
769
770     return bad
771
772   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
773                       node_instance, feedback_fn, n_offline):
774     """Verify an instance.
775
776     This function checks to see if the required block devices are
777     available on the instance's node.
778
779     """
780     bad = False
781
782     node_current = instanceconfig.primary_node
783
784     node_vol_should = {}
785     instanceconfig.MapLVsByNode(node_vol_should)
786
787     for node in node_vol_should:
788       if node in n_offline:
789         # ignore missing volumes on offline nodes
790         continue
791       for volume in node_vol_should[node]:
792         if node not in node_vol_is or volume not in node_vol_is[node]:
793           feedback_fn("  - ERROR: volume %s missing on node %s" %
794                           (volume, node))
795           bad = True
796
797     if instanceconfig.admin_up:
798       if ((node_current not in node_instance or
799           not instance in node_instance[node_current]) and
800           node_current not in n_offline):
801         feedback_fn("  - ERROR: instance %s not running on node %s" %
802                         (instance, node_current))
803         bad = True
804
805     for node in node_instance:
806       if (not node == node_current):
807         if instance in node_instance[node]:
808           feedback_fn("  - ERROR: instance %s should not run on node %s" %
809                           (instance, node))
810           bad = True
811
812     return bad
813
814   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
815     """Verify if there are any unknown volumes in the cluster.
816
817     The .os, .swap and backup volumes are ignored. All other volumes are
818     reported as unknown.
819
820     """
821     bad = False
822
823     for node in node_vol_is:
824       for volume in node_vol_is[node]:
825         if node not in node_vol_should or volume not in node_vol_should[node]:
826           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
827                       (volume, node))
828           bad = True
829     return bad
830
831   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
832     """Verify the list of running instances.
833
834     This checks what instances are running but unknown to the cluster.
835
836     """
837     bad = False
838     for node in node_instance:
839       for runninginstance in node_instance[node]:
840         if runninginstance not in instancelist:
841           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
842                           (runninginstance, node))
843           bad = True
844     return bad
845
846   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
847     """Verify N+1 Memory Resilience.
848
849     Check that if one single node dies we can still start all the instances it
850     was primary for.
851
852     """
853     bad = False
854
855     for node, nodeinfo in node_info.iteritems():
856       # This code checks that every node which is now listed as secondary has
857       # enough memory to host all instances it is supposed to should a single
858       # other node in the cluster fail.
859       # FIXME: not ready for failover to an arbitrary node
860       # FIXME: does not support file-backed instances
861       # WARNING: we currently take into account down instances as well as up
862       # ones, considering that even if they're down someone might want to start
863       # them even in the event of a node failure.
864       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
865         needed_mem = 0
866         for instance in instances:
867           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
868           if bep[constants.BE_AUTO_BALANCE]:
869             needed_mem += bep[constants.BE_MEMORY]
870         if nodeinfo['mfree'] < needed_mem:
871           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
872                       " failovers should node %s fail" % (node, prinode))
873           bad = True
874     return bad
875
876   def CheckPrereq(self):
877     """Check prerequisites.
878
879     Transform the list of checks we're going to skip into a set and check that
880     all its members are valid.
881
882     """
883     self.skip_set = frozenset(self.op.skip_checks)
884     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
885       raise errors.OpPrereqError("Invalid checks to be skipped specified")
886
887   def BuildHooksEnv(self):
888     """Build hooks env.
889
890     Cluster-Verify hooks just rone in the post phase and their failure makes
891     the output be logged in the verify output and the verification to fail.
892
893     """
894     all_nodes = self.cfg.GetNodeList()
895     # TODO: populate the environment with useful information for verify hooks
896     env = {}
897     return env, [], all_nodes
898
899   def Exec(self, feedback_fn):
900     """Verify integrity of cluster, performing various test on nodes.
901
902     """
903     bad = False
904     feedback_fn("* Verifying global settings")
905     for msg in self.cfg.VerifyConfig():
906       feedback_fn("  - ERROR: %s" % msg)
907
908     vg_name = self.cfg.GetVGName()
909     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
910     nodelist = utils.NiceSort(self.cfg.GetNodeList())
911     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
912     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
913     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
914                         for iname in instancelist)
915     i_non_redundant = [] # Non redundant instances
916     i_non_a_balanced = [] # Non auto-balanced instances
917     n_offline = [] # List of offline nodes
918     n_drained = [] # List of nodes being drained
919     node_volume = {}
920     node_instance = {}
921     node_info = {}
922     instance_cfg = {}
923
924     # FIXME: verify OS list
925     # do local checksums
926     master_files = [constants.CLUSTER_CONF_FILE]
927
928     file_names = ssconf.SimpleStore().GetFileList()
929     file_names.append(constants.SSL_CERT_FILE)
930     file_names.append(constants.RAPI_CERT_FILE)
931     file_names.extend(master_files)
932
933     local_checksums = utils.FingerprintFiles(file_names)
934
935     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
936     node_verify_param = {
937       constants.NV_FILELIST: file_names,
938       constants.NV_NODELIST: [node.name for node in nodeinfo
939                               if not node.offline],
940       constants.NV_HYPERVISOR: hypervisors,
941       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
942                                   node.secondary_ip) for node in nodeinfo
943                                  if not node.offline],
944       constants.NV_LVLIST: vg_name,
945       constants.NV_INSTANCELIST: hypervisors,
946       constants.NV_VGLIST: None,
947       constants.NV_VERSION: None,
948       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
949       constants.NV_DRBDLIST: None,
950       }
951     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
952                                            self.cfg.GetClusterName())
953
954     cluster = self.cfg.GetClusterInfo()
955     master_node = self.cfg.GetMasterNode()
956     all_drbd_map = self.cfg.ComputeDRBDMap()
957
958     for node_i in nodeinfo:
959       node = node_i.name
960       nresult = all_nvinfo[node].data
961
962       if node_i.offline:
963         feedback_fn("* Skipping offline node %s" % (node,))
964         n_offline.append(node)
965         continue
966
967       if node == master_node:
968         ntype = "master"
969       elif node_i.master_candidate:
970         ntype = "master candidate"
971       elif node_i.drained:
972         ntype = "drained"
973         n_drained.append(node)
974       else:
975         ntype = "regular"
976       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
977
978       if all_nvinfo[node].failed or not isinstance(nresult, dict):
979         feedback_fn("  - ERROR: connection to %s failed" % (node,))
980         bad = True
981         continue
982
983       node_drbd = {}
984       for minor, instance in all_drbd_map[node].items():
985         instance = instanceinfo[instance]
986         node_drbd[minor] = (instance.name, instance.admin_up)
987       result = self._VerifyNode(node_i, file_names, local_checksums,
988                                 nresult, feedback_fn, master_files,
989                                 node_drbd)
990       bad = bad or result
991
992       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
993       if isinstance(lvdata, basestring):
994         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
995                     (node, utils.SafeEncode(lvdata)))
996         bad = True
997         node_volume[node] = {}
998       elif not isinstance(lvdata, dict):
999         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1000         bad = True
1001         continue
1002       else:
1003         node_volume[node] = lvdata
1004
1005       # node_instance
1006       idata = nresult.get(constants.NV_INSTANCELIST, None)
1007       if not isinstance(idata, list):
1008         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1009                     (node,))
1010         bad = True
1011         continue
1012
1013       node_instance[node] = idata
1014
1015       # node_info
1016       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1017       if not isinstance(nodeinfo, dict):
1018         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1019         bad = True
1020         continue
1021
1022       try:
1023         node_info[node] = {
1024           "mfree": int(nodeinfo['memory_free']),
1025           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1026           "pinst": [],
1027           "sinst": [],
1028           # dictionary holding all instances this node is secondary for,
1029           # grouped by their primary node. Each key is a cluster node, and each
1030           # value is a list of instances which have the key as primary and the
1031           # current node as secondary.  this is handy to calculate N+1 memory
1032           # availability if you can only failover from a primary to its
1033           # secondary.
1034           "sinst-by-pnode": {},
1035         }
1036       except ValueError:
1037         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1038         bad = True
1039         continue
1040
1041     node_vol_should = {}
1042
1043     for instance in instancelist:
1044       feedback_fn("* Verifying instance %s" % instance)
1045       inst_config = instanceinfo[instance]
1046       result =  self._VerifyInstance(instance, inst_config, node_volume,
1047                                      node_instance, feedback_fn, n_offline)
1048       bad = bad or result
1049       inst_nodes_offline = []
1050
1051       inst_config.MapLVsByNode(node_vol_should)
1052
1053       instance_cfg[instance] = inst_config
1054
1055       pnode = inst_config.primary_node
1056       if pnode in node_info:
1057         node_info[pnode]['pinst'].append(instance)
1058       elif pnode not in n_offline:
1059         feedback_fn("  - ERROR: instance %s, connection to primary node"
1060                     " %s failed" % (instance, pnode))
1061         bad = True
1062
1063       if pnode in n_offline:
1064         inst_nodes_offline.append(pnode)
1065
1066       # If the instance is non-redundant we cannot survive losing its primary
1067       # node, so we are not N+1 compliant. On the other hand we have no disk
1068       # templates with more than one secondary so that situation is not well
1069       # supported either.
1070       # FIXME: does not support file-backed instances
1071       if len(inst_config.secondary_nodes) == 0:
1072         i_non_redundant.append(instance)
1073       elif len(inst_config.secondary_nodes) > 1:
1074         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1075                     % instance)
1076
1077       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1078         i_non_a_balanced.append(instance)
1079
1080       for snode in inst_config.secondary_nodes:
1081         if snode in node_info:
1082           node_info[snode]['sinst'].append(instance)
1083           if pnode not in node_info[snode]['sinst-by-pnode']:
1084             node_info[snode]['sinst-by-pnode'][pnode] = []
1085           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1086         elif snode not in n_offline:
1087           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1088                       " %s failed" % (instance, snode))
1089           bad = True
1090         if snode in n_offline:
1091           inst_nodes_offline.append(snode)
1092
1093       if inst_nodes_offline:
1094         # warn that the instance lives on offline nodes, and set bad=True
1095         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1096                     ", ".join(inst_nodes_offline))
1097         bad = True
1098
1099     feedback_fn("* Verifying orphan volumes")
1100     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1101                                        feedback_fn)
1102     bad = bad or result
1103
1104     feedback_fn("* Verifying remaining instances")
1105     result = self._VerifyOrphanInstances(instancelist, node_instance,
1106                                          feedback_fn)
1107     bad = bad or result
1108
1109     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1110       feedback_fn("* Verifying N+1 Memory redundancy")
1111       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1112       bad = bad or result
1113
1114     feedback_fn("* Other Notes")
1115     if i_non_redundant:
1116       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1117                   % len(i_non_redundant))
1118
1119     if i_non_a_balanced:
1120       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1121                   % len(i_non_a_balanced))
1122
1123     if n_offline:
1124       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1125
1126     if n_drained:
1127       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1128
1129     return not bad
1130
1131   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1132     """Analize the post-hooks' result
1133
1134     This method analyses the hook result, handles it, and sends some
1135     nicely-formatted feedback back to the user.
1136
1137     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1138         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1139     @param hooks_results: the results of the multi-node hooks rpc call
1140     @param feedback_fn: function used send feedback back to the caller
1141     @param lu_result: previous Exec result
1142     @return: the new Exec result, based on the previous result
1143         and hook results
1144
1145     """
1146     # We only really run POST phase hooks, and are only interested in
1147     # their results
1148     if phase == constants.HOOKS_PHASE_POST:
1149       # Used to change hooks' output to proper indentation
1150       indent_re = re.compile('^', re.M)
1151       feedback_fn("* Hooks Results")
1152       if not hooks_results:
1153         feedback_fn("  - ERROR: general communication failure")
1154         lu_result = 1
1155       else:
1156         for node_name in hooks_results:
1157           show_node_header = True
1158           res = hooks_results[node_name]
1159           if res.failed or res.data is False or not isinstance(res.data, list):
1160             if res.offline:
1161               # no need to warn or set fail return value
1162               continue
1163             feedback_fn("    Communication failure in hooks execution")
1164             lu_result = 1
1165             continue
1166           for script, hkr, output in res.data:
1167             if hkr == constants.HKR_FAIL:
1168               # The node header is only shown once, if there are
1169               # failing hooks on that node
1170               if show_node_header:
1171                 feedback_fn("  Node %s:" % node_name)
1172                 show_node_header = False
1173               feedback_fn("    ERROR: Script %s failed, output:" % script)
1174               output = indent_re.sub('      ', output)
1175               feedback_fn("%s" % output)
1176               lu_result = 1
1177
1178       return lu_result
1179
1180
1181 class LUVerifyDisks(NoHooksLU):
1182   """Verifies the cluster disks status.
1183
1184   """
1185   _OP_REQP = []
1186   REQ_BGL = False
1187
1188   def ExpandNames(self):
1189     self.needed_locks = {
1190       locking.LEVEL_NODE: locking.ALL_SET,
1191       locking.LEVEL_INSTANCE: locking.ALL_SET,
1192     }
1193     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This has no prerequisites.
1199
1200     """
1201     pass
1202
1203   def Exec(self, feedback_fn):
1204     """Verify integrity of cluster disks.
1205
1206     """
1207     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1208
1209     vg_name = self.cfg.GetVGName()
1210     nodes = utils.NiceSort(self.cfg.GetNodeList())
1211     instances = [self.cfg.GetInstanceInfo(name)
1212                  for name in self.cfg.GetInstanceList()]
1213
1214     nv_dict = {}
1215     for inst in instances:
1216       inst_lvs = {}
1217       if (not inst.admin_up or
1218           inst.disk_template not in constants.DTS_NET_MIRROR):
1219         continue
1220       inst.MapLVsByNode(inst_lvs)
1221       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1222       for node, vol_list in inst_lvs.iteritems():
1223         for vol in vol_list:
1224           nv_dict[(node, vol)] = inst
1225
1226     if not nv_dict:
1227       return result
1228
1229     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1230
1231     to_act = set()
1232     for node in nodes:
1233       # node_volume
1234       lvs = node_lvs[node]
1235       if lvs.failed:
1236         if not lvs.offline:
1237           self.LogWarning("Connection to node %s failed: %s" %
1238                           (node, lvs.data))
1239         continue
1240       lvs = lvs.data
1241       if isinstance(lvs, basestring):
1242         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1243         res_nlvm[node] = lvs
1244       elif not isinstance(lvs, dict):
1245         logging.warning("Connection to node %s failed or invalid data"
1246                         " returned", node)
1247         res_nodes.append(node)
1248         continue
1249
1250       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1251         inst = nv_dict.pop((node, lv_name), None)
1252         if (not lv_online and inst is not None
1253             and inst.name not in res_instances):
1254           res_instances.append(inst.name)
1255
1256     # any leftover items in nv_dict are missing LVs, let's arrange the
1257     # data better
1258     for key, inst in nv_dict.iteritems():
1259       if inst.name not in res_missing:
1260         res_missing[inst.name] = []
1261       res_missing[inst.name].append(key)
1262
1263     return result
1264
1265
1266 class LURenameCluster(LogicalUnit):
1267   """Rename the cluster.
1268
1269   """
1270   HPATH = "cluster-rename"
1271   HTYPE = constants.HTYPE_CLUSTER
1272   _OP_REQP = ["name"]
1273
1274   def BuildHooksEnv(self):
1275     """Build hooks env.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.cfg.GetClusterName(),
1280       "NEW_NAME": self.op.name,
1281       }
1282     mn = self.cfg.GetMasterNode()
1283     return env, [mn], [mn]
1284
1285   def CheckPrereq(self):
1286     """Verify that the passed name is a valid one.
1287
1288     """
1289     hostname = utils.HostInfo(self.op.name)
1290
1291     new_name = hostname.name
1292     self.ip = new_ip = hostname.ip
1293     old_name = self.cfg.GetClusterName()
1294     old_ip = self.cfg.GetMasterIP()
1295     if new_name == old_name and new_ip == old_ip:
1296       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1297                                  " cluster has changed")
1298     if new_ip != old_ip:
1299       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1300         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1301                                    " reachable on the network. Aborting." %
1302                                    new_ip)
1303
1304     self.op.name = new_name
1305
1306   def Exec(self, feedback_fn):
1307     """Rename the cluster.
1308
1309     """
1310     clustername = self.op.name
1311     ip = self.ip
1312
1313     # shutdown the master IP
1314     master = self.cfg.GetMasterNode()
1315     result = self.rpc.call_node_stop_master(master, False)
1316     if result.failed or not result.data:
1317       raise errors.OpExecError("Could not disable the master role")
1318
1319     try:
1320       cluster = self.cfg.GetClusterInfo()
1321       cluster.cluster_name = clustername
1322       cluster.master_ip = ip
1323       self.cfg.Update(cluster)
1324
1325       # update the known hosts file
1326       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1327       node_list = self.cfg.GetNodeList()
1328       try:
1329         node_list.remove(master)
1330       except ValueError:
1331         pass
1332       result = self.rpc.call_upload_file(node_list,
1333                                          constants.SSH_KNOWN_HOSTS_FILE)
1334       for to_node, to_result in result.iteritems():
1335         if to_result.failed or not to_result.data:
1336           logging.error("Copy of file %s to node %s failed",
1337                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1338
1339     finally:
1340       result = self.rpc.call_node_start_master(master, False)
1341       if result.failed or not result.data:
1342         self.LogWarning("Could not re-enable the master role on"
1343                         " the master, please restart manually.")
1344
1345
1346 def _RecursiveCheckIfLVMBased(disk):
1347   """Check if the given disk or its children are lvm-based.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to check
1351   @rtype: booleean
1352   @return: boolean indicating whether a LD_LV dev_type was found or not
1353
1354   """
1355   if disk.children:
1356     for chdisk in disk.children:
1357       if _RecursiveCheckIfLVMBased(chdisk):
1358         return True
1359   return disk.dev_type == constants.LD_LV
1360
1361
1362 class LUSetClusterParams(LogicalUnit):
1363   """Change the parameters of the cluster.
1364
1365   """
1366   HPATH = "cluster-modify"
1367   HTYPE = constants.HTYPE_CLUSTER
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def CheckParameters(self):
1372     """Check parameters
1373
1374     """
1375     if not hasattr(self.op, "candidate_pool_size"):
1376       self.op.candidate_pool_size = None
1377     if self.op.candidate_pool_size is not None:
1378       try:
1379         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1380       except ValueError, err:
1381         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1382                                    str(err))
1383       if self.op.candidate_pool_size < 1:
1384         raise errors.OpPrereqError("At least one master candidate needed")
1385
1386   def ExpandNames(self):
1387     # FIXME: in the future maybe other cluster params won't require checking on
1388     # all nodes to be modified.
1389     self.needed_locks = {
1390       locking.LEVEL_NODE: locking.ALL_SET,
1391     }
1392     self.share_locks[locking.LEVEL_NODE] = 1
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     """
1398     env = {
1399       "OP_TARGET": self.cfg.GetClusterName(),
1400       "NEW_VG_NAME": self.op.vg_name,
1401       }
1402     mn = self.cfg.GetMasterNode()
1403     return env, [mn], [mn]
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     This checks whether the given params don't conflict and
1409     if the given volume group is valid.
1410
1411     """
1412     if self.op.vg_name is not None and not self.op.vg_name:
1413       instances = self.cfg.GetAllInstancesInfo().values()
1414       for inst in instances:
1415         for disk in inst.disks:
1416           if _RecursiveCheckIfLVMBased(disk):
1417             raise errors.OpPrereqError("Cannot disable lvm storage while"
1418                                        " lvm-based instances exist")
1419
1420     node_list = self.acquired_locks[locking.LEVEL_NODE]
1421
1422     # if vg_name not None, checks given volume group on all nodes
1423     if self.op.vg_name:
1424       vglist = self.rpc.call_vg_list(node_list)
1425       for node in node_list:
1426         if vglist[node].failed:
1427           # ignoring down node
1428           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1429           continue
1430         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1431                                               self.op.vg_name,
1432                                               constants.MIN_VG_SIZE)
1433         if vgstatus:
1434           raise errors.OpPrereqError("Error on node '%s': %s" %
1435                                      (node, vgstatus))
1436
1437     self.cluster = cluster = self.cfg.GetClusterInfo()
1438     # validate beparams changes
1439     if self.op.beparams:
1440       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1441       self.new_beparams = cluster.FillDict(
1442         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1443
1444     # hypervisor list/parameters
1445     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1446     if self.op.hvparams:
1447       if not isinstance(self.op.hvparams, dict):
1448         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1449       for hv_name, hv_dict in self.op.hvparams.items():
1450         if hv_name not in self.new_hvparams:
1451           self.new_hvparams[hv_name] = hv_dict
1452         else:
1453           self.new_hvparams[hv_name].update(hv_dict)
1454
1455     if self.op.enabled_hypervisors is not None:
1456       self.hv_list = self.op.enabled_hypervisors
1457     else:
1458       self.hv_list = cluster.enabled_hypervisors
1459
1460     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1461       # either the enabled list has changed, or the parameters have, validate
1462       for hv_name, hv_params in self.new_hvparams.items():
1463         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1464             (self.op.enabled_hypervisors and
1465              hv_name in self.op.enabled_hypervisors)):
1466           # either this is a new hypervisor, or its parameters have changed
1467           hv_class = hypervisor.GetHypervisor(hv_name)
1468           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1469           hv_class.CheckParameterSyntax(hv_params)
1470           _CheckHVParams(self, node_list, hv_name, hv_params)
1471
1472   def Exec(self, feedback_fn):
1473     """Change the parameters of the cluster.
1474
1475     """
1476     if self.op.vg_name is not None:
1477       if self.op.vg_name != self.cfg.GetVGName():
1478         self.cfg.SetVGName(self.op.vg_name)
1479       else:
1480         feedback_fn("Cluster LVM configuration already in desired"
1481                     " state, not changing")
1482     if self.op.hvparams:
1483       self.cluster.hvparams = self.new_hvparams
1484     if self.op.enabled_hypervisors is not None:
1485       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1486     if self.op.beparams:
1487       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1488     if self.op.candidate_pool_size is not None:
1489       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1490
1491     self.cfg.Update(self.cluster)
1492
1493     # we want to update nodes after the cluster so that if any errors
1494     # happen, we have recorded and saved the cluster info
1495     if self.op.candidate_pool_size is not None:
1496       _AdjustCandidatePool(self)
1497
1498
1499 class LURedistributeConfig(NoHooksLU):
1500   """Force the redistribution of cluster configuration.
1501
1502   This is a very simple LU.
1503
1504   """
1505   _OP_REQP = []
1506   REQ_BGL = False
1507
1508   def ExpandNames(self):
1509     self.needed_locks = {
1510       locking.LEVEL_NODE: locking.ALL_SET,
1511     }
1512     self.share_locks[locking.LEVEL_NODE] = 1
1513
1514   def CheckPrereq(self):
1515     """Check prerequisites.
1516
1517     """
1518
1519   def Exec(self, feedback_fn):
1520     """Redistribute the configuration.
1521
1522     """
1523     self.cfg.Update(self.cfg.GetClusterInfo())
1524
1525
1526 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1527   """Sleep and poll for an instance's disk to sync.
1528
1529   """
1530   if not instance.disks:
1531     return True
1532
1533   if not oneshot:
1534     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1535
1536   node = instance.primary_node
1537
1538   for dev in instance.disks:
1539     lu.cfg.SetDiskID(dev, node)
1540
1541   retries = 0
1542   while True:
1543     max_time = 0
1544     done = True
1545     cumul_degraded = False
1546     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1547     if rstats.failed or not rstats.data:
1548       lu.LogWarning("Can't get any data from node %s", node)
1549       retries += 1
1550       if retries >= 10:
1551         raise errors.RemoteError("Can't contact node %s for mirror data,"
1552                                  " aborting." % node)
1553       time.sleep(6)
1554       continue
1555     rstats = rstats.data
1556     retries = 0
1557     for i, mstat in enumerate(rstats):
1558       if mstat is None:
1559         lu.LogWarning("Can't compute data for node %s/%s",
1560                            node, instance.disks[i].iv_name)
1561         continue
1562       # we ignore the ldisk parameter
1563       perc_done, est_time, is_degraded, _ = mstat
1564       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1565       if perc_done is not None:
1566         done = False
1567         if est_time is not None:
1568           rem_time = "%d estimated seconds remaining" % est_time
1569           max_time = est_time
1570         else:
1571           rem_time = "no time estimate"
1572         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1573                         (instance.disks[i].iv_name, perc_done, rem_time))
1574     if done or oneshot:
1575       break
1576
1577     time.sleep(min(60, max_time))
1578
1579   if done:
1580     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1581   return not cumul_degraded
1582
1583
1584 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1585   """Check that mirrors are not degraded.
1586
1587   The ldisk parameter, if True, will change the test from the
1588   is_degraded attribute (which represents overall non-ok status for
1589   the device(s)) to the ldisk (representing the local storage status).
1590
1591   """
1592   lu.cfg.SetDiskID(dev, node)
1593   if ldisk:
1594     idx = 6
1595   else:
1596     idx = 5
1597
1598   result = True
1599   if on_primary or dev.AssembleOnSecondary():
1600     rstats = lu.rpc.call_blockdev_find(node, dev)
1601     msg = rstats.RemoteFailMsg()
1602     if msg:
1603       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1604       result = False
1605     elif not rstats.payload:
1606       lu.LogWarning("Can't find disk on node %s", node)
1607       result = False
1608     else:
1609       result = result and (not rstats.payload[idx])
1610   if dev.children:
1611     for child in dev.children:
1612       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1613
1614   return result
1615
1616
1617 class LUDiagnoseOS(NoHooksLU):
1618   """Logical unit for OS diagnose/query.
1619
1620   """
1621   _OP_REQP = ["output_fields", "names"]
1622   REQ_BGL = False
1623   _FIELDS_STATIC = utils.FieldSet()
1624   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1625
1626   def ExpandNames(self):
1627     if self.op.names:
1628       raise errors.OpPrereqError("Selective OS query not supported")
1629
1630     _CheckOutputFields(static=self._FIELDS_STATIC,
1631                        dynamic=self._FIELDS_DYNAMIC,
1632                        selected=self.op.output_fields)
1633
1634     # Lock all nodes, in shared mode
1635     self.needed_locks = {}
1636     self.share_locks[locking.LEVEL_NODE] = 1
1637     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1638
1639   def CheckPrereq(self):
1640     """Check prerequisites.
1641
1642     """
1643
1644   @staticmethod
1645   def _DiagnoseByOS(node_list, rlist):
1646     """Remaps a per-node return list into an a per-os per-node dictionary
1647
1648     @param node_list: a list with the names of all nodes
1649     @param rlist: a map with node names as keys and OS objects as values
1650
1651     @rtype: dict
1652     @returns: a dictionary with osnames as keys and as value another map, with
1653         nodes as keys and list of OS objects as values, eg::
1654
1655           {"debian-etch": {"node1": [<object>,...],
1656                            "node2": [<object>,]}
1657           }
1658
1659     """
1660     all_os = {}
1661     for node_name, nr in rlist.iteritems():
1662       if nr.failed or not nr.data:
1663         continue
1664       for os_obj in nr.data:
1665         if os_obj.name not in all_os:
1666           # build a list of nodes for this os containing empty lists
1667           # for each node in node_list
1668           all_os[os_obj.name] = {}
1669           for nname in node_list:
1670             all_os[os_obj.name][nname] = []
1671         all_os[os_obj.name][node_name].append(os_obj)
1672     return all_os
1673
1674   def Exec(self, feedback_fn):
1675     """Compute the list of OSes.
1676
1677     """
1678     node_list = self.acquired_locks[locking.LEVEL_NODE]
1679     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1680                    if node in node_list]
1681     node_data = self.rpc.call_os_diagnose(valid_nodes)
1682     if node_data == False:
1683       raise errors.OpExecError("Can't gather the list of OSes")
1684     pol = self._DiagnoseByOS(valid_nodes, node_data)
1685     output = []
1686     for os_name, os_data in pol.iteritems():
1687       row = []
1688       for field in self.op.output_fields:
1689         if field == "name":
1690           val = os_name
1691         elif field == "valid":
1692           val = utils.all([osl and osl[0] for osl in os_data.values()])
1693         elif field == "node_status":
1694           val = {}
1695           for node_name, nos_list in os_data.iteritems():
1696             val[node_name] = [(v.status, v.path) for v in nos_list]
1697         else:
1698           raise errors.ParameterError(field)
1699         row.append(val)
1700       output.append(row)
1701
1702     return output
1703
1704
1705 class LURemoveNode(LogicalUnit):
1706   """Logical unit for removing a node.
1707
1708   """
1709   HPATH = "node-remove"
1710   HTYPE = constants.HTYPE_NODE
1711   _OP_REQP = ["node_name"]
1712
1713   def BuildHooksEnv(self):
1714     """Build hooks env.
1715
1716     This doesn't run on the target node in the pre phase as a failed
1717     node would then be impossible to remove.
1718
1719     """
1720     env = {
1721       "OP_TARGET": self.op.node_name,
1722       "NODE_NAME": self.op.node_name,
1723       }
1724     all_nodes = self.cfg.GetNodeList()
1725     all_nodes.remove(self.op.node_name)
1726     return env, all_nodes, all_nodes
1727
1728   def CheckPrereq(self):
1729     """Check prerequisites.
1730
1731     This checks:
1732      - the node exists in the configuration
1733      - it does not have primary or secondary instances
1734      - it's not the master
1735
1736     Any errors are signalled by raising errors.OpPrereqError.
1737
1738     """
1739     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1740     if node is None:
1741       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1742
1743     instance_list = self.cfg.GetInstanceList()
1744
1745     masternode = self.cfg.GetMasterNode()
1746     if node.name == masternode:
1747       raise errors.OpPrereqError("Node is the master node,"
1748                                  " you need to failover first.")
1749
1750     for instance_name in instance_list:
1751       instance = self.cfg.GetInstanceInfo(instance_name)
1752       if node.name in instance.all_nodes:
1753         raise errors.OpPrereqError("Instance %s is still running on the node,"
1754                                    " please remove first." % instance_name)
1755     self.op.node_name = node.name
1756     self.node = node
1757
1758   def Exec(self, feedback_fn):
1759     """Removes the node from the cluster.
1760
1761     """
1762     node = self.node
1763     logging.info("Stopping the node daemon and removing configs from node %s",
1764                  node.name)
1765
1766     self.context.RemoveNode(node.name)
1767
1768     self.rpc.call_node_leave_cluster(node.name)
1769
1770     # Promote nodes to master candidate as needed
1771     _AdjustCandidatePool(self)
1772
1773
1774 class LUQueryNodes(NoHooksLU):
1775   """Logical unit for querying nodes.
1776
1777   """
1778   _OP_REQP = ["output_fields", "names", "use_locking"]
1779   REQ_BGL = False
1780   _FIELDS_DYNAMIC = utils.FieldSet(
1781     "dtotal", "dfree",
1782     "mtotal", "mnode", "mfree",
1783     "bootid",
1784     "ctotal", "cnodes", "csockets",
1785     )
1786
1787   _FIELDS_STATIC = utils.FieldSet(
1788     "name", "pinst_cnt", "sinst_cnt",
1789     "pinst_list", "sinst_list",
1790     "pip", "sip", "tags",
1791     "serial_no",
1792     "master_candidate",
1793     "master",
1794     "offline",
1795     "drained",
1796     )
1797
1798   def ExpandNames(self):
1799     _CheckOutputFields(static=self._FIELDS_STATIC,
1800                        dynamic=self._FIELDS_DYNAMIC,
1801                        selected=self.op.output_fields)
1802
1803     self.needed_locks = {}
1804     self.share_locks[locking.LEVEL_NODE] = 1
1805
1806     if self.op.names:
1807       self.wanted = _GetWantedNodes(self, self.op.names)
1808     else:
1809       self.wanted = locking.ALL_SET
1810
1811     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1812     self.do_locking = self.do_node_query and self.op.use_locking
1813     if self.do_locking:
1814       # if we don't request only static fields, we need to lock the nodes
1815       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1816
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     """
1822     # The validation of the node list is done in the _GetWantedNodes,
1823     # if non empty, and if empty, there's no validation to do
1824     pass
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     all_info = self.cfg.GetAllNodesInfo()
1831     if self.do_locking:
1832       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1833     elif self.wanted != locking.ALL_SET:
1834       nodenames = self.wanted
1835       missing = set(nodenames).difference(all_info.keys())
1836       if missing:
1837         raise errors.OpExecError(
1838           "Some nodes were removed before retrieving their data: %s" % missing)
1839     else:
1840       nodenames = all_info.keys()
1841
1842     nodenames = utils.NiceSort(nodenames)
1843     nodelist = [all_info[name] for name in nodenames]
1844
1845     # begin data gathering
1846
1847     if self.do_node_query:
1848       live_data = {}
1849       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1850                                           self.cfg.GetHypervisorType())
1851       for name in nodenames:
1852         nodeinfo = node_data[name]
1853         if not nodeinfo.failed and nodeinfo.data:
1854           nodeinfo = nodeinfo.data
1855           fn = utils.TryConvert
1856           live_data[name] = {
1857             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1858             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1859             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1860             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1861             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1862             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1863             "bootid": nodeinfo.get('bootid', None),
1864             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1865             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1866             }
1867         else:
1868           live_data[name] = {}
1869     else:
1870       live_data = dict.fromkeys(nodenames, {})
1871
1872     node_to_primary = dict([(name, set()) for name in nodenames])
1873     node_to_secondary = dict([(name, set()) for name in nodenames])
1874
1875     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1876                              "sinst_cnt", "sinst_list"))
1877     if inst_fields & frozenset(self.op.output_fields):
1878       instancelist = self.cfg.GetInstanceList()
1879
1880       for instance_name in instancelist:
1881         inst = self.cfg.GetInstanceInfo(instance_name)
1882         if inst.primary_node in node_to_primary:
1883           node_to_primary[inst.primary_node].add(inst.name)
1884         for secnode in inst.secondary_nodes:
1885           if secnode in node_to_secondary:
1886             node_to_secondary[secnode].add(inst.name)
1887
1888     master_node = self.cfg.GetMasterNode()
1889
1890     # end data gathering
1891
1892     output = []
1893     for node in nodelist:
1894       node_output = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = node.name
1898         elif field == "pinst_list":
1899           val = list(node_to_primary[node.name])
1900         elif field == "sinst_list":
1901           val = list(node_to_secondary[node.name])
1902         elif field == "pinst_cnt":
1903           val = len(node_to_primary[node.name])
1904         elif field == "sinst_cnt":
1905           val = len(node_to_secondary[node.name])
1906         elif field == "pip":
1907           val = node.primary_ip
1908         elif field == "sip":
1909           val = node.secondary_ip
1910         elif field == "tags":
1911           val = list(node.GetTags())
1912         elif field == "serial_no":
1913           val = node.serial_no
1914         elif field == "master_candidate":
1915           val = node.master_candidate
1916         elif field == "master":
1917           val = node.name == master_node
1918         elif field == "offline":
1919           val = node.offline
1920         elif field == "drained":
1921           val = node.drained
1922         elif self._FIELDS_DYNAMIC.Matches(field):
1923           val = live_data[node.name].get(field, None)
1924         else:
1925           raise errors.ParameterError(field)
1926         node_output.append(val)
1927       output.append(node_output)
1928
1929     return output
1930
1931
1932 class LUQueryNodeVolumes(NoHooksLU):
1933   """Logical unit for getting volumes on node(s).
1934
1935   """
1936   _OP_REQP = ["nodes", "output_fields"]
1937   REQ_BGL = False
1938   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1939   _FIELDS_STATIC = utils.FieldSet("node")
1940
1941   def ExpandNames(self):
1942     _CheckOutputFields(static=self._FIELDS_STATIC,
1943                        dynamic=self._FIELDS_DYNAMIC,
1944                        selected=self.op.output_fields)
1945
1946     self.needed_locks = {}
1947     self.share_locks[locking.LEVEL_NODE] = 1
1948     if not self.op.nodes:
1949       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1950     else:
1951       self.needed_locks[locking.LEVEL_NODE] = \
1952         _GetWantedNodes(self, self.op.nodes)
1953
1954   def CheckPrereq(self):
1955     """Check prerequisites.
1956
1957     This checks that the fields required are valid output fields.
1958
1959     """
1960     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1961
1962   def Exec(self, feedback_fn):
1963     """Computes the list of nodes and their attributes.
1964
1965     """
1966     nodenames = self.nodes
1967     volumes = self.rpc.call_node_volumes(nodenames)
1968
1969     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1970              in self.cfg.GetInstanceList()]
1971
1972     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1973
1974     output = []
1975     for node in nodenames:
1976       if node not in volumes or volumes[node].failed or not volumes[node].data:
1977         continue
1978
1979       node_vols = volumes[node].data[:]
1980       node_vols.sort(key=lambda vol: vol['dev'])
1981
1982       for vol in node_vols:
1983         node_output = []
1984         for field in self.op.output_fields:
1985           if field == "node":
1986             val = node
1987           elif field == "phys":
1988             val = vol['dev']
1989           elif field == "vg":
1990             val = vol['vg']
1991           elif field == "name":
1992             val = vol['name']
1993           elif field == "size":
1994             val = int(float(vol['size']))
1995           elif field == "instance":
1996             for inst in ilist:
1997               if node not in lv_by_node[inst]:
1998                 continue
1999               if vol['name'] in lv_by_node[inst][node]:
2000                 val = inst.name
2001                 break
2002             else:
2003               val = '-'
2004           else:
2005             raise errors.ParameterError(field)
2006           node_output.append(str(val))
2007
2008         output.append(node_output)
2009
2010     return output
2011
2012
2013 class LUAddNode(LogicalUnit):
2014   """Logical unit for adding node to the cluster.
2015
2016   """
2017   HPATH = "node-add"
2018   HTYPE = constants.HTYPE_NODE
2019   _OP_REQP = ["node_name"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This will run on all nodes before, and on all nodes + the new node after.
2025
2026     """
2027     env = {
2028       "OP_TARGET": self.op.node_name,
2029       "NODE_NAME": self.op.node_name,
2030       "NODE_PIP": self.op.primary_ip,
2031       "NODE_SIP": self.op.secondary_ip,
2032       }
2033     nodes_0 = self.cfg.GetNodeList()
2034     nodes_1 = nodes_0 + [self.op.node_name, ]
2035     return env, nodes_0, nodes_1
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks:
2041      - the new node is not already in the config
2042      - it is resolvable
2043      - its parameters (single/dual homed) matches the cluster
2044
2045     Any errors are signalled by raising errors.OpPrereqError.
2046
2047     """
2048     node_name = self.op.node_name
2049     cfg = self.cfg
2050
2051     dns_data = utils.HostInfo(node_name)
2052
2053     node = dns_data.name
2054     primary_ip = self.op.primary_ip = dns_data.ip
2055     secondary_ip = getattr(self.op, "secondary_ip", None)
2056     if secondary_ip is None:
2057       secondary_ip = primary_ip
2058     if not utils.IsValidIP(secondary_ip):
2059       raise errors.OpPrereqError("Invalid secondary IP given")
2060     self.op.secondary_ip = secondary_ip
2061
2062     node_list = cfg.GetNodeList()
2063     if not self.op.readd and node in node_list:
2064       raise errors.OpPrereqError("Node %s is already in the configuration" %
2065                                  node)
2066     elif self.op.readd and node not in node_list:
2067       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2068
2069     for existing_node_name in node_list:
2070       existing_node = cfg.GetNodeInfo(existing_node_name)
2071
2072       if self.op.readd and node == existing_node_name:
2073         if (existing_node.primary_ip != primary_ip or
2074             existing_node.secondary_ip != secondary_ip):
2075           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2076                                      " address configuration as before")
2077         continue
2078
2079       if (existing_node.primary_ip == primary_ip or
2080           existing_node.secondary_ip == primary_ip or
2081           existing_node.primary_ip == secondary_ip or
2082           existing_node.secondary_ip == secondary_ip):
2083         raise errors.OpPrereqError("New node ip address(es) conflict with"
2084                                    " existing node %s" % existing_node.name)
2085
2086     # check that the type of the node (single versus dual homed) is the
2087     # same as for the master
2088     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2089     master_singlehomed = myself.secondary_ip == myself.primary_ip
2090     newbie_singlehomed = secondary_ip == primary_ip
2091     if master_singlehomed != newbie_singlehomed:
2092       if master_singlehomed:
2093         raise errors.OpPrereqError("The master has no private ip but the"
2094                                    " new node has one")
2095       else:
2096         raise errors.OpPrereqError("The master has a private ip but the"
2097                                    " new node doesn't have one")
2098
2099     # checks reachablity
2100     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2101       raise errors.OpPrereqError("Node not reachable by ping")
2102
2103     if not newbie_singlehomed:
2104       # check reachability from my secondary ip to newbie's secondary ip
2105       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2106                            source=myself.secondary_ip):
2107         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2108                                    " based ping to noded port")
2109
2110     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2111     mc_now, _ = self.cfg.GetMasterCandidateStats()
2112     master_candidate = mc_now < cp_size
2113
2114     self.new_node = objects.Node(name=node,
2115                                  primary_ip=primary_ip,
2116                                  secondary_ip=secondary_ip,
2117                                  master_candidate=master_candidate,
2118                                  offline=False, drained=False)
2119
2120   def Exec(self, feedback_fn):
2121     """Adds the new node to the cluster.
2122
2123     """
2124     new_node = self.new_node
2125     node = new_node.name
2126
2127     # check connectivity
2128     result = self.rpc.call_version([node])[node]
2129     result.Raise()
2130     if result.data:
2131       if constants.PROTOCOL_VERSION == result.data:
2132         logging.info("Communication to node %s fine, sw version %s match",
2133                      node, result.data)
2134       else:
2135         raise errors.OpExecError("Version mismatch master version %s,"
2136                                  " node version %s" %
2137                                  (constants.PROTOCOL_VERSION, result.data))
2138     else:
2139       raise errors.OpExecError("Cannot get version from the new node")
2140
2141     # setup ssh on node
2142     logging.info("Copy ssh key to node %s", node)
2143     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2144     keyarray = []
2145     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2146                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2147                 priv_key, pub_key]
2148
2149     for i in keyfiles:
2150       f = open(i, 'r')
2151       try:
2152         keyarray.append(f.read())
2153       finally:
2154         f.close()
2155
2156     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2157                                     keyarray[2],
2158                                     keyarray[3], keyarray[4], keyarray[5])
2159
2160     msg = result.RemoteFailMsg()
2161     if msg:
2162       raise errors.OpExecError("Cannot transfer ssh keys to the"
2163                                " new node: %s" % msg)
2164
2165     # Add node to our /etc/hosts, and add key to known_hosts
2166     utils.AddHostToEtcHosts(new_node.name)
2167
2168     if new_node.secondary_ip != new_node.primary_ip:
2169       result = self.rpc.call_node_has_ip_address(new_node.name,
2170                                                  new_node.secondary_ip)
2171       if result.failed or not result.data:
2172         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2173                                  " you gave (%s). Please fix and re-run this"
2174                                  " command." % new_node.secondary_ip)
2175
2176     node_verify_list = [self.cfg.GetMasterNode()]
2177     node_verify_param = {
2178       'nodelist': [node],
2179       # TODO: do a node-net-test as well?
2180     }
2181
2182     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2183                                        self.cfg.GetClusterName())
2184     for verifier in node_verify_list:
2185       if result[verifier].failed or not result[verifier].data:
2186         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2187                                  " for remote verification" % verifier)
2188       if result[verifier].data['nodelist']:
2189         for failed in result[verifier].data['nodelist']:
2190           feedback_fn("ssh/hostname verification failed %s -> %s" %
2191                       (verifier, result[verifier].data['nodelist'][failed]))
2192         raise errors.OpExecError("ssh/hostname verification failed.")
2193
2194     # Distribute updated /etc/hosts and known_hosts to all nodes,
2195     # including the node just added
2196     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2197     dist_nodes = self.cfg.GetNodeList()
2198     if not self.op.readd:
2199       dist_nodes.append(node)
2200     if myself.name in dist_nodes:
2201       dist_nodes.remove(myself.name)
2202
2203     logging.debug("Copying hosts and known_hosts to all nodes")
2204     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2205       result = self.rpc.call_upload_file(dist_nodes, fname)
2206       for to_node, to_result in result.iteritems():
2207         if to_result.failed or not to_result.data:
2208           logging.error("Copy of file %s to node %s failed", fname, to_node)
2209
2210     to_copy = []
2211     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2212     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2213       to_copy.append(constants.VNC_PASSWORD_FILE)
2214
2215     for fname in to_copy:
2216       result = self.rpc.call_upload_file([node], fname)
2217       if result[node].failed or not result[node]:
2218         logging.error("Could not copy file %s to node %s", fname, node)
2219
2220     if self.op.readd:
2221       self.context.ReaddNode(new_node)
2222     else:
2223       self.context.AddNode(new_node)
2224
2225
2226 class LUSetNodeParams(LogicalUnit):
2227   """Modifies the parameters of a node.
2228
2229   """
2230   HPATH = "node-modify"
2231   HTYPE = constants.HTYPE_NODE
2232   _OP_REQP = ["node_name"]
2233   REQ_BGL = False
2234
2235   def CheckArguments(self):
2236     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2237     if node_name is None:
2238       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2239     self.op.node_name = node_name
2240     _CheckBooleanOpField(self.op, 'master_candidate')
2241     _CheckBooleanOpField(self.op, 'offline')
2242     _CheckBooleanOpField(self.op, 'drained')
2243     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2244     if all_mods.count(None) == 3:
2245       raise errors.OpPrereqError("Please pass at least one modification")
2246     if all_mods.count(True) > 1:
2247       raise errors.OpPrereqError("Can't set the node into more than one"
2248                                  " state at the same time")
2249
2250   def ExpandNames(self):
2251     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2252
2253   def BuildHooksEnv(self):
2254     """Build hooks env.
2255
2256     This runs on the master node.
2257
2258     """
2259     env = {
2260       "OP_TARGET": self.op.node_name,
2261       "MASTER_CANDIDATE": str(self.op.master_candidate),
2262       "OFFLINE": str(self.op.offline),
2263       "DRAINED": str(self.op.drained),
2264       }
2265     nl = [self.cfg.GetMasterNode(),
2266           self.op.node_name]
2267     return env, nl, nl
2268
2269   def CheckPrereq(self):
2270     """Check prerequisites.
2271
2272     This only checks the instance list against the existing names.
2273
2274     """
2275     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2276
2277     if ((self.op.master_candidate == False or self.op.offline == True or
2278          self.op.drained == True) and node.master_candidate):
2279       # we will demote the node from master_candidate
2280       if self.op.node_name == self.cfg.GetMasterNode():
2281         raise errors.OpPrereqError("The master node has to be a"
2282                                    " master candidate, online and not drained")
2283       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2284       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2285       if num_candidates <= cp_size:
2286         msg = ("Not enough master candidates (desired"
2287                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2288         if self.op.force:
2289           self.LogWarning(msg)
2290         else:
2291           raise errors.OpPrereqError(msg)
2292
2293     if (self.op.master_candidate == True and
2294         ((node.offline and not self.op.offline == False) or
2295          (node.drained and not self.op.drained == False))):
2296       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2297                                  " to master_candidate")
2298
2299     return
2300
2301   def Exec(self, feedback_fn):
2302     """Modifies a node.
2303
2304     """
2305     node = self.node
2306
2307     result = []
2308     changed_mc = False
2309
2310     if self.op.offline is not None:
2311       node.offline = self.op.offline
2312       result.append(("offline", str(self.op.offline)))
2313       if self.op.offline == True:
2314         if node.master_candidate:
2315           node.master_candidate = False
2316           changed_mc = True
2317           result.append(("master_candidate", "auto-demotion due to offline"))
2318         if node.drained:
2319           node.drained = False
2320           result.append(("drained", "clear drained status due to offline"))
2321
2322     if self.op.master_candidate is not None:
2323       node.master_candidate = self.op.master_candidate
2324       changed_mc = True
2325       result.append(("master_candidate", str(self.op.master_candidate)))
2326       if self.op.master_candidate == False:
2327         rrc = self.rpc.call_node_demote_from_mc(node.name)
2328         msg = rrc.RemoteFailMsg()
2329         if msg:
2330           self.LogWarning("Node failed to demote itself: %s" % msg)
2331
2332     if self.op.drained is not None:
2333       node.drained = self.op.drained
2334       result.append(("drained", str(self.op.drained)))
2335       if self.op.drained == True:
2336         if node.master_candidate:
2337           node.master_candidate = False
2338           changed_mc = True
2339           result.append(("master_candidate", "auto-demotion due to drain"))
2340         if node.offline:
2341           node.offline = False
2342           result.append(("offline", "clear offline status due to drain"))
2343
2344     # this will trigger configuration file update, if needed
2345     self.cfg.Update(node)
2346     # this will trigger job queue propagation or cleanup
2347     if changed_mc:
2348       self.context.ReaddNode(node)
2349
2350     return result
2351
2352
2353 class LUQueryClusterInfo(NoHooksLU):
2354   """Query cluster configuration.
2355
2356   """
2357   _OP_REQP = []
2358   REQ_BGL = False
2359
2360   def ExpandNames(self):
2361     self.needed_locks = {}
2362
2363   def CheckPrereq(self):
2364     """No prerequsites needed for this LU.
2365
2366     """
2367     pass
2368
2369   def Exec(self, feedback_fn):
2370     """Return cluster config.
2371
2372     """
2373     cluster = self.cfg.GetClusterInfo()
2374     result = {
2375       "software_version": constants.RELEASE_VERSION,
2376       "protocol_version": constants.PROTOCOL_VERSION,
2377       "config_version": constants.CONFIG_VERSION,
2378       "os_api_version": constants.OS_API_VERSION,
2379       "export_version": constants.EXPORT_VERSION,
2380       "architecture": (platform.architecture()[0], platform.machine()),
2381       "name": cluster.cluster_name,
2382       "master": cluster.master_node,
2383       "default_hypervisor": cluster.default_hypervisor,
2384       "enabled_hypervisors": cluster.enabled_hypervisors,
2385       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2386                         for hypervisor in cluster.enabled_hypervisors]),
2387       "beparams": cluster.beparams,
2388       "candidate_pool_size": cluster.candidate_pool_size,
2389       }
2390
2391     return result
2392
2393
2394 class LUQueryConfigValues(NoHooksLU):
2395   """Return configuration values.
2396
2397   """
2398   _OP_REQP = []
2399   REQ_BGL = False
2400   _FIELDS_DYNAMIC = utils.FieldSet()
2401   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2402
2403   def ExpandNames(self):
2404     self.needed_locks = {}
2405
2406     _CheckOutputFields(static=self._FIELDS_STATIC,
2407                        dynamic=self._FIELDS_DYNAMIC,
2408                        selected=self.op.output_fields)
2409
2410   def CheckPrereq(self):
2411     """No prerequisites.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Dump a representation of the cluster config to the standard output.
2418
2419     """
2420     values = []
2421     for field in self.op.output_fields:
2422       if field == "cluster_name":
2423         entry = self.cfg.GetClusterName()
2424       elif field == "master_node":
2425         entry = self.cfg.GetMasterNode()
2426       elif field == "drain_flag":
2427         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2428       else:
2429         raise errors.ParameterError(field)
2430       values.append(entry)
2431     return values
2432
2433
2434 class LUActivateInstanceDisks(NoHooksLU):
2435   """Bring up an instance's disks.
2436
2437   """
2438   _OP_REQP = ["instance_name"]
2439   REQ_BGL = False
2440
2441   def ExpandNames(self):
2442     self._ExpandAndLockInstance()
2443     self.needed_locks[locking.LEVEL_NODE] = []
2444     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2445
2446   def DeclareLocks(self, level):
2447     if level == locking.LEVEL_NODE:
2448       self._LockInstancesNodes()
2449
2450   def CheckPrereq(self):
2451     """Check prerequisites.
2452
2453     This checks that the instance is in the cluster.
2454
2455     """
2456     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2457     assert self.instance is not None, \
2458       "Cannot retrieve locked instance %s" % self.op.instance_name
2459     _CheckNodeOnline(self, self.instance.primary_node)
2460
2461   def Exec(self, feedback_fn):
2462     """Activate the disks.
2463
2464     """
2465     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2466     if not disks_ok:
2467       raise errors.OpExecError("Cannot activate block devices")
2468
2469     return disks_info
2470
2471
2472 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2473   """Prepare the block devices for an instance.
2474
2475   This sets up the block devices on all nodes.
2476
2477   @type lu: L{LogicalUnit}
2478   @param lu: the logical unit on whose behalf we execute
2479   @type instance: L{objects.Instance}
2480   @param instance: the instance for whose disks we assemble
2481   @type ignore_secondaries: boolean
2482   @param ignore_secondaries: if true, errors on secondary nodes
2483       won't result in an error return from the function
2484   @return: False if the operation failed, otherwise a list of
2485       (host, instance_visible_name, node_visible_name)
2486       with the mapping from node devices to instance devices
2487
2488   """
2489   device_info = []
2490   disks_ok = True
2491   iname = instance.name
2492   # With the two passes mechanism we try to reduce the window of
2493   # opportunity for the race condition of switching DRBD to primary
2494   # before handshaking occured, but we do not eliminate it
2495
2496   # The proper fix would be to wait (with some limits) until the
2497   # connection has been made and drbd transitions from WFConnection
2498   # into any other network-connected state (Connected, SyncTarget,
2499   # SyncSource, etc.)
2500
2501   # 1st pass, assemble on all nodes in secondary mode
2502   for inst_disk in instance.disks:
2503     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2504       lu.cfg.SetDiskID(node_disk, node)
2505       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2506       msg = result.RemoteFailMsg()
2507       if msg:
2508         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2509                            " (is_primary=False, pass=1): %s",
2510                            inst_disk.iv_name, node, msg)
2511         if not ignore_secondaries:
2512           disks_ok = False
2513
2514   # FIXME: race condition on drbd migration to primary
2515
2516   # 2nd pass, do only the primary node
2517   for inst_disk in instance.disks:
2518     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2519       if node != instance.primary_node:
2520         continue
2521       lu.cfg.SetDiskID(node_disk, node)
2522       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2523       msg = result.RemoteFailMsg()
2524       if msg:
2525         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2526                            " (is_primary=True, pass=2): %s",
2527                            inst_disk.iv_name, node, msg)
2528         disks_ok = False
2529     device_info.append((instance.primary_node, inst_disk.iv_name,
2530                         result.payload))
2531
2532   # leave the disks configured for the primary node
2533   # this is a workaround that would be fixed better by
2534   # improving the logical/physical id handling
2535   for disk in instance.disks:
2536     lu.cfg.SetDiskID(disk, instance.primary_node)
2537
2538   return disks_ok, device_info
2539
2540
2541 def _StartInstanceDisks(lu, instance, force):
2542   """Start the disks of an instance.
2543
2544   """
2545   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2546                                            ignore_secondaries=force)
2547   if not disks_ok:
2548     _ShutdownInstanceDisks(lu, instance)
2549     if force is not None and not force:
2550       lu.proc.LogWarning("", hint="If the message above refers to a"
2551                          " secondary node,"
2552                          " you can retry the operation using '--force'.")
2553     raise errors.OpExecError("Disk consistency error")
2554
2555
2556 class LUDeactivateInstanceDisks(NoHooksLU):
2557   """Shutdown an instance's disks.
2558
2559   """
2560   _OP_REQP = ["instance_name"]
2561   REQ_BGL = False
2562
2563   def ExpandNames(self):
2564     self._ExpandAndLockInstance()
2565     self.needed_locks[locking.LEVEL_NODE] = []
2566     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2567
2568   def DeclareLocks(self, level):
2569     if level == locking.LEVEL_NODE:
2570       self._LockInstancesNodes()
2571
2572   def CheckPrereq(self):
2573     """Check prerequisites.
2574
2575     This checks that the instance is in the cluster.
2576
2577     """
2578     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2579     assert self.instance is not None, \
2580       "Cannot retrieve locked instance %s" % self.op.instance_name
2581
2582   def Exec(self, feedback_fn):
2583     """Deactivate the disks
2584
2585     """
2586     instance = self.instance
2587     _SafeShutdownInstanceDisks(self, instance)
2588
2589
2590 def _SafeShutdownInstanceDisks(lu, instance):
2591   """Shutdown block devices of an instance.
2592
2593   This function checks if an instance is running, before calling
2594   _ShutdownInstanceDisks.
2595
2596   """
2597   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2598                                       [instance.hypervisor])
2599   ins_l = ins_l[instance.primary_node]
2600   if ins_l.failed or not isinstance(ins_l.data, list):
2601     raise errors.OpExecError("Can't contact node '%s'" %
2602                              instance.primary_node)
2603
2604   if instance.name in ins_l.data:
2605     raise errors.OpExecError("Instance is running, can't shutdown"
2606                              " block devices.")
2607
2608   _ShutdownInstanceDisks(lu, instance)
2609
2610
2611 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2612   """Shutdown block devices of an instance.
2613
2614   This does the shutdown on all nodes of the instance.
2615
2616   If the ignore_primary is false, errors on the primary node are
2617   ignored.
2618
2619   """
2620   all_result = True
2621   for disk in instance.disks:
2622     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2623       lu.cfg.SetDiskID(top_disk, node)
2624       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2625       msg = result.RemoteFailMsg()
2626       if msg:
2627         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2628                       disk.iv_name, node, msg)
2629         if not ignore_primary or node != instance.primary_node:
2630           all_result = False
2631   return all_result
2632
2633
2634 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2635   """Checks if a node has enough free memory.
2636
2637   This function check if a given node has the needed amount of free
2638   memory. In case the node has less memory or we cannot get the
2639   information from the node, this function raise an OpPrereqError
2640   exception.
2641
2642   @type lu: C{LogicalUnit}
2643   @param lu: a logical unit from which we get configuration data
2644   @type node: C{str}
2645   @param node: the node to check
2646   @type reason: C{str}
2647   @param reason: string to use in the error message
2648   @type requested: C{int}
2649   @param requested: the amount of memory in MiB to check for
2650   @type hypervisor_name: C{str}
2651   @param hypervisor_name: the hypervisor to ask for memory stats
2652   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2653       we cannot check the node
2654
2655   """
2656   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2657   nodeinfo[node].Raise()
2658   free_mem = nodeinfo[node].data.get('memory_free')
2659   if not isinstance(free_mem, int):
2660     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2661                              " was '%s'" % (node, free_mem))
2662   if requested > free_mem:
2663     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2664                              " needed %s MiB, available %s MiB" %
2665                              (node, reason, requested, free_mem))
2666
2667
2668 class LUStartupInstance(LogicalUnit):
2669   """Starts an instance.
2670
2671   """
2672   HPATH = "instance-start"
2673   HTYPE = constants.HTYPE_INSTANCE
2674   _OP_REQP = ["instance_name", "force"]
2675   REQ_BGL = False
2676
2677   def ExpandNames(self):
2678     self._ExpandAndLockInstance()
2679
2680   def BuildHooksEnv(self):
2681     """Build hooks env.
2682
2683     This runs on master, primary and secondary nodes of the instance.
2684
2685     """
2686     env = {
2687       "FORCE": self.op.force,
2688       }
2689     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2690     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2691     return env, nl, nl
2692
2693   def CheckPrereq(self):
2694     """Check prerequisites.
2695
2696     This checks that the instance is in the cluster.
2697
2698     """
2699     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2700     assert self.instance is not None, \
2701       "Cannot retrieve locked instance %s" % self.op.instance_name
2702
2703     _CheckNodeOnline(self, instance.primary_node)
2704
2705     bep = self.cfg.GetClusterInfo().FillBE(instance)
2706     # check bridges existance
2707     _CheckInstanceBridgesExist(self, instance)
2708
2709     _CheckNodeFreeMemory(self, instance.primary_node,
2710                          "starting instance %s" % instance.name,
2711                          bep[constants.BE_MEMORY], instance.hypervisor)
2712
2713   def Exec(self, feedback_fn):
2714     """Start the instance.
2715
2716     """
2717     instance = self.instance
2718     force = self.op.force
2719     extra_args = getattr(self.op, "extra_args", "")
2720
2721     self.cfg.MarkInstanceUp(instance.name)
2722
2723     node_current = instance.primary_node
2724
2725     _StartInstanceDisks(self, instance, force)
2726
2727     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2728     msg = result.RemoteFailMsg()
2729     if msg:
2730       _ShutdownInstanceDisks(self, instance)
2731       raise errors.OpExecError("Could not start instance: %s" % msg)
2732
2733
2734 class LURebootInstance(LogicalUnit):
2735   """Reboot an instance.
2736
2737   """
2738   HPATH = "instance-reboot"
2739   HTYPE = constants.HTYPE_INSTANCE
2740   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2741   REQ_BGL = False
2742
2743   def ExpandNames(self):
2744     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2745                                    constants.INSTANCE_REBOOT_HARD,
2746                                    constants.INSTANCE_REBOOT_FULL]:
2747       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2748                                   (constants.INSTANCE_REBOOT_SOFT,
2749                                    constants.INSTANCE_REBOOT_HARD,
2750                                    constants.INSTANCE_REBOOT_FULL))
2751     self._ExpandAndLockInstance()
2752
2753   def BuildHooksEnv(self):
2754     """Build hooks env.
2755
2756     This runs on master, primary and secondary nodes of the instance.
2757
2758     """
2759     env = {
2760       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2761       }
2762     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2763     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2764     return env, nl, nl
2765
2766   def CheckPrereq(self):
2767     """Check prerequisites.
2768
2769     This checks that the instance is in the cluster.
2770
2771     """
2772     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2773     assert self.instance is not None, \
2774       "Cannot retrieve locked instance %s" % self.op.instance_name
2775
2776     _CheckNodeOnline(self, instance.primary_node)
2777
2778     # check bridges existance
2779     _CheckInstanceBridgesExist(self, instance)
2780
2781   def Exec(self, feedback_fn):
2782     """Reboot the instance.
2783
2784     """
2785     instance = self.instance
2786     ignore_secondaries = self.op.ignore_secondaries
2787     reboot_type = self.op.reboot_type
2788     extra_args = getattr(self.op, "extra_args", "")
2789
2790     node_current = instance.primary_node
2791
2792     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2793                        constants.INSTANCE_REBOOT_HARD]:
2794       result = self.rpc.call_instance_reboot(node_current, instance,
2795                                              reboot_type, extra_args)
2796       msg = result.RemoteFailMsg()
2797       if msg:
2798         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2799     else:
2800       result = self.rpc.call_instance_shutdown(node_current, instance)
2801       msg = result.RemoteFailMsg()
2802       if msg:
2803         raise errors.OpExecError("Could not shutdown instance for"
2804                                  " full reboot: %s" % msg)
2805       _ShutdownInstanceDisks(self, instance)
2806       _StartInstanceDisks(self, instance, ignore_secondaries)
2807       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2808       msg = result.RemoteFailMsg()
2809       if msg:
2810         _ShutdownInstanceDisks(self, instance)
2811         raise errors.OpExecError("Could not start instance for"
2812                                  " full reboot: %s" % msg)
2813
2814     self.cfg.MarkInstanceUp(instance.name)
2815
2816
2817 class LUShutdownInstance(LogicalUnit):
2818   """Shutdown an instance.
2819
2820   """
2821   HPATH = "instance-stop"
2822   HTYPE = constants.HTYPE_INSTANCE
2823   _OP_REQP = ["instance_name"]
2824   REQ_BGL = False
2825
2826   def ExpandNames(self):
2827     self._ExpandAndLockInstance()
2828
2829   def BuildHooksEnv(self):
2830     """Build hooks env.
2831
2832     This runs on master, primary and secondary nodes of the instance.
2833
2834     """
2835     env = _BuildInstanceHookEnvByObject(self, self.instance)
2836     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2837     return env, nl, nl
2838
2839   def CheckPrereq(self):
2840     """Check prerequisites.
2841
2842     This checks that the instance is in the cluster.
2843
2844     """
2845     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2846     assert self.instance is not None, \
2847       "Cannot retrieve locked instance %s" % self.op.instance_name
2848     _CheckNodeOnline(self, self.instance.primary_node)
2849
2850   def Exec(self, feedback_fn):
2851     """Shutdown the instance.
2852
2853     """
2854     instance = self.instance
2855     node_current = instance.primary_node
2856     self.cfg.MarkInstanceDown(instance.name)
2857     result = self.rpc.call_instance_shutdown(node_current, instance)
2858     msg = result.RemoteFailMsg()
2859     if msg:
2860       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2861
2862     _ShutdownInstanceDisks(self, instance)
2863
2864
2865 class LUReinstallInstance(LogicalUnit):
2866   """Reinstall an instance.
2867
2868   """
2869   HPATH = "instance-reinstall"
2870   HTYPE = constants.HTYPE_INSTANCE
2871   _OP_REQP = ["instance_name"]
2872   REQ_BGL = False
2873
2874   def ExpandNames(self):
2875     self._ExpandAndLockInstance()
2876
2877   def BuildHooksEnv(self):
2878     """Build hooks env.
2879
2880     This runs on master, primary and secondary nodes of the instance.
2881
2882     """
2883     env = _BuildInstanceHookEnvByObject(self, self.instance)
2884     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2885     return env, nl, nl
2886
2887   def CheckPrereq(self):
2888     """Check prerequisites.
2889
2890     This checks that the instance is in the cluster and is not running.
2891
2892     """
2893     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2894     assert instance is not None, \
2895       "Cannot retrieve locked instance %s" % self.op.instance_name
2896     _CheckNodeOnline(self, instance.primary_node)
2897
2898     if instance.disk_template == constants.DT_DISKLESS:
2899       raise errors.OpPrereqError("Instance '%s' has no disks" %
2900                                  self.op.instance_name)
2901     if instance.admin_up:
2902       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2903                                  self.op.instance_name)
2904     remote_info = self.rpc.call_instance_info(instance.primary_node,
2905                                               instance.name,
2906                                               instance.hypervisor)
2907     if remote_info.failed or remote_info.data:
2908       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2909                                  (self.op.instance_name,
2910                                   instance.primary_node))
2911
2912     self.op.os_type = getattr(self.op, "os_type", None)
2913     if self.op.os_type is not None:
2914       # OS verification
2915       pnode = self.cfg.GetNodeInfo(
2916         self.cfg.ExpandNodeName(instance.primary_node))
2917       if pnode is None:
2918         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2919                                    self.op.pnode)
2920       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2921       result.Raise()
2922       if not isinstance(result.data, objects.OS):
2923         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2924                                    " primary node"  % self.op.os_type)
2925
2926     self.instance = instance
2927
2928   def Exec(self, feedback_fn):
2929     """Reinstall the instance.
2930
2931     """
2932     inst = self.instance
2933
2934     if self.op.os_type is not None:
2935       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2936       inst.os = self.op.os_type
2937       self.cfg.Update(inst)
2938
2939     _StartInstanceDisks(self, inst, None)
2940     try:
2941       feedback_fn("Running the instance OS create scripts...")
2942       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2943       msg = result.RemoteFailMsg()
2944       if msg:
2945         raise errors.OpExecError("Could not install OS for instance %s"
2946                                  " on node %s: %s" %
2947                                  (inst.name, inst.primary_node, msg))
2948     finally:
2949       _ShutdownInstanceDisks(self, inst)
2950
2951
2952 class LURenameInstance(LogicalUnit):
2953   """Rename an instance.
2954
2955   """
2956   HPATH = "instance-rename"
2957   HTYPE = constants.HTYPE_INSTANCE
2958   _OP_REQP = ["instance_name", "new_name"]
2959
2960   def BuildHooksEnv(self):
2961     """Build hooks env.
2962
2963     This runs on master, primary and secondary nodes of the instance.
2964
2965     """
2966     env = _BuildInstanceHookEnvByObject(self, self.instance)
2967     env["INSTANCE_NEW_NAME"] = self.op.new_name
2968     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2969     return env, nl, nl
2970
2971   def CheckPrereq(self):
2972     """Check prerequisites.
2973
2974     This checks that the instance is in the cluster and is not running.
2975
2976     """
2977     instance = self.cfg.GetInstanceInfo(
2978       self.cfg.ExpandInstanceName(self.op.instance_name))
2979     if instance is None:
2980       raise errors.OpPrereqError("Instance '%s' not known" %
2981                                  self.op.instance_name)
2982     _CheckNodeOnline(self, instance.primary_node)
2983
2984     if instance.admin_up:
2985       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2986                                  self.op.instance_name)
2987     remote_info = self.rpc.call_instance_info(instance.primary_node,
2988                                               instance.name,
2989                                               instance.hypervisor)
2990     remote_info.Raise()
2991     if remote_info.data:
2992       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2993                                  (self.op.instance_name,
2994                                   instance.primary_node))
2995     self.instance = instance
2996
2997     # new name verification
2998     name_info = utils.HostInfo(self.op.new_name)
2999
3000     self.op.new_name = new_name = name_info.name
3001     instance_list = self.cfg.GetInstanceList()
3002     if new_name in instance_list:
3003       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3004                                  new_name)
3005
3006     if not getattr(self.op, "ignore_ip", False):
3007       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3008         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3009                                    (name_info.ip, new_name))
3010
3011
3012   def Exec(self, feedback_fn):
3013     """Reinstall the instance.
3014
3015     """
3016     inst = self.instance
3017     old_name = inst.name
3018
3019     if inst.disk_template == constants.DT_FILE:
3020       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3021
3022     self.cfg.RenameInstance(inst.name, self.op.new_name)
3023     # Change the instance lock. This is definitely safe while we hold the BGL
3024     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3025     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3026
3027     # re-read the instance from the configuration after rename
3028     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3029
3030     if inst.disk_template == constants.DT_FILE:
3031       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3032       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3033                                                      old_file_storage_dir,
3034                                                      new_file_storage_dir)
3035       result.Raise()
3036       if not result.data:
3037         raise errors.OpExecError("Could not connect to node '%s' to rename"
3038                                  " directory '%s' to '%s' (but the instance"
3039                                  " has been renamed in Ganeti)" % (
3040                                  inst.primary_node, old_file_storage_dir,
3041                                  new_file_storage_dir))
3042
3043       if not result.data[0]:
3044         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3045                                  " (but the instance has been renamed in"
3046                                  " Ganeti)" % (old_file_storage_dir,
3047                                                new_file_storage_dir))
3048
3049     _StartInstanceDisks(self, inst, None)
3050     try:
3051       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3052                                                  old_name)
3053       msg = result.RemoteFailMsg()
3054       if msg:
3055         msg = ("Could not run OS rename script for instance %s on node %s"
3056                " (but the instance has been renamed in Ganeti): %s" %
3057                (inst.name, inst.primary_node, msg))
3058         self.proc.LogWarning(msg)
3059     finally:
3060       _ShutdownInstanceDisks(self, inst)
3061
3062
3063 class LURemoveInstance(LogicalUnit):
3064   """Remove an instance.
3065
3066   """
3067   HPATH = "instance-remove"
3068   HTYPE = constants.HTYPE_INSTANCE
3069   _OP_REQP = ["instance_name", "ignore_failures"]
3070   REQ_BGL = False
3071
3072   def ExpandNames(self):
3073     self._ExpandAndLockInstance()
3074     self.needed_locks[locking.LEVEL_NODE] = []
3075     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3076
3077   def DeclareLocks(self, level):
3078     if level == locking.LEVEL_NODE:
3079       self._LockInstancesNodes()
3080
3081   def BuildHooksEnv(self):
3082     """Build hooks env.
3083
3084     This runs on master, primary and secondary nodes of the instance.
3085
3086     """
3087     env = _BuildInstanceHookEnvByObject(self, self.instance)
3088     nl = [self.cfg.GetMasterNode()]
3089     return env, nl, nl
3090
3091   def CheckPrereq(self):
3092     """Check prerequisites.
3093
3094     This checks that the instance is in the cluster.
3095
3096     """
3097     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3098     assert self.instance is not None, \
3099       "Cannot retrieve locked instance %s" % self.op.instance_name
3100
3101   def Exec(self, feedback_fn):
3102     """Remove the instance.
3103
3104     """
3105     instance = self.instance
3106     logging.info("Shutting down instance %s on node %s",
3107                  instance.name, instance.primary_node)
3108
3109     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3110     msg = result.RemoteFailMsg()
3111     if msg:
3112       if self.op.ignore_failures:
3113         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3114       else:
3115         raise errors.OpExecError("Could not shutdown instance %s on"
3116                                  " node %s: %s" %
3117                                  (instance.name, instance.primary_node, msg))
3118
3119     logging.info("Removing block devices for instance %s", instance.name)
3120
3121     if not _RemoveDisks(self, instance):
3122       if self.op.ignore_failures:
3123         feedback_fn("Warning: can't remove instance's disks")
3124       else:
3125         raise errors.OpExecError("Can't remove instance's disks")
3126
3127     logging.info("Removing instance %s out of cluster config", instance.name)
3128
3129     self.cfg.RemoveInstance(instance.name)
3130     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3131
3132
3133 class LUQueryInstances(NoHooksLU):
3134   """Logical unit for querying instances.
3135
3136   """
3137   _OP_REQP = ["output_fields", "names", "use_locking"]
3138   REQ_BGL = False
3139   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3140                                     "admin_state",
3141                                     "disk_template", "ip", "mac", "bridge",
3142                                     "sda_size", "sdb_size", "vcpus", "tags",
3143                                     "network_port", "beparams",
3144                                     r"(disk)\.(size)/([0-9]+)",
3145                                     r"(disk)\.(sizes)", "disk_usage",
3146                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3147                                     r"(nic)\.(macs|ips|bridges)",
3148                                     r"(disk|nic)\.(count)",
3149                                     "serial_no", "hypervisor", "hvparams",] +
3150                                   ["hv/%s" % name
3151                                    for name in constants.HVS_PARAMETERS] +
3152                                   ["be/%s" % name
3153                                    for name in constants.BES_PARAMETERS])
3154   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3155
3156
3157   def ExpandNames(self):
3158     _CheckOutputFields(static=self._FIELDS_STATIC,
3159                        dynamic=self._FIELDS_DYNAMIC,
3160                        selected=self.op.output_fields)
3161
3162     self.needed_locks = {}
3163     self.share_locks[locking.LEVEL_INSTANCE] = 1
3164     self.share_locks[locking.LEVEL_NODE] = 1
3165
3166     if self.op.names:
3167       self.wanted = _GetWantedInstances(self, self.op.names)
3168     else:
3169       self.wanted = locking.ALL_SET
3170
3171     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3172     self.do_locking = self.do_node_query and self.op.use_locking
3173     if self.do_locking:
3174       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3175       self.needed_locks[locking.LEVEL_NODE] = []
3176       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3177
3178   def DeclareLocks(self, level):
3179     if level == locking.LEVEL_NODE and self.do_locking:
3180       self._LockInstancesNodes()
3181
3182   def CheckPrereq(self):
3183     """Check prerequisites.
3184
3185     """
3186     pass
3187
3188   def Exec(self, feedback_fn):
3189     """Computes the list of nodes and their attributes.
3190
3191     """
3192     all_info = self.cfg.GetAllInstancesInfo()
3193     if self.wanted == locking.ALL_SET:
3194       # caller didn't specify instance names, so ordering is not important
3195       if self.do_locking:
3196         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3197       else:
3198         instance_names = all_info.keys()
3199       instance_names = utils.NiceSort(instance_names)
3200     else:
3201       # caller did specify names, so we must keep the ordering
3202       if self.do_locking:
3203         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3204       else:
3205         tgt_set = all_info.keys()
3206       missing = set(self.wanted).difference(tgt_set)
3207       if missing:
3208         raise errors.OpExecError("Some instances were removed before"
3209                                  " retrieving their data: %s" % missing)
3210       instance_names = self.wanted
3211
3212     instance_list = [all_info[iname] for iname in instance_names]
3213
3214     # begin data gathering
3215
3216     nodes = frozenset([inst.primary_node for inst in instance_list])
3217     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3218
3219     bad_nodes = []
3220     off_nodes = []
3221     if self.do_node_query:
3222       live_data = {}
3223       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3224       for name in nodes:
3225         result = node_data[name]
3226         if result.offline:
3227           # offline nodes will be in both lists
3228           off_nodes.append(name)
3229         if result.failed:
3230           bad_nodes.append(name)
3231         else:
3232           if result.data:
3233             live_data.update(result.data)
3234             # else no instance is alive
3235     else:
3236       live_data = dict([(name, {}) for name in instance_names])
3237
3238     # end data gathering
3239
3240     HVPREFIX = "hv/"
3241     BEPREFIX = "be/"
3242     output = []
3243     for instance in instance_list:
3244       iout = []
3245       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3246       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3247       for field in self.op.output_fields:
3248         st_match = self._FIELDS_STATIC.Matches(field)
3249         if field == "name":
3250           val = instance.name
3251         elif field == "os":
3252           val = instance.os
3253         elif field == "pnode":
3254           val = instance.primary_node
3255         elif field == "snodes":
3256           val = list(instance.secondary_nodes)
3257         elif field == "admin_state":
3258           val = instance.admin_up
3259         elif field == "oper_state":
3260           if instance.primary_node in bad_nodes:
3261             val = None
3262           else:
3263             val = bool(live_data.get(instance.name))
3264         elif field == "status":
3265           if instance.primary_node in off_nodes:
3266             val = "ERROR_nodeoffline"
3267           elif instance.primary_node in bad_nodes:
3268             val = "ERROR_nodedown"
3269           else:
3270             running = bool(live_data.get(instance.name))
3271             if running:
3272               if instance.admin_up:
3273                 val = "running"
3274               else:
3275                 val = "ERROR_up"
3276             else:
3277               if instance.admin_up:
3278                 val = "ERROR_down"
3279               else:
3280                 val = "ADMIN_down"
3281         elif field == "oper_ram":
3282           if instance.primary_node in bad_nodes:
3283             val = None
3284           elif instance.name in live_data:
3285             val = live_data[instance.name].get("memory", "?")
3286           else:
3287             val = "-"
3288         elif field == "disk_template":
3289           val = instance.disk_template
3290         elif field == "ip":
3291           val = instance.nics[0].ip
3292         elif field == "bridge":
3293           val = instance.nics[0].bridge
3294         elif field == "mac":
3295           val = instance.nics[0].mac
3296         elif field == "sda_size" or field == "sdb_size":
3297           idx = ord(field[2]) - ord('a')
3298           try:
3299             val = instance.FindDisk(idx).size
3300           except errors.OpPrereqError:
3301             val = None
3302         elif field == "disk_usage": # total disk usage per node
3303           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3304           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3305         elif field == "tags":
3306           val = list(instance.GetTags())
3307         elif field == "serial_no":
3308           val = instance.serial_no
3309         elif field == "network_port":
3310           val = instance.network_port
3311         elif field == "hypervisor":
3312           val = instance.hypervisor
3313         elif field == "hvparams":
3314           val = i_hv
3315         elif (field.startswith(HVPREFIX) and
3316               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3317           val = i_hv.get(field[len(HVPREFIX):], None)
3318         elif field == "beparams":
3319           val = i_be
3320         elif (field.startswith(BEPREFIX) and
3321               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3322           val = i_be.get(field[len(BEPREFIX):], None)
3323         elif st_match and st_match.groups():
3324           # matches a variable list
3325           st_groups = st_match.groups()
3326           if st_groups and st_groups[0] == "disk":
3327             if st_groups[1] == "count":
3328               val = len(instance.disks)
3329             elif st_groups[1] == "sizes":
3330               val = [disk.size for disk in instance.disks]
3331             elif st_groups[1] == "size":
3332               try:
3333                 val = instance.FindDisk(st_groups[2]).size
3334               except errors.OpPrereqError:
3335                 val = None
3336             else:
3337               assert False, "Unhandled disk parameter"
3338           elif st_groups[0] == "nic":
3339             if st_groups[1] == "count":
3340               val = len(instance.nics)
3341             elif st_groups[1] == "macs":
3342               val = [nic.mac for nic in instance.nics]
3343             elif st_groups[1] == "ips":
3344               val = [nic.ip for nic in instance.nics]
3345             elif st_groups[1] == "bridges":
3346               val = [nic.bridge for nic in instance.nics]
3347             else:
3348               # index-based item
3349               nic_idx = int(st_groups[2])
3350               if nic_idx >= len(instance.nics):
3351                 val = None
3352               else:
3353                 if st_groups[1] == "mac":
3354                   val = instance.nics[nic_idx].mac
3355                 elif st_groups[1] == "ip":
3356                   val = instance.nics[nic_idx].ip
3357                 elif st_groups[1] == "bridge":
3358                   val = instance.nics[nic_idx].bridge
3359                 else:
3360                   assert False, "Unhandled NIC parameter"
3361           else:
3362             assert False, "Unhandled variable parameter"
3363         else:
3364           raise errors.ParameterError(field)
3365         iout.append(val)
3366       output.append(iout)
3367
3368     return output
3369
3370
3371 class LUFailoverInstance(LogicalUnit):
3372   """Failover an instance.
3373
3374   """
3375   HPATH = "instance-failover"
3376   HTYPE = constants.HTYPE_INSTANCE
3377   _OP_REQP = ["instance_name", "ignore_consistency"]
3378   REQ_BGL = False
3379
3380   def ExpandNames(self):
3381     self._ExpandAndLockInstance()
3382     self.needed_locks[locking.LEVEL_NODE] = []
3383     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3384
3385   def DeclareLocks(self, level):
3386     if level == locking.LEVEL_NODE:
3387       self._LockInstancesNodes()
3388
3389   def BuildHooksEnv(self):
3390     """Build hooks env.
3391
3392     This runs on master, primary and secondary nodes of the instance.
3393
3394     """
3395     env = {
3396       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3397       }
3398     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3399     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3400     return env, nl, nl
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     This checks that the instance is in the cluster.
3406
3407     """
3408     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3409     assert self.instance is not None, \
3410       "Cannot retrieve locked instance %s" % self.op.instance_name
3411
3412     bep = self.cfg.GetClusterInfo().FillBE(instance)
3413     if instance.disk_template not in constants.DTS_NET_MIRROR:
3414       raise errors.OpPrereqError("Instance's disk layout is not"
3415                                  " network mirrored, cannot failover.")
3416
3417     secondary_nodes = instance.secondary_nodes
3418     if not secondary_nodes:
3419       raise errors.ProgrammerError("no secondary node but using "
3420                                    "a mirrored disk template")
3421
3422     target_node = secondary_nodes[0]
3423     _CheckNodeOnline(self, target_node)
3424     _CheckNodeNotDrained(self, target_node)
3425     # check memory requirements on the secondary node
3426     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3427                          instance.name, bep[constants.BE_MEMORY],
3428                          instance.hypervisor)
3429
3430     # check bridge existance
3431     brlist = [nic.bridge for nic in instance.nics]
3432     result = self.rpc.call_bridges_exist(target_node, brlist)
3433     result.Raise()
3434     if not result.data:
3435       raise errors.OpPrereqError("One or more target bridges %s does not"
3436                                  " exist on destination node '%s'" %
3437                                  (brlist, target_node))
3438
3439   def Exec(self, feedback_fn):
3440     """Failover an instance.
3441
3442     The failover is done by shutting it down on its present node and
3443     starting it on the secondary.
3444
3445     """
3446     instance = self.instance
3447
3448     source_node = instance.primary_node
3449     target_node = instance.secondary_nodes[0]
3450
3451     feedback_fn("* checking disk consistency between source and target")
3452     for dev in instance.disks:
3453       # for drbd, these are drbd over lvm
3454       if not _CheckDiskConsistency(self, dev, target_node, False):
3455         if instance.admin_up and not self.op.ignore_consistency:
3456           raise errors.OpExecError("Disk %s is degraded on target node,"
3457                                    " aborting failover." % dev.iv_name)
3458
3459     feedback_fn("* shutting down instance on source node")
3460     logging.info("Shutting down instance %s on node %s",
3461                  instance.name, source_node)
3462
3463     result = self.rpc.call_instance_shutdown(source_node, instance)
3464     msg = result.RemoteFailMsg()
3465     if msg:
3466       if self.op.ignore_consistency:
3467         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3468                              " Proceeding anyway. Please make sure node"
3469                              " %s is down. Error details: %s",
3470                              instance.name, source_node, source_node, msg)
3471       else:
3472         raise errors.OpExecError("Could not shutdown instance %s on"
3473                                  " node %s: %s" %
3474                                  (instance.name, source_node, msg))
3475
3476     feedback_fn("* deactivating the instance's disks on source node")
3477     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3478       raise errors.OpExecError("Can't shut down the instance's disks.")
3479
3480     instance.primary_node = target_node
3481     # distribute new instance config to the other nodes
3482     self.cfg.Update(instance)
3483
3484     # Only start the instance if it's marked as up
3485     if instance.admin_up:
3486       feedback_fn("* activating the instance's disks on target node")
3487       logging.info("Starting instance %s on node %s",
3488                    instance.name, target_node)
3489
3490       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3491                                                ignore_secondaries=True)
3492       if not disks_ok:
3493         _ShutdownInstanceDisks(self, instance)
3494         raise errors.OpExecError("Can't activate the instance's disks")
3495
3496       feedback_fn("* starting the instance on the target node")
3497       result = self.rpc.call_instance_start(target_node, instance, None)
3498       msg = result.RemoteFailMsg()
3499       if msg:
3500         _ShutdownInstanceDisks(self, instance)
3501         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3502                                  (instance.name, target_node, msg))
3503
3504
3505 class LUMigrateInstance(LogicalUnit):
3506   """Migrate an instance.
3507
3508   This is migration without shutting down, compared to the failover,
3509   which is done with shutdown.
3510
3511   """
3512   HPATH = "instance-migrate"
3513   HTYPE = constants.HTYPE_INSTANCE
3514   _OP_REQP = ["instance_name", "live", "cleanup"]
3515
3516   REQ_BGL = False
3517
3518   def ExpandNames(self):
3519     self._ExpandAndLockInstance()
3520     self.needed_locks[locking.LEVEL_NODE] = []
3521     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3522
3523   def DeclareLocks(self, level):
3524     if level == locking.LEVEL_NODE:
3525       self._LockInstancesNodes()
3526
3527   def BuildHooksEnv(self):
3528     """Build hooks env.
3529
3530     This runs on master, primary and secondary nodes of the instance.
3531
3532     """
3533     env = _BuildInstanceHookEnvByObject(self, self.instance)
3534     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3535     return env, nl, nl
3536
3537   def CheckPrereq(self):
3538     """Check prerequisites.
3539
3540     This checks that the instance is in the cluster.
3541
3542     """
3543     instance = self.cfg.GetInstanceInfo(
3544       self.cfg.ExpandInstanceName(self.op.instance_name))
3545     if instance is None:
3546       raise errors.OpPrereqError("Instance '%s' not known" %
3547                                  self.op.instance_name)
3548
3549     if instance.disk_template != constants.DT_DRBD8:
3550       raise errors.OpPrereqError("Instance's disk layout is not"
3551                                  " drbd8, cannot migrate.")
3552
3553     secondary_nodes = instance.secondary_nodes
3554     if not secondary_nodes:
3555       raise errors.ConfigurationError("No secondary node but using"
3556                                       " drbd8 disk template")
3557
3558     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3559
3560     target_node = secondary_nodes[0]
3561     # check memory requirements on the secondary node
3562     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3563                          instance.name, i_be[constants.BE_MEMORY],
3564                          instance.hypervisor)
3565
3566     # check bridge existance
3567     brlist = [nic.bridge for nic in instance.nics]
3568     result = self.rpc.call_bridges_exist(target_node, brlist)
3569     if result.failed or not result.data:
3570       raise errors.OpPrereqError("One or more target bridges %s does not"
3571                                  " exist on destination node '%s'" %
3572                                  (brlist, target_node))
3573
3574     if not self.op.cleanup:
3575       _CheckNodeNotDrained(self, target_node)
3576       result = self.rpc.call_instance_migratable(instance.primary_node,
3577                                                  instance)
3578       msg = result.RemoteFailMsg()
3579       if msg:
3580         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3581                                    msg)
3582
3583     self.instance = instance
3584
3585   def _WaitUntilSync(self):
3586     """Poll with custom rpc for disk sync.
3587
3588     This uses our own step-based rpc call.
3589
3590     """
3591     self.feedback_fn("* wait until resync is done")
3592     all_done = False
3593     while not all_done:
3594       all_done = True
3595       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3596                                             self.nodes_ip,
3597                                             self.instance.disks)
3598       min_percent = 100
3599       for node, nres in result.items():
3600         msg = nres.RemoteFailMsg()
3601         if msg:
3602           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3603                                    (node, msg))
3604         node_done, node_percent = nres.payload
3605         all_done = all_done and node_done
3606         if node_percent is not None:
3607           min_percent = min(min_percent, node_percent)
3608       if not all_done:
3609         if min_percent < 100:
3610           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3611         time.sleep(2)
3612
3613   def _EnsureSecondary(self, node):
3614     """Demote a node to secondary.
3615
3616     """
3617     self.feedback_fn("* switching node %s to secondary mode" % node)
3618
3619     for dev in self.instance.disks:
3620       self.cfg.SetDiskID(dev, node)
3621
3622     result = self.rpc.call_blockdev_close(node, self.instance.name,
3623                                           self.instance.disks)
3624     msg = result.RemoteFailMsg()
3625     if msg:
3626       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3627                                " error %s" % (node, msg))
3628
3629   def _GoStandalone(self):
3630     """Disconnect from the network.
3631
3632     """
3633     self.feedback_fn("* changing into standalone mode")
3634     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3635                                                self.instance.disks)
3636     for node, nres in result.items():
3637       msg = nres.RemoteFailMsg()
3638       if msg:
3639         raise errors.OpExecError("Cannot disconnect disks node %s,"
3640                                  " error %s" % (node, msg))
3641
3642   def _GoReconnect(self, multimaster):
3643     """Reconnect to the network.
3644
3645     """
3646     if multimaster:
3647       msg = "dual-master"
3648     else:
3649       msg = "single-master"
3650     self.feedback_fn("* changing disks into %s mode" % msg)
3651     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3652                                            self.instance.disks,
3653                                            self.instance.name, multimaster)
3654     for node, nres in result.items():
3655       msg = nres.RemoteFailMsg()
3656       if msg:
3657         raise errors.OpExecError("Cannot change disks config on node %s,"
3658                                  " error: %s" % (node, msg))
3659
3660   def _ExecCleanup(self):
3661     """Try to cleanup after a failed migration.
3662
3663     The cleanup is done by:
3664       - check that the instance is running only on one node
3665         (and update the config if needed)
3666       - change disks on its secondary node to secondary
3667       - wait until disks are fully synchronized
3668       - disconnect from the network
3669       - change disks into single-master mode
3670       - wait again until disks are fully synchronized
3671
3672     """
3673     instance = self.instance
3674     target_node = self.target_node
3675     source_node = self.source_node
3676
3677     # check running on only one node
3678     self.feedback_fn("* checking where the instance actually runs"
3679                      " (if this hangs, the hypervisor might be in"
3680                      " a bad state)")
3681     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3682     for node, result in ins_l.items():
3683       result.Raise()
3684       if not isinstance(result.data, list):
3685         raise errors.OpExecError("Can't contact node '%s'" % node)
3686
3687     runningon_source = instance.name in ins_l[source_node].data
3688     runningon_target = instance.name in ins_l[target_node].data
3689
3690     if runningon_source and runningon_target:
3691       raise errors.OpExecError("Instance seems to be running on two nodes,"
3692                                " or the hypervisor is confused. You will have"
3693                                " to ensure manually that it runs only on one"
3694                                " and restart this operation.")
3695
3696     if not (runningon_source or runningon_target):
3697       raise errors.OpExecError("Instance does not seem to be running at all."
3698                                " In this case, it's safer to repair by"
3699                                " running 'gnt-instance stop' to ensure disk"
3700                                " shutdown, and then restarting it.")
3701
3702     if runningon_target:
3703       # the migration has actually succeeded, we need to update the config
3704       self.feedback_fn("* instance running on secondary node (%s),"
3705                        " updating config" % target_node)
3706       instance.primary_node = target_node
3707       self.cfg.Update(instance)
3708       demoted_node = source_node
3709     else:
3710       self.feedback_fn("* instance confirmed to be running on its"
3711                        " primary node (%s)" % source_node)
3712       demoted_node = target_node
3713
3714     self._EnsureSecondary(demoted_node)
3715     try:
3716       self._WaitUntilSync()
3717     except errors.OpExecError:
3718       # we ignore here errors, since if the device is standalone, it
3719       # won't be able to sync
3720       pass
3721     self._GoStandalone()
3722     self._GoReconnect(False)
3723     self._WaitUntilSync()
3724
3725     self.feedback_fn("* done")
3726
3727   def _RevertDiskStatus(self):
3728     """Try to revert the disk status after a failed migration.
3729
3730     """
3731     target_node = self.target_node
3732     try:
3733       self._EnsureSecondary(target_node)
3734       self._GoStandalone()
3735       self._GoReconnect(False)
3736       self._WaitUntilSync()
3737     except errors.OpExecError, err:
3738       self.LogWarning("Migration failed and I can't reconnect the"
3739                       " drives: error '%s'\n"
3740                       "Please look and recover the instance status" %
3741                       str(err))
3742
3743   def _AbortMigration(self):
3744     """Call the hypervisor code to abort a started migration.
3745
3746     """
3747     instance = self.instance
3748     target_node = self.target_node
3749     migration_info = self.migration_info
3750
3751     abort_result = self.rpc.call_finalize_migration(target_node,
3752                                                     instance,
3753                                                     migration_info,
3754                                                     False)
3755     abort_msg = abort_result.RemoteFailMsg()
3756     if abort_msg:
3757       logging.error("Aborting migration failed on target node %s: %s" %
3758                     (target_node, abort_msg))
3759       # Don't raise an exception here, as we stil have to try to revert the
3760       # disk status, even if this step failed.
3761
3762   def _ExecMigration(self):
3763     """Migrate an instance.
3764
3765     The migrate is done by:
3766       - change the disks into dual-master mode
3767       - wait until disks are fully synchronized again
3768       - migrate the instance
3769       - change disks on the new secondary node (the old primary) to secondary
3770       - wait until disks are fully synchronized
3771       - change disks into single-master mode
3772
3773     """
3774     instance = self.instance
3775     target_node = self.target_node
3776     source_node = self.source_node
3777
3778     self.feedback_fn("* checking disk consistency between source and target")
3779     for dev in instance.disks:
3780       if not _CheckDiskConsistency(self, dev, target_node, False):
3781         raise errors.OpExecError("Disk %s is degraded or not fully"
3782                                  " synchronized on target node,"
3783                                  " aborting migrate." % dev.iv_name)
3784
3785     # First get the migration information from the remote node
3786     result = self.rpc.call_migration_info(source_node, instance)
3787     msg = result.RemoteFailMsg()
3788     if msg:
3789       log_err = ("Failed fetching source migration information from %s: %s" %
3790                  (source_node, msg))
3791       logging.error(log_err)
3792       raise errors.OpExecError(log_err)
3793
3794     self.migration_info = migration_info = result.payload
3795
3796     # Then switch the disks to master/master mode
3797     self._EnsureSecondary(target_node)
3798     self._GoStandalone()
3799     self._GoReconnect(True)
3800     self._WaitUntilSync()
3801
3802     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3803     result = self.rpc.call_accept_instance(target_node,
3804                                            instance,
3805                                            migration_info,
3806                                            self.nodes_ip[target_node])
3807
3808     msg = result.RemoteFailMsg()
3809     if msg:
3810       logging.error("Instance pre-migration failed, trying to revert"
3811                     " disk status: %s", msg)
3812       self._AbortMigration()
3813       self._RevertDiskStatus()
3814       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3815                                (instance.name, msg))
3816
3817     self.feedback_fn("* migrating instance to %s" % target_node)
3818     time.sleep(10)
3819     result = self.rpc.call_instance_migrate(source_node, instance,
3820                                             self.nodes_ip[target_node],
3821                                             self.op.live)
3822     msg = result.RemoteFailMsg()
3823     if msg:
3824       logging.error("Instance migration failed, trying to revert"
3825                     " disk status: %s", msg)
3826       self._AbortMigration()
3827       self._RevertDiskStatus()
3828       raise errors.OpExecError("Could not migrate instance %s: %s" %
3829                                (instance.name, msg))
3830     time.sleep(10)
3831
3832     instance.primary_node = target_node
3833     # distribute new instance config to the other nodes
3834     self.cfg.Update(instance)
3835
3836     result = self.rpc.call_finalize_migration(target_node,
3837                                               instance,
3838                                               migration_info,
3839                                               True)
3840     msg = result.RemoteFailMsg()
3841     if msg:
3842       logging.error("Instance migration succeeded, but finalization failed:"
3843                     " %s" % msg)
3844       raise errors.OpExecError("Could not finalize instance migration: %s" %
3845                                msg)
3846
3847     self._EnsureSecondary(source_node)
3848     self._WaitUntilSync()
3849     self._GoStandalone()
3850     self._GoReconnect(False)
3851     self._WaitUntilSync()
3852
3853     self.feedback_fn("* done")
3854
3855   def Exec(self, feedback_fn):
3856     """Perform the migration.
3857
3858     """
3859     self.feedback_fn = feedback_fn
3860
3861     self.source_node = self.instance.primary_node
3862     self.target_node = self.instance.secondary_nodes[0]
3863     self.all_nodes = [self.source_node, self.target_node]
3864     self.nodes_ip = {
3865       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3866       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3867       }
3868     if self.op.cleanup:
3869       return self._ExecCleanup()
3870     else:
3871       return self._ExecMigration()
3872
3873
3874 def _CreateBlockDev(lu, node, instance, device, force_create,
3875                     info, force_open):
3876   """Create a tree of block devices on a given node.
3877
3878   If this device type has to be created on secondaries, create it and
3879   all its children.
3880
3881   If not, just recurse to children keeping the same 'force' value.
3882
3883   @param lu: the lu on whose behalf we execute
3884   @param node: the node on which to create the device
3885   @type instance: L{objects.Instance}
3886   @param instance: the instance which owns the device
3887   @type device: L{objects.Disk}
3888   @param device: the device to create
3889   @type force_create: boolean
3890   @param force_create: whether to force creation of this device; this
3891       will be change to True whenever we find a device which has
3892       CreateOnSecondary() attribute
3893   @param info: the extra 'metadata' we should attach to the device
3894       (this will be represented as a LVM tag)
3895   @type force_open: boolean
3896   @param force_open: this parameter will be passes to the
3897       L{backend.BlockdevCreate} function where it specifies
3898       whether we run on primary or not, and it affects both
3899       the child assembly and the device own Open() execution
3900
3901   """
3902   if device.CreateOnSecondary():
3903     force_create = True
3904
3905   if device.children:
3906     for child in device.children:
3907       _CreateBlockDev(lu, node, instance, child, force_create,
3908                       info, force_open)
3909
3910   if not force_create:
3911     return
3912
3913   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3914
3915
3916 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3917   """Create a single block device on a given node.
3918
3919   This will not recurse over children of the device, so they must be
3920   created in advance.
3921
3922   @param lu: the lu on whose behalf we execute
3923   @param node: the node on which to create the device
3924   @type instance: L{objects.Instance}
3925   @param instance: the instance which owns the device
3926   @type device: L{objects.Disk}
3927   @param device: the device to create
3928   @param info: the extra 'metadata' we should attach to the device
3929       (this will be represented as a LVM tag)
3930   @type force_open: boolean
3931   @param force_open: this parameter will be passes to the
3932       L{backend.BlockdevCreate} function where it specifies
3933       whether we run on primary or not, and it affects both
3934       the child assembly and the device own Open() execution
3935
3936   """
3937   lu.cfg.SetDiskID(device, node)
3938   result = lu.rpc.call_blockdev_create(node, device, device.size,
3939                                        instance.name, force_open, info)
3940   msg = result.RemoteFailMsg()
3941   if msg:
3942     raise errors.OpExecError("Can't create block device %s on"
3943                              " node %s for instance %s: %s" %
3944                              (device, node, instance.name, msg))
3945   if device.physical_id is None:
3946     device.physical_id = result.payload
3947
3948
3949 def _GenerateUniqueNames(lu, exts):
3950   """Generate a suitable LV name.
3951
3952   This will generate a logical volume name for the given instance.
3953
3954   """
3955   results = []
3956   for val in exts:
3957     new_id = lu.cfg.GenerateUniqueID()
3958     results.append("%s%s" % (new_id, val))
3959   return results
3960
3961
3962 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3963                          p_minor, s_minor):
3964   """Generate a drbd8 device complete with its children.
3965
3966   """
3967   port = lu.cfg.AllocatePort()
3968   vgname = lu.cfg.GetVGName()
3969   shared_secret = lu.cfg.GenerateDRBDSecret()
3970   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3971                           logical_id=(vgname, names[0]))
3972   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3973                           logical_id=(vgname, names[1]))
3974   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3975                           logical_id=(primary, secondary, port,
3976                                       p_minor, s_minor,
3977                                       shared_secret),
3978                           children=[dev_data, dev_meta],
3979                           iv_name=iv_name)
3980   return drbd_dev
3981
3982
3983 def _GenerateDiskTemplate(lu, template_name,
3984                           instance_name, primary_node,
3985                           secondary_nodes, disk_info,
3986                           file_storage_dir, file_driver,
3987                           base_index):
3988   """Generate the entire disk layout for a given template type.
3989
3990   """
3991   #TODO: compute space requirements
3992
3993   vgname = lu.cfg.GetVGName()
3994   disk_count = len(disk_info)
3995   disks = []
3996   if template_name == constants.DT_DISKLESS:
3997     pass
3998   elif template_name == constants.DT_PLAIN:
3999     if len(secondary_nodes) != 0:
4000       raise errors.ProgrammerError("Wrong template configuration")
4001
4002     names = _GenerateUniqueNames(lu, [".disk%d" % i
4003                                       for i in range(disk_count)])
4004     for idx, disk in enumerate(disk_info):
4005       disk_index = idx + base_index
4006       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4007                               logical_id=(vgname, names[idx]),
4008                               iv_name="disk/%d" % disk_index,
4009                               mode=disk["mode"])
4010       disks.append(disk_dev)
4011   elif template_name == constants.DT_DRBD8:
4012     if len(secondary_nodes) != 1:
4013       raise errors.ProgrammerError("Wrong template configuration")
4014     remote_node = secondary_nodes[0]
4015     minors = lu.cfg.AllocateDRBDMinor(
4016       [primary_node, remote_node] * len(disk_info), instance_name)
4017
4018     names = []
4019     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4020                                                for i in range(disk_count)]):
4021       names.append(lv_prefix + "_data")
4022       names.append(lv_prefix + "_meta")
4023     for idx, disk in enumerate(disk_info):
4024       disk_index = idx + base_index
4025       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4026                                       disk["size"], names[idx*2:idx*2+2],
4027                                       "disk/%d" % disk_index,
4028                                       minors[idx*2], minors[idx*2+1])
4029       disk_dev.mode = disk["mode"]
4030       disks.append(disk_dev)
4031   elif template_name == constants.DT_FILE:
4032     if len(secondary_nodes) != 0:
4033       raise errors.ProgrammerError("Wrong template configuration")
4034
4035     for idx, disk in enumerate(disk_info):
4036       disk_index = idx + base_index
4037       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4038                               iv_name="disk/%d" % disk_index,
4039                               logical_id=(file_driver,
4040                                           "%s/disk%d" % (file_storage_dir,
4041                                                          disk_index)),
4042                               mode=disk["mode"])
4043       disks.append(disk_dev)
4044   else:
4045     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4046   return disks
4047
4048
4049 def _GetInstanceInfoText(instance):
4050   """Compute that text that should be added to the disk's metadata.
4051
4052   """
4053   return "originstname+%s" % instance.name
4054
4055
4056 def _CreateDisks(lu, instance):
4057   """Create all disks for an instance.
4058
4059   This abstracts away some work from AddInstance.
4060
4061   @type lu: L{LogicalUnit}
4062   @param lu: the logical unit on whose behalf we execute
4063   @type instance: L{objects.Instance}
4064   @param instance: the instance whose disks we should create
4065   @rtype: boolean
4066   @return: the success of the creation
4067
4068   """
4069   info = _GetInstanceInfoText(instance)
4070   pnode = instance.primary_node
4071
4072   if instance.disk_template == constants.DT_FILE:
4073     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4074     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4075
4076     if result.failed or not result.data:
4077       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4078
4079     if not result.data[0]:
4080       raise errors.OpExecError("Failed to create directory '%s'" %
4081                                file_storage_dir)
4082
4083   # Note: this needs to be kept in sync with adding of disks in
4084   # LUSetInstanceParams
4085   for device in instance.disks:
4086     logging.info("Creating volume %s for instance %s",
4087                  device.iv_name, instance.name)
4088     #HARDCODE
4089     for node in instance.all_nodes:
4090       f_create = node == pnode
4091       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4092
4093
4094 def _RemoveDisks(lu, instance):
4095   """Remove all disks for an instance.
4096
4097   This abstracts away some work from `AddInstance()` and
4098   `RemoveInstance()`. Note that in case some of the devices couldn't
4099   be removed, the removal will continue with the other ones (compare
4100   with `_CreateDisks()`).
4101
4102   @type lu: L{LogicalUnit}
4103   @param lu: the logical unit on whose behalf we execute
4104   @type instance: L{objects.Instance}
4105   @param instance: the instance whose disks we should remove
4106   @rtype: boolean
4107   @return: the success of the removal
4108
4109   """
4110   logging.info("Removing block devices for instance %s", instance.name)
4111
4112   all_result = True
4113   for device in instance.disks:
4114     for node, disk in device.ComputeNodeTree(instance.primary_node):
4115       lu.cfg.SetDiskID(disk, node)
4116       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4117       if msg:
4118         lu.LogWarning("Could not remove block device %s on node %s,"
4119                       " continuing anyway: %s", device.iv_name, node, msg)
4120         all_result = False
4121
4122   if instance.disk_template == constants.DT_FILE:
4123     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4124     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4125                                                  file_storage_dir)
4126     if result.failed or not result.data:
4127       logging.error("Could not remove directory '%s'", file_storage_dir)
4128       all_result = False
4129
4130   return all_result
4131
4132
4133 def _ComputeDiskSize(disk_template, disks):
4134   """Compute disk size requirements in the volume group
4135
4136   """
4137   # Required free disk space as a function of disk and swap space
4138   req_size_dict = {
4139     constants.DT_DISKLESS: None,
4140     constants.DT_PLAIN: sum(d["size"] for d in disks),
4141     # 128 MB are added for drbd metadata for each disk
4142     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4143     constants.DT_FILE: None,
4144   }
4145
4146   if disk_template not in req_size_dict:
4147     raise errors.ProgrammerError("Disk template '%s' size requirement"
4148                                  " is unknown" %  disk_template)
4149
4150   return req_size_dict[disk_template]
4151
4152
4153 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4154   """Hypervisor parameter validation.
4155
4156   This function abstract the hypervisor parameter validation to be
4157   used in both instance create and instance modify.
4158
4159   @type lu: L{LogicalUnit}
4160   @param lu: the logical unit for which we check
4161   @type nodenames: list
4162   @param nodenames: the list of nodes on which we should check
4163   @type hvname: string
4164   @param hvname: the name of the hypervisor we should use
4165   @type hvparams: dict
4166   @param hvparams: the parameters which we need to check
4167   @raise errors.OpPrereqError: if the parameters are not valid
4168
4169   """
4170   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4171                                                   hvname,
4172                                                   hvparams)
4173   for node in nodenames:
4174     info = hvinfo[node]
4175     if info.offline:
4176       continue
4177     msg = info.RemoteFailMsg()
4178     if msg:
4179       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4180                                  " %s" % msg)
4181
4182
4183 class LUCreateInstance(LogicalUnit):
4184   """Create an instance.
4185
4186   """
4187   HPATH = "instance-add"
4188   HTYPE = constants.HTYPE_INSTANCE
4189   _OP_REQP = ["instance_name", "disks", "disk_template",
4190               "mode", "start",
4191               "wait_for_sync", "ip_check", "nics",
4192               "hvparams", "beparams"]
4193   REQ_BGL = False
4194
4195   def _ExpandNode(self, node):
4196     """Expands and checks one node name.
4197
4198     """
4199     node_full = self.cfg.ExpandNodeName(node)
4200     if node_full is None:
4201       raise errors.OpPrereqError("Unknown node %s" % node)
4202     return node_full
4203
4204   def ExpandNames(self):
4205     """ExpandNames for CreateInstance.
4206
4207     Figure out the right locks for instance creation.
4208
4209     """
4210     self.needed_locks = {}
4211
4212     # set optional parameters to none if they don't exist
4213     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4214       if not hasattr(self.op, attr):
4215         setattr(self.op, attr, None)
4216
4217     # cheap checks, mostly valid constants given
4218
4219     # verify creation mode
4220     if self.op.mode not in (constants.INSTANCE_CREATE,
4221                             constants.INSTANCE_IMPORT):
4222       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4223                                  self.op.mode)
4224
4225     # disk template and mirror node verification
4226     if self.op.disk_template not in constants.DISK_TEMPLATES:
4227       raise errors.OpPrereqError("Invalid disk template name")
4228
4229     if self.op.hypervisor is None:
4230       self.op.hypervisor = self.cfg.GetHypervisorType()
4231
4232     cluster = self.cfg.GetClusterInfo()
4233     enabled_hvs = cluster.enabled_hypervisors
4234     if self.op.hypervisor not in enabled_hvs:
4235       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4236                                  " cluster (%s)" % (self.op.hypervisor,
4237                                   ",".join(enabled_hvs)))
4238
4239     # check hypervisor parameter syntax (locally)
4240     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4241     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4242                                   self.op.hvparams)
4243     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4244     hv_type.CheckParameterSyntax(filled_hvp)
4245
4246     # fill and remember the beparams dict
4247     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4248     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4249                                     self.op.beparams)
4250
4251     #### instance parameters check
4252
4253     # instance name verification
4254     hostname1 = utils.HostInfo(self.op.instance_name)
4255     self.op.instance_name = instance_name = hostname1.name
4256
4257     # this is just a preventive check, but someone might still add this
4258     # instance in the meantime, and creation will fail at lock-add time
4259     if instance_name in self.cfg.GetInstanceList():
4260       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4261                                  instance_name)
4262
4263     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4264
4265     # NIC buildup
4266     self.nics = []
4267     for nic in self.op.nics:
4268       # ip validity checks
4269       ip = nic.get("ip", None)
4270       if ip is None or ip.lower() == "none":
4271         nic_ip = None
4272       elif ip.lower() == constants.VALUE_AUTO:
4273         nic_ip = hostname1.ip
4274       else:
4275         if not utils.IsValidIP(ip):
4276           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4277                                      " like a valid IP" % ip)
4278         nic_ip = ip
4279
4280       # MAC address verification
4281       mac = nic.get("mac", constants.VALUE_AUTO)
4282       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4283         if not utils.IsValidMac(mac.lower()):
4284           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4285                                      mac)
4286       # bridge verification
4287       bridge = nic.get("bridge", None)
4288       if bridge is None:
4289         bridge = self.cfg.GetDefBridge()
4290       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4291
4292     # disk checks/pre-build
4293     self.disks = []
4294     for disk in self.op.disks:
4295       mode = disk.get("mode", constants.DISK_RDWR)
4296       if mode not in constants.DISK_ACCESS_SET:
4297         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4298                                    mode)
4299       size = disk.get("size", None)
4300       if size is None:
4301         raise errors.OpPrereqError("Missing disk size")
4302       try:
4303         size = int(size)
4304       except ValueError:
4305         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4306       self.disks.append({"size": size, "mode": mode})
4307
4308     # used in CheckPrereq for ip ping check
4309     self.check_ip = hostname1.ip
4310
4311     # file storage checks
4312     if (self.op.file_driver and
4313         not self.op.file_driver in constants.FILE_DRIVER):
4314       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4315                                  self.op.file_driver)
4316
4317     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4318       raise errors.OpPrereqError("File storage directory path not absolute")
4319
4320     ### Node/iallocator related checks
4321     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4322       raise errors.OpPrereqError("One and only one of iallocator and primary"
4323                                  " node must be given")
4324
4325     if self.op.iallocator:
4326       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4327     else:
4328       self.op.pnode = self._ExpandNode(self.op.pnode)
4329       nodelist = [self.op.pnode]
4330       if self.op.snode is not None:
4331         self.op.snode = self._ExpandNode(self.op.snode)
4332         nodelist.append(self.op.snode)
4333       self.needed_locks[locking.LEVEL_NODE] = nodelist
4334
4335     # in case of import lock the source node too
4336     if self.op.mode == constants.INSTANCE_IMPORT:
4337       src_node = getattr(self.op, "src_node", None)
4338       src_path = getattr(self.op, "src_path", None)
4339
4340       if src_path is None:
4341         self.op.src_path = src_path = self.op.instance_name
4342
4343       if src_node is None:
4344         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4345         self.op.src_node = None
4346         if os.path.isabs(src_path):
4347           raise errors.OpPrereqError("Importing an instance from an absolute"
4348                                      " path requires a source node option.")
4349       else:
4350         self.op.src_node = src_node = self._ExpandNode(src_node)
4351         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4352           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4353         if not os.path.isabs(src_path):
4354           self.op.src_path = src_path = \
4355             os.path.join(constants.EXPORT_DIR, src_path)
4356
4357     else: # INSTANCE_CREATE
4358       if getattr(self.op, "os_type", None) is None:
4359         raise errors.OpPrereqError("No guest OS specified")
4360
4361   def _RunAllocator(self):
4362     """Run the allocator based on input opcode.
4363
4364     """
4365     nics = [n.ToDict() for n in self.nics]
4366     ial = IAllocator(self,
4367                      mode=constants.IALLOCATOR_MODE_ALLOC,
4368                      name=self.op.instance_name,
4369                      disk_template=self.op.disk_template,
4370                      tags=[],
4371                      os=self.op.os_type,
4372                      vcpus=self.be_full[constants.BE_VCPUS],
4373                      mem_size=self.be_full[constants.BE_MEMORY],
4374                      disks=self.disks,
4375                      nics=nics,
4376                      hypervisor=self.op.hypervisor,
4377                      )
4378
4379     ial.Run(self.op.iallocator)
4380
4381     if not ial.success:
4382       raise errors.OpPrereqError("Can't compute nodes using"
4383                                  " iallocator '%s': %s" % (self.op.iallocator,
4384                                                            ial.info))
4385     if len(ial.nodes) != ial.required_nodes:
4386       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4387                                  " of nodes (%s), required %s" %
4388                                  (self.op.iallocator, len(ial.nodes),
4389                                   ial.required_nodes))
4390     self.op.pnode = ial.nodes[0]
4391     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4392                  self.op.instance_name, self.op.iallocator,
4393                  ", ".join(ial.nodes))
4394     if ial.required_nodes == 2:
4395       self.op.snode = ial.nodes[1]
4396
4397   def BuildHooksEnv(self):
4398     """Build hooks env.
4399
4400     This runs on master, primary and secondary nodes of the instance.
4401
4402     """
4403     env = {
4404       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4405       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4406       "INSTANCE_ADD_MODE": self.op.mode,
4407       }
4408     if self.op.mode == constants.INSTANCE_IMPORT:
4409       env["INSTANCE_SRC_NODE"] = self.op.src_node
4410       env["INSTANCE_SRC_PATH"] = self.op.src_path
4411       env["INSTANCE_SRC_IMAGES"] = self.src_images
4412
4413     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4414       primary_node=self.op.pnode,
4415       secondary_nodes=self.secondaries,
4416       status=self.op.start,
4417       os_type=self.op.os_type,
4418       memory=self.be_full[constants.BE_MEMORY],
4419       vcpus=self.be_full[constants.BE_VCPUS],
4420       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4421     ))
4422
4423     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4424           self.secondaries)
4425     return env, nl, nl
4426
4427
4428   def CheckPrereq(self):
4429     """Check prerequisites.
4430
4431     """
4432     if (not self.cfg.GetVGName() and
4433         self.op.disk_template not in constants.DTS_NOT_LVM):
4434       raise errors.OpPrereqError("Cluster does not support lvm-based"
4435                                  " instances")
4436
4437     if self.op.mode == constants.INSTANCE_IMPORT:
4438       src_node = self.op.src_node
4439       src_path = self.op.src_path
4440
4441       if src_node is None:
4442         exp_list = self.rpc.call_export_list(
4443           self.acquired_locks[locking.LEVEL_NODE])
4444         found = False
4445         for node in exp_list:
4446           if not exp_list[node].failed and src_path in exp_list[node].data:
4447             found = True
4448             self.op.src_node = src_node = node
4449             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4450                                                        src_path)
4451             break
4452         if not found:
4453           raise errors.OpPrereqError("No export found for relative path %s" %
4454                                       src_path)
4455
4456       _CheckNodeOnline(self, src_node)
4457       result = self.rpc.call_export_info(src_node, src_path)
4458       result.Raise()
4459       if not result.data:
4460         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4461
4462       export_info = result.data
4463       if not export_info.has_section(constants.INISECT_EXP):
4464         raise errors.ProgrammerError("Corrupted export config")
4465
4466       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4467       if (int(ei_version) != constants.EXPORT_VERSION):
4468         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4469                                    (ei_version, constants.EXPORT_VERSION))
4470
4471       # Check that the new instance doesn't have less disks than the export
4472       instance_disks = len(self.disks)
4473       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4474       if instance_disks < export_disks:
4475         raise errors.OpPrereqError("Not enough disks to import."
4476                                    " (instance: %d, export: %d)" %
4477                                    (instance_disks, export_disks))
4478
4479       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4480       disk_images = []
4481       for idx in range(export_disks):
4482         option = 'disk%d_dump' % idx
4483         if export_info.has_option(constants.INISECT_INS, option):
4484           # FIXME: are the old os-es, disk sizes, etc. useful?
4485           export_name = export_info.get(constants.INISECT_INS, option)
4486           image = os.path.join(src_path, export_name)
4487           disk_images.append(image)
4488         else:
4489           disk_images.append(False)
4490
4491       self.src_images = disk_images
4492
4493       old_name = export_info.get(constants.INISECT_INS, 'name')
4494       # FIXME: int() here could throw a ValueError on broken exports
4495       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4496       if self.op.instance_name == old_name:
4497         for idx, nic in enumerate(self.nics):
4498           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4499             nic_mac_ini = 'nic%d_mac' % idx
4500             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4501
4502     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4503     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4504     if self.op.start and not self.op.ip_check:
4505       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4506                                  " adding an instance in start mode")
4507
4508     if self.op.ip_check:
4509       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4510         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4511                                    (self.check_ip, self.op.instance_name))
4512
4513     #### mac address generation
4514     # By generating here the mac address both the allocator and the hooks get
4515     # the real final mac address rather than the 'auto' or 'generate' value.
4516     # There is a race condition between the generation and the instance object
4517     # creation, which means that we know the mac is valid now, but we're not
4518     # sure it will be when we actually add the instance. If things go bad
4519     # adding the instance will abort because of a duplicate mac, and the
4520     # creation job will fail.
4521     for nic in self.nics:
4522       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4523         nic.mac = self.cfg.GenerateMAC()
4524
4525     #### allocator run
4526
4527     if self.op.iallocator is not None:
4528       self._RunAllocator()
4529
4530     #### node related checks
4531
4532     # check primary node
4533     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4534     assert self.pnode is not None, \
4535       "Cannot retrieve locked node %s" % self.op.pnode
4536     if pnode.offline:
4537       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4538                                  pnode.name)
4539     if pnode.drained:
4540       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4541                                  pnode.name)
4542
4543     self.secondaries = []
4544
4545     # mirror node verification
4546     if self.op.disk_template in constants.DTS_NET_MIRROR:
4547       if self.op.snode is None:
4548         raise errors.OpPrereqError("The networked disk templates need"
4549                                    " a mirror node")
4550       if self.op.snode == pnode.name:
4551         raise errors.OpPrereqError("The secondary node cannot be"
4552                                    " the primary node.")
4553       _CheckNodeOnline(self, self.op.snode)
4554       _CheckNodeNotDrained(self, self.op.snode)
4555       self.secondaries.append(self.op.snode)
4556
4557     nodenames = [pnode.name] + self.secondaries
4558
4559     req_size = _ComputeDiskSize(self.op.disk_template,
4560                                 self.disks)
4561
4562     # Check lv size requirements
4563     if req_size is not None:
4564       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4565                                          self.op.hypervisor)
4566       for node in nodenames:
4567         info = nodeinfo[node]
4568         info.Raise()
4569         info = info.data
4570         if not info:
4571           raise errors.OpPrereqError("Cannot get current information"
4572                                      " from node '%s'" % node)
4573         vg_free = info.get('vg_free', None)
4574         if not isinstance(vg_free, int):
4575           raise errors.OpPrereqError("Can't compute free disk space on"
4576                                      " node %s" % node)
4577         if req_size > info['vg_free']:
4578           raise errors.OpPrereqError("Not enough disk space on target node %s."
4579                                      " %d MB available, %d MB required" %
4580                                      (node, info['vg_free'], req_size))
4581
4582     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4583
4584     # os verification
4585     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4586     result.Raise()
4587     if not isinstance(result.data, objects.OS):
4588       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4589                                  " primary node"  % self.op.os_type)
4590
4591     # bridge check on primary node
4592     bridges = [n.bridge for n in self.nics]
4593     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4594     result.Raise()
4595     if not result.data:
4596       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4597                                  " exist on destination node '%s'" %
4598                                  (",".join(bridges), pnode.name))
4599
4600     # memory check on primary node
4601     if self.op.start:
4602       _CheckNodeFreeMemory(self, self.pnode.name,
4603                            "creating instance %s" % self.op.instance_name,
4604                            self.be_full[constants.BE_MEMORY],
4605                            self.op.hypervisor)
4606
4607   def Exec(self, feedback_fn):
4608     """Create and add the instance to the cluster.
4609
4610     """
4611     instance = self.op.instance_name
4612     pnode_name = self.pnode.name
4613
4614     ht_kind = self.op.hypervisor
4615     if ht_kind in constants.HTS_REQ_PORT:
4616       network_port = self.cfg.AllocatePort()
4617     else:
4618       network_port = None
4619
4620     ##if self.op.vnc_bind_address is None:
4621     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4622
4623     # this is needed because os.path.join does not accept None arguments
4624     if self.op.file_storage_dir is None:
4625       string_file_storage_dir = ""
4626     else:
4627       string_file_storage_dir = self.op.file_storage_dir
4628
4629     # build the full file storage dir path
4630     file_storage_dir = os.path.normpath(os.path.join(
4631                                         self.cfg.GetFileStorageDir(),
4632                                         string_file_storage_dir, instance))
4633
4634
4635     disks = _GenerateDiskTemplate(self,
4636                                   self.op.disk_template,
4637                                   instance, pnode_name,
4638                                   self.secondaries,
4639                                   self.disks,
4640                                   file_storage_dir,
4641                                   self.op.file_driver,
4642                                   0)
4643
4644     iobj = objects.Instance(name=instance, os=self.op.os_type,
4645                             primary_node=pnode_name,
4646                             nics=self.nics, disks=disks,
4647                             disk_template=self.op.disk_template,
4648                             admin_up=False,
4649                             network_port=network_port,
4650                             beparams=self.op.beparams,
4651                             hvparams=self.op.hvparams,
4652                             hypervisor=self.op.hypervisor,
4653                             )
4654
4655     feedback_fn("* creating instance disks...")
4656     try:
4657       _CreateDisks(self, iobj)
4658     except errors.OpExecError:
4659       self.LogWarning("Device creation failed, reverting...")
4660       try:
4661         _RemoveDisks(self, iobj)
4662       finally:
4663         self.cfg.ReleaseDRBDMinors(instance)
4664         raise
4665
4666     feedback_fn("adding instance %s to cluster config" % instance)
4667
4668     self.cfg.AddInstance(iobj)
4669     # Declare that we don't want to remove the instance lock anymore, as we've
4670     # added the instance to the config
4671     del self.remove_locks[locking.LEVEL_INSTANCE]
4672     # Unlock all the nodes
4673     if self.op.mode == constants.INSTANCE_IMPORT:
4674       nodes_keep = [self.op.src_node]
4675       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4676                        if node != self.op.src_node]
4677       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4678       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4679     else:
4680       self.context.glm.release(locking.LEVEL_NODE)
4681       del self.acquired_locks[locking.LEVEL_NODE]
4682
4683     if self.op.wait_for_sync:
4684       disk_abort = not _WaitForSync(self, iobj)
4685     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4686       # make sure the disks are not degraded (still sync-ing is ok)
4687       time.sleep(15)
4688       feedback_fn("* checking mirrors status")
4689       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4690     else:
4691       disk_abort = False
4692
4693     if disk_abort:
4694       _RemoveDisks(self, iobj)
4695       self.cfg.RemoveInstance(iobj.name)
4696       # Make sure the instance lock gets removed
4697       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4698       raise errors.OpExecError("There are some degraded disks for"
4699                                " this instance")
4700
4701     feedback_fn("creating os for instance %s on node %s" %
4702                 (instance, pnode_name))
4703
4704     if iobj.disk_template != constants.DT_DISKLESS:
4705       if self.op.mode == constants.INSTANCE_CREATE:
4706         feedback_fn("* running the instance OS create scripts...")
4707         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4708         msg = result.RemoteFailMsg()
4709         if msg:
4710           raise errors.OpExecError("Could not add os for instance %s"
4711                                    " on node %s: %s" %
4712                                    (instance, pnode_name, msg))
4713
4714       elif self.op.mode == constants.INSTANCE_IMPORT:
4715         feedback_fn("* running the instance OS import scripts...")
4716         src_node = self.op.src_node
4717         src_images = self.src_images
4718         cluster_name = self.cfg.GetClusterName()
4719         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4720                                                          src_node, src_images,
4721                                                          cluster_name)
4722         import_result.Raise()
4723         for idx, result in enumerate(import_result.data):
4724           if not result:
4725             self.LogWarning("Could not import the image %s for instance"
4726                             " %s, disk %d, on node %s" %
4727                             (src_images[idx], instance, idx, pnode_name))
4728       else:
4729         # also checked in the prereq part
4730         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4731                                      % self.op.mode)
4732
4733     if self.op.start:
4734       iobj.admin_up = True
4735       self.cfg.Update(iobj)
4736       logging.info("Starting instance %s on node %s", instance, pnode_name)
4737       feedback_fn("* starting instance...")
4738       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4739       msg = result.RemoteFailMsg()
4740       if msg:
4741         raise errors.OpExecError("Could not start instance: %s" % msg)
4742
4743
4744 class LUConnectConsole(NoHooksLU):
4745   """Connect to an instance's console.
4746
4747   This is somewhat special in that it returns the command line that
4748   you need to run on the master node in order to connect to the
4749   console.
4750
4751   """
4752   _OP_REQP = ["instance_name"]
4753   REQ_BGL = False
4754
4755   def ExpandNames(self):
4756     self._ExpandAndLockInstance()
4757
4758   def CheckPrereq(self):
4759     """Check prerequisites.
4760
4761     This checks that the instance is in the cluster.
4762
4763     """
4764     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4765     assert self.instance is not None, \
4766       "Cannot retrieve locked instance %s" % self.op.instance_name
4767     _CheckNodeOnline(self, self.instance.primary_node)
4768
4769   def Exec(self, feedback_fn):
4770     """Connect to the console of an instance
4771
4772     """
4773     instance = self.instance
4774     node = instance.primary_node
4775
4776     node_insts = self.rpc.call_instance_list([node],
4777                                              [instance.hypervisor])[node]
4778     node_insts.Raise()
4779
4780     if instance.name not in node_insts.data:
4781       raise errors.OpExecError("Instance %s is not running." % instance.name)
4782
4783     logging.debug("Connecting to console of %s on %s", instance.name, node)
4784
4785     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4786     cluster = self.cfg.GetClusterInfo()
4787     # beparams and hvparams are passed separately, to avoid editing the
4788     # instance and then saving the defaults in the instance itself.
4789     hvparams = cluster.FillHV(instance)
4790     beparams = cluster.FillBE(instance)
4791     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4792
4793     # build ssh cmdline
4794     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4795
4796
4797 class LUReplaceDisks(LogicalUnit):
4798   """Replace the disks of an instance.
4799
4800   """
4801   HPATH = "mirrors-replace"
4802   HTYPE = constants.HTYPE_INSTANCE
4803   _OP_REQP = ["instance_name", "mode", "disks"]
4804   REQ_BGL = False
4805
4806   def CheckArguments(self):
4807     if not hasattr(self.op, "remote_node"):
4808       self.op.remote_node = None
4809     if not hasattr(self.op, "iallocator"):
4810       self.op.iallocator = None
4811
4812     # check for valid parameter combination
4813     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4814     if self.op.mode == constants.REPLACE_DISK_CHG:
4815       if cnt == 2:
4816         raise errors.OpPrereqError("When changing the secondary either an"
4817                                    " iallocator script must be used or the"
4818                                    " new node given")
4819       elif cnt == 0:
4820         raise errors.OpPrereqError("Give either the iallocator or the new"
4821                                    " secondary, not both")
4822     else: # not replacing the secondary
4823       if cnt != 2:
4824         raise errors.OpPrereqError("The iallocator and new node options can"
4825                                    " be used only when changing the"
4826                                    " secondary node")
4827
4828   def ExpandNames(self):
4829     self._ExpandAndLockInstance()
4830
4831     if self.op.iallocator is not None:
4832       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4833     elif self.op.remote_node is not None:
4834       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4835       if remote_node is None:
4836         raise errors.OpPrereqError("Node '%s' not known" %
4837                                    self.op.remote_node)
4838       self.op.remote_node = remote_node
4839       # Warning: do not remove the locking of the new secondary here
4840       # unless DRBD8.AddChildren is changed to work in parallel;
4841       # currently it doesn't since parallel invocations of
4842       # FindUnusedMinor will conflict
4843       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4844       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4845     else:
4846       self.needed_locks[locking.LEVEL_NODE] = []
4847       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4848
4849   def DeclareLocks(self, level):
4850     # If we're not already locking all nodes in the set we have to declare the
4851     # instance's primary/secondary nodes.
4852     if (level == locking.LEVEL_NODE and
4853         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4854       self._LockInstancesNodes()
4855
4856   def _RunAllocator(self):
4857     """Compute a new secondary node using an IAllocator.
4858
4859     """
4860     ial = IAllocator(self,
4861                      mode=constants.IALLOCATOR_MODE_RELOC,
4862                      name=self.op.instance_name,
4863                      relocate_from=[self.sec_node])
4864
4865     ial.Run(self.op.iallocator)
4866
4867     if not ial.success:
4868       raise errors.OpPrereqError("Can't compute nodes using"
4869                                  " iallocator '%s': %s" % (self.op.iallocator,
4870                                                            ial.info))
4871     if len(ial.nodes) != ial.required_nodes:
4872       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4873                                  " of nodes (%s), required %s" %
4874                                  (len(ial.nodes), ial.required_nodes))
4875     self.op.remote_node = ial.nodes[0]
4876     self.LogInfo("Selected new secondary for the instance: %s",
4877                  self.op.remote_node)
4878
4879   def BuildHooksEnv(self):
4880     """Build hooks env.
4881
4882     This runs on the master, the primary and all the secondaries.
4883
4884     """
4885     env = {
4886       "MODE": self.op.mode,
4887       "NEW_SECONDARY": self.op.remote_node,
4888       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4889       }
4890     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4891     nl = [
4892       self.cfg.GetMasterNode(),
4893       self.instance.primary_node,
4894       ]
4895     if self.op.remote_node is not None:
4896       nl.append(self.op.remote_node)
4897     return env, nl, nl
4898
4899   def CheckPrereq(self):
4900     """Check prerequisites.
4901
4902     This checks that the instance is in the cluster.
4903
4904     """
4905     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4906     assert instance is not None, \
4907       "Cannot retrieve locked instance %s" % self.op.instance_name
4908     self.instance = instance
4909
4910     if instance.disk_template != constants.DT_DRBD8:
4911       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4912                                  " instances")
4913
4914     if len(instance.secondary_nodes) != 1:
4915       raise errors.OpPrereqError("The instance has a strange layout,"
4916                                  " expected one secondary but found %d" %
4917                                  len(instance.secondary_nodes))
4918
4919     self.sec_node = instance.secondary_nodes[0]
4920
4921     if self.op.iallocator is not None:
4922       self._RunAllocator()
4923
4924     remote_node = self.op.remote_node
4925     if remote_node is not None:
4926       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4927       assert self.remote_node_info is not None, \
4928         "Cannot retrieve locked node %s" % remote_node
4929     else:
4930       self.remote_node_info = None
4931     if remote_node == instance.primary_node:
4932       raise errors.OpPrereqError("The specified node is the primary node of"
4933                                  " the instance.")
4934     elif remote_node == self.sec_node:
4935       raise errors.OpPrereqError("The specified node is already the"
4936                                  " secondary node of the instance.")
4937
4938     if self.op.mode == constants.REPLACE_DISK_PRI:
4939       n1 = self.tgt_node = instance.primary_node
4940       n2 = self.oth_node = self.sec_node
4941     elif self.op.mode == constants.REPLACE_DISK_SEC:
4942       n1 = self.tgt_node = self.sec_node
4943       n2 = self.oth_node = instance.primary_node
4944     elif self.op.mode == constants.REPLACE_DISK_CHG:
4945       n1 = self.new_node = remote_node
4946       n2 = self.oth_node = instance.primary_node
4947       self.tgt_node = self.sec_node
4948       _CheckNodeNotDrained(self, remote_node)
4949     else:
4950       raise errors.ProgrammerError("Unhandled disk replace mode")
4951
4952     _CheckNodeOnline(self, n1)
4953     _CheckNodeOnline(self, n2)
4954
4955     if not self.op.disks:
4956       self.op.disks = range(len(instance.disks))
4957
4958     for disk_idx in self.op.disks:
4959       instance.FindDisk(disk_idx)
4960
4961   def _ExecD8DiskOnly(self, feedback_fn):
4962     """Replace a disk on the primary or secondary for dbrd8.
4963
4964     The algorithm for replace is quite complicated:
4965
4966       1. for each disk to be replaced:
4967
4968         1. create new LVs on the target node with unique names
4969         1. detach old LVs from the drbd device
4970         1. rename old LVs to name_replaced.<time_t>
4971         1. rename new LVs to old LVs
4972         1. attach the new LVs (with the old names now) to the drbd device
4973
4974       1. wait for sync across all devices
4975
4976       1. for each modified disk:
4977
4978         1. remove old LVs (which have the name name_replaces.<time_t>)
4979
4980     Failures are not very well handled.
4981
4982     """
4983     steps_total = 6
4984     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4985     instance = self.instance
4986     iv_names = {}
4987     vgname = self.cfg.GetVGName()
4988     # start of work
4989     cfg = self.cfg
4990     tgt_node = self.tgt_node
4991     oth_node = self.oth_node
4992
4993     # Step: check device activation
4994     self.proc.LogStep(1, steps_total, "check device existence")
4995     info("checking volume groups")
4996     my_vg = cfg.GetVGName()
4997     results = self.rpc.call_vg_list([oth_node, tgt_node])
4998     if not results:
4999       raise errors.OpExecError("Can't list volume groups on the nodes")
5000     for node in oth_node, tgt_node:
5001       res = results[node]
5002       if res.failed or not res.data or my_vg not in res.data:
5003         raise errors.OpExecError("Volume group '%s' not found on %s" %
5004                                  (my_vg, node))
5005     for idx, dev in enumerate(instance.disks):
5006       if idx not in self.op.disks:
5007         continue
5008       for node in tgt_node, oth_node:
5009         info("checking disk/%d on %s" % (idx, node))
5010         cfg.SetDiskID(dev, node)
5011         result = self.rpc.call_blockdev_find(node, dev)
5012         msg = result.RemoteFailMsg()
5013         if not msg and not result.payload:
5014           msg = "disk not found"
5015         if msg:
5016           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5017                                    (idx, node, msg))
5018
5019     # Step: check other node consistency
5020     self.proc.LogStep(2, steps_total, "check peer consistency")
5021     for idx, dev in enumerate(instance.disks):
5022       if idx not in self.op.disks:
5023         continue
5024       info("checking disk/%d consistency on %s" % (idx, oth_node))
5025       if not _CheckDiskConsistency(self, dev, oth_node,
5026                                    oth_node==instance.primary_node):
5027         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5028                                  " to replace disks on this node (%s)" %
5029                                  (oth_node, tgt_node))
5030
5031     # Step: create new storage
5032     self.proc.LogStep(3, steps_total, "allocate new storage")
5033     for idx, dev in enumerate(instance.disks):
5034       if idx not in self.op.disks:
5035         continue
5036       size = dev.size
5037       cfg.SetDiskID(dev, tgt_node)
5038       lv_names = [".disk%d_%s" % (idx, suf)
5039                   for suf in ["data", "meta"]]
5040       names = _GenerateUniqueNames(self, lv_names)
5041       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5042                              logical_id=(vgname, names[0]))
5043       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5044                              logical_id=(vgname, names[1]))
5045       new_lvs = [lv_data, lv_meta]
5046       old_lvs = dev.children
5047       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5048       info("creating new local storage on %s for %s" %
5049            (tgt_node, dev.iv_name))
5050       # we pass force_create=True to force the LVM creation
5051       for new_lv in new_lvs:
5052         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5053                         _GetInstanceInfoText(instance), False)
5054
5055     # Step: for each lv, detach+rename*2+attach
5056     self.proc.LogStep(4, steps_total, "change drbd configuration")
5057     for dev, old_lvs, new_lvs in iv_names.itervalues():
5058       info("detaching %s drbd from local storage" % dev.iv_name)
5059       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5060       result.Raise()
5061       if not result.data:
5062         raise errors.OpExecError("Can't detach drbd from local storage on node"
5063                                  " %s for device %s" % (tgt_node, dev.iv_name))
5064       #dev.children = []
5065       #cfg.Update(instance)
5066
5067       # ok, we created the new LVs, so now we know we have the needed
5068       # storage; as such, we proceed on the target node to rename
5069       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5070       # using the assumption that logical_id == physical_id (which in
5071       # turn is the unique_id on that node)
5072
5073       # FIXME(iustin): use a better name for the replaced LVs
5074       temp_suffix = int(time.time())
5075       ren_fn = lambda d, suff: (d.physical_id[0],
5076                                 d.physical_id[1] + "_replaced-%s" % suff)
5077       # build the rename list based on what LVs exist on the node
5078       rlist = []
5079       for to_ren in old_lvs:
5080         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5081         if not result.RemoteFailMsg() and result.payload:
5082           # device exists
5083           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5084
5085       info("renaming the old LVs on the target node")
5086       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5087       result.Raise()
5088       if not result.data:
5089         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5090       # now we rename the new LVs to the old LVs
5091       info("renaming the new LVs on the target node")
5092       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5093       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5094       result.Raise()
5095       if not result.data:
5096         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5097
5098       for old, new in zip(old_lvs, new_lvs):
5099         new.logical_id = old.logical_id
5100         cfg.SetDiskID(new, tgt_node)
5101
5102       for disk in old_lvs:
5103         disk.logical_id = ren_fn(disk, temp_suffix)
5104         cfg.SetDiskID(disk, tgt_node)
5105
5106       # now that the new lvs have the old name, we can add them to the device
5107       info("adding new mirror component on %s" % tgt_node)
5108       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5109       if result.failed or not result.data:
5110         for new_lv in new_lvs:
5111           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5112           if msg:
5113             warning("Can't rollback device %s: %s", dev, msg,
5114                     hint="cleanup manually the unused logical volumes")
5115         raise errors.OpExecError("Can't add local storage to drbd")
5116
5117       dev.children = new_lvs
5118       cfg.Update(instance)
5119
5120     # Step: wait for sync
5121
5122     # this can fail as the old devices are degraded and _WaitForSync
5123     # does a combined result over all disks, so we don't check its
5124     # return value
5125     self.proc.LogStep(5, steps_total, "sync devices")
5126     _WaitForSync(self, instance, unlock=True)
5127
5128     # so check manually all the devices
5129     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5130       cfg.SetDiskID(dev, instance.primary_node)
5131       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5132       msg = result.RemoteFailMsg()
5133       if not msg and not result.payload:
5134         msg = "disk not found"
5135       if msg:
5136         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5137                                  (name, msg))
5138       if result.payload[5]:
5139         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5140
5141     # Step: remove old storage
5142     self.proc.LogStep(6, steps_total, "removing old storage")
5143     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5144       info("remove logical volumes for %s" % name)
5145       for lv in old_lvs:
5146         cfg.SetDiskID(lv, tgt_node)
5147         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5148         if msg:
5149           warning("Can't remove old LV: %s" % msg,
5150                   hint="manually remove unused LVs")
5151           continue
5152
5153   def _ExecD8Secondary(self, feedback_fn):
5154     """Replace the secondary node for drbd8.
5155
5156     The algorithm for replace is quite complicated:
5157       - for all disks of the instance:
5158         - create new LVs on the new node with same names
5159         - shutdown the drbd device on the old secondary
5160         - disconnect the drbd network on the primary
5161         - create the drbd device on the new secondary
5162         - network attach the drbd on the primary, using an artifice:
5163           the drbd code for Attach() will connect to the network if it
5164           finds a device which is connected to the good local disks but
5165           not network enabled
5166       - wait for sync across all devices
5167       - remove all disks from the old secondary
5168
5169     Failures are not very well handled.
5170
5171     """
5172     steps_total = 6
5173     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5174     instance = self.instance
5175     iv_names = {}
5176     # start of work
5177     cfg = self.cfg
5178     old_node = self.tgt_node
5179     new_node = self.new_node
5180     pri_node = instance.primary_node
5181     nodes_ip = {
5182       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5183       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5184       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5185       }
5186
5187     # Step: check device activation
5188     self.proc.LogStep(1, steps_total, "check device existence")
5189     info("checking volume groups")
5190     my_vg = cfg.GetVGName()
5191     results = self.rpc.call_vg_list([pri_node, new_node])
5192     for node in pri_node, new_node:
5193       res = results[node]
5194       if res.failed or not res.data or my_vg not in res.data:
5195         raise errors.OpExecError("Volume group '%s' not found on %s" %
5196                                  (my_vg, node))
5197     for idx, dev in enumerate(instance.disks):
5198       if idx not in self.op.disks:
5199         continue
5200       info("checking disk/%d on %s" % (idx, pri_node))
5201       cfg.SetDiskID(dev, pri_node)
5202       result = self.rpc.call_blockdev_find(pri_node, dev)
5203       msg = result.RemoteFailMsg()
5204       if not msg and not result.payload:
5205         msg = "disk not found"
5206       if msg:
5207         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5208                                  (idx, pri_node, msg))
5209
5210     # Step: check other node consistency
5211     self.proc.LogStep(2, steps_total, "check peer consistency")
5212     for idx, dev in enumerate(instance.disks):
5213       if idx not in self.op.disks:
5214         continue
5215       info("checking disk/%d consistency on %s" % (idx, pri_node))
5216       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5217         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5218                                  " unsafe to replace the secondary" %
5219                                  pri_node)
5220
5221     # Step: create new storage
5222     self.proc.LogStep(3, steps_total, "allocate new storage")
5223     for idx, dev in enumerate(instance.disks):
5224       info("adding new local storage on %s for disk/%d" %
5225            (new_node, idx))
5226       # we pass force_create=True to force LVM creation
5227       for new_lv in dev.children:
5228         _CreateBlockDev(self, new_node, instance, new_lv, True,
5229                         _GetInstanceInfoText(instance), False)
5230
5231     # Step 4: dbrd minors and drbd setups changes
5232     # after this, we must manually remove the drbd minors on both the
5233     # error and the success paths
5234     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5235                                    instance.name)
5236     logging.debug("Allocated minors %s" % (minors,))
5237     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5238     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5239       size = dev.size
5240       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5241       # create new devices on new_node; note that we create two IDs:
5242       # one without port, so the drbd will be activated without
5243       # networking information on the new node at this stage, and one
5244       # with network, for the latter activation in step 4
5245       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5246       if pri_node == o_node1:
5247         p_minor = o_minor1
5248       else:
5249         p_minor = o_minor2
5250
5251       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5252       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5253
5254       iv_names[idx] = (dev, dev.children, new_net_id)
5255       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5256                     new_net_id)
5257       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5258                               logical_id=new_alone_id,
5259                               children=dev.children)
5260       try:
5261         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5262                               _GetInstanceInfoText(instance), False)
5263       except errors.BlockDeviceError:
5264         self.cfg.ReleaseDRBDMinors(instance.name)
5265         raise
5266
5267     for idx, dev in enumerate(instance.disks):
5268       # we have new devices, shutdown the drbd on the old secondary
5269       info("shutting down drbd for disk/%d on old node" % idx)
5270       cfg.SetDiskID(dev, old_node)
5271       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5272       if msg:
5273         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5274                 (idx, msg),
5275                 hint="Please cleanup this device manually as soon as possible")
5276
5277     info("detaching primary drbds from the network (=> standalone)")
5278     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5279                                                instance.disks)[pri_node]
5280
5281     msg = result.RemoteFailMsg()
5282     if msg:
5283       # detaches didn't succeed (unlikely)
5284       self.cfg.ReleaseDRBDMinors(instance.name)
5285       raise errors.OpExecError("Can't detach the disks from the network on"
5286                                " old node: %s" % (msg,))
5287
5288     # if we managed to detach at least one, we update all the disks of
5289     # the instance to point to the new secondary
5290     info("updating instance configuration")
5291     for dev, _, new_logical_id in iv_names.itervalues():
5292       dev.logical_id = new_logical_id
5293       cfg.SetDiskID(dev, pri_node)
5294     cfg.Update(instance)
5295
5296     # and now perform the drbd attach
5297     info("attaching primary drbds to new secondary (standalone => connected)")
5298     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5299                                            instance.disks, instance.name,
5300                                            False)
5301     for to_node, to_result in result.items():
5302       msg = to_result.RemoteFailMsg()
5303       if msg:
5304         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5305                 hint="please do a gnt-instance info to see the"
5306                 " status of disks")
5307
5308     # this can fail as the old devices are degraded and _WaitForSync
5309     # does a combined result over all disks, so we don't check its
5310     # return value
5311     self.proc.LogStep(5, steps_total, "sync devices")
5312     _WaitForSync(self, instance, unlock=True)
5313
5314     # so check manually all the devices
5315     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5316       cfg.SetDiskID(dev, pri_node)
5317       result = self.rpc.call_blockdev_find(pri_node, dev)
5318       msg = result.RemoteFailMsg()
5319       if not msg and not result.payload:
5320         msg = "disk not found"
5321       if msg:
5322         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5323                                  (idx, msg))
5324       if result.payload[5]:
5325         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5326
5327     self.proc.LogStep(6, steps_total, "removing old storage")
5328     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5329       info("remove logical volumes for disk/%d" % idx)
5330       for lv in old_lvs:
5331         cfg.SetDiskID(lv, old_node)
5332         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5333         if msg:
5334           warning("Can't remove LV on old secondary: %s", msg,
5335                   hint="Cleanup stale volumes by hand")
5336
5337   def Exec(self, feedback_fn):
5338     """Execute disk replacement.
5339
5340     This dispatches the disk replacement to the appropriate handler.
5341
5342     """
5343     instance = self.instance
5344
5345     # Activate the instance disks if we're replacing them on a down instance
5346     if not instance.admin_up:
5347       _StartInstanceDisks(self, instance, True)
5348
5349     if self.op.mode == constants.REPLACE_DISK_CHG:
5350       fn = self._ExecD8Secondary
5351     else:
5352       fn = self._ExecD8DiskOnly
5353
5354     ret = fn(feedback_fn)
5355
5356     # Deactivate the instance disks if we're replacing them on a down instance
5357     if not instance.admin_up:
5358       _SafeShutdownInstanceDisks(self, instance)
5359
5360     return ret
5361
5362
5363 class LUGrowDisk(LogicalUnit):
5364   """Grow a disk of an instance.
5365
5366   """
5367   HPATH = "disk-grow"
5368   HTYPE = constants.HTYPE_INSTANCE
5369   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5370   REQ_BGL = False
5371
5372   def ExpandNames(self):
5373     self._ExpandAndLockInstance()
5374     self.needed_locks[locking.LEVEL_NODE] = []
5375     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5376
5377   def DeclareLocks(self, level):
5378     if level == locking.LEVEL_NODE:
5379       self._LockInstancesNodes()
5380
5381   def BuildHooksEnv(self):
5382     """Build hooks env.
5383
5384     This runs on the master, the primary and all the secondaries.
5385
5386     """
5387     env = {
5388       "DISK": self.op.disk,
5389       "AMOUNT": self.op.amount,
5390       }
5391     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5392     nl = [
5393       self.cfg.GetMasterNode(),
5394       self.instance.primary_node,
5395       ]
5396     return env, nl, nl
5397
5398   def CheckPrereq(self):
5399     """Check prerequisites.
5400
5401     This checks that the instance is in the cluster.
5402
5403     """
5404     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5405     assert instance is not None, \
5406       "Cannot retrieve locked instance %s" % self.op.instance_name
5407     nodenames = list(instance.all_nodes)
5408     for node in nodenames:
5409       _CheckNodeOnline(self, node)
5410
5411
5412     self.instance = instance
5413
5414     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5415       raise errors.OpPrereqError("Instance's disk layout does not support"
5416                                  " growing.")
5417
5418     self.disk = instance.FindDisk(self.op.disk)
5419
5420     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5421                                        instance.hypervisor)
5422     for node in nodenames:
5423       info = nodeinfo[node]
5424       if info.failed or not info.data:
5425         raise errors.OpPrereqError("Cannot get current information"
5426                                    " from node '%s'" % node)
5427       vg_free = info.data.get('vg_free', None)
5428       if not isinstance(vg_free, int):
5429         raise errors.OpPrereqError("Can't compute free disk space on"
5430                                    " node %s" % node)
5431       if self.op.amount > vg_free:
5432         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5433                                    " %d MiB available, %d MiB required" %
5434                                    (node, vg_free, self.op.amount))
5435
5436   def Exec(self, feedback_fn):
5437     """Execute disk grow.
5438
5439     """
5440     instance = self.instance
5441     disk = self.disk
5442     for node in instance.all_nodes:
5443       self.cfg.SetDiskID(disk, node)
5444       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5445       msg = result.RemoteFailMsg()
5446       if msg:
5447         raise errors.OpExecError("Grow request failed to node %s: %s" %
5448                                  (node, msg))
5449     disk.RecordGrow(self.op.amount)
5450     self.cfg.Update(instance)
5451     if self.op.wait_for_sync:
5452       disk_abort = not _WaitForSync(self, instance)
5453       if disk_abort:
5454         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5455                              " status.\nPlease check the instance.")
5456
5457
5458 class LUQueryInstanceData(NoHooksLU):
5459   """Query runtime instance data.
5460
5461   """
5462   _OP_REQP = ["instances", "static"]
5463   REQ_BGL = False
5464
5465   def ExpandNames(self):
5466     self.needed_locks = {}
5467     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5468
5469     if not isinstance(self.op.instances, list):
5470       raise errors.OpPrereqError("Invalid argument type 'instances'")
5471
5472     if self.op.instances:
5473       self.wanted_names = []
5474       for name in self.op.instances:
5475         full_name = self.cfg.ExpandInstanceName(name)
5476         if full_name is None:
5477           raise errors.OpPrereqError("Instance '%s' not known" % name)
5478         self.wanted_names.append(full_name)
5479       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5480     else:
5481       self.wanted_names = None
5482       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5483
5484     self.needed_locks[locking.LEVEL_NODE] = []
5485     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5486
5487   def DeclareLocks(self, level):
5488     if level == locking.LEVEL_NODE:
5489       self._LockInstancesNodes()
5490
5491   def CheckPrereq(self):
5492     """Check prerequisites.
5493
5494     This only checks the optional instance list against the existing names.
5495
5496     """
5497     if self.wanted_names is None:
5498       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5499
5500     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5501                              in self.wanted_names]
5502     return
5503
5504   def _ComputeDiskStatus(self, instance, snode, dev):
5505     """Compute block device status.
5506
5507     """
5508     static = self.op.static
5509     if not static:
5510       self.cfg.SetDiskID(dev, instance.primary_node)
5511       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5512       msg = dev_pstatus.RemoteFailMsg()
5513       if msg:
5514         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5515                                  (instance.name, msg))
5516       dev_pstatus = dev_pstatus.payload
5517     else:
5518       dev_pstatus = None
5519
5520     if dev.dev_type in constants.LDS_DRBD:
5521       # we change the snode then (otherwise we use the one passed in)
5522       if dev.logical_id[0] == instance.primary_node:
5523         snode = dev.logical_id[1]
5524       else:
5525         snode = dev.logical_id[0]
5526
5527     if snode and not static:
5528       self.cfg.SetDiskID(dev, snode)
5529       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5530       msg = dev_sstatus.RemoteFailMsg()
5531       if msg:
5532         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5533                                  (instance.name, msg))
5534       dev_sstatus = dev_sstatus.payload
5535     else:
5536       dev_sstatus = None
5537
5538     if dev.children:
5539       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5540                       for child in dev.children]
5541     else:
5542       dev_children = []
5543
5544     data = {
5545       "iv_name": dev.iv_name,
5546       "dev_type": dev.dev_type,
5547       "logical_id": dev.logical_id,
5548       "physical_id": dev.physical_id,
5549       "pstatus": dev_pstatus,
5550       "sstatus": dev_sstatus,
5551       "children": dev_children,
5552       "mode": dev.mode,
5553       }
5554
5555     return data
5556
5557   def Exec(self, feedback_fn):
5558     """Gather and return data"""
5559     result = {}
5560
5561     cluster = self.cfg.GetClusterInfo()
5562
5563     for instance in self.wanted_instances:
5564       if not self.op.static:
5565         remote_info = self.rpc.call_instance_info(instance.primary_node,
5566                                                   instance.name,
5567                                                   instance.hypervisor)
5568         remote_info.Raise()
5569         remote_info = remote_info.data
5570         if remote_info and "state" in remote_info:
5571           remote_state = "up"
5572         else:
5573           remote_state = "down"
5574       else:
5575         remote_state = None
5576       if instance.admin_up:
5577         config_state = "up"
5578       else:
5579         config_state = "down"
5580
5581       disks = [self._ComputeDiskStatus(instance, None, device)
5582                for device in instance.disks]
5583
5584       idict = {
5585         "name": instance.name,
5586         "config_state": config_state,
5587         "run_state": remote_state,
5588         "pnode": instance.primary_node,
5589         "snodes": instance.secondary_nodes,
5590         "os": instance.os,
5591         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5592         "disks": disks,
5593         "hypervisor": instance.hypervisor,
5594         "network_port": instance.network_port,
5595         "hv_instance": instance.hvparams,
5596         "hv_actual": cluster.FillHV(instance),
5597         "be_instance": instance.beparams,
5598         "be_actual": cluster.FillBE(instance),
5599         }
5600
5601       result[instance.name] = idict
5602
5603     return result
5604
5605
5606 class LUSetInstanceParams(LogicalUnit):
5607   """Modifies an instances's parameters.
5608
5609   """
5610   HPATH = "instance-modify"
5611   HTYPE = constants.HTYPE_INSTANCE
5612   _OP_REQP = ["instance_name"]
5613   REQ_BGL = False
5614
5615   def CheckArguments(self):
5616     if not hasattr(self.op, 'nics'):
5617       self.op.nics = []
5618     if not hasattr(self.op, 'disks'):
5619       self.op.disks = []
5620     if not hasattr(self.op, 'beparams'):
5621       self.op.beparams = {}
5622     if not hasattr(self.op, 'hvparams'):
5623       self.op.hvparams = {}
5624     self.op.force = getattr(self.op, "force", False)
5625     if not (self.op.nics or self.op.disks or
5626             self.op.hvparams or self.op.beparams):
5627       raise errors.OpPrereqError("No changes submitted")
5628
5629     # Disk validation
5630     disk_addremove = 0
5631     for disk_op, disk_dict in self.op.disks:
5632       if disk_op == constants.DDM_REMOVE:
5633         disk_addremove += 1
5634         continue
5635       elif disk_op == constants.DDM_ADD:
5636         disk_addremove += 1
5637       else:
5638         if not isinstance(disk_op, int):
5639           raise errors.OpPrereqError("Invalid disk index")
5640       if disk_op == constants.DDM_ADD:
5641         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5642         if mode not in constants.DISK_ACCESS_SET:
5643           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5644         size = disk_dict.get('size', None)
5645         if size is None:
5646           raise errors.OpPrereqError("Required disk parameter size missing")
5647         try:
5648           size = int(size)
5649         except ValueError, err:
5650           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5651                                      str(err))
5652         disk_dict['size'] = size
5653       else:
5654         # modification of disk
5655         if 'size' in disk_dict:
5656           raise errors.OpPrereqError("Disk size change not possible, use"
5657                                      " grow-disk")
5658
5659     if disk_addremove > 1:
5660       raise errors.OpPrereqError("Only one disk add or remove operation"
5661                                  " supported at a time")
5662
5663     # NIC validation
5664     nic_addremove = 0
5665     for nic_op, nic_dict in self.op.nics:
5666       if nic_op == constants.DDM_REMOVE:
5667         nic_addremove += 1
5668         continue
5669       elif nic_op == constants.DDM_ADD:
5670         nic_addremove += 1
5671       else:
5672         if not isinstance(nic_op, int):
5673           raise errors.OpPrereqError("Invalid nic index")
5674
5675       # nic_dict should be a dict
5676       nic_ip = nic_dict.get('ip', None)
5677       if nic_ip is not None:
5678         if nic_ip.lower() == constants.VALUE_NONE:
5679           nic_dict['ip'] = None
5680         else:
5681           if not utils.IsValidIP(nic_ip):
5682             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5683
5684       if nic_op == constants.DDM_ADD:
5685         nic_bridge = nic_dict.get('bridge', None)
5686         if nic_bridge is None:
5687           nic_dict['bridge'] = self.cfg.GetDefBridge()
5688         nic_mac = nic_dict.get('mac', None)
5689         if nic_mac is None:
5690           nic_dict['mac'] = constants.VALUE_AUTO
5691
5692       if 'mac' in nic_dict:
5693         nic_mac = nic_dict['mac']
5694         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5695           if not utils.IsValidMac(nic_mac):
5696             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5697         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5698           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5699                                      " modifying an existing nic")
5700
5701     if nic_addremove > 1:
5702       raise errors.OpPrereqError("Only one NIC add or remove operation"
5703                                  " supported at a time")
5704
5705   def ExpandNames(self):
5706     self._ExpandAndLockInstance()
5707     self.needed_locks[locking.LEVEL_NODE] = []
5708     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5709
5710   def DeclareLocks(self, level):
5711     if level == locking.LEVEL_NODE:
5712       self._LockInstancesNodes()
5713
5714   def BuildHooksEnv(self):
5715     """Build hooks env.
5716
5717     This runs on the master, primary and secondaries.
5718
5719     """
5720     args = dict()
5721     if constants.BE_MEMORY in self.be_new:
5722       args['memory'] = self.be_new[constants.BE_MEMORY]
5723     if constants.BE_VCPUS in self.be_new:
5724       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5725     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5726     # information at all.
5727     if self.op.nics:
5728       args['nics'] = []
5729       nic_override = dict(self.op.nics)
5730       for idx, nic in enumerate(self.instance.nics):
5731         if idx in nic_override:
5732           this_nic_override = nic_override[idx]
5733         else:
5734           this_nic_override = {}
5735         if 'ip' in this_nic_override:
5736           ip = this_nic_override['ip']
5737         else:
5738           ip = nic.ip
5739         if 'bridge' in this_nic_override:
5740           bridge = this_nic_override['bridge']
5741         else:
5742           bridge = nic.bridge
5743         if 'mac' in this_nic_override:
5744           mac = this_nic_override['mac']
5745         else:
5746           mac = nic.mac
5747         args['nics'].append((ip, bridge, mac))
5748       if constants.DDM_ADD in nic_override:
5749         ip = nic_override[constants.DDM_ADD].get('ip', None)
5750         bridge = nic_override[constants.DDM_ADD]['bridge']
5751         mac = nic_override[constants.DDM_ADD]['mac']
5752         args['nics'].append((ip, bridge, mac))
5753       elif constants.DDM_REMOVE in nic_override:
5754         del args['nics'][-1]
5755
5756     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5757     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5758     return env, nl, nl
5759
5760   def CheckPrereq(self):
5761     """Check prerequisites.
5762
5763     This only checks the instance list against the existing names.
5764
5765     """
5766     force = self.force = self.op.force
5767
5768     # checking the new params on the primary/secondary nodes
5769
5770     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5771     assert self.instance is not None, \
5772       "Cannot retrieve locked instance %s" % self.op.instance_name
5773     pnode = instance.primary_node
5774     nodelist = list(instance.all_nodes)
5775
5776     # hvparams processing
5777     if self.op.hvparams:
5778       i_hvdict = copy.deepcopy(instance.hvparams)
5779       for key, val in self.op.hvparams.iteritems():
5780         if val == constants.VALUE_DEFAULT:
5781           try:
5782             del i_hvdict[key]
5783           except KeyError:
5784             pass
5785         else:
5786           i_hvdict[key] = val
5787       cluster = self.cfg.GetClusterInfo()
5788       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5789       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5790                                 i_hvdict)
5791       # local check
5792       hypervisor.GetHypervisor(
5793         instance.hypervisor).CheckParameterSyntax(hv_new)
5794       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5795       self.hv_new = hv_new # the new actual values
5796       self.hv_inst = i_hvdict # the new dict (without defaults)
5797     else:
5798       self.hv_new = self.hv_inst = {}
5799
5800     # beparams processing
5801     if self.op.beparams:
5802       i_bedict = copy.deepcopy(instance.beparams)
5803       for key, val in self.op.beparams.iteritems():
5804         if val == constants.VALUE_DEFAULT:
5805           try:
5806             del i_bedict[key]
5807           except KeyError:
5808             pass
5809         else:
5810           i_bedict[key] = val
5811       cluster = self.cfg.GetClusterInfo()
5812       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5813       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5814                                 i_bedict)
5815       self.be_new = be_new # the new actual values
5816       self.be_inst = i_bedict # the new dict (without defaults)
5817     else:
5818       self.be_new = self.be_inst = {}
5819
5820     self.warn = []
5821
5822     if constants.BE_MEMORY in self.op.beparams and not self.force:
5823       mem_check_list = [pnode]
5824       if be_new[constants.BE_AUTO_BALANCE]:
5825         # either we changed auto_balance to yes or it was from before
5826         mem_check_list.extend(instance.secondary_nodes)
5827       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5828                                                   instance.hypervisor)
5829       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5830                                          instance.hypervisor)
5831       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5832         # Assume the primary node is unreachable and go ahead
5833         self.warn.append("Can't get info from primary node %s" % pnode)
5834       else:
5835         if not instance_info.failed and instance_info.data:
5836           current_mem = instance_info.data['memory']
5837         else:
5838           # Assume instance not running
5839           # (there is a slight race condition here, but it's not very probable,
5840           # and we have no other way to check)
5841           current_mem = 0
5842         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5843                     nodeinfo[pnode].data['memory_free'])
5844         if miss_mem > 0:
5845           raise errors.OpPrereqError("This change will prevent the instance"
5846                                      " from starting, due to %d MB of memory"
5847                                      " missing on its primary node" % miss_mem)
5848
5849       if be_new[constants.BE_AUTO_BALANCE]:
5850         for node, nres in nodeinfo.iteritems():
5851           if node not in instance.secondary_nodes:
5852             continue
5853           if nres.failed or not isinstance(nres.data, dict):
5854             self.warn.append("Can't get info from secondary node %s" % node)
5855           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5856             self.warn.append("Not enough memory to failover instance to"
5857                              " secondary node %s" % node)
5858
5859     # NIC processing
5860     for nic_op, nic_dict in self.op.nics:
5861       if nic_op == constants.DDM_REMOVE:
5862         if not instance.nics:
5863           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5864         continue
5865       if nic_op != constants.DDM_ADD:
5866         # an existing nic
5867         if nic_op < 0 or nic_op >= len(instance.nics):
5868           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5869                                      " are 0 to %d" %
5870                                      (nic_op, len(instance.nics)))
5871       if 'bridge' in nic_dict:
5872         nic_bridge = nic_dict['bridge']
5873         if nic_bridge is None:
5874           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5875         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5876           msg = ("Bridge '%s' doesn't exist on one of"
5877                  " the instance nodes" % nic_bridge)
5878           if self.force:
5879             self.warn.append(msg)
5880           else:
5881             raise errors.OpPrereqError(msg)
5882       if 'mac' in nic_dict:
5883         nic_mac = nic_dict['mac']
5884         if nic_mac is None:
5885           raise errors.OpPrereqError('Cannot set the nic mac to None')
5886         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5887           # otherwise generate the mac
5888           nic_dict['mac'] = self.cfg.GenerateMAC()
5889         else:
5890           # or validate/reserve the current one
5891           if self.cfg.IsMacInUse(nic_mac):
5892             raise errors.OpPrereqError("MAC address %s already in use"
5893                                        " in cluster" % nic_mac)
5894
5895     # DISK processing
5896     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5897       raise errors.OpPrereqError("Disk operations not supported for"
5898                                  " diskless instances")
5899     for disk_op, disk_dict in self.op.disks:
5900       if disk_op == constants.DDM_REMOVE:
5901         if len(instance.disks) == 1:
5902           raise errors.OpPrereqError("Cannot remove the last disk of"
5903                                      " an instance")
5904         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5905         ins_l = ins_l[pnode]
5906         if ins_l.failed or not isinstance(ins_l.data, list):
5907           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5908         if instance.name in ins_l.data:
5909           raise errors.OpPrereqError("Instance is running, can't remove"
5910                                      " disks.")
5911
5912       if (disk_op == constants.DDM_ADD and
5913           len(instance.nics) >= constants.MAX_DISKS):
5914         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5915                                    " add more" % constants.MAX_DISKS)
5916       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5917         # an existing disk
5918         if disk_op < 0 or disk_op >= len(instance.disks):
5919           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5920                                      " are 0 to %d" %
5921                                      (disk_op, len(instance.disks)))
5922
5923     return
5924
5925   def Exec(self, feedback_fn):
5926     """Modifies an instance.
5927
5928     All parameters take effect only at the next restart of the instance.
5929
5930     """
5931     # Process here the warnings from CheckPrereq, as we don't have a
5932     # feedback_fn there.
5933     for warn in self.warn:
5934       feedback_fn("WARNING: %s" % warn)
5935
5936     result = []
5937     instance = self.instance
5938     # disk changes
5939     for disk_op, disk_dict in self.op.disks:
5940       if disk_op == constants.DDM_REMOVE:
5941         # remove the last disk
5942         device = instance.disks.pop()
5943         device_idx = len(instance.disks)
5944         for node, disk in device.ComputeNodeTree(instance.primary_node):
5945           self.cfg.SetDiskID(disk, node)
5946           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5947           if msg:
5948             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5949                             " continuing anyway", device_idx, node, msg)
5950         result.append(("disk/%d" % device_idx, "remove"))
5951       elif disk_op == constants.DDM_ADD:
5952         # add a new disk
5953         if instance.disk_template == constants.DT_FILE:
5954           file_driver, file_path = instance.disks[0].logical_id
5955           file_path = os.path.dirname(file_path)
5956         else:
5957           file_driver = file_path = None
5958         disk_idx_base = len(instance.disks)
5959         new_disk = _GenerateDiskTemplate(self,
5960                                          instance.disk_template,
5961                                          instance.name, instance.primary_node,
5962                                          instance.secondary_nodes,
5963                                          [disk_dict],
5964                                          file_path,
5965                                          file_driver,
5966                                          disk_idx_base)[0]
5967         instance.disks.append(new_disk)
5968         info = _GetInstanceInfoText(instance)
5969
5970         logging.info("Creating volume %s for instance %s",
5971                      new_disk.iv_name, instance.name)
5972         # Note: this needs to be kept in sync with _CreateDisks
5973         #HARDCODE
5974         for node in instance.all_nodes:
5975           f_create = node == instance.primary_node
5976           try:
5977             _CreateBlockDev(self, node, instance, new_disk,
5978                             f_create, info, f_create)
5979           except errors.OpExecError, err:
5980             self.LogWarning("Failed to create volume %s (%s) on"
5981                             " node %s: %s",
5982                             new_disk.iv_name, new_disk, node, err)
5983         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5984                        (new_disk.size, new_disk.mode)))
5985       else:
5986         # change a given disk
5987         instance.disks[disk_op].mode = disk_dict['mode']
5988         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5989     # NIC changes
5990     for nic_op, nic_dict in self.op.nics:
5991       if nic_op == constants.DDM_REMOVE:
5992         # remove the last nic
5993         del instance.nics[-1]
5994         result.append(("nic.%d" % len(instance.nics), "remove"))
5995       elif nic_op == constants.DDM_ADD:
5996         # mac and bridge should be set, by now
5997         mac = nic_dict['mac']
5998         bridge = nic_dict['bridge']
5999         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6000                               bridge=bridge)
6001         instance.nics.append(new_nic)
6002         result.append(("nic.%d" % (len(instance.nics) - 1),
6003                        "add:mac=%s,ip=%s,bridge=%s" %
6004                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6005       else:
6006         # change a given nic
6007         for key in 'mac', 'ip', 'bridge':
6008           if key in nic_dict:
6009             setattr(instance.nics[nic_op], key, nic_dict[key])
6010             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6011
6012     # hvparams changes
6013     if self.op.hvparams:
6014       instance.hvparams = self.hv_inst
6015       for key, val in self.op.hvparams.iteritems():
6016         result.append(("hv/%s" % key, val))
6017
6018     # beparams changes
6019     if self.op.beparams:
6020       instance.beparams = self.be_inst
6021       for key, val in self.op.beparams.iteritems():
6022         result.append(("be/%s" % key, val))
6023
6024     self.cfg.Update(instance)
6025
6026     return result
6027
6028
6029 class LUQueryExports(NoHooksLU):
6030   """Query the exports list
6031
6032   """
6033   _OP_REQP = ['nodes']
6034   REQ_BGL = False
6035
6036   def ExpandNames(self):
6037     self.needed_locks = {}
6038     self.share_locks[locking.LEVEL_NODE] = 1
6039     if not self.op.nodes:
6040       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6041     else:
6042       self.needed_locks[locking.LEVEL_NODE] = \
6043         _GetWantedNodes(self, self.op.nodes)
6044
6045   def CheckPrereq(self):
6046     """Check prerequisites.
6047
6048     """
6049     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6050
6051   def Exec(self, feedback_fn):
6052     """Compute the list of all the exported system images.
6053
6054     @rtype: dict
6055     @return: a dictionary with the structure node->(export-list)
6056         where export-list is a list of the instances exported on
6057         that node.
6058
6059     """
6060     rpcresult = self.rpc.call_export_list(self.nodes)
6061     result = {}
6062     for node in rpcresult:
6063       if rpcresult[node].failed:
6064         result[node] = False
6065       else:
6066         result[node] = rpcresult[node].data
6067
6068     return result
6069
6070
6071 class LUExportInstance(LogicalUnit):
6072   """Export an instance to an image in the cluster.
6073
6074   """
6075   HPATH = "instance-export"
6076   HTYPE = constants.HTYPE_INSTANCE
6077   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6078   REQ_BGL = False
6079
6080   def ExpandNames(self):
6081     self._ExpandAndLockInstance()
6082     # FIXME: lock only instance primary and destination node
6083     #
6084     # Sad but true, for now we have do lock all nodes, as we don't know where
6085     # the previous export might be, and and in this LU we search for it and
6086     # remove it from its current node. In the future we could fix this by:
6087     #  - making a tasklet to search (share-lock all), then create the new one,
6088     #    then one to remove, after
6089     #  - removing the removal operation altoghether
6090     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6091
6092   def DeclareLocks(self, level):
6093     """Last minute lock declaration."""
6094     # All nodes are locked anyway, so nothing to do here.
6095
6096   def BuildHooksEnv(self):
6097     """Build hooks env.
6098
6099     This will run on the master, primary node and target node.
6100
6101     """
6102     env = {
6103       "EXPORT_NODE": self.op.target_node,
6104       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6105       }
6106     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6107     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6108           self.op.target_node]
6109     return env, nl, nl
6110
6111   def CheckPrereq(self):
6112     """Check prerequisites.
6113
6114     This checks that the instance and node names are valid.
6115
6116     """
6117     instance_name = self.op.instance_name
6118     self.instance = self.cfg.GetInstanceInfo(instance_name)
6119     assert self.instance is not None, \
6120           "Cannot retrieve locked instance %s" % self.op.instance_name
6121     _CheckNodeOnline(self, self.instance.primary_node)
6122
6123     self.dst_node = self.cfg.GetNodeInfo(
6124       self.cfg.ExpandNodeName(self.op.target_node))
6125
6126     if self.dst_node is None:
6127       # This is wrong node name, not a non-locked node
6128       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6129     _CheckNodeOnline(self, self.dst_node.name)
6130     _CheckNodeNotDrained(self, self.dst_node.name)
6131
6132     # instance disk type verification
6133     for disk in self.instance.disks:
6134       if disk.dev_type == constants.LD_FILE:
6135         raise errors.OpPrereqError("Export not supported for instances with"
6136                                    " file-based disks")
6137
6138   def Exec(self, feedback_fn):
6139     """Export an instance to an image in the cluster.
6140
6141     """
6142     instance = self.instance
6143     dst_node = self.dst_node
6144     src_node = instance.primary_node
6145     if self.op.shutdown:
6146       # shutdown the instance, but not the disks
6147       result = self.rpc.call_instance_shutdown(src_node, instance)
6148       msg = result.RemoteFailMsg()
6149       if msg:
6150         raise errors.OpExecError("Could not shutdown instance %s on"
6151                                  " node %s: %s" %
6152                                  (instance.name, src_node, msg))
6153
6154     vgname = self.cfg.GetVGName()
6155
6156     snap_disks = []
6157
6158     # set the disks ID correctly since call_instance_start needs the
6159     # correct drbd minor to create the symlinks
6160     for disk in instance.disks:
6161       self.cfg.SetDiskID(disk, src_node)
6162
6163     try:
6164       for disk in instance.disks:
6165         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6166         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6167         if new_dev_name.failed or not new_dev_name.data:
6168           self.LogWarning("Could not snapshot block device %s on node %s",
6169                           disk.logical_id[1], src_node)
6170           snap_disks.append(False)
6171         else:
6172           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6173                                  logical_id=(vgname, new_dev_name.data),
6174                                  physical_id=(vgname, new_dev_name.data),
6175                                  iv_name=disk.iv_name)
6176           snap_disks.append(new_dev)
6177
6178     finally:
6179       if self.op.shutdown and instance.admin_up:
6180         result = self.rpc.call_instance_start(src_node, instance, None)
6181         msg = result.RemoteFailMsg()
6182         if msg:
6183           _ShutdownInstanceDisks(self, instance)
6184           raise errors.OpExecError("Could not start instance: %s" % msg)
6185
6186     # TODO: check for size
6187
6188     cluster_name = self.cfg.GetClusterName()
6189     for idx, dev in enumerate(snap_disks):
6190       if dev:
6191         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6192                                                instance, cluster_name, idx)
6193         if result.failed or not result.data:
6194           self.LogWarning("Could not export block device %s from node %s to"
6195                           " node %s", dev.logical_id[1], src_node,
6196                           dst_node.name)
6197         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6198         if msg:
6199           self.LogWarning("Could not remove snapshot block device %s from node"
6200                           " %s: %s", dev.logical_id[1], src_node, msg)
6201
6202     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6203     if result.failed or not result.data:
6204       self.LogWarning("Could not finalize export for instance %s on node %s",
6205                       instance.name, dst_node.name)
6206
6207     nodelist = self.cfg.GetNodeList()
6208     nodelist.remove(dst_node.name)
6209
6210     # on one-node clusters nodelist will be empty after the removal
6211     # if we proceed the backup would be removed because OpQueryExports
6212     # substitutes an empty list with the full cluster node list.
6213     if nodelist:
6214       exportlist = self.rpc.call_export_list(nodelist)
6215       for node in exportlist:
6216         if exportlist[node].failed:
6217           continue
6218         if instance.name in exportlist[node].data:
6219           if not self.rpc.call_export_remove(node, instance.name):
6220             self.LogWarning("Could not remove older export for instance %s"
6221                             " on node %s", instance.name, node)
6222
6223
6224 class LURemoveExport(NoHooksLU):
6225   """Remove exports related to the named instance.
6226
6227   """
6228   _OP_REQP = ["instance_name"]
6229   REQ_BGL = False
6230
6231   def ExpandNames(self):
6232     self.needed_locks = {}
6233     # We need all nodes to be locked in order for RemoveExport to work, but we
6234     # don't need to lock the instance itself, as nothing will happen to it (and
6235     # we can remove exports also for a removed instance)
6236     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6237
6238   def CheckPrereq(self):
6239     """Check prerequisites.
6240     """
6241     pass
6242
6243   def Exec(self, feedback_fn):
6244     """Remove any export.
6245
6246     """
6247     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6248     # If the instance was not found we'll try with the name that was passed in.
6249     # This will only work if it was an FQDN, though.
6250     fqdn_warn = False
6251     if not instance_name:
6252       fqdn_warn = True
6253       instance_name = self.op.instance_name
6254
6255     exportlist = self.rpc.call_export_list(self.acquired_locks[
6256       locking.LEVEL_NODE])
6257     found = False
6258     for node in exportlist:
6259       if exportlist[node].failed:
6260         self.LogWarning("Failed to query node %s, continuing" % node)
6261         continue
6262       if instance_name in exportlist[node].data:
6263         found = True
6264         result = self.rpc.call_export_remove(node, instance_name)
6265         if result.failed or not result.data:
6266           logging.error("Could not remove export for instance %s"
6267                         " on node %s", instance_name, node)
6268
6269     if fqdn_warn and not found:
6270       feedback_fn("Export not found. If trying to remove an export belonging"
6271                   " to a deleted instance please use its Fully Qualified"
6272                   " Domain Name.")
6273
6274
6275 class TagsLU(NoHooksLU):
6276   """Generic tags LU.
6277
6278   This is an abstract class which is the parent of all the other tags LUs.
6279
6280   """
6281
6282   def ExpandNames(self):
6283     self.needed_locks = {}
6284     if self.op.kind == constants.TAG_NODE:
6285       name = self.cfg.ExpandNodeName(self.op.name)
6286       if name is None:
6287         raise errors.OpPrereqError("Invalid node name (%s)" %
6288                                    (self.op.name,))
6289       self.op.name = name
6290       self.needed_locks[locking.LEVEL_NODE] = name
6291     elif self.op.kind == constants.TAG_INSTANCE:
6292       name = self.cfg.ExpandInstanceName(self.op.name)
6293       if name is None:
6294         raise errors.OpPrereqError("Invalid instance name (%s)" %
6295                                    (self.op.name,))
6296       self.op.name = name
6297       self.needed_locks[locking.LEVEL_INSTANCE] = name
6298
6299   def CheckPrereq(self):
6300     """Check prerequisites.
6301
6302     """
6303     if self.op.kind == constants.TAG_CLUSTER:
6304       self.target = self.cfg.GetClusterInfo()
6305     elif self.op.kind == constants.TAG_NODE:
6306       self.target = self.cfg.GetNodeInfo(self.op.name)
6307     elif self.op.kind == constants.TAG_INSTANCE:
6308       self.target = self.cfg.GetInstanceInfo(self.op.name)
6309     else:
6310       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6311                                  str(self.op.kind))
6312
6313
6314 class LUGetTags(TagsLU):
6315   """Returns the tags of a given object.
6316
6317   """
6318   _OP_REQP = ["kind", "name"]
6319   REQ_BGL = False
6320
6321   def Exec(self, feedback_fn):
6322     """Returns the tag list.
6323
6324     """
6325     return list(self.target.GetTags())
6326
6327
6328 class LUSearchTags(NoHooksLU):
6329   """Searches the tags for a given pattern.
6330
6331   """
6332   _OP_REQP = ["pattern"]
6333   REQ_BGL = False
6334
6335   def ExpandNames(self):
6336     self.needed_locks = {}
6337
6338   def CheckPrereq(self):
6339     """Check prerequisites.
6340
6341     This checks the pattern passed for validity by compiling it.
6342
6343     """
6344     try:
6345       self.re = re.compile(self.op.pattern)
6346     except re.error, err:
6347       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6348                                  (self.op.pattern, err))
6349
6350   def Exec(self, feedback_fn):
6351     """Returns the tag list.
6352
6353     """
6354     cfg = self.cfg
6355     tgts = [("/cluster", cfg.GetClusterInfo())]
6356     ilist = cfg.GetAllInstancesInfo().values()
6357     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6358     nlist = cfg.GetAllNodesInfo().values()
6359     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6360     results = []
6361     for path, target in tgts:
6362       for tag in target.GetTags():
6363         if self.re.search(tag):
6364           results.append((path, tag))
6365     return results
6366
6367
6368 class LUAddTags(TagsLU):
6369   """Sets a tag on a given object.
6370
6371   """
6372   _OP_REQP = ["kind", "name", "tags"]
6373   REQ_BGL = False
6374
6375   def CheckPrereq(self):
6376     """Check prerequisites.
6377
6378     This checks the type and length of the tag name and value.
6379
6380     """
6381     TagsLU.CheckPrereq(self)
6382     for tag in self.op.tags:
6383       objects.TaggableObject.ValidateTag(tag)
6384
6385   def Exec(self, feedback_fn):
6386     """Sets the tag.
6387
6388     """
6389     try:
6390       for tag in self.op.tags:
6391         self.target.AddTag(tag)
6392     except errors.TagError, err:
6393       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6394     try:
6395       self.cfg.Update(self.target)
6396     except errors.ConfigurationError:
6397       raise errors.OpRetryError("There has been a modification to the"
6398                                 " config file and the operation has been"
6399                                 " aborted. Please retry.")
6400
6401
6402 class LUDelTags(TagsLU):
6403   """Delete a list of tags from a given object.
6404
6405   """
6406   _OP_REQP = ["kind", "name", "tags"]
6407   REQ_BGL = False
6408
6409   def CheckPrereq(self):
6410     """Check prerequisites.
6411
6412     This checks that we have the given tag.
6413
6414     """
6415     TagsLU.CheckPrereq(self)
6416     for tag in self.op.tags:
6417       objects.TaggableObject.ValidateTag(tag)
6418     del_tags = frozenset(self.op.tags)
6419     cur_tags = self.target.GetTags()
6420     if not del_tags <= cur_tags:
6421       diff_tags = del_tags - cur_tags
6422       diff_names = ["'%s'" % tag for tag in diff_tags]
6423       diff_names.sort()
6424       raise errors.OpPrereqError("Tag(s) %s not found" %
6425                                  (",".join(diff_names)))
6426
6427   def Exec(self, feedback_fn):
6428     """Remove the tag from the object.
6429
6430     """
6431     for tag in self.op.tags:
6432       self.target.RemoveTag(tag)
6433     try:
6434       self.cfg.Update(self.target)
6435     except errors.ConfigurationError:
6436       raise errors.OpRetryError("There has been a modification to the"
6437                                 " config file and the operation has been"
6438                                 " aborted. Please retry.")
6439
6440
6441 class LUTestDelay(NoHooksLU):
6442   """Sleep for a specified amount of time.
6443
6444   This LU sleeps on the master and/or nodes for a specified amount of
6445   time.
6446
6447   """
6448   _OP_REQP = ["duration", "on_master", "on_nodes"]
6449   REQ_BGL = False
6450
6451   def ExpandNames(self):
6452     """Expand names and set required locks.
6453
6454     This expands the node list, if any.
6455
6456     """
6457     self.needed_locks = {}
6458     if self.op.on_nodes:
6459       # _GetWantedNodes can be used here, but is not always appropriate to use
6460       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6461       # more information.
6462       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6463       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6464
6465   def CheckPrereq(self):
6466     """Check prerequisites.
6467
6468     """
6469
6470   def Exec(self, feedback_fn):
6471     """Do the actual sleep.
6472
6473     """
6474     if self.op.on_master:
6475       if not utils.TestDelay(self.op.duration):
6476         raise errors.OpExecError("Error during master delay test")
6477     if self.op.on_nodes:
6478       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6479       if not result:
6480         raise errors.OpExecError("Complete failure from rpc call")
6481       for node, node_result in result.items():
6482         node_result.Raise()
6483         if not node_result.data:
6484           raise errors.OpExecError("Failure during rpc call to node %s,"
6485                                    " result: %s" % (node, node_result.data))
6486
6487
6488 class IAllocator(object):
6489   """IAllocator framework.
6490
6491   An IAllocator instance has three sets of attributes:
6492     - cfg that is needed to query the cluster
6493     - input data (all members of the _KEYS class attribute are required)
6494     - four buffer attributes (in|out_data|text), that represent the
6495       input (to the external script) in text and data structure format,
6496       and the output from it, again in two formats
6497     - the result variables from the script (success, info, nodes) for
6498       easy usage
6499
6500   """
6501   _ALLO_KEYS = [
6502     "mem_size", "disks", "disk_template",
6503     "os", "tags", "nics", "vcpus", "hypervisor",
6504     ]
6505   _RELO_KEYS = [
6506     "relocate_from",
6507     ]
6508
6509   def __init__(self, lu, mode, name, **kwargs):
6510     self.lu = lu
6511     # init buffer variables
6512     self.in_text = self.out_text = self.in_data = self.out_data = None
6513     # init all input fields so that pylint is happy
6514     self.mode = mode
6515     self.name = name
6516     self.mem_size = self.disks = self.disk_template = None
6517     self.os = self.tags = self.nics = self.vcpus = None
6518     self.hypervisor = None
6519     self.relocate_from = None
6520     # computed fields
6521     self.required_nodes = None
6522     # init result fields
6523     self.success = self.info = self.nodes = None
6524     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6525       keyset = self._ALLO_KEYS
6526     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6527       keyset = self._RELO_KEYS
6528     else:
6529       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6530                                    " IAllocator" % self.mode)
6531     for key in kwargs:
6532       if key not in keyset:
6533         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6534                                      " IAllocator" % key)
6535       setattr(self, key, kwargs[key])
6536     for key in keyset:
6537       if key not in kwargs:
6538         raise errors.ProgrammerError("Missing input parameter '%s' to"
6539                                      " IAllocator" % key)
6540     self._BuildInputData()
6541
6542   def _ComputeClusterData(self):
6543     """Compute the generic allocator input data.
6544
6545     This is the data that is independent of the actual operation.
6546
6547     """
6548     cfg = self.lu.cfg
6549     cluster_info = cfg.GetClusterInfo()
6550     # cluster data
6551     data = {
6552       "version": 1,
6553       "cluster_name": cfg.GetClusterName(),
6554       "cluster_tags": list(cluster_info.GetTags()),
6555       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6556       # we don't have job IDs
6557       }
6558     iinfo = cfg.GetAllInstancesInfo().values()
6559     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6560
6561     # node data
6562     node_results = {}
6563     node_list = cfg.GetNodeList()
6564
6565     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6566       hypervisor_name = self.hypervisor
6567     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6568       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6569
6570     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6571                                            hypervisor_name)
6572     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6573                        cluster_info.enabled_hypervisors)
6574     for nname, nresult in node_data.items():
6575       # first fill in static (config-based) values
6576       ninfo = cfg.GetNodeInfo(nname)
6577       pnr = {
6578         "tags": list(ninfo.GetTags()),
6579         "primary_ip": ninfo.primary_ip,
6580         "secondary_ip": ninfo.secondary_ip,
6581         "offline": ninfo.offline,
6582         "drained": ninfo.drained,
6583         "master_candidate": ninfo.master_candidate,
6584         }
6585
6586       if not ninfo.offline:
6587         nresult.Raise()
6588         if not isinstance(nresult.data, dict):
6589           raise errors.OpExecError("Can't get data for node %s" % nname)
6590         remote_info = nresult.data
6591         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6592                      'vg_size', 'vg_free', 'cpu_total']:
6593           if attr not in remote_info:
6594             raise errors.OpExecError("Node '%s' didn't return attribute"
6595                                      " '%s'" % (nname, attr))
6596           try:
6597             remote_info[attr] = int(remote_info[attr])
6598           except ValueError, err:
6599             raise errors.OpExecError("Node '%s' returned invalid value"
6600                                      " for '%s': %s" % (nname, attr, err))
6601         # compute memory used by primary instances
6602         i_p_mem = i_p_up_mem = 0
6603         for iinfo, beinfo in i_list:
6604           if iinfo.primary_node == nname:
6605             i_p_mem += beinfo[constants.BE_MEMORY]
6606             if iinfo.name not in node_iinfo[nname].data:
6607               i_used_mem = 0
6608             else:
6609               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6610             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6611             remote_info['memory_free'] -= max(0, i_mem_diff)
6612
6613             if iinfo.admin_up:
6614               i_p_up_mem += beinfo[constants.BE_MEMORY]
6615
6616         # compute memory used by instances
6617         pnr_dyn = {
6618           "total_memory": remote_info['memory_total'],
6619           "reserved_memory": remote_info['memory_dom0'],
6620           "free_memory": remote_info['memory_free'],
6621           "total_disk": remote_info['vg_size'],
6622           "free_disk": remote_info['vg_free'],
6623           "total_cpus": remote_info['cpu_total'],
6624           "i_pri_memory": i_p_mem,
6625           "i_pri_up_memory": i_p_up_mem,
6626           }
6627         pnr.update(pnr_dyn)
6628
6629       node_results[nname] = pnr
6630     data["nodes"] = node_results
6631
6632     # instance data
6633     instance_data = {}
6634     for iinfo, beinfo in i_list:
6635       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6636                   for n in iinfo.nics]
6637       pir = {
6638         "tags": list(iinfo.GetTags()),
6639         "admin_up": iinfo.admin_up,
6640         "vcpus": beinfo[constants.BE_VCPUS],
6641         "memory": beinfo[constants.BE_MEMORY],
6642         "os": iinfo.os,
6643         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6644         "nics": nic_data,
6645         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6646         "disk_template": iinfo.disk_template,
6647         "hypervisor": iinfo.hypervisor,
6648         }
6649       instance_data[iinfo.name] = pir
6650
6651     data["instances"] = instance_data
6652
6653     self.in_data = data
6654
6655   def _AddNewInstance(self):
6656     """Add new instance data to allocator structure.
6657
6658     This in combination with _AllocatorGetClusterData will create the
6659     correct structure needed as input for the allocator.
6660
6661     The checks for the completeness of the opcode must have already been
6662     done.
6663
6664     """
6665     data = self.in_data
6666
6667     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6668
6669     if self.disk_template in constants.DTS_NET_MIRROR:
6670       self.required_nodes = 2
6671     else:
6672       self.required_nodes = 1
6673     request = {
6674       "type": "allocate",
6675       "name": self.name,
6676       "disk_template": self.disk_template,
6677       "tags": self.tags,
6678       "os": self.os,
6679       "vcpus": self.vcpus,
6680       "memory": self.mem_size,
6681       "disks": self.disks,
6682       "disk_space_total": disk_space,
6683       "nics": self.nics,
6684       "required_nodes": self.required_nodes,
6685       }
6686     data["request"] = request
6687
6688   def _AddRelocateInstance(self):
6689     """Add relocate instance data to allocator structure.
6690
6691     This in combination with _IAllocatorGetClusterData will create the
6692     correct structure needed as input for the allocator.
6693
6694     The checks for the completeness of the opcode must have already been
6695     done.
6696
6697     """
6698     instance = self.lu.cfg.GetInstanceInfo(self.name)
6699     if instance is None:
6700       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6701                                    " IAllocator" % self.name)
6702
6703     if instance.disk_template not in constants.DTS_NET_MIRROR:
6704       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6705
6706     if len(instance.secondary_nodes) != 1:
6707       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6708
6709     self.required_nodes = 1
6710     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6711     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6712
6713     request = {
6714       "type": "relocate",
6715       "name": self.name,
6716       "disk_space_total": disk_space,
6717       "required_nodes": self.required_nodes,
6718       "relocate_from": self.relocate_from,
6719       }
6720     self.in_data["request"] = request
6721
6722   def _BuildInputData(self):
6723     """Build input data structures.
6724
6725     """
6726     self._ComputeClusterData()
6727
6728     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6729       self._AddNewInstance()
6730     else:
6731       self._AddRelocateInstance()
6732
6733     self.in_text = serializer.Dump(self.in_data)
6734
6735   def Run(self, name, validate=True, call_fn=None):
6736     """Run an instance allocator and return the results.
6737
6738     """
6739     if call_fn is None:
6740       call_fn = self.lu.rpc.call_iallocator_runner
6741     data = self.in_text
6742
6743     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6744     result.Raise()
6745
6746     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6747       raise errors.OpExecError("Invalid result from master iallocator runner")
6748
6749     rcode, stdout, stderr, fail = result.data
6750
6751     if rcode == constants.IARUN_NOTFOUND:
6752       raise errors.OpExecError("Can't find allocator '%s'" % name)
6753     elif rcode == constants.IARUN_FAILURE:
6754       raise errors.OpExecError("Instance allocator call failed: %s,"
6755                                " output: %s" % (fail, stdout+stderr))
6756     self.out_text = stdout
6757     if validate:
6758       self._ValidateResult()
6759
6760   def _ValidateResult(self):
6761     """Process the allocator results.
6762
6763     This will process and if successful save the result in
6764     self.out_data and the other parameters.
6765
6766     """
6767     try:
6768       rdict = serializer.Load(self.out_text)
6769     except Exception, err:
6770       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6771
6772     if not isinstance(rdict, dict):
6773       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6774
6775     for key in "success", "info", "nodes":
6776       if key not in rdict:
6777         raise errors.OpExecError("Can't parse iallocator results:"
6778                                  " missing key '%s'" % key)
6779       setattr(self, key, rdict[key])
6780
6781     if not isinstance(rdict["nodes"], list):
6782       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6783                                " is not a list")
6784     self.out_data = rdict
6785
6786
6787 class LUTestAllocator(NoHooksLU):
6788   """Run allocator tests.
6789
6790   This LU runs the allocator tests
6791
6792   """
6793   _OP_REQP = ["direction", "mode", "name"]
6794
6795   def CheckPrereq(self):
6796     """Check prerequisites.
6797
6798     This checks the opcode parameters depending on the director and mode test.
6799
6800     """
6801     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6802       for attr in ["name", "mem_size", "disks", "disk_template",
6803                    "os", "tags", "nics", "vcpus"]:
6804         if not hasattr(self.op, attr):
6805           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6806                                      attr)
6807       iname = self.cfg.ExpandInstanceName(self.op.name)
6808       if iname is not None:
6809         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6810                                    iname)
6811       if not isinstance(self.op.nics, list):
6812         raise errors.OpPrereqError("Invalid parameter 'nics'")
6813       for row in self.op.nics:
6814         if (not isinstance(row, dict) or
6815             "mac" not in row or
6816             "ip" not in row or
6817             "bridge" not in row):
6818           raise errors.OpPrereqError("Invalid contents of the"
6819                                      " 'nics' parameter")
6820       if not isinstance(self.op.disks, list):
6821         raise errors.OpPrereqError("Invalid parameter 'disks'")
6822       for row in self.op.disks:
6823         if (not isinstance(row, dict) or
6824             "size" not in row or
6825             not isinstance(row["size"], int) or
6826             "mode" not in row or
6827             row["mode"] not in ['r', 'w']):
6828           raise errors.OpPrereqError("Invalid contents of the"
6829                                      " 'disks' parameter")
6830       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6831         self.op.hypervisor = self.cfg.GetHypervisorType()
6832     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6833       if not hasattr(self.op, "name"):
6834         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6835       fname = self.cfg.ExpandInstanceName(self.op.name)
6836       if fname is None:
6837         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6838                                    self.op.name)
6839       self.op.name = fname
6840       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6841     else:
6842       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6843                                  self.op.mode)
6844
6845     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6846       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6847         raise errors.OpPrereqError("Missing allocator name")
6848     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6849       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6850                                  self.op.direction)
6851
6852   def Exec(self, feedback_fn):
6853     """Run the allocator test.
6854
6855     """
6856     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6857       ial = IAllocator(self,
6858                        mode=self.op.mode,
6859                        name=self.op.name,
6860                        mem_size=self.op.mem_size,
6861                        disks=self.op.disks,
6862                        disk_template=self.op.disk_template,
6863                        os=self.op.os,
6864                        tags=self.op.tags,
6865                        nics=self.op.nics,
6866                        vcpus=self.op.vcpus,
6867                        hypervisor=self.op.hypervisor,
6868                        )
6869     else:
6870       ial = IAllocator(self,
6871                        mode=self.op.mode,
6872                        name=self.op.name,
6873                        relocate_from=list(self.relocate_from),
6874                        )
6875
6876     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6877       result = ial.in_text
6878     else:
6879       ial.Run(self.op.allocator, validate=False)
6880       result = ial.out_text
6881     return result