13480c6af903a2619ebbb3497c28dde321364e74
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @rtype: dict
480   @return: the hook environment for this instance
481
482   """
483   if status:
484     str_status = "up"
485   else:
486     str_status = "down"
487   env = {
488     "OP_TARGET": name,
489     "INSTANCE_NAME": name,
490     "INSTANCE_PRIMARY": primary_node,
491     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492     "INSTANCE_OS_TYPE": os_type,
493     "INSTANCE_STATUS": str_status,
494     "INSTANCE_MEMORY": memory,
495     "INSTANCE_VCPUS": vcpus,
496   }
497
498   if nics:
499     nic_count = len(nics)
500     for idx, (ip, bridge, mac) in enumerate(nics):
501       if ip is None:
502         ip = ""
503       env["INSTANCE_NIC%d_IP" % idx] = ip
504       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506   else:
507     nic_count = 0
508
509   env["INSTANCE_NIC_COUNT"] = nic_count
510
511   return env
512
513
514 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515   """Builds instance related env variables for hooks from an object.
516
517   @type lu: L{LogicalUnit}
518   @param lu: the logical unit on whose behalf we execute
519   @type instance: L{objects.Instance}
520   @param instance: the instance for which we should build the
521       environment
522   @type override: dict
523   @param override: dictionary with key/values that will override
524       our values
525   @rtype: dict
526   @return: the hook environment dictionary
527
528   """
529   bep = lu.cfg.GetClusterInfo().FillBE(instance)
530   args = {
531     'name': instance.name,
532     'primary_node': instance.primary_node,
533     'secondary_nodes': instance.secondary_nodes,
534     'os_type': instance.os,
535     'status': instance.admin_up,
536     'memory': bep[constants.BE_MEMORY],
537     'vcpus': bep[constants.BE_VCPUS],
538     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539   }
540   if override:
541     args.update(override)
542   return _BuildInstanceHookEnv(**args)
543
544
545 def _AdjustCandidatePool(lu):
546   """Adjust the candidate pool after node operations.
547
548   """
549   mod_list = lu.cfg.MaintainCandidatePool()
550   if mod_list:
551     lu.LogInfo("Promoted nodes to master candidate role: %s",
552                ", ".join(node.name for node in mod_list))
553     for name in mod_list:
554       lu.context.ReaddNode(name)
555   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556   if mc_now > mc_max:
557     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558                (mc_now, mc_max))
559
560
561 def _CheckInstanceBridgesExist(lu, instance):
562   """Check that the brigdes needed by an instance exist.
563
564   """
565   # check bridges existance
566   brlist = [nic.bridge for nic in instance.nics]
567   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568   result.Raise()
569   if not result.data:
570     raise errors.OpPrereqError("One or more target bridges %s does not"
571                                " exist on destination node '%s'" %
572                                (brlist, instance.primary_node))
573
574
575 class LUDestroyCluster(NoHooksLU):
576   """Logical unit for destroying the cluster.
577
578   """
579   _OP_REQP = []
580
581   def CheckPrereq(self):
582     """Check prerequisites.
583
584     This checks whether the cluster is empty.
585
586     Any errors are signalled by raising errors.OpPrereqError.
587
588     """
589     master = self.cfg.GetMasterNode()
590
591     nodelist = self.cfg.GetNodeList()
592     if len(nodelist) != 1 or nodelist[0] != master:
593       raise errors.OpPrereqError("There are still %d node(s) in"
594                                  " this cluster." % (len(nodelist) - 1))
595     instancelist = self.cfg.GetInstanceList()
596     if instancelist:
597       raise errors.OpPrereqError("There are still %d instance(s) in"
598                                  " this cluster." % len(instancelist))
599
600   def Exec(self, feedback_fn):
601     """Destroys the cluster.
602
603     """
604     master = self.cfg.GetMasterNode()
605     result = self.rpc.call_node_stop_master(master, False)
606     result.Raise()
607     if not result.data:
608       raise errors.OpExecError("Could not disable the master role")
609     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610     utils.CreateBackup(priv_key)
611     utils.CreateBackup(pub_key)
612     return master
613
614
615 class LUVerifyCluster(LogicalUnit):
616   """Verifies the cluster status.
617
618   """
619   HPATH = "cluster-verify"
620   HTYPE = constants.HTYPE_CLUSTER
621   _OP_REQP = ["skip_checks"]
622   REQ_BGL = False
623
624   def ExpandNames(self):
625     self.needed_locks = {
626       locking.LEVEL_NODE: locking.ALL_SET,
627       locking.LEVEL_INSTANCE: locking.ALL_SET,
628     }
629     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630
631   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632                   node_result, feedback_fn, master_files,
633                   drbd_map):
634     """Run multiple tests against a node.
635
636     Test list:
637
638       - compares ganeti version
639       - checks vg existance and size > 20G
640       - checks config file checksum
641       - checks ssh to other nodes
642
643     @type nodeinfo: L{objects.Node}
644     @param nodeinfo: the node to check
645     @param file_list: required list of files
646     @param local_cksum: dictionary of local files and their checksums
647     @param node_result: the results from the node
648     @param feedback_fn: function used to accumulate results
649     @param master_files: list of files that only masters should have
650     @param drbd_map: the useddrbd minors for this node, in
651         form of minor: (instance, must_exist) which correspond to instances
652         and their running status
653
654     """
655     node = nodeinfo.name
656
657     # main result, node_result should be a non-empty dict
658     if not node_result or not isinstance(node_result, dict):
659       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660       return True
661
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     remote_version = node_result.get('version', None)
665     if not (remote_version and isinstance(remote_version, (list, tuple)) and
666             len(remote_version) == 2):
667       feedback_fn("  - ERROR: connection to %s failed" % (node))
668       return True
669
670     if local_version != remote_version[0]:
671       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672                   " node %s %s" % (local_version, node, remote_version[0]))
673       return True
674
675     # node seems compatible, we can actually try to look into its results
676
677     bad = False
678
679     # full package version
680     if constants.RELEASE_VERSION != remote_version[1]:
681       feedback_fn("  - WARNING: software version mismatch: master %s,"
682                   " node %s %s" %
683                   (constants.RELEASE_VERSION, node, remote_version[1]))
684
685     # checks vg existence and size > 20G
686
687     vglist = node_result.get(constants.NV_VGLIST, None)
688     if not vglist:
689       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690                       (node,))
691       bad = True
692     else:
693       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694                                             constants.MIN_VG_SIZE)
695       if vgstatus:
696         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697         bad = True
698
699     # checks config file checksum
700
701     remote_cksum = node_result.get(constants.NV_FILELIST, None)
702     if not isinstance(remote_cksum, dict):
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned file checksum data")
705     else:
706       for file_name in file_list:
707         node_is_mc = nodeinfo.master_candidate
708         must_have_file = file_name not in master_files
709         if file_name not in remote_cksum:
710           if node_is_mc or must_have_file:
711             bad = True
712             feedback_fn("  - ERROR: file '%s' missing" % file_name)
713         elif remote_cksum[file_name] != local_cksum[file_name]:
714           if node_is_mc or must_have_file:
715             bad = True
716             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717           else:
718             # not candidate and this is not a must-have file
719             bad = True
720             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721                         " '%s'" % file_name)
722         else:
723           # all good, except non-master/non-must have combination
724           if not node_is_mc and not must_have_file:
725             feedback_fn("  - ERROR: file '%s' should not exist on non master"
726                         " candidates" % file_name)
727
728     # checks ssh to any
729
730     if constants.NV_NODELIST not in node_result:
731       bad = True
732       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733     else:
734       if node_result[constants.NV_NODELIST]:
735         bad = True
736         for node in node_result[constants.NV_NODELIST]:
737           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738                           (node, node_result[constants.NV_NODELIST][node]))
739
740     if constants.NV_NODENETTEST not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743     else:
744       if node_result[constants.NV_NODENETTEST]:
745         bad = True
746         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747         for node in nlist:
748           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749                           (node, node_result[constants.NV_NODENETTEST][node]))
750
751     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752     if isinstance(hyp_result, dict):
753       for hv_name, hv_result in hyp_result.iteritems():
754         if hv_result is not None:
755           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756                       (hv_name, hv_result))
757
758     # check used drbd list
759     used_minors = node_result.get(constants.NV_DRBDLIST, [])
760     for minor, (iname, must_exist) in drbd_map.items():
761       if minor not in used_minors and must_exist:
762         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
763                     (minor, iname))
764         bad = True
765     for minor in used_minors:
766       if minor not in drbd_map:
767         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
768         bad = True
769
770     return bad
771
772   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
773                       node_instance, feedback_fn, n_offline):
774     """Verify an instance.
775
776     This function checks to see if the required block devices are
777     available on the instance's node.
778
779     """
780     bad = False
781
782     node_current = instanceconfig.primary_node
783
784     node_vol_should = {}
785     instanceconfig.MapLVsByNode(node_vol_should)
786
787     for node in node_vol_should:
788       if node in n_offline:
789         # ignore missing volumes on offline nodes
790         continue
791       for volume in node_vol_should[node]:
792         if node not in node_vol_is or volume not in node_vol_is[node]:
793           feedback_fn("  - ERROR: volume %s missing on node %s" %
794                           (volume, node))
795           bad = True
796
797     if instanceconfig.admin_up:
798       if ((node_current not in node_instance or
799           not instance in node_instance[node_current]) and
800           node_current not in n_offline):
801         feedback_fn("  - ERROR: instance %s not running on node %s" %
802                         (instance, node_current))
803         bad = True
804
805     for node in node_instance:
806       if (not node == node_current):
807         if instance in node_instance[node]:
808           feedback_fn("  - ERROR: instance %s should not run on node %s" %
809                           (instance, node))
810           bad = True
811
812     return bad
813
814   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
815     """Verify if there are any unknown volumes in the cluster.
816
817     The .os, .swap and backup volumes are ignored. All other volumes are
818     reported as unknown.
819
820     """
821     bad = False
822
823     for node in node_vol_is:
824       for volume in node_vol_is[node]:
825         if node not in node_vol_should or volume not in node_vol_should[node]:
826           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
827                       (volume, node))
828           bad = True
829     return bad
830
831   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
832     """Verify the list of running instances.
833
834     This checks what instances are running but unknown to the cluster.
835
836     """
837     bad = False
838     for node in node_instance:
839       for runninginstance in node_instance[node]:
840         if runninginstance not in instancelist:
841           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
842                           (runninginstance, node))
843           bad = True
844     return bad
845
846   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
847     """Verify N+1 Memory Resilience.
848
849     Check that if one single node dies we can still start all the instances it
850     was primary for.
851
852     """
853     bad = False
854
855     for node, nodeinfo in node_info.iteritems():
856       # This code checks that every node which is now listed as secondary has
857       # enough memory to host all instances it is supposed to should a single
858       # other node in the cluster fail.
859       # FIXME: not ready for failover to an arbitrary node
860       # FIXME: does not support file-backed instances
861       # WARNING: we currently take into account down instances as well as up
862       # ones, considering that even if they're down someone might want to start
863       # them even in the event of a node failure.
864       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
865         needed_mem = 0
866         for instance in instances:
867           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
868           if bep[constants.BE_AUTO_BALANCE]:
869             needed_mem += bep[constants.BE_MEMORY]
870         if nodeinfo['mfree'] < needed_mem:
871           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
872                       " failovers should node %s fail" % (node, prinode))
873           bad = True
874     return bad
875
876   def CheckPrereq(self):
877     """Check prerequisites.
878
879     Transform the list of checks we're going to skip into a set and check that
880     all its members are valid.
881
882     """
883     self.skip_set = frozenset(self.op.skip_checks)
884     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
885       raise errors.OpPrereqError("Invalid checks to be skipped specified")
886
887   def BuildHooksEnv(self):
888     """Build hooks env.
889
890     Cluster-Verify hooks just rone in the post phase and their failure makes
891     the output be logged in the verify output and the verification to fail.
892
893     """
894     all_nodes = self.cfg.GetNodeList()
895     # TODO: populate the environment with useful information for verify hooks
896     env = {}
897     return env, [], all_nodes
898
899   def Exec(self, feedback_fn):
900     """Verify integrity of cluster, performing various test on nodes.
901
902     """
903     bad = False
904     feedback_fn("* Verifying global settings")
905     for msg in self.cfg.VerifyConfig():
906       feedback_fn("  - ERROR: %s" % msg)
907
908     vg_name = self.cfg.GetVGName()
909     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
910     nodelist = utils.NiceSort(self.cfg.GetNodeList())
911     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
912     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
913     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
914                         for iname in instancelist)
915     i_non_redundant = [] # Non redundant instances
916     i_non_a_balanced = [] # Non auto-balanced instances
917     n_offline = [] # List of offline nodes
918     n_drained = [] # List of nodes being drained
919     node_volume = {}
920     node_instance = {}
921     node_info = {}
922     instance_cfg = {}
923
924     # FIXME: verify OS list
925     # do local checksums
926     master_files = [constants.CLUSTER_CONF_FILE]
927
928     file_names = ssconf.SimpleStore().GetFileList()
929     file_names.append(constants.SSL_CERT_FILE)
930     file_names.append(constants.RAPI_CERT_FILE)
931     file_names.extend(master_files)
932
933     local_checksums = utils.FingerprintFiles(file_names)
934
935     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
936     node_verify_param = {
937       constants.NV_FILELIST: file_names,
938       constants.NV_NODELIST: [node.name for node in nodeinfo
939                               if not node.offline],
940       constants.NV_HYPERVISOR: hypervisors,
941       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
942                                   node.secondary_ip) for node in nodeinfo
943                                  if not node.offline],
944       constants.NV_LVLIST: vg_name,
945       constants.NV_INSTANCELIST: hypervisors,
946       constants.NV_VGLIST: None,
947       constants.NV_VERSION: None,
948       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
949       constants.NV_DRBDLIST: None,
950       }
951     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
952                                            self.cfg.GetClusterName())
953
954     cluster = self.cfg.GetClusterInfo()
955     master_node = self.cfg.GetMasterNode()
956     all_drbd_map = self.cfg.ComputeDRBDMap()
957
958     for node_i in nodeinfo:
959       node = node_i.name
960       nresult = all_nvinfo[node].data
961
962       if node_i.offline:
963         feedback_fn("* Skipping offline node %s" % (node,))
964         n_offline.append(node)
965         continue
966
967       if node == master_node:
968         ntype = "master"
969       elif node_i.master_candidate:
970         ntype = "master candidate"
971       elif node_i.drained:
972         ntype = "drained"
973         n_drained.append(node)
974       else:
975         ntype = "regular"
976       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
977
978       if all_nvinfo[node].failed or not isinstance(nresult, dict):
979         feedback_fn("  - ERROR: connection to %s failed" % (node,))
980         bad = True
981         continue
982
983       node_drbd = {}
984       for minor, instance in all_drbd_map[node].items():
985         instance = instanceinfo[instance]
986         node_drbd[minor] = (instance.name, instance.admin_up)
987       result = self._VerifyNode(node_i, file_names, local_checksums,
988                                 nresult, feedback_fn, master_files,
989                                 node_drbd)
990       bad = bad or result
991
992       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
993       if isinstance(lvdata, basestring):
994         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
995                     (node, utils.SafeEncode(lvdata)))
996         bad = True
997         node_volume[node] = {}
998       elif not isinstance(lvdata, dict):
999         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1000         bad = True
1001         continue
1002       else:
1003         node_volume[node] = lvdata
1004
1005       # node_instance
1006       idata = nresult.get(constants.NV_INSTANCELIST, None)
1007       if not isinstance(idata, list):
1008         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1009                     (node,))
1010         bad = True
1011         continue
1012
1013       node_instance[node] = idata
1014
1015       # node_info
1016       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1017       if not isinstance(nodeinfo, dict):
1018         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1019         bad = True
1020         continue
1021
1022       try:
1023         node_info[node] = {
1024           "mfree": int(nodeinfo['memory_free']),
1025           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1026           "pinst": [],
1027           "sinst": [],
1028           # dictionary holding all instances this node is secondary for,
1029           # grouped by their primary node. Each key is a cluster node, and each
1030           # value is a list of instances which have the key as primary and the
1031           # current node as secondary.  this is handy to calculate N+1 memory
1032           # availability if you can only failover from a primary to its
1033           # secondary.
1034           "sinst-by-pnode": {},
1035         }
1036       except ValueError:
1037         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1038         bad = True
1039         continue
1040
1041     node_vol_should = {}
1042
1043     for instance in instancelist:
1044       feedback_fn("* Verifying instance %s" % instance)
1045       inst_config = instanceinfo[instance]
1046       result =  self._VerifyInstance(instance, inst_config, node_volume,
1047                                      node_instance, feedback_fn, n_offline)
1048       bad = bad or result
1049       inst_nodes_offline = []
1050
1051       inst_config.MapLVsByNode(node_vol_should)
1052
1053       instance_cfg[instance] = inst_config
1054
1055       pnode = inst_config.primary_node
1056       if pnode in node_info:
1057         node_info[pnode]['pinst'].append(instance)
1058       elif pnode not in n_offline:
1059         feedback_fn("  - ERROR: instance %s, connection to primary node"
1060                     " %s failed" % (instance, pnode))
1061         bad = True
1062
1063       if pnode in n_offline:
1064         inst_nodes_offline.append(pnode)
1065
1066       # If the instance is non-redundant we cannot survive losing its primary
1067       # node, so we are not N+1 compliant. On the other hand we have no disk
1068       # templates with more than one secondary so that situation is not well
1069       # supported either.
1070       # FIXME: does not support file-backed instances
1071       if len(inst_config.secondary_nodes) == 0:
1072         i_non_redundant.append(instance)
1073       elif len(inst_config.secondary_nodes) > 1:
1074         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1075                     % instance)
1076
1077       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1078         i_non_a_balanced.append(instance)
1079
1080       for snode in inst_config.secondary_nodes:
1081         if snode in node_info:
1082           node_info[snode]['sinst'].append(instance)
1083           if pnode not in node_info[snode]['sinst-by-pnode']:
1084             node_info[snode]['sinst-by-pnode'][pnode] = []
1085           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1086         elif snode not in n_offline:
1087           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1088                       " %s failed" % (instance, snode))
1089           bad = True
1090         if snode in n_offline:
1091           inst_nodes_offline.append(snode)
1092
1093       if inst_nodes_offline:
1094         # warn that the instance lives on offline nodes, and set bad=True
1095         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1096                     ", ".join(inst_nodes_offline))
1097         bad = True
1098
1099     feedback_fn("* Verifying orphan volumes")
1100     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1101                                        feedback_fn)
1102     bad = bad or result
1103
1104     feedback_fn("* Verifying remaining instances")
1105     result = self._VerifyOrphanInstances(instancelist, node_instance,
1106                                          feedback_fn)
1107     bad = bad or result
1108
1109     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1110       feedback_fn("* Verifying N+1 Memory redundancy")
1111       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1112       bad = bad or result
1113
1114     feedback_fn("* Other Notes")
1115     if i_non_redundant:
1116       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1117                   % len(i_non_redundant))
1118
1119     if i_non_a_balanced:
1120       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1121                   % len(i_non_a_balanced))
1122
1123     if n_offline:
1124       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1125
1126     if n_drained:
1127       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1128
1129     return not bad
1130
1131   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1132     """Analize the post-hooks' result
1133
1134     This method analyses the hook result, handles it, and sends some
1135     nicely-formatted feedback back to the user.
1136
1137     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1138         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1139     @param hooks_results: the results of the multi-node hooks rpc call
1140     @param feedback_fn: function used send feedback back to the caller
1141     @param lu_result: previous Exec result
1142     @return: the new Exec result, based on the previous result
1143         and hook results
1144
1145     """
1146     # We only really run POST phase hooks, and are only interested in
1147     # their results
1148     if phase == constants.HOOKS_PHASE_POST:
1149       # Used to change hooks' output to proper indentation
1150       indent_re = re.compile('^', re.M)
1151       feedback_fn("* Hooks Results")
1152       if not hooks_results:
1153         feedback_fn("  - ERROR: general communication failure")
1154         lu_result = 1
1155       else:
1156         for node_name in hooks_results:
1157           show_node_header = True
1158           res = hooks_results[node_name]
1159           if res.failed or res.data is False or not isinstance(res.data, list):
1160             if res.offline:
1161               # no need to warn or set fail return value
1162               continue
1163             feedback_fn("    Communication failure in hooks execution")
1164             lu_result = 1
1165             continue
1166           for script, hkr, output in res.data:
1167             if hkr == constants.HKR_FAIL:
1168               # The node header is only shown once, if there are
1169               # failing hooks on that node
1170               if show_node_header:
1171                 feedback_fn("  Node %s:" % node_name)
1172                 show_node_header = False
1173               feedback_fn("    ERROR: Script %s failed, output:" % script)
1174               output = indent_re.sub('      ', output)
1175               feedback_fn("%s" % output)
1176               lu_result = 1
1177
1178       return lu_result
1179
1180
1181 class LUVerifyDisks(NoHooksLU):
1182   """Verifies the cluster disks status.
1183
1184   """
1185   _OP_REQP = []
1186   REQ_BGL = False
1187
1188   def ExpandNames(self):
1189     self.needed_locks = {
1190       locking.LEVEL_NODE: locking.ALL_SET,
1191       locking.LEVEL_INSTANCE: locking.ALL_SET,
1192     }
1193     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This has no prerequisites.
1199
1200     """
1201     pass
1202
1203   def Exec(self, feedback_fn):
1204     """Verify integrity of cluster disks.
1205
1206     """
1207     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1208
1209     vg_name = self.cfg.GetVGName()
1210     nodes = utils.NiceSort(self.cfg.GetNodeList())
1211     instances = [self.cfg.GetInstanceInfo(name)
1212                  for name in self.cfg.GetInstanceList()]
1213
1214     nv_dict = {}
1215     for inst in instances:
1216       inst_lvs = {}
1217       if (not inst.admin_up or
1218           inst.disk_template not in constants.DTS_NET_MIRROR):
1219         continue
1220       inst.MapLVsByNode(inst_lvs)
1221       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1222       for node, vol_list in inst_lvs.iteritems():
1223         for vol in vol_list:
1224           nv_dict[(node, vol)] = inst
1225
1226     if not nv_dict:
1227       return result
1228
1229     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1230
1231     to_act = set()
1232     for node in nodes:
1233       # node_volume
1234       lvs = node_lvs[node]
1235       if lvs.failed:
1236         if not lvs.offline:
1237           self.LogWarning("Connection to node %s failed: %s" %
1238                           (node, lvs.data))
1239         continue
1240       lvs = lvs.data
1241       if isinstance(lvs, basestring):
1242         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1243         res_nlvm[node] = lvs
1244       elif not isinstance(lvs, dict):
1245         logging.warning("Connection to node %s failed or invalid data"
1246                         " returned", node)
1247         res_nodes.append(node)
1248         continue
1249
1250       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1251         inst = nv_dict.pop((node, lv_name), None)
1252         if (not lv_online and inst is not None
1253             and inst.name not in res_instances):
1254           res_instances.append(inst.name)
1255
1256     # any leftover items in nv_dict are missing LVs, let's arrange the
1257     # data better
1258     for key, inst in nv_dict.iteritems():
1259       if inst.name not in res_missing:
1260         res_missing[inst.name] = []
1261       res_missing[inst.name].append(key)
1262
1263     return result
1264
1265
1266 class LURenameCluster(LogicalUnit):
1267   """Rename the cluster.
1268
1269   """
1270   HPATH = "cluster-rename"
1271   HTYPE = constants.HTYPE_CLUSTER
1272   _OP_REQP = ["name"]
1273
1274   def BuildHooksEnv(self):
1275     """Build hooks env.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.cfg.GetClusterName(),
1280       "NEW_NAME": self.op.name,
1281       }
1282     mn = self.cfg.GetMasterNode()
1283     return env, [mn], [mn]
1284
1285   def CheckPrereq(self):
1286     """Verify that the passed name is a valid one.
1287
1288     """
1289     hostname = utils.HostInfo(self.op.name)
1290
1291     new_name = hostname.name
1292     self.ip = new_ip = hostname.ip
1293     old_name = self.cfg.GetClusterName()
1294     old_ip = self.cfg.GetMasterIP()
1295     if new_name == old_name and new_ip == old_ip:
1296       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1297                                  " cluster has changed")
1298     if new_ip != old_ip:
1299       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1300         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1301                                    " reachable on the network. Aborting." %
1302                                    new_ip)
1303
1304     self.op.name = new_name
1305
1306   def Exec(self, feedback_fn):
1307     """Rename the cluster.
1308
1309     """
1310     clustername = self.op.name
1311     ip = self.ip
1312
1313     # shutdown the master IP
1314     master = self.cfg.GetMasterNode()
1315     result = self.rpc.call_node_stop_master(master, False)
1316     if result.failed or not result.data:
1317       raise errors.OpExecError("Could not disable the master role")
1318
1319     try:
1320       cluster = self.cfg.GetClusterInfo()
1321       cluster.cluster_name = clustername
1322       cluster.master_ip = ip
1323       self.cfg.Update(cluster)
1324
1325       # update the known hosts file
1326       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1327       node_list = self.cfg.GetNodeList()
1328       try:
1329         node_list.remove(master)
1330       except ValueError:
1331         pass
1332       result = self.rpc.call_upload_file(node_list,
1333                                          constants.SSH_KNOWN_HOSTS_FILE)
1334       for to_node, to_result in result.iteritems():
1335         if to_result.failed or not to_result.data:
1336           logging.error("Copy of file %s to node %s failed",
1337                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1338
1339     finally:
1340       result = self.rpc.call_node_start_master(master, False)
1341       if result.failed or not result.data:
1342         self.LogWarning("Could not re-enable the master role on"
1343                         " the master, please restart manually.")
1344
1345
1346 def _RecursiveCheckIfLVMBased(disk):
1347   """Check if the given disk or its children are lvm-based.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to check
1351   @rtype: booleean
1352   @return: boolean indicating whether a LD_LV dev_type was found or not
1353
1354   """
1355   if disk.children:
1356     for chdisk in disk.children:
1357       if _RecursiveCheckIfLVMBased(chdisk):
1358         return True
1359   return disk.dev_type == constants.LD_LV
1360
1361
1362 class LUSetClusterParams(LogicalUnit):
1363   """Change the parameters of the cluster.
1364
1365   """
1366   HPATH = "cluster-modify"
1367   HTYPE = constants.HTYPE_CLUSTER
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def CheckParameters(self):
1372     """Check parameters
1373
1374     """
1375     if not hasattr(self.op, "candidate_pool_size"):
1376       self.op.candidate_pool_size = None
1377     if self.op.candidate_pool_size is not None:
1378       try:
1379         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1380       except ValueError, err:
1381         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1382                                    str(err))
1383       if self.op.candidate_pool_size < 1:
1384         raise errors.OpPrereqError("At least one master candidate needed")
1385
1386   def ExpandNames(self):
1387     # FIXME: in the future maybe other cluster params won't require checking on
1388     # all nodes to be modified.
1389     self.needed_locks = {
1390       locking.LEVEL_NODE: locking.ALL_SET,
1391     }
1392     self.share_locks[locking.LEVEL_NODE] = 1
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     """
1398     env = {
1399       "OP_TARGET": self.cfg.GetClusterName(),
1400       "NEW_VG_NAME": self.op.vg_name,
1401       }
1402     mn = self.cfg.GetMasterNode()
1403     return env, [mn], [mn]
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     This checks whether the given params don't conflict and
1409     if the given volume group is valid.
1410
1411     """
1412     if self.op.vg_name is not None and not self.op.vg_name:
1413       instances = self.cfg.GetAllInstancesInfo().values()
1414       for inst in instances:
1415         for disk in inst.disks:
1416           if _RecursiveCheckIfLVMBased(disk):
1417             raise errors.OpPrereqError("Cannot disable lvm storage while"
1418                                        " lvm-based instances exist")
1419
1420     node_list = self.acquired_locks[locking.LEVEL_NODE]
1421
1422     # if vg_name not None, checks given volume group on all nodes
1423     if self.op.vg_name:
1424       vglist = self.rpc.call_vg_list(node_list)
1425       for node in node_list:
1426         if vglist[node].failed:
1427           # ignoring down node
1428           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1429           continue
1430         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1431                                               self.op.vg_name,
1432                                               constants.MIN_VG_SIZE)
1433         if vgstatus:
1434           raise errors.OpPrereqError("Error on node '%s': %s" %
1435                                      (node, vgstatus))
1436
1437     self.cluster = cluster = self.cfg.GetClusterInfo()
1438     # validate beparams changes
1439     if self.op.beparams:
1440       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1441       self.new_beparams = cluster.FillDict(
1442         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1443
1444     # hypervisor list/parameters
1445     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1446     if self.op.hvparams:
1447       if not isinstance(self.op.hvparams, dict):
1448         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1449       for hv_name, hv_dict in self.op.hvparams.items():
1450         if hv_name not in self.new_hvparams:
1451           self.new_hvparams[hv_name] = hv_dict
1452         else:
1453           self.new_hvparams[hv_name].update(hv_dict)
1454
1455     if self.op.enabled_hypervisors is not None:
1456       self.hv_list = self.op.enabled_hypervisors
1457     else:
1458       self.hv_list = cluster.enabled_hypervisors
1459
1460     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1461       # either the enabled list has changed, or the parameters have, validate
1462       for hv_name, hv_params in self.new_hvparams.items():
1463         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1464             (self.op.enabled_hypervisors and
1465              hv_name in self.op.enabled_hypervisors)):
1466           # either this is a new hypervisor, or its parameters have changed
1467           hv_class = hypervisor.GetHypervisor(hv_name)
1468           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1469           hv_class.CheckParameterSyntax(hv_params)
1470           _CheckHVParams(self, node_list, hv_name, hv_params)
1471
1472   def Exec(self, feedback_fn):
1473     """Change the parameters of the cluster.
1474
1475     """
1476     if self.op.vg_name is not None:
1477       if self.op.vg_name != self.cfg.GetVGName():
1478         self.cfg.SetVGName(self.op.vg_name)
1479       else:
1480         feedback_fn("Cluster LVM configuration already in desired"
1481                     " state, not changing")
1482     if self.op.hvparams:
1483       self.cluster.hvparams = self.new_hvparams
1484     if self.op.enabled_hypervisors is not None:
1485       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1486     if self.op.beparams:
1487       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1488     if self.op.candidate_pool_size is not None:
1489       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1490
1491     self.cfg.Update(self.cluster)
1492
1493     # we want to update nodes after the cluster so that if any errors
1494     # happen, we have recorded and saved the cluster info
1495     if self.op.candidate_pool_size is not None:
1496       _AdjustCandidatePool(self)
1497
1498
1499 class LURedistributeConfig(NoHooksLU):
1500   """Force the redistribution of cluster configuration.
1501
1502   This is a very simple LU.
1503
1504   """
1505   _OP_REQP = []
1506   REQ_BGL = False
1507
1508   def ExpandNames(self):
1509     self.needed_locks = {
1510       locking.LEVEL_NODE: locking.ALL_SET,
1511     }
1512     self.share_locks[locking.LEVEL_NODE] = 1
1513
1514   def CheckPrereq(self):
1515     """Check prerequisites.
1516
1517     """
1518
1519   def Exec(self, feedback_fn):
1520     """Redistribute the configuration.
1521
1522     """
1523     self.cfg.Update(self.cfg.GetClusterInfo())
1524
1525
1526 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1527   """Sleep and poll for an instance's disk to sync.
1528
1529   """
1530   if not instance.disks:
1531     return True
1532
1533   if not oneshot:
1534     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1535
1536   node = instance.primary_node
1537
1538   for dev in instance.disks:
1539     lu.cfg.SetDiskID(dev, node)
1540
1541   retries = 0
1542   while True:
1543     max_time = 0
1544     done = True
1545     cumul_degraded = False
1546     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1547     if rstats.failed or not rstats.data:
1548       lu.LogWarning("Can't get any data from node %s", node)
1549       retries += 1
1550       if retries >= 10:
1551         raise errors.RemoteError("Can't contact node %s for mirror data,"
1552                                  " aborting." % node)
1553       time.sleep(6)
1554       continue
1555     rstats = rstats.data
1556     retries = 0
1557     for i, mstat in enumerate(rstats):
1558       if mstat is None:
1559         lu.LogWarning("Can't compute data for node %s/%s",
1560                            node, instance.disks[i].iv_name)
1561         continue
1562       # we ignore the ldisk parameter
1563       perc_done, est_time, is_degraded, _ = mstat
1564       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1565       if perc_done is not None:
1566         done = False
1567         if est_time is not None:
1568           rem_time = "%d estimated seconds remaining" % est_time
1569           max_time = est_time
1570         else:
1571           rem_time = "no time estimate"
1572         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1573                         (instance.disks[i].iv_name, perc_done, rem_time))
1574     if done or oneshot:
1575       break
1576
1577     time.sleep(min(60, max_time))
1578
1579   if done:
1580     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1581   return not cumul_degraded
1582
1583
1584 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1585   """Check that mirrors are not degraded.
1586
1587   The ldisk parameter, if True, will change the test from the
1588   is_degraded attribute (which represents overall non-ok status for
1589   the device(s)) to the ldisk (representing the local storage status).
1590
1591   """
1592   lu.cfg.SetDiskID(dev, node)
1593   if ldisk:
1594     idx = 6
1595   else:
1596     idx = 5
1597
1598   result = True
1599   if on_primary or dev.AssembleOnSecondary():
1600     rstats = lu.rpc.call_blockdev_find(node, dev)
1601     msg = rstats.RemoteFailMsg()
1602     if msg:
1603       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1604       result = False
1605     elif not rstats.payload:
1606       lu.LogWarning("Can't find disk on node %s", node)
1607       result = False
1608     else:
1609       result = result and (not rstats.payload[idx])
1610   if dev.children:
1611     for child in dev.children:
1612       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1613
1614   return result
1615
1616
1617 class LUDiagnoseOS(NoHooksLU):
1618   """Logical unit for OS diagnose/query.
1619
1620   """
1621   _OP_REQP = ["output_fields", "names"]
1622   REQ_BGL = False
1623   _FIELDS_STATIC = utils.FieldSet()
1624   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1625
1626   def ExpandNames(self):
1627     if self.op.names:
1628       raise errors.OpPrereqError("Selective OS query not supported")
1629
1630     _CheckOutputFields(static=self._FIELDS_STATIC,
1631                        dynamic=self._FIELDS_DYNAMIC,
1632                        selected=self.op.output_fields)
1633
1634     # Lock all nodes, in shared mode
1635     self.needed_locks = {}
1636     self.share_locks[locking.LEVEL_NODE] = 1
1637     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1638
1639   def CheckPrereq(self):
1640     """Check prerequisites.
1641
1642     """
1643
1644   @staticmethod
1645   def _DiagnoseByOS(node_list, rlist):
1646     """Remaps a per-node return list into an a per-os per-node dictionary
1647
1648     @param node_list: a list with the names of all nodes
1649     @param rlist: a map with node names as keys and OS objects as values
1650
1651     @rtype: dict
1652     @returns: a dictionary with osnames as keys and as value another map, with
1653         nodes as keys and list of OS objects as values, eg::
1654
1655           {"debian-etch": {"node1": [<object>,...],
1656                            "node2": [<object>,]}
1657           }
1658
1659     """
1660     all_os = {}
1661     for node_name, nr in rlist.iteritems():
1662       if nr.failed or not nr.data:
1663         continue
1664       for os_obj in nr.data:
1665         if os_obj.name not in all_os:
1666           # build a list of nodes for this os containing empty lists
1667           # for each node in node_list
1668           all_os[os_obj.name] = {}
1669           for nname in node_list:
1670             all_os[os_obj.name][nname] = []
1671         all_os[os_obj.name][node_name].append(os_obj)
1672     return all_os
1673
1674   def Exec(self, feedback_fn):
1675     """Compute the list of OSes.
1676
1677     """
1678     node_list = self.acquired_locks[locking.LEVEL_NODE]
1679     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1680                    if node in node_list]
1681     node_data = self.rpc.call_os_diagnose(valid_nodes)
1682     if node_data == False:
1683       raise errors.OpExecError("Can't gather the list of OSes")
1684     pol = self._DiagnoseByOS(valid_nodes, node_data)
1685     output = []
1686     for os_name, os_data in pol.iteritems():
1687       row = []
1688       for field in self.op.output_fields:
1689         if field == "name":
1690           val = os_name
1691         elif field == "valid":
1692           val = utils.all([osl and osl[0] for osl in os_data.values()])
1693         elif field == "node_status":
1694           val = {}
1695           for node_name, nos_list in os_data.iteritems():
1696             val[node_name] = [(v.status, v.path) for v in nos_list]
1697         else:
1698           raise errors.ParameterError(field)
1699         row.append(val)
1700       output.append(row)
1701
1702     return output
1703
1704
1705 class LURemoveNode(LogicalUnit):
1706   """Logical unit for removing a node.
1707
1708   """
1709   HPATH = "node-remove"
1710   HTYPE = constants.HTYPE_NODE
1711   _OP_REQP = ["node_name"]
1712
1713   def BuildHooksEnv(self):
1714     """Build hooks env.
1715
1716     This doesn't run on the target node in the pre phase as a failed
1717     node would then be impossible to remove.
1718
1719     """
1720     env = {
1721       "OP_TARGET": self.op.node_name,
1722       "NODE_NAME": self.op.node_name,
1723       }
1724     all_nodes = self.cfg.GetNodeList()
1725     all_nodes.remove(self.op.node_name)
1726     return env, all_nodes, all_nodes
1727
1728   def CheckPrereq(self):
1729     """Check prerequisites.
1730
1731     This checks:
1732      - the node exists in the configuration
1733      - it does not have primary or secondary instances
1734      - it's not the master
1735
1736     Any errors are signalled by raising errors.OpPrereqError.
1737
1738     """
1739     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1740     if node is None:
1741       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1742
1743     instance_list = self.cfg.GetInstanceList()
1744
1745     masternode = self.cfg.GetMasterNode()
1746     if node.name == masternode:
1747       raise errors.OpPrereqError("Node is the master node,"
1748                                  " you need to failover first.")
1749
1750     for instance_name in instance_list:
1751       instance = self.cfg.GetInstanceInfo(instance_name)
1752       if node.name in instance.all_nodes:
1753         raise errors.OpPrereqError("Instance %s is still running on the node,"
1754                                    " please remove first." % instance_name)
1755     self.op.node_name = node.name
1756     self.node = node
1757
1758   def Exec(self, feedback_fn):
1759     """Removes the node from the cluster.
1760
1761     """
1762     node = self.node
1763     logging.info("Stopping the node daemon and removing configs from node %s",
1764                  node.name)
1765
1766     self.context.RemoveNode(node.name)
1767
1768     self.rpc.call_node_leave_cluster(node.name)
1769
1770     # Promote nodes to master candidate as needed
1771     _AdjustCandidatePool(self)
1772
1773
1774 class LUQueryNodes(NoHooksLU):
1775   """Logical unit for querying nodes.
1776
1777   """
1778   _OP_REQP = ["output_fields", "names", "use_locking"]
1779   REQ_BGL = False
1780   _FIELDS_DYNAMIC = utils.FieldSet(
1781     "dtotal", "dfree",
1782     "mtotal", "mnode", "mfree",
1783     "bootid",
1784     "ctotal", "cnodes", "csockets",
1785     )
1786
1787   _FIELDS_STATIC = utils.FieldSet(
1788     "name", "pinst_cnt", "sinst_cnt",
1789     "pinst_list", "sinst_list",
1790     "pip", "sip", "tags",
1791     "serial_no",
1792     "master_candidate",
1793     "master",
1794     "offline",
1795     "drained",
1796     )
1797
1798   def ExpandNames(self):
1799     _CheckOutputFields(static=self._FIELDS_STATIC,
1800                        dynamic=self._FIELDS_DYNAMIC,
1801                        selected=self.op.output_fields)
1802
1803     self.needed_locks = {}
1804     self.share_locks[locking.LEVEL_NODE] = 1
1805
1806     if self.op.names:
1807       self.wanted = _GetWantedNodes(self, self.op.names)
1808     else:
1809       self.wanted = locking.ALL_SET
1810
1811     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1812     self.do_locking = self.do_node_query and self.op.use_locking
1813     if self.do_locking:
1814       # if we don't request only static fields, we need to lock the nodes
1815       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1816
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     """
1822     # The validation of the node list is done in the _GetWantedNodes,
1823     # if non empty, and if empty, there's no validation to do
1824     pass
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     all_info = self.cfg.GetAllNodesInfo()
1831     if self.do_locking:
1832       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1833     elif self.wanted != locking.ALL_SET:
1834       nodenames = self.wanted
1835       missing = set(nodenames).difference(all_info.keys())
1836       if missing:
1837         raise errors.OpExecError(
1838           "Some nodes were removed before retrieving their data: %s" % missing)
1839     else:
1840       nodenames = all_info.keys()
1841
1842     nodenames = utils.NiceSort(nodenames)
1843     nodelist = [all_info[name] for name in nodenames]
1844
1845     # begin data gathering
1846
1847     if self.do_node_query:
1848       live_data = {}
1849       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1850                                           self.cfg.GetHypervisorType())
1851       for name in nodenames:
1852         nodeinfo = node_data[name]
1853         if not nodeinfo.failed and nodeinfo.data:
1854           nodeinfo = nodeinfo.data
1855           fn = utils.TryConvert
1856           live_data[name] = {
1857             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1858             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1859             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1860             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1861             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1862             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1863             "bootid": nodeinfo.get('bootid', None),
1864             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1865             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1866             }
1867         else:
1868           live_data[name] = {}
1869     else:
1870       live_data = dict.fromkeys(nodenames, {})
1871
1872     node_to_primary = dict([(name, set()) for name in nodenames])
1873     node_to_secondary = dict([(name, set()) for name in nodenames])
1874
1875     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1876                              "sinst_cnt", "sinst_list"))
1877     if inst_fields & frozenset(self.op.output_fields):
1878       instancelist = self.cfg.GetInstanceList()
1879
1880       for instance_name in instancelist:
1881         inst = self.cfg.GetInstanceInfo(instance_name)
1882         if inst.primary_node in node_to_primary:
1883           node_to_primary[inst.primary_node].add(inst.name)
1884         for secnode in inst.secondary_nodes:
1885           if secnode in node_to_secondary:
1886             node_to_secondary[secnode].add(inst.name)
1887
1888     master_node = self.cfg.GetMasterNode()
1889
1890     # end data gathering
1891
1892     output = []
1893     for node in nodelist:
1894       node_output = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = node.name
1898         elif field == "pinst_list":
1899           val = list(node_to_primary[node.name])
1900         elif field == "sinst_list":
1901           val = list(node_to_secondary[node.name])
1902         elif field == "pinst_cnt":
1903           val = len(node_to_primary[node.name])
1904         elif field == "sinst_cnt":
1905           val = len(node_to_secondary[node.name])
1906         elif field == "pip":
1907           val = node.primary_ip
1908         elif field == "sip":
1909           val = node.secondary_ip
1910         elif field == "tags":
1911           val = list(node.GetTags())
1912         elif field == "serial_no":
1913           val = node.serial_no
1914         elif field == "master_candidate":
1915           val = node.master_candidate
1916         elif field == "master":
1917           val = node.name == master_node
1918         elif field == "offline":
1919           val = node.offline
1920         elif field == "drained":
1921           val = node.drained
1922         elif self._FIELDS_DYNAMIC.Matches(field):
1923           val = live_data[node.name].get(field, None)
1924         else:
1925           raise errors.ParameterError(field)
1926         node_output.append(val)
1927       output.append(node_output)
1928
1929     return output
1930
1931
1932 class LUQueryNodeVolumes(NoHooksLU):
1933   """Logical unit for getting volumes on node(s).
1934
1935   """
1936   _OP_REQP = ["nodes", "output_fields"]
1937   REQ_BGL = False
1938   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1939   _FIELDS_STATIC = utils.FieldSet("node")
1940
1941   def ExpandNames(self):
1942     _CheckOutputFields(static=self._FIELDS_STATIC,
1943                        dynamic=self._FIELDS_DYNAMIC,
1944                        selected=self.op.output_fields)
1945
1946     self.needed_locks = {}
1947     self.share_locks[locking.LEVEL_NODE] = 1
1948     if not self.op.nodes:
1949       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1950     else:
1951       self.needed_locks[locking.LEVEL_NODE] = \
1952         _GetWantedNodes(self, self.op.nodes)
1953
1954   def CheckPrereq(self):
1955     """Check prerequisites.
1956
1957     This checks that the fields required are valid output fields.
1958
1959     """
1960     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1961
1962   def Exec(self, feedback_fn):
1963     """Computes the list of nodes and their attributes.
1964
1965     """
1966     nodenames = self.nodes
1967     volumes = self.rpc.call_node_volumes(nodenames)
1968
1969     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1970              in self.cfg.GetInstanceList()]
1971
1972     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1973
1974     output = []
1975     for node in nodenames:
1976       if node not in volumes or volumes[node].failed or not volumes[node].data:
1977         continue
1978
1979       node_vols = volumes[node].data[:]
1980       node_vols.sort(key=lambda vol: vol['dev'])
1981
1982       for vol in node_vols:
1983         node_output = []
1984         for field in self.op.output_fields:
1985           if field == "node":
1986             val = node
1987           elif field == "phys":
1988             val = vol['dev']
1989           elif field == "vg":
1990             val = vol['vg']
1991           elif field == "name":
1992             val = vol['name']
1993           elif field == "size":
1994             val = int(float(vol['size']))
1995           elif field == "instance":
1996             for inst in ilist:
1997               if node not in lv_by_node[inst]:
1998                 continue
1999               if vol['name'] in lv_by_node[inst][node]:
2000                 val = inst.name
2001                 break
2002             else:
2003               val = '-'
2004           else:
2005             raise errors.ParameterError(field)
2006           node_output.append(str(val))
2007
2008         output.append(node_output)
2009
2010     return output
2011
2012
2013 class LUAddNode(LogicalUnit):
2014   """Logical unit for adding node to the cluster.
2015
2016   """
2017   HPATH = "node-add"
2018   HTYPE = constants.HTYPE_NODE
2019   _OP_REQP = ["node_name"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This will run on all nodes before, and on all nodes + the new node after.
2025
2026     """
2027     env = {
2028       "OP_TARGET": self.op.node_name,
2029       "NODE_NAME": self.op.node_name,
2030       "NODE_PIP": self.op.primary_ip,
2031       "NODE_SIP": self.op.secondary_ip,
2032       }
2033     nodes_0 = self.cfg.GetNodeList()
2034     nodes_1 = nodes_0 + [self.op.node_name, ]
2035     return env, nodes_0, nodes_1
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks:
2041      - the new node is not already in the config
2042      - it is resolvable
2043      - its parameters (single/dual homed) matches the cluster
2044
2045     Any errors are signalled by raising errors.OpPrereqError.
2046
2047     """
2048     node_name = self.op.node_name
2049     cfg = self.cfg
2050
2051     dns_data = utils.HostInfo(node_name)
2052
2053     node = dns_data.name
2054     primary_ip = self.op.primary_ip = dns_data.ip
2055     secondary_ip = getattr(self.op, "secondary_ip", None)
2056     if secondary_ip is None:
2057       secondary_ip = primary_ip
2058     if not utils.IsValidIP(secondary_ip):
2059       raise errors.OpPrereqError("Invalid secondary IP given")
2060     self.op.secondary_ip = secondary_ip
2061
2062     node_list = cfg.GetNodeList()
2063     if not self.op.readd and node in node_list:
2064       raise errors.OpPrereqError("Node %s is already in the configuration" %
2065                                  node)
2066     elif self.op.readd and node not in node_list:
2067       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2068
2069     for existing_node_name in node_list:
2070       existing_node = cfg.GetNodeInfo(existing_node_name)
2071
2072       if self.op.readd and node == existing_node_name:
2073         if (existing_node.primary_ip != primary_ip or
2074             existing_node.secondary_ip != secondary_ip):
2075           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2076                                      " address configuration as before")
2077         continue
2078
2079       if (existing_node.primary_ip == primary_ip or
2080           existing_node.secondary_ip == primary_ip or
2081           existing_node.primary_ip == secondary_ip or
2082           existing_node.secondary_ip == secondary_ip):
2083         raise errors.OpPrereqError("New node ip address(es) conflict with"
2084                                    " existing node %s" % existing_node.name)
2085
2086     # check that the type of the node (single versus dual homed) is the
2087     # same as for the master
2088     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2089     master_singlehomed = myself.secondary_ip == myself.primary_ip
2090     newbie_singlehomed = secondary_ip == primary_ip
2091     if master_singlehomed != newbie_singlehomed:
2092       if master_singlehomed:
2093         raise errors.OpPrereqError("The master has no private ip but the"
2094                                    " new node has one")
2095       else:
2096         raise errors.OpPrereqError("The master has a private ip but the"
2097                                    " new node doesn't have one")
2098
2099     # checks reachablity
2100     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2101       raise errors.OpPrereqError("Node not reachable by ping")
2102
2103     if not newbie_singlehomed:
2104       # check reachability from my secondary ip to newbie's secondary ip
2105       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2106                            source=myself.secondary_ip):
2107         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2108                                    " based ping to noded port")
2109
2110     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2111     mc_now, _ = self.cfg.GetMasterCandidateStats()
2112     master_candidate = mc_now < cp_size
2113
2114     self.new_node = objects.Node(name=node,
2115                                  primary_ip=primary_ip,
2116                                  secondary_ip=secondary_ip,
2117                                  master_candidate=master_candidate,
2118                                  offline=False, drained=False)
2119
2120   def Exec(self, feedback_fn):
2121     """Adds the new node to the cluster.
2122
2123     """
2124     new_node = self.new_node
2125     node = new_node.name
2126
2127     # check connectivity
2128     result = self.rpc.call_version([node])[node]
2129     result.Raise()
2130     if result.data:
2131       if constants.PROTOCOL_VERSION == result.data:
2132         logging.info("Communication to node %s fine, sw version %s match",
2133                      node, result.data)
2134       else:
2135         raise errors.OpExecError("Version mismatch master version %s,"
2136                                  " node version %s" %
2137                                  (constants.PROTOCOL_VERSION, result.data))
2138     else:
2139       raise errors.OpExecError("Cannot get version from the new node")
2140
2141     # setup ssh on node
2142     logging.info("Copy ssh key to node %s", node)
2143     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2144     keyarray = []
2145     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2146                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2147                 priv_key, pub_key]
2148
2149     for i in keyfiles:
2150       f = open(i, 'r')
2151       try:
2152         keyarray.append(f.read())
2153       finally:
2154         f.close()
2155
2156     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2157                                     keyarray[2],
2158                                     keyarray[3], keyarray[4], keyarray[5])
2159
2160     msg = result.RemoteFailMsg()
2161     if msg:
2162       raise errors.OpExecError("Cannot transfer ssh keys to the"
2163                                " new node: %s" % msg)
2164
2165     # Add node to our /etc/hosts, and add key to known_hosts
2166     utils.AddHostToEtcHosts(new_node.name)
2167
2168     if new_node.secondary_ip != new_node.primary_ip:
2169       result = self.rpc.call_node_has_ip_address(new_node.name,
2170                                                  new_node.secondary_ip)
2171       if result.failed or not result.data:
2172         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2173                                  " you gave (%s). Please fix and re-run this"
2174                                  " command." % new_node.secondary_ip)
2175
2176     node_verify_list = [self.cfg.GetMasterNode()]
2177     node_verify_param = {
2178       'nodelist': [node],
2179       # TODO: do a node-net-test as well?
2180     }
2181
2182     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2183                                        self.cfg.GetClusterName())
2184     for verifier in node_verify_list:
2185       if result[verifier].failed or not result[verifier].data:
2186         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2187                                  " for remote verification" % verifier)
2188       if result[verifier].data['nodelist']:
2189         for failed in result[verifier].data['nodelist']:
2190           feedback_fn("ssh/hostname verification failed %s -> %s" %
2191                       (verifier, result[verifier].data['nodelist'][failed]))
2192         raise errors.OpExecError("ssh/hostname verification failed.")
2193
2194     # Distribute updated /etc/hosts and known_hosts to all nodes,
2195     # including the node just added
2196     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2197     dist_nodes = self.cfg.GetNodeList()
2198     if not self.op.readd:
2199       dist_nodes.append(node)
2200     if myself.name in dist_nodes:
2201       dist_nodes.remove(myself.name)
2202
2203     logging.debug("Copying hosts and known_hosts to all nodes")
2204     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2205       result = self.rpc.call_upload_file(dist_nodes, fname)
2206       for to_node, to_result in result.iteritems():
2207         if to_result.failed or not to_result.data:
2208           logging.error("Copy of file %s to node %s failed", fname, to_node)
2209
2210     to_copy = []
2211     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2212     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2213       to_copy.append(constants.VNC_PASSWORD_FILE)
2214
2215     for fname in to_copy:
2216       result = self.rpc.call_upload_file([node], fname)
2217       if result[node].failed or not result[node]:
2218         logging.error("Could not copy file %s to node %s", fname, node)
2219
2220     if self.op.readd:
2221       self.context.ReaddNode(new_node)
2222     else:
2223       self.context.AddNode(new_node)
2224
2225
2226 class LUSetNodeParams(LogicalUnit):
2227   """Modifies the parameters of a node.
2228
2229   """
2230   HPATH = "node-modify"
2231   HTYPE = constants.HTYPE_NODE
2232   _OP_REQP = ["node_name"]
2233   REQ_BGL = False
2234
2235   def CheckArguments(self):
2236     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2237     if node_name is None:
2238       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2239     self.op.node_name = node_name
2240     _CheckBooleanOpField(self.op, 'master_candidate')
2241     _CheckBooleanOpField(self.op, 'offline')
2242     _CheckBooleanOpField(self.op, 'drained')
2243     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2244     if all_mods.count(None) == 3:
2245       raise errors.OpPrereqError("Please pass at least one modification")
2246     if all_mods.count(True) > 1:
2247       raise errors.OpPrereqError("Can't set the node into more than one"
2248                                  " state at the same time")
2249
2250   def ExpandNames(self):
2251     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2252
2253   def BuildHooksEnv(self):
2254     """Build hooks env.
2255
2256     This runs on the master node.
2257
2258     """
2259     env = {
2260       "OP_TARGET": self.op.node_name,
2261       "MASTER_CANDIDATE": str(self.op.master_candidate),
2262       "OFFLINE": str(self.op.offline),
2263       "DRAINED": str(self.op.drained),
2264       }
2265     nl = [self.cfg.GetMasterNode(),
2266           self.op.node_name]
2267     return env, nl, nl
2268
2269   def CheckPrereq(self):
2270     """Check prerequisites.
2271
2272     This only checks the instance list against the existing names.
2273
2274     """
2275     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2276
2277     if ((self.op.master_candidate == False or self.op.offline == True or
2278          self.op.drained == True) and node.master_candidate):
2279       # we will demote the node from master_candidate
2280       if self.op.node_name == self.cfg.GetMasterNode():
2281         raise errors.OpPrereqError("The master node has to be a"
2282                                    " master candidate, online and not drained")
2283       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2284       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2285       if num_candidates <= cp_size:
2286         msg = ("Not enough master candidates (desired"
2287                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2288         if self.op.force:
2289           self.LogWarning(msg)
2290         else:
2291           raise errors.OpPrereqError(msg)
2292
2293     if (self.op.master_candidate == True and
2294         ((node.offline and not self.op.offline == False) or
2295          (node.drained and not self.op.drained == False))):
2296       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2297                                  " to master_candidate")
2298
2299     return
2300
2301   def Exec(self, feedback_fn):
2302     """Modifies a node.
2303
2304     """
2305     node = self.node
2306
2307     result = []
2308     changed_mc = False
2309
2310     if self.op.offline is not None:
2311       node.offline = self.op.offline
2312       result.append(("offline", str(self.op.offline)))
2313       if self.op.offline == True:
2314         if node.master_candidate:
2315           node.master_candidate = False
2316           changed_mc = True
2317           result.append(("master_candidate", "auto-demotion due to offline"))
2318         if node.drained:
2319           node.drained = False
2320           result.append(("drained", "clear drained status due to offline"))
2321
2322     if self.op.master_candidate is not None:
2323       node.master_candidate = self.op.master_candidate
2324       changed_mc = True
2325       result.append(("master_candidate", str(self.op.master_candidate)))
2326       if self.op.master_candidate == False:
2327         rrc = self.rpc.call_node_demote_from_mc(node.name)
2328         msg = rrc.RemoteFailMsg()
2329         if msg:
2330           self.LogWarning("Node failed to demote itself: %s" % msg)
2331
2332     if self.op.drained is not None:
2333       node.drained = self.op.drained
2334       result.append(("drained", str(self.op.drained)))
2335       if self.op.drained == True:
2336         if node.master_candidate:
2337           node.master_candidate = False
2338           changed_mc = True
2339           result.append(("master_candidate", "auto-demotion due to drain"))
2340         if node.offline:
2341           node.offline = False
2342           result.append(("offline", "clear offline status due to drain"))
2343
2344     # this will trigger configuration file update, if needed
2345     self.cfg.Update(node)
2346     # this will trigger job queue propagation or cleanup
2347     if changed_mc:
2348       self.context.ReaddNode(node)
2349
2350     return result
2351
2352
2353 class LUQueryClusterInfo(NoHooksLU):
2354   """Query cluster configuration.
2355
2356   """
2357   _OP_REQP = []
2358   REQ_BGL = False
2359
2360   def ExpandNames(self):
2361     self.needed_locks = {}
2362
2363   def CheckPrereq(self):
2364     """No prerequsites needed for this LU.
2365
2366     """
2367     pass
2368
2369   def Exec(self, feedback_fn):
2370     """Return cluster config.
2371
2372     """
2373     cluster = self.cfg.GetClusterInfo()
2374     result = {
2375       "software_version": constants.RELEASE_VERSION,
2376       "protocol_version": constants.PROTOCOL_VERSION,
2377       "config_version": constants.CONFIG_VERSION,
2378       "os_api_version": constants.OS_API_VERSION,
2379       "export_version": constants.EXPORT_VERSION,
2380       "architecture": (platform.architecture()[0], platform.machine()),
2381       "name": cluster.cluster_name,
2382       "master": cluster.master_node,
2383       "default_hypervisor": cluster.default_hypervisor,
2384       "enabled_hypervisors": cluster.enabled_hypervisors,
2385       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2386                         for hypervisor in cluster.enabled_hypervisors]),
2387       "beparams": cluster.beparams,
2388       "candidate_pool_size": cluster.candidate_pool_size,
2389       }
2390
2391     return result
2392
2393
2394 class LUQueryConfigValues(NoHooksLU):
2395   """Return configuration values.
2396
2397   """
2398   _OP_REQP = []
2399   REQ_BGL = False
2400   _FIELDS_DYNAMIC = utils.FieldSet()
2401   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2402
2403   def ExpandNames(self):
2404     self.needed_locks = {}
2405
2406     _CheckOutputFields(static=self._FIELDS_STATIC,
2407                        dynamic=self._FIELDS_DYNAMIC,
2408                        selected=self.op.output_fields)
2409
2410   def CheckPrereq(self):
2411     """No prerequisites.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Dump a representation of the cluster config to the standard output.
2418
2419     """
2420     values = []
2421     for field in self.op.output_fields:
2422       if field == "cluster_name":
2423         entry = self.cfg.GetClusterName()
2424       elif field == "master_node":
2425         entry = self.cfg.GetMasterNode()
2426       elif field == "drain_flag":
2427         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2428       else:
2429         raise errors.ParameterError(field)
2430       values.append(entry)
2431     return values
2432
2433
2434 class LUActivateInstanceDisks(NoHooksLU):
2435   """Bring up an instance's disks.
2436
2437   """
2438   _OP_REQP = ["instance_name"]
2439   REQ_BGL = False
2440
2441   def ExpandNames(self):
2442     self._ExpandAndLockInstance()
2443     self.needed_locks[locking.LEVEL_NODE] = []
2444     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2445
2446   def DeclareLocks(self, level):
2447     if level == locking.LEVEL_NODE:
2448       self._LockInstancesNodes()
2449
2450   def CheckPrereq(self):
2451     """Check prerequisites.
2452
2453     This checks that the instance is in the cluster.
2454
2455     """
2456     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2457     assert self.instance is not None, \
2458       "Cannot retrieve locked instance %s" % self.op.instance_name
2459     _CheckNodeOnline(self, self.instance.primary_node)
2460
2461   def Exec(self, feedback_fn):
2462     """Activate the disks.
2463
2464     """
2465     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2466     if not disks_ok:
2467       raise errors.OpExecError("Cannot activate block devices")
2468
2469     return disks_info
2470
2471
2472 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2473   """Prepare the block devices for an instance.
2474
2475   This sets up the block devices on all nodes.
2476
2477   @type lu: L{LogicalUnit}
2478   @param lu: the logical unit on whose behalf we execute
2479   @type instance: L{objects.Instance}
2480   @param instance: the instance for whose disks we assemble
2481   @type ignore_secondaries: boolean
2482   @param ignore_secondaries: if true, errors on secondary nodes
2483       won't result in an error return from the function
2484   @return: False if the operation failed, otherwise a list of
2485       (host, instance_visible_name, node_visible_name)
2486       with the mapping from node devices to instance devices
2487
2488   """
2489   device_info = []
2490   disks_ok = True
2491   iname = instance.name
2492   # With the two passes mechanism we try to reduce the window of
2493   # opportunity for the race condition of switching DRBD to primary
2494   # before handshaking occured, but we do not eliminate it
2495
2496   # The proper fix would be to wait (with some limits) until the
2497   # connection has been made and drbd transitions from WFConnection
2498   # into any other network-connected state (Connected, SyncTarget,
2499   # SyncSource, etc.)
2500
2501   # 1st pass, assemble on all nodes in secondary mode
2502   for inst_disk in instance.disks:
2503     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2504       lu.cfg.SetDiskID(node_disk, node)
2505       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2506       msg = result.RemoteFailMsg()
2507       if msg:
2508         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2509                            " (is_primary=False, pass=1): %s",
2510                            inst_disk.iv_name, node, msg)
2511         if not ignore_secondaries:
2512           disks_ok = False
2513
2514   # FIXME: race condition on drbd migration to primary
2515
2516   # 2nd pass, do only the primary node
2517   for inst_disk in instance.disks:
2518     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2519       if node != instance.primary_node:
2520         continue
2521       lu.cfg.SetDiskID(node_disk, node)
2522       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2523       msg = result.RemoteFailMsg()
2524       if msg:
2525         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2526                            " (is_primary=True, pass=2): %s",
2527                            inst_disk.iv_name, node, msg)
2528         disks_ok = False
2529     device_info.append((instance.primary_node, inst_disk.iv_name,
2530                         result.payload))
2531
2532   # leave the disks configured for the primary node
2533   # this is a workaround that would be fixed better by
2534   # improving the logical/physical id handling
2535   for disk in instance.disks:
2536     lu.cfg.SetDiskID(disk, instance.primary_node)
2537
2538   return disks_ok, device_info
2539
2540
2541 def _StartInstanceDisks(lu, instance, force):
2542   """Start the disks of an instance.
2543
2544   """
2545   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2546                                            ignore_secondaries=force)
2547   if not disks_ok:
2548     _ShutdownInstanceDisks(lu, instance)
2549     if force is not None and not force:
2550       lu.proc.LogWarning("", hint="If the message above refers to a"
2551                          " secondary node,"
2552                          " you can retry the operation using '--force'.")
2553     raise errors.OpExecError("Disk consistency error")
2554
2555
2556 class LUDeactivateInstanceDisks(NoHooksLU):
2557   """Shutdown an instance's disks.
2558
2559   """
2560   _OP_REQP = ["instance_name"]
2561   REQ_BGL = False
2562
2563   def ExpandNames(self):
2564     self._ExpandAndLockInstance()
2565     self.needed_locks[locking.LEVEL_NODE] = []
2566     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2567
2568   def DeclareLocks(self, level):
2569     if level == locking.LEVEL_NODE:
2570       self._LockInstancesNodes()
2571
2572   def CheckPrereq(self):
2573     """Check prerequisites.
2574
2575     This checks that the instance is in the cluster.
2576
2577     """
2578     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2579     assert self.instance is not None, \
2580       "Cannot retrieve locked instance %s" % self.op.instance_name
2581
2582   def Exec(self, feedback_fn):
2583     """Deactivate the disks
2584
2585     """
2586     instance = self.instance
2587     _SafeShutdownInstanceDisks(self, instance)
2588
2589
2590 def _SafeShutdownInstanceDisks(lu, instance):
2591   """Shutdown block devices of an instance.
2592
2593   This function checks if an instance is running, before calling
2594   _ShutdownInstanceDisks.
2595
2596   """
2597   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2598                                       [instance.hypervisor])
2599   ins_l = ins_l[instance.primary_node]
2600   if ins_l.failed or not isinstance(ins_l.data, list):
2601     raise errors.OpExecError("Can't contact node '%s'" %
2602                              instance.primary_node)
2603
2604   if instance.name in ins_l.data:
2605     raise errors.OpExecError("Instance is running, can't shutdown"
2606                              " block devices.")
2607
2608   _ShutdownInstanceDisks(lu, instance)
2609
2610
2611 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2612   """Shutdown block devices of an instance.
2613
2614   This does the shutdown on all nodes of the instance.
2615
2616   If the ignore_primary is false, errors on the primary node are
2617   ignored.
2618
2619   """
2620   all_result = True
2621   for disk in instance.disks:
2622     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2623       lu.cfg.SetDiskID(top_disk, node)
2624       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2625       msg = result.RemoteFailMsg()
2626       if msg:
2627         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2628                       disk.iv_name, node, msg)
2629         if not ignore_primary or node != instance.primary_node:
2630           all_result = False
2631   return all_result
2632
2633
2634 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2635   """Checks if a node has enough free memory.
2636
2637   This function check if a given node has the needed amount of free
2638   memory. In case the node has less memory or we cannot get the
2639   information from the node, this function raise an OpPrereqError
2640   exception.
2641
2642   @type lu: C{LogicalUnit}
2643   @param lu: a logical unit from which we get configuration data
2644   @type node: C{str}
2645   @param node: the node to check
2646   @type reason: C{str}
2647   @param reason: string to use in the error message
2648   @type requested: C{int}
2649   @param requested: the amount of memory in MiB to check for
2650   @type hypervisor_name: C{str}
2651   @param hypervisor_name: the hypervisor to ask for memory stats
2652   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2653       we cannot check the node
2654
2655   """
2656   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2657   nodeinfo[node].Raise()
2658   free_mem = nodeinfo[node].data.get('memory_free')
2659   if not isinstance(free_mem, int):
2660     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2661                              " was '%s'" % (node, free_mem))
2662   if requested > free_mem:
2663     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2664                              " needed %s MiB, available %s MiB" %
2665                              (node, reason, requested, free_mem))
2666
2667
2668 class LUStartupInstance(LogicalUnit):
2669   """Starts an instance.
2670
2671   """
2672   HPATH = "instance-start"
2673   HTYPE = constants.HTYPE_INSTANCE
2674   _OP_REQP = ["instance_name", "force"]
2675   REQ_BGL = False
2676
2677   def ExpandNames(self):
2678     self._ExpandAndLockInstance()
2679
2680   def BuildHooksEnv(self):
2681     """Build hooks env.
2682
2683     This runs on master, primary and secondary nodes of the instance.
2684
2685     """
2686     env = {
2687       "FORCE": self.op.force,
2688       }
2689     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2690     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2691     return env, nl, nl
2692
2693   def CheckPrereq(self):
2694     """Check prerequisites.
2695
2696     This checks that the instance is in the cluster.
2697
2698     """
2699     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2700     assert self.instance is not None, \
2701       "Cannot retrieve locked instance %s" % self.op.instance_name
2702
2703     _CheckNodeOnline(self, instance.primary_node)
2704
2705     bep = self.cfg.GetClusterInfo().FillBE(instance)
2706     # check bridges existance
2707     _CheckInstanceBridgesExist(self, instance)
2708
2709     _CheckNodeFreeMemory(self, instance.primary_node,
2710                          "starting instance %s" % instance.name,
2711                          bep[constants.BE_MEMORY], instance.hypervisor)
2712
2713   def Exec(self, feedback_fn):
2714     """Start the instance.
2715
2716     """
2717     instance = self.instance
2718     force = self.op.force
2719     extra_args = getattr(self.op, "extra_args", "")
2720
2721     self.cfg.MarkInstanceUp(instance.name)
2722
2723     node_current = instance.primary_node
2724
2725     _StartInstanceDisks(self, instance, force)
2726
2727     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2728     msg = result.RemoteFailMsg()
2729     if msg:
2730       _ShutdownInstanceDisks(self, instance)
2731       raise errors.OpExecError("Could not start instance: %s" % msg)
2732
2733
2734 class LURebootInstance(LogicalUnit):
2735   """Reboot an instance.
2736
2737   """
2738   HPATH = "instance-reboot"
2739   HTYPE = constants.HTYPE_INSTANCE
2740   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2741   REQ_BGL = False
2742
2743   def ExpandNames(self):
2744     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2745                                    constants.INSTANCE_REBOOT_HARD,
2746                                    constants.INSTANCE_REBOOT_FULL]:
2747       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2748                                   (constants.INSTANCE_REBOOT_SOFT,
2749                                    constants.INSTANCE_REBOOT_HARD,
2750                                    constants.INSTANCE_REBOOT_FULL))
2751     self._ExpandAndLockInstance()
2752
2753   def BuildHooksEnv(self):
2754     """Build hooks env.
2755
2756     This runs on master, primary and secondary nodes of the instance.
2757
2758     """
2759     env = {
2760       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2761       }
2762     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2763     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2764     return env, nl, nl
2765
2766   def CheckPrereq(self):
2767     """Check prerequisites.
2768
2769     This checks that the instance is in the cluster.
2770
2771     """
2772     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2773     assert self.instance is not None, \
2774       "Cannot retrieve locked instance %s" % self.op.instance_name
2775
2776     _CheckNodeOnline(self, instance.primary_node)
2777
2778     # check bridges existance
2779     _CheckInstanceBridgesExist(self, instance)
2780
2781   def Exec(self, feedback_fn):
2782     """Reboot the instance.
2783
2784     """
2785     instance = self.instance
2786     ignore_secondaries = self.op.ignore_secondaries
2787     reboot_type = self.op.reboot_type
2788     extra_args = getattr(self.op, "extra_args", "")
2789
2790     node_current = instance.primary_node
2791
2792     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2793                        constants.INSTANCE_REBOOT_HARD]:
2794       result = self.rpc.call_instance_reboot(node_current, instance,
2795                                              reboot_type, extra_args)
2796       msg = result.RemoteFailMsg()
2797       if msg:
2798         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2799     else:
2800       result = self.rpc.call_instance_shutdown(node_current, instance)
2801       msg = result.RemoteFailMsg()
2802       if msg:
2803         raise errors.OpExecError("Could not shutdown instance for"
2804                                  " full reboot: %s" % msg)
2805       _ShutdownInstanceDisks(self, instance)
2806       _StartInstanceDisks(self, instance, ignore_secondaries)
2807       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2808       msg = result.RemoteFailMsg()
2809       if msg:
2810         _ShutdownInstanceDisks(self, instance)
2811         raise errors.OpExecError("Could not start instance for"
2812                                  " full reboot: %s" % msg)
2813
2814     self.cfg.MarkInstanceUp(instance.name)
2815
2816
2817 class LUShutdownInstance(LogicalUnit):
2818   """Shutdown an instance.
2819
2820   """
2821   HPATH = "instance-stop"
2822   HTYPE = constants.HTYPE_INSTANCE
2823   _OP_REQP = ["instance_name"]
2824   REQ_BGL = False
2825
2826   def ExpandNames(self):
2827     self._ExpandAndLockInstance()
2828
2829   def BuildHooksEnv(self):
2830     """Build hooks env.
2831
2832     This runs on master, primary and secondary nodes of the instance.
2833
2834     """
2835     env = _BuildInstanceHookEnvByObject(self, self.instance)
2836     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2837     return env, nl, nl
2838
2839   def CheckPrereq(self):
2840     """Check prerequisites.
2841
2842     This checks that the instance is in the cluster.
2843
2844     """
2845     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2846     assert self.instance is not None, \
2847       "Cannot retrieve locked instance %s" % self.op.instance_name
2848     _CheckNodeOnline(self, self.instance.primary_node)
2849
2850   def Exec(self, feedback_fn):
2851     """Shutdown the instance.
2852
2853     """
2854     instance = self.instance
2855     node_current = instance.primary_node
2856     self.cfg.MarkInstanceDown(instance.name)
2857     result = self.rpc.call_instance_shutdown(node_current, instance)
2858     msg = result.RemoteFailMsg()
2859     if msg:
2860       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2861
2862     _ShutdownInstanceDisks(self, instance)
2863
2864
2865 class LUReinstallInstance(LogicalUnit):
2866   """Reinstall an instance.
2867
2868   """
2869   HPATH = "instance-reinstall"
2870   HTYPE = constants.HTYPE_INSTANCE
2871   _OP_REQP = ["instance_name"]
2872   REQ_BGL = False
2873
2874   def ExpandNames(self):
2875     self._ExpandAndLockInstance()
2876
2877   def BuildHooksEnv(self):
2878     """Build hooks env.
2879
2880     This runs on master, primary and secondary nodes of the instance.
2881
2882     """
2883     env = _BuildInstanceHookEnvByObject(self, self.instance)
2884     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2885     return env, nl, nl
2886
2887   def CheckPrereq(self):
2888     """Check prerequisites.
2889
2890     This checks that the instance is in the cluster and is not running.
2891
2892     """
2893     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2894     assert instance is not None, \
2895       "Cannot retrieve locked instance %s" % self.op.instance_name
2896     _CheckNodeOnline(self, instance.primary_node)
2897
2898     if instance.disk_template == constants.DT_DISKLESS:
2899       raise errors.OpPrereqError("Instance '%s' has no disks" %
2900                                  self.op.instance_name)
2901     if instance.admin_up:
2902       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2903                                  self.op.instance_name)
2904     remote_info = self.rpc.call_instance_info(instance.primary_node,
2905                                               instance.name,
2906                                               instance.hypervisor)
2907     if remote_info.failed or remote_info.data:
2908       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2909                                  (self.op.instance_name,
2910                                   instance.primary_node))
2911
2912     self.op.os_type = getattr(self.op, "os_type", None)
2913     if self.op.os_type is not None:
2914       # OS verification
2915       pnode = self.cfg.GetNodeInfo(
2916         self.cfg.ExpandNodeName(instance.primary_node))
2917       if pnode is None:
2918         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2919                                    self.op.pnode)
2920       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2921       result.Raise()
2922       if not isinstance(result.data, objects.OS):
2923         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2924                                    " primary node"  % self.op.os_type)
2925
2926     self.instance = instance
2927
2928   def Exec(self, feedback_fn):
2929     """Reinstall the instance.
2930
2931     """
2932     inst = self.instance
2933
2934     if self.op.os_type is not None:
2935       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2936       inst.os = self.op.os_type
2937       self.cfg.Update(inst)
2938
2939     _StartInstanceDisks(self, inst, None)
2940     try:
2941       feedback_fn("Running the instance OS create scripts...")
2942       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2943       msg = result.RemoteFailMsg()
2944       if msg:
2945         raise errors.OpExecError("Could not install OS for instance %s"
2946                                  " on node %s: %s" %
2947                                  (inst.name, inst.primary_node, msg))
2948     finally:
2949       _ShutdownInstanceDisks(self, inst)
2950
2951
2952 class LURenameInstance(LogicalUnit):
2953   """Rename an instance.
2954
2955   """
2956   HPATH = "instance-rename"
2957   HTYPE = constants.HTYPE_INSTANCE
2958   _OP_REQP = ["instance_name", "new_name"]
2959
2960   def BuildHooksEnv(self):
2961     """Build hooks env.
2962
2963     This runs on master, primary and secondary nodes of the instance.
2964
2965     """
2966     env = _BuildInstanceHookEnvByObject(self, self.instance)
2967     env["INSTANCE_NEW_NAME"] = self.op.new_name
2968     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2969     return env, nl, nl
2970
2971   def CheckPrereq(self):
2972     """Check prerequisites.
2973
2974     This checks that the instance is in the cluster and is not running.
2975
2976     """
2977     instance = self.cfg.GetInstanceInfo(
2978       self.cfg.ExpandInstanceName(self.op.instance_name))
2979     if instance is None:
2980       raise errors.OpPrereqError("Instance '%s' not known" %
2981                                  self.op.instance_name)
2982     _CheckNodeOnline(self, instance.primary_node)
2983
2984     if instance.admin_up:
2985       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2986                                  self.op.instance_name)
2987     remote_info = self.rpc.call_instance_info(instance.primary_node,
2988                                               instance.name,
2989                                               instance.hypervisor)
2990     remote_info.Raise()
2991     if remote_info.data:
2992       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2993                                  (self.op.instance_name,
2994                                   instance.primary_node))
2995     self.instance = instance
2996
2997     # new name verification
2998     name_info = utils.HostInfo(self.op.new_name)
2999
3000     self.op.new_name = new_name = name_info.name
3001     instance_list = self.cfg.GetInstanceList()
3002     if new_name in instance_list:
3003       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3004                                  new_name)
3005
3006     if not getattr(self.op, "ignore_ip", False):
3007       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3008         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3009                                    (name_info.ip, new_name))
3010
3011
3012   def Exec(self, feedback_fn):
3013     """Reinstall the instance.
3014
3015     """
3016     inst = self.instance
3017     old_name = inst.name
3018
3019     if inst.disk_template == constants.DT_FILE:
3020       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3021
3022     self.cfg.RenameInstance(inst.name, self.op.new_name)
3023     # Change the instance lock. This is definitely safe while we hold the BGL
3024     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3025     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3026
3027     # re-read the instance from the configuration after rename
3028     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3029
3030     if inst.disk_template == constants.DT_FILE:
3031       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3032       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3033                                                      old_file_storage_dir,
3034                                                      new_file_storage_dir)
3035       result.Raise()
3036       if not result.data:
3037         raise errors.OpExecError("Could not connect to node '%s' to rename"
3038                                  " directory '%s' to '%s' (but the instance"
3039                                  " has been renamed in Ganeti)" % (
3040                                  inst.primary_node, old_file_storage_dir,
3041                                  new_file_storage_dir))
3042
3043       if not result.data[0]:
3044         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3045                                  " (but the instance has been renamed in"
3046                                  " Ganeti)" % (old_file_storage_dir,
3047                                                new_file_storage_dir))
3048
3049     _StartInstanceDisks(self, inst, None)
3050     try:
3051       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3052                                                  old_name)
3053       msg = result.RemoteFailMsg()
3054       if msg:
3055         msg = ("Could not run OS rename script for instance %s on node %s"
3056                " (but the instance has been renamed in Ganeti): %s" %
3057                (inst.name, inst.primary_node, msg))
3058         self.proc.LogWarning(msg)
3059     finally:
3060       _ShutdownInstanceDisks(self, inst)
3061
3062
3063 class LURemoveInstance(LogicalUnit):
3064   """Remove an instance.
3065
3066   """
3067   HPATH = "instance-remove"
3068   HTYPE = constants.HTYPE_INSTANCE
3069   _OP_REQP = ["instance_name", "ignore_failures"]
3070   REQ_BGL = False
3071
3072   def ExpandNames(self):
3073     self._ExpandAndLockInstance()
3074     self.needed_locks[locking.LEVEL_NODE] = []
3075     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3076
3077   def DeclareLocks(self, level):
3078     if level == locking.LEVEL_NODE:
3079       self._LockInstancesNodes()
3080
3081   def BuildHooksEnv(self):
3082     """Build hooks env.
3083
3084     This runs on master, primary and secondary nodes of the instance.
3085
3086     """
3087     env = _BuildInstanceHookEnvByObject(self, self.instance)
3088     nl = [self.cfg.GetMasterNode()]
3089     return env, nl, nl
3090
3091   def CheckPrereq(self):
3092     """Check prerequisites.
3093
3094     This checks that the instance is in the cluster.
3095
3096     """
3097     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3098     assert self.instance is not None, \
3099       "Cannot retrieve locked instance %s" % self.op.instance_name
3100
3101   def Exec(self, feedback_fn):
3102     """Remove the instance.
3103
3104     """
3105     instance = self.instance
3106     logging.info("Shutting down instance %s on node %s",
3107                  instance.name, instance.primary_node)
3108
3109     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3110     msg = result.RemoteFailMsg()
3111     if msg:
3112       if self.op.ignore_failures:
3113         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3114       else:
3115         raise errors.OpExecError("Could not shutdown instance %s on"
3116                                  " node %s: %s" %
3117                                  (instance.name, instance.primary_node, msg))
3118
3119     logging.info("Removing block devices for instance %s", instance.name)
3120
3121     if not _RemoveDisks(self, instance):
3122       if self.op.ignore_failures:
3123         feedback_fn("Warning: can't remove instance's disks")
3124       else:
3125         raise errors.OpExecError("Can't remove instance's disks")
3126
3127     logging.info("Removing instance %s out of cluster config", instance.name)
3128
3129     self.cfg.RemoveInstance(instance.name)
3130     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3131
3132
3133 class LUQueryInstances(NoHooksLU):
3134   """Logical unit for querying instances.
3135
3136   """
3137   _OP_REQP = ["output_fields", "names", "use_locking"]
3138   REQ_BGL = False
3139   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3140                                     "admin_state", "admin_ram",
3141                                     "disk_template", "ip", "mac", "bridge",
3142                                     "sda_size", "sdb_size", "vcpus", "tags",
3143                                     "network_port", "beparams",
3144                                     "(disk).(size)/([0-9]+)",
3145                                     "(disk).(sizes)", "disk_usage",
3146                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3147                                     "(nic).(macs|ips|bridges)",
3148                                     "(disk|nic).(count)",
3149                                     "serial_no", "hypervisor", "hvparams",] +
3150                                   ["hv/%s" % name
3151                                    for name in constants.HVS_PARAMETERS] +
3152                                   ["be/%s" % name
3153                                    for name in constants.BES_PARAMETERS])
3154   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3155
3156
3157   def ExpandNames(self):
3158     _CheckOutputFields(static=self._FIELDS_STATIC,
3159                        dynamic=self._FIELDS_DYNAMIC,
3160                        selected=self.op.output_fields)
3161
3162     self.needed_locks = {}
3163     self.share_locks[locking.LEVEL_INSTANCE] = 1
3164     self.share_locks[locking.LEVEL_NODE] = 1
3165
3166     if self.op.names:
3167       self.wanted = _GetWantedInstances(self, self.op.names)
3168     else:
3169       self.wanted = locking.ALL_SET
3170
3171     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3172     self.do_locking = self.do_node_query and self.op.use_locking
3173     if self.do_locking:
3174       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3175       self.needed_locks[locking.LEVEL_NODE] = []
3176       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3177
3178   def DeclareLocks(self, level):
3179     if level == locking.LEVEL_NODE and self.do_locking:
3180       self._LockInstancesNodes()
3181
3182   def CheckPrereq(self):
3183     """Check prerequisites.
3184
3185     """
3186     pass
3187
3188   def Exec(self, feedback_fn):
3189     """Computes the list of nodes and their attributes.
3190
3191     """
3192     all_info = self.cfg.GetAllInstancesInfo()
3193     if self.wanted == locking.ALL_SET:
3194       # caller didn't specify instance names, so ordering is not important
3195       if self.do_locking:
3196         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3197       else:
3198         instance_names = all_info.keys()
3199       instance_names = utils.NiceSort(instance_names)
3200     else:
3201       # caller did specify names, so we must keep the ordering
3202       if self.do_locking:
3203         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3204       else:
3205         tgt_set = all_info.keys()
3206       missing = set(self.wanted).difference(tgt_set)
3207       if missing:
3208         raise errors.OpExecError("Some instances were removed before"
3209                                  " retrieving their data: %s" % missing)
3210       instance_names = self.wanted
3211
3212     instance_list = [all_info[iname] for iname in instance_names]
3213
3214     # begin data gathering
3215
3216     nodes = frozenset([inst.primary_node for inst in instance_list])
3217     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3218
3219     bad_nodes = []
3220     off_nodes = []
3221     if self.do_node_query:
3222       live_data = {}
3223       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3224       for name in nodes:
3225         result = node_data[name]
3226         if result.offline:
3227           # offline nodes will be in both lists
3228           off_nodes.append(name)
3229         if result.failed:
3230           bad_nodes.append(name)
3231         else:
3232           if result.data:
3233             live_data.update(result.data)
3234             # else no instance is alive
3235     else:
3236       live_data = dict([(name, {}) for name in instance_names])
3237
3238     # end data gathering
3239
3240     HVPREFIX = "hv/"
3241     BEPREFIX = "be/"
3242     output = []
3243     for instance in instance_list:
3244       iout = []
3245       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3246       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3247       for field in self.op.output_fields:
3248         st_match = self._FIELDS_STATIC.Matches(field)
3249         if field == "name":
3250           val = instance.name
3251         elif field == "os":
3252           val = instance.os
3253         elif field == "pnode":
3254           val = instance.primary_node
3255         elif field == "snodes":
3256           val = list(instance.secondary_nodes)
3257         elif field == "admin_state":
3258           val = instance.admin_up
3259         elif field == "oper_state":
3260           if instance.primary_node in bad_nodes:
3261             val = None
3262           else:
3263             val = bool(live_data.get(instance.name))
3264         elif field == "status":
3265           if instance.primary_node in off_nodes:
3266             val = "ERROR_nodeoffline"
3267           elif instance.primary_node in bad_nodes:
3268             val = "ERROR_nodedown"
3269           else:
3270             running = bool(live_data.get(instance.name))
3271             if running:
3272               if instance.admin_up:
3273                 val = "running"
3274               else:
3275                 val = "ERROR_up"
3276             else:
3277               if instance.admin_up:
3278                 val = "ERROR_down"
3279               else:
3280                 val = "ADMIN_down"
3281         elif field == "oper_ram":
3282           if instance.primary_node in bad_nodes:
3283             val = None
3284           elif instance.name in live_data:
3285             val = live_data[instance.name].get("memory", "?")
3286           else:
3287             val = "-"
3288         elif field == "disk_template":
3289           val = instance.disk_template
3290         elif field == "ip":
3291           val = instance.nics[0].ip
3292         elif field == "bridge":
3293           val = instance.nics[0].bridge
3294         elif field == "mac":
3295           val = instance.nics[0].mac
3296         elif field == "sda_size" or field == "sdb_size":
3297           idx = ord(field[2]) - ord('a')
3298           try:
3299             val = instance.FindDisk(idx).size
3300           except errors.OpPrereqError:
3301             val = None
3302         elif field == "disk_usage": # total disk usage per node
3303           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3304           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3305         elif field == "tags":
3306           val = list(instance.GetTags())
3307         elif field == "serial_no":
3308           val = instance.serial_no
3309         elif field == "network_port":
3310           val = instance.network_port
3311         elif field == "hypervisor":
3312           val = instance.hypervisor
3313         elif field == "hvparams":
3314           val = i_hv
3315         elif (field.startswith(HVPREFIX) and
3316               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3317           val = i_hv.get(field[len(HVPREFIX):], None)
3318         elif field == "beparams":
3319           val = i_be
3320         elif (field.startswith(BEPREFIX) and
3321               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3322           val = i_be.get(field[len(BEPREFIX):], None)
3323         elif st_match and st_match.groups():
3324           # matches a variable list
3325           st_groups = st_match.groups()
3326           if st_groups and st_groups[0] == "disk":
3327             if st_groups[1] == "count":
3328               val = len(instance.disks)
3329             elif st_groups[1] == "sizes":
3330               val = [disk.size for disk in instance.disks]
3331             elif st_groups[1] == "size":
3332               try:
3333                 val = instance.FindDisk(st_groups[2]).size
3334               except errors.OpPrereqError:
3335                 val = None
3336             else:
3337               assert False, "Unhandled disk parameter"
3338           elif st_groups[0] == "nic":
3339             if st_groups[1] == "count":
3340               val = len(instance.nics)
3341             elif st_groups[1] == "macs":
3342               val = [nic.mac for nic in instance.nics]
3343             elif st_groups[1] == "ips":
3344               val = [nic.ip for nic in instance.nics]
3345             elif st_groups[1] == "bridges":
3346               val = [nic.bridge for nic in instance.nics]
3347             else:
3348               # index-based item
3349               nic_idx = int(st_groups[2])
3350               if nic_idx >= len(instance.nics):
3351                 val = None
3352               else:
3353                 if st_groups[1] == "mac":
3354                   val = instance.nics[nic_idx].mac
3355                 elif st_groups[1] == "ip":
3356                   val = instance.nics[nic_idx].ip
3357                 elif st_groups[1] == "bridge":
3358                   val = instance.nics[nic_idx].bridge
3359                 else:
3360                   assert False, "Unhandled NIC parameter"
3361           else:
3362             assert False, "Unhandled variable parameter"
3363         else:
3364           raise errors.ParameterError(field)
3365         iout.append(val)
3366       output.append(iout)
3367
3368     return output
3369
3370
3371 class LUFailoverInstance(LogicalUnit):
3372   """Failover an instance.
3373
3374   """
3375   HPATH = "instance-failover"
3376   HTYPE = constants.HTYPE_INSTANCE
3377   _OP_REQP = ["instance_name", "ignore_consistency"]
3378   REQ_BGL = False
3379
3380   def ExpandNames(self):
3381     self._ExpandAndLockInstance()
3382     self.needed_locks[locking.LEVEL_NODE] = []
3383     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3384
3385   def DeclareLocks(self, level):
3386     if level == locking.LEVEL_NODE:
3387       self._LockInstancesNodes()
3388
3389   def BuildHooksEnv(self):
3390     """Build hooks env.
3391
3392     This runs on master, primary and secondary nodes of the instance.
3393
3394     """
3395     env = {
3396       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3397       }
3398     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3399     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3400     return env, nl, nl
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     This checks that the instance is in the cluster.
3406
3407     """
3408     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3409     assert self.instance is not None, \
3410       "Cannot retrieve locked instance %s" % self.op.instance_name
3411
3412     bep = self.cfg.GetClusterInfo().FillBE(instance)
3413     if instance.disk_template not in constants.DTS_NET_MIRROR:
3414       raise errors.OpPrereqError("Instance's disk layout is not"
3415                                  " network mirrored, cannot failover.")
3416
3417     secondary_nodes = instance.secondary_nodes
3418     if not secondary_nodes:
3419       raise errors.ProgrammerError("no secondary node but using "
3420                                    "a mirrored disk template")
3421
3422     target_node = secondary_nodes[0]
3423     _CheckNodeOnline(self, target_node)
3424     _CheckNodeNotDrained(self, target_node)
3425     # check memory requirements on the secondary node
3426     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3427                          instance.name, bep[constants.BE_MEMORY],
3428                          instance.hypervisor)
3429
3430     # check bridge existance
3431     brlist = [nic.bridge for nic in instance.nics]
3432     result = self.rpc.call_bridges_exist(target_node, brlist)
3433     result.Raise()
3434     if not result.data:
3435       raise errors.OpPrereqError("One or more target bridges %s does not"
3436                                  " exist on destination node '%s'" %
3437                                  (brlist, target_node))
3438
3439   def Exec(self, feedback_fn):
3440     """Failover an instance.
3441
3442     The failover is done by shutting it down on its present node and
3443     starting it on the secondary.
3444
3445     """
3446     instance = self.instance
3447
3448     source_node = instance.primary_node
3449     target_node = instance.secondary_nodes[0]
3450
3451     feedback_fn("* checking disk consistency between source and target")
3452     for dev in instance.disks:
3453       # for drbd, these are drbd over lvm
3454       if not _CheckDiskConsistency(self, dev, target_node, False):
3455         if instance.admin_up and not self.op.ignore_consistency:
3456           raise errors.OpExecError("Disk %s is degraded on target node,"
3457                                    " aborting failover." % dev.iv_name)
3458
3459     feedback_fn("* shutting down instance on source node")
3460     logging.info("Shutting down instance %s on node %s",
3461                  instance.name, source_node)
3462
3463     result = self.rpc.call_instance_shutdown(source_node, instance)
3464     msg = result.RemoteFailMsg()
3465     if msg:
3466       if self.op.ignore_consistency:
3467         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3468                              " Proceeding anyway. Please make sure node"
3469                              " %s is down. Error details: %s",
3470                              instance.name, source_node, source_node, msg)
3471       else:
3472         raise errors.OpExecError("Could not shutdown instance %s on"
3473                                  " node %s: %s" %
3474                                  (instance.name, source_node, msg))
3475
3476     feedback_fn("* deactivating the instance's disks on source node")
3477     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3478       raise errors.OpExecError("Can't shut down the instance's disks.")
3479
3480     instance.primary_node = target_node
3481     # distribute new instance config to the other nodes
3482     self.cfg.Update(instance)
3483
3484     # Only start the instance if it's marked as up
3485     if instance.admin_up:
3486       feedback_fn("* activating the instance's disks on target node")
3487       logging.info("Starting instance %s on node %s",
3488                    instance.name, target_node)
3489
3490       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3491                                                ignore_secondaries=True)
3492       if not disks_ok:
3493         _ShutdownInstanceDisks(self, instance)
3494         raise errors.OpExecError("Can't activate the instance's disks")
3495
3496       feedback_fn("* starting the instance on the target node")
3497       result = self.rpc.call_instance_start(target_node, instance, None)
3498       msg = result.RemoteFailMsg()
3499       if msg:
3500         _ShutdownInstanceDisks(self, instance)
3501         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3502                                  (instance.name, target_node, msg))
3503
3504
3505 class LUMigrateInstance(LogicalUnit):
3506   """Migrate an instance.
3507
3508   This is migration without shutting down, compared to the failover,
3509   which is done with shutdown.
3510
3511   """
3512   HPATH = "instance-migrate"
3513   HTYPE = constants.HTYPE_INSTANCE
3514   _OP_REQP = ["instance_name", "live", "cleanup"]
3515
3516   REQ_BGL = False
3517
3518   def ExpandNames(self):
3519     self._ExpandAndLockInstance()
3520     self.needed_locks[locking.LEVEL_NODE] = []
3521     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3522
3523   def DeclareLocks(self, level):
3524     if level == locking.LEVEL_NODE:
3525       self._LockInstancesNodes()
3526
3527   def BuildHooksEnv(self):
3528     """Build hooks env.
3529
3530     This runs on master, primary and secondary nodes of the instance.
3531
3532     """
3533     env = _BuildInstanceHookEnvByObject(self, self.instance)
3534     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3535     return env, nl, nl
3536
3537   def CheckPrereq(self):
3538     """Check prerequisites.
3539
3540     This checks that the instance is in the cluster.
3541
3542     """
3543     instance = self.cfg.GetInstanceInfo(
3544       self.cfg.ExpandInstanceName(self.op.instance_name))
3545     if instance is None:
3546       raise errors.OpPrereqError("Instance '%s' not known" %
3547                                  self.op.instance_name)
3548
3549     if instance.disk_template != constants.DT_DRBD8:
3550       raise errors.OpPrereqError("Instance's disk layout is not"
3551                                  " drbd8, cannot migrate.")
3552
3553     secondary_nodes = instance.secondary_nodes
3554     if not secondary_nodes:
3555       raise errors.ConfigurationError("No secondary node but using"
3556                                       " drbd8 disk template")
3557
3558     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3559
3560     target_node = secondary_nodes[0]
3561     # check memory requirements on the secondary node
3562     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3563                          instance.name, i_be[constants.BE_MEMORY],
3564                          instance.hypervisor)
3565
3566     # check bridge existance
3567     brlist = [nic.bridge for nic in instance.nics]
3568     result = self.rpc.call_bridges_exist(target_node, brlist)
3569     if result.failed or not result.data:
3570       raise errors.OpPrereqError("One or more target bridges %s does not"
3571                                  " exist on destination node '%s'" %
3572                                  (brlist, target_node))
3573
3574     if not self.op.cleanup:
3575       _CheckNodeNotDrained(self, target_node)
3576       result = self.rpc.call_instance_migratable(instance.primary_node,
3577                                                  instance)
3578       msg = result.RemoteFailMsg()
3579       if msg:
3580         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3581                                    msg)
3582
3583     self.instance = instance
3584
3585   def _WaitUntilSync(self):
3586     """Poll with custom rpc for disk sync.
3587
3588     This uses our own step-based rpc call.
3589
3590     """
3591     self.feedback_fn("* wait until resync is done")
3592     all_done = False
3593     while not all_done:
3594       all_done = True
3595       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3596                                             self.nodes_ip,
3597                                             self.instance.disks)
3598       min_percent = 100
3599       for node, nres in result.items():
3600         msg = nres.RemoteFailMsg()
3601         if msg:
3602           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3603                                    (node, msg))
3604         node_done, node_percent = nres.payload
3605         all_done = all_done and node_done
3606         if node_percent is not None:
3607           min_percent = min(min_percent, node_percent)
3608       if not all_done:
3609         if min_percent < 100:
3610           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3611         time.sleep(2)
3612
3613   def _EnsureSecondary(self, node):
3614     """Demote a node to secondary.
3615
3616     """
3617     self.feedback_fn("* switching node %s to secondary mode" % node)
3618
3619     for dev in self.instance.disks:
3620       self.cfg.SetDiskID(dev, node)
3621
3622     result = self.rpc.call_blockdev_close(node, self.instance.name,
3623                                           self.instance.disks)
3624     msg = result.RemoteFailMsg()
3625     if msg:
3626       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3627                                " error %s" % (node, msg))
3628
3629   def _GoStandalone(self):
3630     """Disconnect from the network.
3631
3632     """
3633     self.feedback_fn("* changing into standalone mode")
3634     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3635                                                self.instance.disks)
3636     for node, nres in result.items():
3637       msg = nres.RemoteFailMsg()
3638       if msg:
3639         raise errors.OpExecError("Cannot disconnect disks node %s,"
3640                                  " error %s" % (node, msg))
3641
3642   def _GoReconnect(self, multimaster):
3643     """Reconnect to the network.
3644
3645     """
3646     if multimaster:
3647       msg = "dual-master"
3648     else:
3649       msg = "single-master"
3650     self.feedback_fn("* changing disks into %s mode" % msg)
3651     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3652                                            self.instance.disks,
3653                                            self.instance.name, multimaster)
3654     for node, nres in result.items():
3655       msg = nres.RemoteFailMsg()
3656       if msg:
3657         raise errors.OpExecError("Cannot change disks config on node %s,"
3658                                  " error: %s" % (node, msg))
3659
3660   def _ExecCleanup(self):
3661     """Try to cleanup after a failed migration.
3662
3663     The cleanup is done by:
3664       - check that the instance is running only on one node
3665         (and update the config if needed)
3666       - change disks on its secondary node to secondary
3667       - wait until disks are fully synchronized
3668       - disconnect from the network
3669       - change disks into single-master mode
3670       - wait again until disks are fully synchronized
3671
3672     """
3673     instance = self.instance
3674     target_node = self.target_node
3675     source_node = self.source_node
3676
3677     # check running on only one node
3678     self.feedback_fn("* checking where the instance actually runs"
3679                      " (if this hangs, the hypervisor might be in"
3680                      " a bad state)")
3681     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3682     for node, result in ins_l.items():
3683       result.Raise()
3684       if not isinstance(result.data, list):
3685         raise errors.OpExecError("Can't contact node '%s'" % node)
3686
3687     runningon_source = instance.name in ins_l[source_node].data
3688     runningon_target = instance.name in ins_l[target_node].data
3689
3690     if runningon_source and runningon_target:
3691       raise errors.OpExecError("Instance seems to be running on two nodes,"
3692                                " or the hypervisor is confused. You will have"
3693                                " to ensure manually that it runs only on one"
3694                                " and restart this operation.")
3695
3696     if not (runningon_source or runningon_target):
3697       raise errors.OpExecError("Instance does not seem to be running at all."
3698                                " In this case, it's safer to repair by"
3699                                " running 'gnt-instance stop' to ensure disk"
3700                                " shutdown, and then restarting it.")
3701
3702     if runningon_target:
3703       # the migration has actually succeeded, we need to update the config
3704       self.feedback_fn("* instance running on secondary node (%s),"
3705                        " updating config" % target_node)
3706       instance.primary_node = target_node
3707       self.cfg.Update(instance)
3708       demoted_node = source_node
3709     else:
3710       self.feedback_fn("* instance confirmed to be running on its"
3711                        " primary node (%s)" % source_node)
3712       demoted_node = target_node
3713
3714     self._EnsureSecondary(demoted_node)
3715     try:
3716       self._WaitUntilSync()
3717     except errors.OpExecError:
3718       # we ignore here errors, since if the device is standalone, it
3719       # won't be able to sync
3720       pass
3721     self._GoStandalone()
3722     self._GoReconnect(False)
3723     self._WaitUntilSync()
3724
3725     self.feedback_fn("* done")
3726
3727   def _RevertDiskStatus(self):
3728     """Try to revert the disk status after a failed migration.
3729
3730     """
3731     target_node = self.target_node
3732     try:
3733       self._EnsureSecondary(target_node)
3734       self._GoStandalone()
3735       self._GoReconnect(False)
3736       self._WaitUntilSync()
3737     except errors.OpExecError, err:
3738       self.LogWarning("Migration failed and I can't reconnect the"
3739                       " drives: error '%s'\n"
3740                       "Please look and recover the instance status" %
3741                       str(err))
3742
3743   def _AbortMigration(self):
3744     """Call the hypervisor code to abort a started migration.
3745
3746     """
3747     instance = self.instance
3748     target_node = self.target_node
3749     migration_info = self.migration_info
3750
3751     abort_result = self.rpc.call_finalize_migration(target_node,
3752                                                     instance,
3753                                                     migration_info,
3754                                                     False)
3755     abort_msg = abort_result.RemoteFailMsg()
3756     if abort_msg:
3757       logging.error("Aborting migration failed on target node %s: %s" %
3758                     (target_node, abort_msg))
3759       # Don't raise an exception here, as we stil have to try to revert the
3760       # disk status, even if this step failed.
3761
3762   def _ExecMigration(self):
3763     """Migrate an instance.
3764
3765     The migrate is done by:
3766       - change the disks into dual-master mode
3767       - wait until disks are fully synchronized again
3768       - migrate the instance
3769       - change disks on the new secondary node (the old primary) to secondary
3770       - wait until disks are fully synchronized
3771       - change disks into single-master mode
3772
3773     """
3774     instance = self.instance
3775     target_node = self.target_node
3776     source_node = self.source_node
3777
3778     self.feedback_fn("* checking disk consistency between source and target")
3779     for dev in instance.disks:
3780       if not _CheckDiskConsistency(self, dev, target_node, False):
3781         raise errors.OpExecError("Disk %s is degraded or not fully"
3782                                  " synchronized on target node,"
3783                                  " aborting migrate." % dev.iv_name)
3784
3785     # First get the migration information from the remote node
3786     result = self.rpc.call_migration_info(source_node, instance)
3787     msg = result.RemoteFailMsg()
3788     if msg:
3789       log_err = ("Failed fetching source migration information from %s: %s" %
3790                  (source_node, msg))
3791       logging.error(log_err)
3792       raise errors.OpExecError(log_err)
3793
3794     self.migration_info = migration_info = result.payload
3795
3796     # Then switch the disks to master/master mode
3797     self._EnsureSecondary(target_node)
3798     self._GoStandalone()
3799     self._GoReconnect(True)
3800     self._WaitUntilSync()
3801
3802     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3803     result = self.rpc.call_accept_instance(target_node,
3804                                            instance,
3805                                            migration_info,
3806                                            self.nodes_ip[target_node])
3807
3808     msg = result.RemoteFailMsg()
3809     if msg:
3810       logging.error("Instance pre-migration failed, trying to revert"
3811                     " disk status: %s", msg)
3812       self._AbortMigration()
3813       self._RevertDiskStatus()
3814       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3815                                (instance.name, msg))
3816
3817     self.feedback_fn("* migrating instance to %s" % target_node)
3818     time.sleep(10)
3819     result = self.rpc.call_instance_migrate(source_node, instance,
3820                                             self.nodes_ip[target_node],
3821                                             self.op.live)
3822     msg = result.RemoteFailMsg()
3823     if msg:
3824       logging.error("Instance migration failed, trying to revert"
3825                     " disk status: %s", msg)
3826       self._AbortMigration()
3827       self._RevertDiskStatus()
3828       raise errors.OpExecError("Could not migrate instance %s: %s" %
3829                                (instance.name, msg))
3830     time.sleep(10)
3831
3832     instance.primary_node = target_node
3833     # distribute new instance config to the other nodes
3834     self.cfg.Update(instance)
3835
3836     result = self.rpc.call_finalize_migration(target_node,
3837                                               instance,
3838                                               migration_info,
3839                                               True)
3840     msg = result.RemoteFailMsg()
3841     if msg:
3842       logging.error("Instance migration succeeded, but finalization failed:"
3843                     " %s" % msg)
3844       raise errors.OpExecError("Could not finalize instance migration: %s" %
3845                                msg)
3846
3847     self._EnsureSecondary(source_node)
3848     self._WaitUntilSync()
3849     self._GoStandalone()
3850     self._GoReconnect(False)
3851     self._WaitUntilSync()
3852
3853     self.feedback_fn("* done")
3854
3855   def Exec(self, feedback_fn):
3856     """Perform the migration.
3857
3858     """
3859     self.feedback_fn = feedback_fn
3860
3861     self.source_node = self.instance.primary_node
3862     self.target_node = self.instance.secondary_nodes[0]
3863     self.all_nodes = [self.source_node, self.target_node]
3864     self.nodes_ip = {
3865       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3866       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3867       }
3868     if self.op.cleanup:
3869       return self._ExecCleanup()
3870     else:
3871       return self._ExecMigration()
3872
3873
3874 def _CreateBlockDev(lu, node, instance, device, force_create,
3875                     info, force_open):
3876   """Create a tree of block devices on a given node.
3877
3878   If this device type has to be created on secondaries, create it and
3879   all its children.
3880
3881   If not, just recurse to children keeping the same 'force' value.
3882
3883   @param lu: the lu on whose behalf we execute
3884   @param node: the node on which to create the device
3885   @type instance: L{objects.Instance}
3886   @param instance: the instance which owns the device
3887   @type device: L{objects.Disk}
3888   @param device: the device to create
3889   @type force_create: boolean
3890   @param force_create: whether to force creation of this device; this
3891       will be change to True whenever we find a device which has
3892       CreateOnSecondary() attribute
3893   @param info: the extra 'metadata' we should attach to the device
3894       (this will be represented as a LVM tag)
3895   @type force_open: boolean
3896   @param force_open: this parameter will be passes to the
3897       L{backend.BlockdevCreate} function where it specifies
3898       whether we run on primary or not, and it affects both
3899       the child assembly and the device own Open() execution
3900
3901   """
3902   if device.CreateOnSecondary():
3903     force_create = True
3904
3905   if device.children:
3906     for child in device.children:
3907       _CreateBlockDev(lu, node, instance, child, force_create,
3908                       info, force_open)
3909
3910   if not force_create:
3911     return
3912
3913   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3914
3915
3916 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3917   """Create a single block device on a given node.
3918
3919   This will not recurse over children of the device, so they must be
3920   created in advance.
3921
3922   @param lu: the lu on whose behalf we execute
3923   @param node: the node on which to create the device
3924   @type instance: L{objects.Instance}
3925   @param instance: the instance which owns the device
3926   @type device: L{objects.Disk}
3927   @param device: the device to create
3928   @param info: the extra 'metadata' we should attach to the device
3929       (this will be represented as a LVM tag)
3930   @type force_open: boolean
3931   @param force_open: this parameter will be passes to the
3932       L{backend.BlockdevCreate} function where it specifies
3933       whether we run on primary or not, and it affects both
3934       the child assembly and the device own Open() execution
3935
3936   """
3937   lu.cfg.SetDiskID(device, node)
3938   result = lu.rpc.call_blockdev_create(node, device, device.size,
3939                                        instance.name, force_open, info)
3940   msg = result.RemoteFailMsg()
3941   if msg:
3942     raise errors.OpExecError("Can't create block device %s on"
3943                              " node %s for instance %s: %s" %
3944                              (device, node, instance.name, msg))
3945   if device.physical_id is None:
3946     device.physical_id = result.payload
3947
3948
3949 def _GenerateUniqueNames(lu, exts):
3950   """Generate a suitable LV name.
3951
3952   This will generate a logical volume name for the given instance.
3953
3954   """
3955   results = []
3956   for val in exts:
3957     new_id = lu.cfg.GenerateUniqueID()
3958     results.append("%s%s" % (new_id, val))
3959   return results
3960
3961
3962 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3963                          p_minor, s_minor):
3964   """Generate a drbd8 device complete with its children.
3965
3966   """
3967   port = lu.cfg.AllocatePort()
3968   vgname = lu.cfg.GetVGName()
3969   shared_secret = lu.cfg.GenerateDRBDSecret()
3970   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3971                           logical_id=(vgname, names[0]))
3972   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3973                           logical_id=(vgname, names[1]))
3974   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3975                           logical_id=(primary, secondary, port,
3976                                       p_minor, s_minor,
3977                                       shared_secret),
3978                           children=[dev_data, dev_meta],
3979                           iv_name=iv_name)
3980   return drbd_dev
3981
3982
3983 def _GenerateDiskTemplate(lu, template_name,
3984                           instance_name, primary_node,
3985                           secondary_nodes, disk_info,
3986                           file_storage_dir, file_driver,
3987                           base_index):
3988   """Generate the entire disk layout for a given template type.
3989
3990   """
3991   #TODO: compute space requirements
3992
3993   vgname = lu.cfg.GetVGName()
3994   disk_count = len(disk_info)
3995   disks = []
3996   if template_name == constants.DT_DISKLESS:
3997     pass
3998   elif template_name == constants.DT_PLAIN:
3999     if len(secondary_nodes) != 0:
4000       raise errors.ProgrammerError("Wrong template configuration")
4001
4002     names = _GenerateUniqueNames(lu, [".disk%d" % i
4003                                       for i in range(disk_count)])
4004     for idx, disk in enumerate(disk_info):
4005       disk_index = idx + base_index
4006       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4007                               logical_id=(vgname, names[idx]),
4008                               iv_name="disk/%d" % disk_index,
4009                               mode=disk["mode"])
4010       disks.append(disk_dev)
4011   elif template_name == constants.DT_DRBD8:
4012     if len(secondary_nodes) != 1:
4013       raise errors.ProgrammerError("Wrong template configuration")
4014     remote_node = secondary_nodes[0]
4015     minors = lu.cfg.AllocateDRBDMinor(
4016       [primary_node, remote_node] * len(disk_info), instance_name)
4017
4018     names = []
4019     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4020                                                for i in range(disk_count)]):
4021       names.append(lv_prefix + "_data")
4022       names.append(lv_prefix + "_meta")
4023     for idx, disk in enumerate(disk_info):
4024       disk_index = idx + base_index
4025       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4026                                       disk["size"], names[idx*2:idx*2+2],
4027                                       "disk/%d" % disk_index,
4028                                       minors[idx*2], minors[idx*2+1])
4029       disk_dev.mode = disk["mode"]
4030       disks.append(disk_dev)
4031   elif template_name == constants.DT_FILE:
4032     if len(secondary_nodes) != 0:
4033       raise errors.ProgrammerError("Wrong template configuration")
4034
4035     for idx, disk in enumerate(disk_info):
4036       disk_index = idx + base_index
4037       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4038                               iv_name="disk/%d" % disk_index,
4039                               logical_id=(file_driver,
4040                                           "%s/disk%d" % (file_storage_dir,
4041                                                          disk_index)),
4042                               mode=disk["mode"])
4043       disks.append(disk_dev)
4044   else:
4045     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4046   return disks
4047
4048
4049 def _GetInstanceInfoText(instance):
4050   """Compute that text that should be added to the disk's metadata.
4051
4052   """
4053   return "originstname+%s" % instance.name
4054
4055
4056 def _CreateDisks(lu, instance):
4057   """Create all disks for an instance.
4058
4059   This abstracts away some work from AddInstance.
4060
4061   @type lu: L{LogicalUnit}
4062   @param lu: the logical unit on whose behalf we execute
4063   @type instance: L{objects.Instance}
4064   @param instance: the instance whose disks we should create
4065   @rtype: boolean
4066   @return: the success of the creation
4067
4068   """
4069   info = _GetInstanceInfoText(instance)
4070   pnode = instance.primary_node
4071
4072   if instance.disk_template == constants.DT_FILE:
4073     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4074     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4075
4076     if result.failed or not result.data:
4077       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4078
4079     if not result.data[0]:
4080       raise errors.OpExecError("Failed to create directory '%s'" %
4081                                file_storage_dir)
4082
4083   # Note: this needs to be kept in sync with adding of disks in
4084   # LUSetInstanceParams
4085   for device in instance.disks:
4086     logging.info("Creating volume %s for instance %s",
4087                  device.iv_name, instance.name)
4088     #HARDCODE
4089     for node in instance.all_nodes:
4090       f_create = node == pnode
4091       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4092
4093
4094 def _RemoveDisks(lu, instance):
4095   """Remove all disks for an instance.
4096
4097   This abstracts away some work from `AddInstance()` and
4098   `RemoveInstance()`. Note that in case some of the devices couldn't
4099   be removed, the removal will continue with the other ones (compare
4100   with `_CreateDisks()`).
4101
4102   @type lu: L{LogicalUnit}
4103   @param lu: the logical unit on whose behalf we execute
4104   @type instance: L{objects.Instance}
4105   @param instance: the instance whose disks we should remove
4106   @rtype: boolean
4107   @return: the success of the removal
4108
4109   """
4110   logging.info("Removing block devices for instance %s", instance.name)
4111
4112   all_result = True
4113   for device in instance.disks:
4114     for node, disk in device.ComputeNodeTree(instance.primary_node):
4115       lu.cfg.SetDiskID(disk, node)
4116       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4117       if msg:
4118         lu.LogWarning("Could not remove block device %s on node %s,"
4119                       " continuing anyway: %s", device.iv_name, node, msg)
4120         all_result = False
4121
4122   if instance.disk_template == constants.DT_FILE:
4123     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4124     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4125                                                  file_storage_dir)
4126     if result.failed or not result.data:
4127       logging.error("Could not remove directory '%s'", file_storage_dir)
4128       all_result = False
4129
4130   return all_result
4131
4132
4133 def _ComputeDiskSize(disk_template, disks):
4134   """Compute disk size requirements in the volume group
4135
4136   """
4137   # Required free disk space as a function of disk and swap space
4138   req_size_dict = {
4139     constants.DT_DISKLESS: None,
4140     constants.DT_PLAIN: sum(d["size"] for d in disks),
4141     # 128 MB are added for drbd metadata for each disk
4142     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4143     constants.DT_FILE: None,
4144   }
4145
4146   if disk_template not in req_size_dict:
4147     raise errors.ProgrammerError("Disk template '%s' size requirement"
4148                                  " is unknown" %  disk_template)
4149
4150   return req_size_dict[disk_template]
4151
4152
4153 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4154   """Hypervisor parameter validation.
4155
4156   This function abstract the hypervisor parameter validation to be
4157   used in both instance create and instance modify.
4158
4159   @type lu: L{LogicalUnit}
4160   @param lu: the logical unit for which we check
4161   @type nodenames: list
4162   @param nodenames: the list of nodes on which we should check
4163   @type hvname: string
4164   @param hvname: the name of the hypervisor we should use
4165   @type hvparams: dict
4166   @param hvparams: the parameters which we need to check
4167   @raise errors.OpPrereqError: if the parameters are not valid
4168
4169   """
4170   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4171                                                   hvname,
4172                                                   hvparams)
4173   for node in nodenames:
4174     info = hvinfo[node]
4175     if info.offline:
4176       continue
4177     msg = info.RemoteFailMsg()
4178     if msg:
4179       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4180                                  " %s" % msg)
4181
4182
4183 class LUCreateInstance(LogicalUnit):
4184   """Create an instance.
4185
4186   """
4187   HPATH = "instance-add"
4188   HTYPE = constants.HTYPE_INSTANCE
4189   _OP_REQP = ["instance_name", "disks", "disk_template",
4190               "mode", "start",
4191               "wait_for_sync", "ip_check", "nics",
4192               "hvparams", "beparams"]
4193   REQ_BGL = False
4194
4195   def _ExpandNode(self, node):
4196     """Expands and checks one node name.
4197
4198     """
4199     node_full = self.cfg.ExpandNodeName(node)
4200     if node_full is None:
4201       raise errors.OpPrereqError("Unknown node %s" % node)
4202     return node_full
4203
4204   def ExpandNames(self):
4205     """ExpandNames for CreateInstance.
4206
4207     Figure out the right locks for instance creation.
4208
4209     """
4210     self.needed_locks = {}
4211
4212     # set optional parameters to none if they don't exist
4213     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4214       if not hasattr(self.op, attr):
4215         setattr(self.op, attr, None)
4216
4217     # cheap checks, mostly valid constants given
4218
4219     # verify creation mode
4220     if self.op.mode not in (constants.INSTANCE_CREATE,
4221                             constants.INSTANCE_IMPORT):
4222       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4223                                  self.op.mode)
4224
4225     # disk template and mirror node verification
4226     if self.op.disk_template not in constants.DISK_TEMPLATES:
4227       raise errors.OpPrereqError("Invalid disk template name")
4228
4229     if self.op.hypervisor is None:
4230       self.op.hypervisor = self.cfg.GetHypervisorType()
4231
4232     cluster = self.cfg.GetClusterInfo()
4233     enabled_hvs = cluster.enabled_hypervisors
4234     if self.op.hypervisor not in enabled_hvs:
4235       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4236                                  " cluster (%s)" % (self.op.hypervisor,
4237                                   ",".join(enabled_hvs)))
4238
4239     # check hypervisor parameter syntax (locally)
4240     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4241     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4242                                   self.op.hvparams)
4243     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4244     hv_type.CheckParameterSyntax(filled_hvp)
4245
4246     # fill and remember the beparams dict
4247     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4248     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4249                                     self.op.beparams)
4250
4251     #### instance parameters check
4252
4253     # instance name verification
4254     hostname1 = utils.HostInfo(self.op.instance_name)
4255     self.op.instance_name = instance_name = hostname1.name
4256
4257     # this is just a preventive check, but someone might still add this
4258     # instance in the meantime, and creation will fail at lock-add time
4259     if instance_name in self.cfg.GetInstanceList():
4260       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4261                                  instance_name)
4262
4263     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4264
4265     # NIC buildup
4266     self.nics = []
4267     for nic in self.op.nics:
4268       # ip validity checks
4269       ip = nic.get("ip", None)
4270       if ip is None or ip.lower() == "none":
4271         nic_ip = None
4272       elif ip.lower() == constants.VALUE_AUTO:
4273         nic_ip = hostname1.ip
4274       else:
4275         if not utils.IsValidIP(ip):
4276           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4277                                      " like a valid IP" % ip)
4278         nic_ip = ip
4279
4280       # MAC address verification
4281       mac = nic.get("mac", constants.VALUE_AUTO)
4282       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4283         if not utils.IsValidMac(mac.lower()):
4284           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4285                                      mac)
4286       # bridge verification
4287       bridge = nic.get("bridge", None)
4288       if bridge is None:
4289         bridge = self.cfg.GetDefBridge()
4290       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4291
4292     # disk checks/pre-build
4293     self.disks = []
4294     for disk in self.op.disks:
4295       mode = disk.get("mode", constants.DISK_RDWR)
4296       if mode not in constants.DISK_ACCESS_SET:
4297         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4298                                    mode)
4299       size = disk.get("size", None)
4300       if size is None:
4301         raise errors.OpPrereqError("Missing disk size")
4302       try:
4303         size = int(size)
4304       except ValueError:
4305         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4306       self.disks.append({"size": size, "mode": mode})
4307
4308     # used in CheckPrereq for ip ping check
4309     self.check_ip = hostname1.ip
4310
4311     # file storage checks
4312     if (self.op.file_driver and
4313         not self.op.file_driver in constants.FILE_DRIVER):
4314       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4315                                  self.op.file_driver)
4316
4317     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4318       raise errors.OpPrereqError("File storage directory path not absolute")
4319
4320     ### Node/iallocator related checks
4321     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4322       raise errors.OpPrereqError("One and only one of iallocator and primary"
4323                                  " node must be given")
4324
4325     if self.op.iallocator:
4326       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4327     else:
4328       self.op.pnode = self._ExpandNode(self.op.pnode)
4329       nodelist = [self.op.pnode]
4330       if self.op.snode is not None:
4331         self.op.snode = self._ExpandNode(self.op.snode)
4332         nodelist.append(self.op.snode)
4333       self.needed_locks[locking.LEVEL_NODE] = nodelist
4334
4335     # in case of import lock the source node too
4336     if self.op.mode == constants.INSTANCE_IMPORT:
4337       src_node = getattr(self.op, "src_node", None)
4338       src_path = getattr(self.op, "src_path", None)
4339
4340       if src_path is None:
4341         self.op.src_path = src_path = self.op.instance_name
4342
4343       if src_node is None:
4344         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4345         self.op.src_node = None
4346         if os.path.isabs(src_path):
4347           raise errors.OpPrereqError("Importing an instance from an absolute"
4348                                      " path requires a source node option.")
4349       else:
4350         self.op.src_node = src_node = self._ExpandNode(src_node)
4351         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4352           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4353         if not os.path.isabs(src_path):
4354           self.op.src_path = src_path = \
4355             os.path.join(constants.EXPORT_DIR, src_path)
4356
4357     else: # INSTANCE_CREATE
4358       if getattr(self.op, "os_type", None) is None:
4359         raise errors.OpPrereqError("No guest OS specified")
4360
4361   def _RunAllocator(self):
4362     """Run the allocator based on input opcode.
4363
4364     """
4365     nics = [n.ToDict() for n in self.nics]
4366     ial = IAllocator(self,
4367                      mode=constants.IALLOCATOR_MODE_ALLOC,
4368                      name=self.op.instance_name,
4369                      disk_template=self.op.disk_template,
4370                      tags=[],
4371                      os=self.op.os_type,
4372                      vcpus=self.be_full[constants.BE_VCPUS],
4373                      mem_size=self.be_full[constants.BE_MEMORY],
4374                      disks=self.disks,
4375                      nics=nics,
4376                      hypervisor=self.op.hypervisor,
4377                      )
4378
4379     ial.Run(self.op.iallocator)
4380
4381     if not ial.success:
4382       raise errors.OpPrereqError("Can't compute nodes using"
4383                                  " iallocator '%s': %s" % (self.op.iallocator,
4384                                                            ial.info))
4385     if len(ial.nodes) != ial.required_nodes:
4386       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4387                                  " of nodes (%s), required %s" %
4388                                  (self.op.iallocator, len(ial.nodes),
4389                                   ial.required_nodes))
4390     self.op.pnode = ial.nodes[0]
4391     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4392                  self.op.instance_name, self.op.iallocator,
4393                  ", ".join(ial.nodes))
4394     if ial.required_nodes == 2:
4395       self.op.snode = ial.nodes[1]
4396
4397   def BuildHooksEnv(self):
4398     """Build hooks env.
4399
4400     This runs on master, primary and secondary nodes of the instance.
4401
4402     """
4403     env = {
4404       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4405       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4406       "INSTANCE_ADD_MODE": self.op.mode,
4407       }
4408     if self.op.mode == constants.INSTANCE_IMPORT:
4409       env["INSTANCE_SRC_NODE"] = self.op.src_node
4410       env["INSTANCE_SRC_PATH"] = self.op.src_path
4411       env["INSTANCE_SRC_IMAGES"] = self.src_images
4412
4413     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4414       primary_node=self.op.pnode,
4415       secondary_nodes=self.secondaries,
4416       status=self.op.start,
4417       os_type=self.op.os_type,
4418       memory=self.be_full[constants.BE_MEMORY],
4419       vcpus=self.be_full[constants.BE_VCPUS],
4420       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4421     ))
4422
4423     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4424           self.secondaries)
4425     return env, nl, nl
4426
4427
4428   def CheckPrereq(self):
4429     """Check prerequisites.
4430
4431     """
4432     if (not self.cfg.GetVGName() and
4433         self.op.disk_template not in constants.DTS_NOT_LVM):
4434       raise errors.OpPrereqError("Cluster does not support lvm-based"
4435                                  " instances")
4436
4437
4438     if self.op.mode == constants.INSTANCE_IMPORT:
4439       src_node = self.op.src_node
4440       src_path = self.op.src_path
4441
4442       if src_node is None:
4443         exp_list = self.rpc.call_export_list(
4444           self.acquired_locks[locking.LEVEL_NODE])
4445         found = False
4446         for node in exp_list:
4447           if not exp_list[node].failed and src_path in exp_list[node].data:
4448             found = True
4449             self.op.src_node = src_node = node
4450             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4451                                                        src_path)
4452             break
4453         if not found:
4454           raise errors.OpPrereqError("No export found for relative path %s" %
4455                                       src_path)
4456
4457       _CheckNodeOnline(self, src_node)
4458       result = self.rpc.call_export_info(src_node, src_path)
4459       result.Raise()
4460       if not result.data:
4461         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4462
4463       export_info = result.data
4464       if not export_info.has_section(constants.INISECT_EXP):
4465         raise errors.ProgrammerError("Corrupted export config")
4466
4467       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4468       if (int(ei_version) != constants.EXPORT_VERSION):
4469         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4470                                    (ei_version, constants.EXPORT_VERSION))
4471
4472       # Check that the new instance doesn't have less disks than the export
4473       instance_disks = len(self.disks)
4474       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4475       if instance_disks < export_disks:
4476         raise errors.OpPrereqError("Not enough disks to import."
4477                                    " (instance: %d, export: %d)" %
4478                                    (instance_disks, export_disks))
4479
4480       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4481       disk_images = []
4482       for idx in range(export_disks):
4483         option = 'disk%d_dump' % idx
4484         if export_info.has_option(constants.INISECT_INS, option):
4485           # FIXME: are the old os-es, disk sizes, etc. useful?
4486           export_name = export_info.get(constants.INISECT_INS, option)
4487           image = os.path.join(src_path, export_name)
4488           disk_images.append(image)
4489         else:
4490           disk_images.append(False)
4491
4492       self.src_images = disk_images
4493
4494       old_name = export_info.get(constants.INISECT_INS, 'name')
4495       # FIXME: int() here could throw a ValueError on broken exports
4496       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4497       if self.op.instance_name == old_name:
4498         for idx, nic in enumerate(self.nics):
4499           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4500             nic_mac_ini = 'nic%d_mac' % idx
4501             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4502
4503     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4504     if self.op.start and not self.op.ip_check:
4505       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4506                                  " adding an instance in start mode")
4507
4508     if self.op.ip_check:
4509       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4510         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4511                                    (self.check_ip, self.op.instance_name))
4512
4513     #### allocator run
4514
4515     if self.op.iallocator is not None:
4516       self._RunAllocator()
4517
4518     #### node related checks
4519
4520     # check primary node
4521     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4522     assert self.pnode is not None, \
4523       "Cannot retrieve locked node %s" % self.op.pnode
4524     if pnode.offline:
4525       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4526                                  pnode.name)
4527     if pnode.drained:
4528       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4529                                  pnode.name)
4530
4531     self.secondaries = []
4532
4533     # mirror node verification
4534     if self.op.disk_template in constants.DTS_NET_MIRROR:
4535       if self.op.snode is None:
4536         raise errors.OpPrereqError("The networked disk templates need"
4537                                    " a mirror node")
4538       if self.op.snode == pnode.name:
4539         raise errors.OpPrereqError("The secondary node cannot be"
4540                                    " the primary node.")
4541       _CheckNodeOnline(self, self.op.snode)
4542       _CheckNodeNotDrained(self, self.op.snode)
4543       self.secondaries.append(self.op.snode)
4544
4545     nodenames = [pnode.name] + self.secondaries
4546
4547     req_size = _ComputeDiskSize(self.op.disk_template,
4548                                 self.disks)
4549
4550     # Check lv size requirements
4551     if req_size is not None:
4552       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4553                                          self.op.hypervisor)
4554       for node in nodenames:
4555         info = nodeinfo[node]
4556         info.Raise()
4557         info = info.data
4558         if not info:
4559           raise errors.OpPrereqError("Cannot get current information"
4560                                      " from node '%s'" % node)
4561         vg_free = info.get('vg_free', None)
4562         if not isinstance(vg_free, int):
4563           raise errors.OpPrereqError("Can't compute free disk space on"
4564                                      " node %s" % node)
4565         if req_size > info['vg_free']:
4566           raise errors.OpPrereqError("Not enough disk space on target node %s."
4567                                      " %d MB available, %d MB required" %
4568                                      (node, info['vg_free'], req_size))
4569
4570     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4571
4572     # os verification
4573     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4574     result.Raise()
4575     if not isinstance(result.data, objects.OS):
4576       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4577                                  " primary node"  % self.op.os_type)
4578
4579     # bridge check on primary node
4580     bridges = [n.bridge for n in self.nics]
4581     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4582     result.Raise()
4583     if not result.data:
4584       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4585                                  " exist on destination node '%s'" %
4586                                  (",".join(bridges), pnode.name))
4587
4588     # memory check on primary node
4589     if self.op.start:
4590       _CheckNodeFreeMemory(self, self.pnode.name,
4591                            "creating instance %s" % self.op.instance_name,
4592                            self.be_full[constants.BE_MEMORY],
4593                            self.op.hypervisor)
4594
4595   def Exec(self, feedback_fn):
4596     """Create and add the instance to the cluster.
4597
4598     """
4599     instance = self.op.instance_name
4600     pnode_name = self.pnode.name
4601
4602     for nic in self.nics:
4603       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4604         nic.mac = self.cfg.GenerateMAC()
4605
4606     ht_kind = self.op.hypervisor
4607     if ht_kind in constants.HTS_REQ_PORT:
4608       network_port = self.cfg.AllocatePort()
4609     else:
4610       network_port = None
4611
4612     ##if self.op.vnc_bind_address is None:
4613     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4614
4615     # this is needed because os.path.join does not accept None arguments
4616     if self.op.file_storage_dir is None:
4617       string_file_storage_dir = ""
4618     else:
4619       string_file_storage_dir = self.op.file_storage_dir
4620
4621     # build the full file storage dir path
4622     file_storage_dir = os.path.normpath(os.path.join(
4623                                         self.cfg.GetFileStorageDir(),
4624                                         string_file_storage_dir, instance))
4625
4626
4627     disks = _GenerateDiskTemplate(self,
4628                                   self.op.disk_template,
4629                                   instance, pnode_name,
4630                                   self.secondaries,
4631                                   self.disks,
4632                                   file_storage_dir,
4633                                   self.op.file_driver,
4634                                   0)
4635
4636     iobj = objects.Instance(name=instance, os=self.op.os_type,
4637                             primary_node=pnode_name,
4638                             nics=self.nics, disks=disks,
4639                             disk_template=self.op.disk_template,
4640                             admin_up=False,
4641                             network_port=network_port,
4642                             beparams=self.op.beparams,
4643                             hvparams=self.op.hvparams,
4644                             hypervisor=self.op.hypervisor,
4645                             )
4646
4647     feedback_fn("* creating instance disks...")
4648     try:
4649       _CreateDisks(self, iobj)
4650     except errors.OpExecError:
4651       self.LogWarning("Device creation failed, reverting...")
4652       try:
4653         _RemoveDisks(self, iobj)
4654       finally:
4655         self.cfg.ReleaseDRBDMinors(instance)
4656         raise
4657
4658     feedback_fn("adding instance %s to cluster config" % instance)
4659
4660     self.cfg.AddInstance(iobj)
4661     # Declare that we don't want to remove the instance lock anymore, as we've
4662     # added the instance to the config
4663     del self.remove_locks[locking.LEVEL_INSTANCE]
4664     # Unlock all the nodes
4665     if self.op.mode == constants.INSTANCE_IMPORT:
4666       nodes_keep = [self.op.src_node]
4667       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4668                        if node != self.op.src_node]
4669       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4670       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4671     else:
4672       self.context.glm.release(locking.LEVEL_NODE)
4673       del self.acquired_locks[locking.LEVEL_NODE]
4674
4675     if self.op.wait_for_sync:
4676       disk_abort = not _WaitForSync(self, iobj)
4677     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4678       # make sure the disks are not degraded (still sync-ing is ok)
4679       time.sleep(15)
4680       feedback_fn("* checking mirrors status")
4681       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4682     else:
4683       disk_abort = False
4684
4685     if disk_abort:
4686       _RemoveDisks(self, iobj)
4687       self.cfg.RemoveInstance(iobj.name)
4688       # Make sure the instance lock gets removed
4689       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4690       raise errors.OpExecError("There are some degraded disks for"
4691                                " this instance")
4692
4693     feedback_fn("creating os for instance %s on node %s" %
4694                 (instance, pnode_name))
4695
4696     if iobj.disk_template != constants.DT_DISKLESS:
4697       if self.op.mode == constants.INSTANCE_CREATE:
4698         feedback_fn("* running the instance OS create scripts...")
4699         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4700         msg = result.RemoteFailMsg()
4701         if msg:
4702           raise errors.OpExecError("Could not add os for instance %s"
4703                                    " on node %s: %s" %
4704                                    (instance, pnode_name, msg))
4705
4706       elif self.op.mode == constants.INSTANCE_IMPORT:
4707         feedback_fn("* running the instance OS import scripts...")
4708         src_node = self.op.src_node
4709         src_images = self.src_images
4710         cluster_name = self.cfg.GetClusterName()
4711         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4712                                                          src_node, src_images,
4713                                                          cluster_name)
4714         import_result.Raise()
4715         for idx, result in enumerate(import_result.data):
4716           if not result:
4717             self.LogWarning("Could not import the image %s for instance"
4718                             " %s, disk %d, on node %s" %
4719                             (src_images[idx], instance, idx, pnode_name))
4720       else:
4721         # also checked in the prereq part
4722         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4723                                      % self.op.mode)
4724
4725     if self.op.start:
4726       iobj.admin_up = True
4727       self.cfg.Update(iobj)
4728       logging.info("Starting instance %s on node %s", instance, pnode_name)
4729       feedback_fn("* starting instance...")
4730       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4731       msg = result.RemoteFailMsg()
4732       if msg:
4733         raise errors.OpExecError("Could not start instance: %s" % msg)
4734
4735
4736 class LUConnectConsole(NoHooksLU):
4737   """Connect to an instance's console.
4738
4739   This is somewhat special in that it returns the command line that
4740   you need to run on the master node in order to connect to the
4741   console.
4742
4743   """
4744   _OP_REQP = ["instance_name"]
4745   REQ_BGL = False
4746
4747   def ExpandNames(self):
4748     self._ExpandAndLockInstance()
4749
4750   def CheckPrereq(self):
4751     """Check prerequisites.
4752
4753     This checks that the instance is in the cluster.
4754
4755     """
4756     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4757     assert self.instance is not None, \
4758       "Cannot retrieve locked instance %s" % self.op.instance_name
4759     _CheckNodeOnline(self, self.instance.primary_node)
4760
4761   def Exec(self, feedback_fn):
4762     """Connect to the console of an instance
4763
4764     """
4765     instance = self.instance
4766     node = instance.primary_node
4767
4768     node_insts = self.rpc.call_instance_list([node],
4769                                              [instance.hypervisor])[node]
4770     node_insts.Raise()
4771
4772     if instance.name not in node_insts.data:
4773       raise errors.OpExecError("Instance %s is not running." % instance.name)
4774
4775     logging.debug("Connecting to console of %s on %s", instance.name, node)
4776
4777     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4778     cluster = self.cfg.GetClusterInfo()
4779     # beparams and hvparams are passed separately, to avoid editing the
4780     # instance and then saving the defaults in the instance itself.
4781     hvparams = cluster.FillHV(instance)
4782     beparams = cluster.FillBE(instance)
4783     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4784
4785     # build ssh cmdline
4786     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4787
4788
4789 class LUReplaceDisks(LogicalUnit):
4790   """Replace the disks of an instance.
4791
4792   """
4793   HPATH = "mirrors-replace"
4794   HTYPE = constants.HTYPE_INSTANCE
4795   _OP_REQP = ["instance_name", "mode", "disks"]
4796   REQ_BGL = False
4797
4798   def CheckArguments(self):
4799     if not hasattr(self.op, "remote_node"):
4800       self.op.remote_node = None
4801     if not hasattr(self.op, "iallocator"):
4802       self.op.iallocator = None
4803
4804     # check for valid parameter combination
4805     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4806     if self.op.mode == constants.REPLACE_DISK_CHG:
4807       if cnt == 2:
4808         raise errors.OpPrereqError("When changing the secondary either an"
4809                                    " iallocator script must be used or the"
4810                                    " new node given")
4811       elif cnt == 0:
4812         raise errors.OpPrereqError("Give either the iallocator or the new"
4813                                    " secondary, not both")
4814     else: # not replacing the secondary
4815       if cnt != 2:
4816         raise errors.OpPrereqError("The iallocator and new node options can"
4817                                    " be used only when changing the"
4818                                    " secondary node")
4819
4820   def ExpandNames(self):
4821     self._ExpandAndLockInstance()
4822
4823     if self.op.iallocator is not None:
4824       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4825     elif self.op.remote_node is not None:
4826       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4827       if remote_node is None:
4828         raise errors.OpPrereqError("Node '%s' not known" %
4829                                    self.op.remote_node)
4830       self.op.remote_node = remote_node
4831       # Warning: do not remove the locking of the new secondary here
4832       # unless DRBD8.AddChildren is changed to work in parallel;
4833       # currently it doesn't since parallel invocations of
4834       # FindUnusedMinor will conflict
4835       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4836       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4837     else:
4838       self.needed_locks[locking.LEVEL_NODE] = []
4839       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4840
4841   def DeclareLocks(self, level):
4842     # If we're not already locking all nodes in the set we have to declare the
4843     # instance's primary/secondary nodes.
4844     if (level == locking.LEVEL_NODE and
4845         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4846       self._LockInstancesNodes()
4847
4848   def _RunAllocator(self):
4849     """Compute a new secondary node using an IAllocator.
4850
4851     """
4852     ial = IAllocator(self,
4853                      mode=constants.IALLOCATOR_MODE_RELOC,
4854                      name=self.op.instance_name,
4855                      relocate_from=[self.sec_node])
4856
4857     ial.Run(self.op.iallocator)
4858
4859     if not ial.success:
4860       raise errors.OpPrereqError("Can't compute nodes using"
4861                                  " iallocator '%s': %s" % (self.op.iallocator,
4862                                                            ial.info))
4863     if len(ial.nodes) != ial.required_nodes:
4864       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4865                                  " of nodes (%s), required %s" %
4866                                  (len(ial.nodes), ial.required_nodes))
4867     self.op.remote_node = ial.nodes[0]
4868     self.LogInfo("Selected new secondary for the instance: %s",
4869                  self.op.remote_node)
4870
4871   def BuildHooksEnv(self):
4872     """Build hooks env.
4873
4874     This runs on the master, the primary and all the secondaries.
4875
4876     """
4877     env = {
4878       "MODE": self.op.mode,
4879       "NEW_SECONDARY": self.op.remote_node,
4880       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4881       }
4882     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4883     nl = [
4884       self.cfg.GetMasterNode(),
4885       self.instance.primary_node,
4886       ]
4887     if self.op.remote_node is not None:
4888       nl.append(self.op.remote_node)
4889     return env, nl, nl
4890
4891   def CheckPrereq(self):
4892     """Check prerequisites.
4893
4894     This checks that the instance is in the cluster.
4895
4896     """
4897     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4898     assert instance is not None, \
4899       "Cannot retrieve locked instance %s" % self.op.instance_name
4900     self.instance = instance
4901
4902     if instance.disk_template != constants.DT_DRBD8:
4903       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4904                                  " instances")
4905
4906     if len(instance.secondary_nodes) != 1:
4907       raise errors.OpPrereqError("The instance has a strange layout,"
4908                                  " expected one secondary but found %d" %
4909                                  len(instance.secondary_nodes))
4910
4911     self.sec_node = instance.secondary_nodes[0]
4912
4913     if self.op.iallocator is not None:
4914       self._RunAllocator()
4915
4916     remote_node = self.op.remote_node
4917     if remote_node is not None:
4918       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4919       assert self.remote_node_info is not None, \
4920         "Cannot retrieve locked node %s" % remote_node
4921     else:
4922       self.remote_node_info = None
4923     if remote_node == instance.primary_node:
4924       raise errors.OpPrereqError("The specified node is the primary node of"
4925                                  " the instance.")
4926     elif remote_node == self.sec_node:
4927       raise errors.OpPrereqError("The specified node is already the"
4928                                  " secondary node of the instance.")
4929
4930     if self.op.mode == constants.REPLACE_DISK_PRI:
4931       n1 = self.tgt_node = instance.primary_node
4932       n2 = self.oth_node = self.sec_node
4933     elif self.op.mode == constants.REPLACE_DISK_SEC:
4934       n1 = self.tgt_node = self.sec_node
4935       n2 = self.oth_node = instance.primary_node
4936     elif self.op.mode == constants.REPLACE_DISK_CHG:
4937       n1 = self.new_node = remote_node
4938       n2 = self.oth_node = instance.primary_node
4939       self.tgt_node = self.sec_node
4940       _CheckNodeNotDrained(self, remote_node)
4941     else:
4942       raise errors.ProgrammerError("Unhandled disk replace mode")
4943
4944     _CheckNodeOnline(self, n1)
4945     _CheckNodeOnline(self, n2)
4946
4947     if not self.op.disks:
4948       self.op.disks = range(len(instance.disks))
4949
4950     for disk_idx in self.op.disks:
4951       instance.FindDisk(disk_idx)
4952
4953   def _ExecD8DiskOnly(self, feedback_fn):
4954     """Replace a disk on the primary or secondary for dbrd8.
4955
4956     The algorithm for replace is quite complicated:
4957
4958       1. for each disk to be replaced:
4959
4960         1. create new LVs on the target node with unique names
4961         1. detach old LVs from the drbd device
4962         1. rename old LVs to name_replaced.<time_t>
4963         1. rename new LVs to old LVs
4964         1. attach the new LVs (with the old names now) to the drbd device
4965
4966       1. wait for sync across all devices
4967
4968       1. for each modified disk:
4969
4970         1. remove old LVs (which have the name name_replaces.<time_t>)
4971
4972     Failures are not very well handled.
4973
4974     """
4975     steps_total = 6
4976     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4977     instance = self.instance
4978     iv_names = {}
4979     vgname = self.cfg.GetVGName()
4980     # start of work
4981     cfg = self.cfg
4982     tgt_node = self.tgt_node
4983     oth_node = self.oth_node
4984
4985     # Step: check device activation
4986     self.proc.LogStep(1, steps_total, "check device existence")
4987     info("checking volume groups")
4988     my_vg = cfg.GetVGName()
4989     results = self.rpc.call_vg_list([oth_node, tgt_node])
4990     if not results:
4991       raise errors.OpExecError("Can't list volume groups on the nodes")
4992     for node in oth_node, tgt_node:
4993       res = results[node]
4994       if res.failed or not res.data or my_vg not in res.data:
4995         raise errors.OpExecError("Volume group '%s' not found on %s" %
4996                                  (my_vg, node))
4997     for idx, dev in enumerate(instance.disks):
4998       if idx not in self.op.disks:
4999         continue
5000       for node in tgt_node, oth_node:
5001         info("checking disk/%d on %s" % (idx, node))
5002         cfg.SetDiskID(dev, node)
5003         result = self.rpc.call_blockdev_find(node, dev)
5004         msg = result.RemoteFailMsg()
5005         if not msg and not result.payload:
5006           msg = "disk not found"
5007         if msg:
5008           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5009                                    (idx, node, msg))
5010
5011     # Step: check other node consistency
5012     self.proc.LogStep(2, steps_total, "check peer consistency")
5013     for idx, dev in enumerate(instance.disks):
5014       if idx not in self.op.disks:
5015         continue
5016       info("checking disk/%d consistency on %s" % (idx, oth_node))
5017       if not _CheckDiskConsistency(self, dev, oth_node,
5018                                    oth_node==instance.primary_node):
5019         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5020                                  " to replace disks on this node (%s)" %
5021                                  (oth_node, tgt_node))
5022
5023     # Step: create new storage
5024     self.proc.LogStep(3, steps_total, "allocate new storage")
5025     for idx, dev in enumerate(instance.disks):
5026       if idx not in self.op.disks:
5027         continue
5028       size = dev.size
5029       cfg.SetDiskID(dev, tgt_node)
5030       lv_names = [".disk%d_%s" % (idx, suf)
5031                   for suf in ["data", "meta"]]
5032       names = _GenerateUniqueNames(self, lv_names)
5033       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5034                              logical_id=(vgname, names[0]))
5035       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5036                              logical_id=(vgname, names[1]))
5037       new_lvs = [lv_data, lv_meta]
5038       old_lvs = dev.children
5039       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5040       info("creating new local storage on %s for %s" %
5041            (tgt_node, dev.iv_name))
5042       # we pass force_create=True to force the LVM creation
5043       for new_lv in new_lvs:
5044         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5045                         _GetInstanceInfoText(instance), False)
5046
5047     # Step: for each lv, detach+rename*2+attach
5048     self.proc.LogStep(4, steps_total, "change drbd configuration")
5049     for dev, old_lvs, new_lvs in iv_names.itervalues():
5050       info("detaching %s drbd from local storage" % dev.iv_name)
5051       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5052       result.Raise()
5053       if not result.data:
5054         raise errors.OpExecError("Can't detach drbd from local storage on node"
5055                                  " %s for device %s" % (tgt_node, dev.iv_name))
5056       #dev.children = []
5057       #cfg.Update(instance)
5058
5059       # ok, we created the new LVs, so now we know we have the needed
5060       # storage; as such, we proceed on the target node to rename
5061       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5062       # using the assumption that logical_id == physical_id (which in
5063       # turn is the unique_id on that node)
5064
5065       # FIXME(iustin): use a better name for the replaced LVs
5066       temp_suffix = int(time.time())
5067       ren_fn = lambda d, suff: (d.physical_id[0],
5068                                 d.physical_id[1] + "_replaced-%s" % suff)
5069       # build the rename list based on what LVs exist on the node
5070       rlist = []
5071       for to_ren in old_lvs:
5072         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5073         if not result.RemoteFailMsg() and result.payload:
5074           # device exists
5075           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5076
5077       info("renaming the old LVs on the target node")
5078       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5079       result.Raise()
5080       if not result.data:
5081         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5082       # now we rename the new LVs to the old LVs
5083       info("renaming the new LVs on the target node")
5084       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5085       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5086       result.Raise()
5087       if not result.data:
5088         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5089
5090       for old, new in zip(old_lvs, new_lvs):
5091         new.logical_id = old.logical_id
5092         cfg.SetDiskID(new, tgt_node)
5093
5094       for disk in old_lvs:
5095         disk.logical_id = ren_fn(disk, temp_suffix)
5096         cfg.SetDiskID(disk, tgt_node)
5097
5098       # now that the new lvs have the old name, we can add them to the device
5099       info("adding new mirror component on %s" % tgt_node)
5100       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5101       if result.failed or not result.data:
5102         for new_lv in new_lvs:
5103           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5104           if msg:
5105             warning("Can't rollback device %s: %s", dev, msg,
5106                     hint="cleanup manually the unused logical volumes")
5107         raise errors.OpExecError("Can't add local storage to drbd")
5108
5109       dev.children = new_lvs
5110       cfg.Update(instance)
5111
5112     # Step: wait for sync
5113
5114     # this can fail as the old devices are degraded and _WaitForSync
5115     # does a combined result over all disks, so we don't check its
5116     # return value
5117     self.proc.LogStep(5, steps_total, "sync devices")
5118     _WaitForSync(self, instance, unlock=True)
5119
5120     # so check manually all the devices
5121     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5122       cfg.SetDiskID(dev, instance.primary_node)
5123       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5124       msg = result.RemoteFailMsg()
5125       if not msg and not result.payload:
5126         msg = "disk not found"
5127       if msg:
5128         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5129                                  (name, msg))
5130       if result.payload[5]:
5131         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5132
5133     # Step: remove old storage
5134     self.proc.LogStep(6, steps_total, "removing old storage")
5135     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5136       info("remove logical volumes for %s" % name)
5137       for lv in old_lvs:
5138         cfg.SetDiskID(lv, tgt_node)
5139         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5140         if msg:
5141           warning("Can't remove old LV: %s" % msg,
5142                   hint="manually remove unused LVs")
5143           continue
5144
5145   def _ExecD8Secondary(self, feedback_fn):
5146     """Replace the secondary node for drbd8.
5147
5148     The algorithm for replace is quite complicated:
5149       - for all disks of the instance:
5150         - create new LVs on the new node with same names
5151         - shutdown the drbd device on the old secondary
5152         - disconnect the drbd network on the primary
5153         - create the drbd device on the new secondary
5154         - network attach the drbd on the primary, using an artifice:
5155           the drbd code for Attach() will connect to the network if it
5156           finds a device which is connected to the good local disks but
5157           not network enabled
5158       - wait for sync across all devices
5159       - remove all disks from the old secondary
5160
5161     Failures are not very well handled.
5162
5163     """
5164     steps_total = 6
5165     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5166     instance = self.instance
5167     iv_names = {}
5168     # start of work
5169     cfg = self.cfg
5170     old_node = self.tgt_node
5171     new_node = self.new_node
5172     pri_node = instance.primary_node
5173     nodes_ip = {
5174       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5175       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5176       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5177       }
5178
5179     # Step: check device activation
5180     self.proc.LogStep(1, steps_total, "check device existence")
5181     info("checking volume groups")
5182     my_vg = cfg.GetVGName()
5183     results = self.rpc.call_vg_list([pri_node, new_node])
5184     for node in pri_node, new_node:
5185       res = results[node]
5186       if res.failed or not res.data or my_vg not in res.data:
5187         raise errors.OpExecError("Volume group '%s' not found on %s" %
5188                                  (my_vg, node))
5189     for idx, dev in enumerate(instance.disks):
5190       if idx not in self.op.disks:
5191         continue
5192       info("checking disk/%d on %s" % (idx, pri_node))
5193       cfg.SetDiskID(dev, pri_node)
5194       result = self.rpc.call_blockdev_find(pri_node, dev)
5195       msg = result.RemoteFailMsg()
5196       if not msg and not result.payload:
5197         msg = "disk not found"
5198       if msg:
5199         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5200                                  (idx, pri_node, msg))
5201
5202     # Step: check other node consistency
5203     self.proc.LogStep(2, steps_total, "check peer consistency")
5204     for idx, dev in enumerate(instance.disks):
5205       if idx not in self.op.disks:
5206         continue
5207       info("checking disk/%d consistency on %s" % (idx, pri_node))
5208       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5209         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5210                                  " unsafe to replace the secondary" %
5211                                  pri_node)
5212
5213     # Step: create new storage
5214     self.proc.LogStep(3, steps_total, "allocate new storage")
5215     for idx, dev in enumerate(instance.disks):
5216       info("adding new local storage on %s for disk/%d" %
5217            (new_node, idx))
5218       # we pass force_create=True to force LVM creation
5219       for new_lv in dev.children:
5220         _CreateBlockDev(self, new_node, instance, new_lv, True,
5221                         _GetInstanceInfoText(instance), False)
5222
5223     # Step 4: dbrd minors and drbd setups changes
5224     # after this, we must manually remove the drbd minors on both the
5225     # error and the success paths
5226     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5227                                    instance.name)
5228     logging.debug("Allocated minors %s" % (minors,))
5229     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5230     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5231       size = dev.size
5232       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5233       # create new devices on new_node; note that we create two IDs:
5234       # one without port, so the drbd will be activated without
5235       # networking information on the new node at this stage, and one
5236       # with network, for the latter activation in step 4
5237       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5238       if pri_node == o_node1:
5239         p_minor = o_minor1
5240       else:
5241         p_minor = o_minor2
5242
5243       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5244       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5245
5246       iv_names[idx] = (dev, dev.children, new_net_id)
5247       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5248                     new_net_id)
5249       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5250                               logical_id=new_alone_id,
5251                               children=dev.children)
5252       try:
5253         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5254                               _GetInstanceInfoText(instance), False)
5255       except errors.BlockDeviceError:
5256         self.cfg.ReleaseDRBDMinors(instance.name)
5257         raise
5258
5259     for idx, dev in enumerate(instance.disks):
5260       # we have new devices, shutdown the drbd on the old secondary
5261       info("shutting down drbd for disk/%d on old node" % idx)
5262       cfg.SetDiskID(dev, old_node)
5263       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5264       if msg:
5265         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5266                 (idx, msg),
5267                 hint="Please cleanup this device manually as soon as possible")
5268
5269     info("detaching primary drbds from the network (=> standalone)")
5270     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5271                                                instance.disks)[pri_node]
5272
5273     msg = result.RemoteFailMsg()
5274     if msg:
5275       # detaches didn't succeed (unlikely)
5276       self.cfg.ReleaseDRBDMinors(instance.name)
5277       raise errors.OpExecError("Can't detach the disks from the network on"
5278                                " old node: %s" % (msg,))
5279
5280     # if we managed to detach at least one, we update all the disks of
5281     # the instance to point to the new secondary
5282     info("updating instance configuration")
5283     for dev, _, new_logical_id in iv_names.itervalues():
5284       dev.logical_id = new_logical_id
5285       cfg.SetDiskID(dev, pri_node)
5286     cfg.Update(instance)
5287
5288     # and now perform the drbd attach
5289     info("attaching primary drbds to new secondary (standalone => connected)")
5290     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5291                                            instance.disks, instance.name,
5292                                            False)
5293     for to_node, to_result in result.items():
5294       msg = to_result.RemoteFailMsg()
5295       if msg:
5296         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5297                 hint="please do a gnt-instance info to see the"
5298                 " status of disks")
5299
5300     # this can fail as the old devices are degraded and _WaitForSync
5301     # does a combined result over all disks, so we don't check its
5302     # return value
5303     self.proc.LogStep(5, steps_total, "sync devices")
5304     _WaitForSync(self, instance, unlock=True)
5305
5306     # so check manually all the devices
5307     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5308       cfg.SetDiskID(dev, pri_node)
5309       result = self.rpc.call_blockdev_find(pri_node, dev)
5310       msg = result.RemoteFailMsg()
5311       if not msg and not result.payload:
5312         msg = "disk not found"
5313       if msg:
5314         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5315                                  (idx, msg))
5316       if result.payload[5]:
5317         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5318
5319     self.proc.LogStep(6, steps_total, "removing old storage")
5320     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5321       info("remove logical volumes for disk/%d" % idx)
5322       for lv in old_lvs:
5323         cfg.SetDiskID(lv, old_node)
5324         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5325         if msg:
5326           warning("Can't remove LV on old secondary: %s", msg,
5327                   hint="Cleanup stale volumes by hand")
5328
5329   def Exec(self, feedback_fn):
5330     """Execute disk replacement.
5331
5332     This dispatches the disk replacement to the appropriate handler.
5333
5334     """
5335     instance = self.instance
5336
5337     # Activate the instance disks if we're replacing them on a down instance
5338     if not instance.admin_up:
5339       _StartInstanceDisks(self, instance, True)
5340
5341     if self.op.mode == constants.REPLACE_DISK_CHG:
5342       fn = self._ExecD8Secondary
5343     else:
5344       fn = self._ExecD8DiskOnly
5345
5346     ret = fn(feedback_fn)
5347
5348     # Deactivate the instance disks if we're replacing them on a down instance
5349     if not instance.admin_up:
5350       _SafeShutdownInstanceDisks(self, instance)
5351
5352     return ret
5353
5354
5355 class LUGrowDisk(LogicalUnit):
5356   """Grow a disk of an instance.
5357
5358   """
5359   HPATH = "disk-grow"
5360   HTYPE = constants.HTYPE_INSTANCE
5361   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5362   REQ_BGL = False
5363
5364   def ExpandNames(self):
5365     self._ExpandAndLockInstance()
5366     self.needed_locks[locking.LEVEL_NODE] = []
5367     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5368
5369   def DeclareLocks(self, level):
5370     if level == locking.LEVEL_NODE:
5371       self._LockInstancesNodes()
5372
5373   def BuildHooksEnv(self):
5374     """Build hooks env.
5375
5376     This runs on the master, the primary and all the secondaries.
5377
5378     """
5379     env = {
5380       "DISK": self.op.disk,
5381       "AMOUNT": self.op.amount,
5382       }
5383     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5384     nl = [
5385       self.cfg.GetMasterNode(),
5386       self.instance.primary_node,
5387       ]
5388     return env, nl, nl
5389
5390   def CheckPrereq(self):
5391     """Check prerequisites.
5392
5393     This checks that the instance is in the cluster.
5394
5395     """
5396     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5397     assert instance is not None, \
5398       "Cannot retrieve locked instance %s" % self.op.instance_name
5399     nodenames = list(instance.all_nodes)
5400     for node in nodenames:
5401       _CheckNodeOnline(self, node)
5402
5403
5404     self.instance = instance
5405
5406     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5407       raise errors.OpPrereqError("Instance's disk layout does not support"
5408                                  " growing.")
5409
5410     self.disk = instance.FindDisk(self.op.disk)
5411
5412     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5413                                        instance.hypervisor)
5414     for node in nodenames:
5415       info = nodeinfo[node]
5416       if info.failed or not info.data:
5417         raise errors.OpPrereqError("Cannot get current information"
5418                                    " from node '%s'" % node)
5419       vg_free = info.data.get('vg_free', None)
5420       if not isinstance(vg_free, int):
5421         raise errors.OpPrereqError("Can't compute free disk space on"
5422                                    " node %s" % node)
5423       if self.op.amount > vg_free:
5424         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5425                                    " %d MiB available, %d MiB required" %
5426                                    (node, vg_free, self.op.amount))
5427
5428   def Exec(self, feedback_fn):
5429     """Execute disk grow.
5430
5431     """
5432     instance = self.instance
5433     disk = self.disk
5434     for node in instance.all_nodes:
5435       self.cfg.SetDiskID(disk, node)
5436       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5437       msg = result.RemoteFailMsg()
5438       if msg:
5439         raise errors.OpExecError("Grow request failed to node %s: %s" %
5440                                  (node, msg))
5441     disk.RecordGrow(self.op.amount)
5442     self.cfg.Update(instance)
5443     if self.op.wait_for_sync:
5444       disk_abort = not _WaitForSync(self, instance)
5445       if disk_abort:
5446         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5447                              " status.\nPlease check the instance.")
5448
5449
5450 class LUQueryInstanceData(NoHooksLU):
5451   """Query runtime instance data.
5452
5453   """
5454   _OP_REQP = ["instances", "static"]
5455   REQ_BGL = False
5456
5457   def ExpandNames(self):
5458     self.needed_locks = {}
5459     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5460
5461     if not isinstance(self.op.instances, list):
5462       raise errors.OpPrereqError("Invalid argument type 'instances'")
5463
5464     if self.op.instances:
5465       self.wanted_names = []
5466       for name in self.op.instances:
5467         full_name = self.cfg.ExpandInstanceName(name)
5468         if full_name is None:
5469           raise errors.OpPrereqError("Instance '%s' not known" % name)
5470         self.wanted_names.append(full_name)
5471       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5472     else:
5473       self.wanted_names = None
5474       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5475
5476     self.needed_locks[locking.LEVEL_NODE] = []
5477     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5478
5479   def DeclareLocks(self, level):
5480     if level == locking.LEVEL_NODE:
5481       self._LockInstancesNodes()
5482
5483   def CheckPrereq(self):
5484     """Check prerequisites.
5485
5486     This only checks the optional instance list against the existing names.
5487
5488     """
5489     if self.wanted_names is None:
5490       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5491
5492     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5493                              in self.wanted_names]
5494     return
5495
5496   def _ComputeDiskStatus(self, instance, snode, dev):
5497     """Compute block device status.
5498
5499     """
5500     static = self.op.static
5501     if not static:
5502       self.cfg.SetDiskID(dev, instance.primary_node)
5503       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5504       msg = dev_pstatus.RemoteFailMsg()
5505       if msg:
5506         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5507                                  (instance.name, msg))
5508       dev_pstatus = dev_pstatus.payload
5509     else:
5510       dev_pstatus = None
5511
5512     if dev.dev_type in constants.LDS_DRBD:
5513       # we change the snode then (otherwise we use the one passed in)
5514       if dev.logical_id[0] == instance.primary_node:
5515         snode = dev.logical_id[1]
5516       else:
5517         snode = dev.logical_id[0]
5518
5519     if snode and not static:
5520       self.cfg.SetDiskID(dev, snode)
5521       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5522       msg = dev_sstatus.RemoteFailMsg()
5523       if msg:
5524         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5525                                  (instance.name, msg))
5526       dev_sstatus = dev_sstatus.payload
5527     else:
5528       dev_sstatus = None
5529
5530     if dev.children:
5531       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5532                       for child in dev.children]
5533     else:
5534       dev_children = []
5535
5536     data = {
5537       "iv_name": dev.iv_name,
5538       "dev_type": dev.dev_type,
5539       "logical_id": dev.logical_id,
5540       "physical_id": dev.physical_id,
5541       "pstatus": dev_pstatus,
5542       "sstatus": dev_sstatus,
5543       "children": dev_children,
5544       "mode": dev.mode,
5545       }
5546
5547     return data
5548
5549   def Exec(self, feedback_fn):
5550     """Gather and return data"""
5551     result = {}
5552
5553     cluster = self.cfg.GetClusterInfo()
5554
5555     for instance in self.wanted_instances:
5556       if not self.op.static:
5557         remote_info = self.rpc.call_instance_info(instance.primary_node,
5558                                                   instance.name,
5559                                                   instance.hypervisor)
5560         remote_info.Raise()
5561         remote_info = remote_info.data
5562         if remote_info and "state" in remote_info:
5563           remote_state = "up"
5564         else:
5565           remote_state = "down"
5566       else:
5567         remote_state = None
5568       if instance.admin_up:
5569         config_state = "up"
5570       else:
5571         config_state = "down"
5572
5573       disks = [self._ComputeDiskStatus(instance, None, device)
5574                for device in instance.disks]
5575
5576       idict = {
5577         "name": instance.name,
5578         "config_state": config_state,
5579         "run_state": remote_state,
5580         "pnode": instance.primary_node,
5581         "snodes": instance.secondary_nodes,
5582         "os": instance.os,
5583         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5584         "disks": disks,
5585         "hypervisor": instance.hypervisor,
5586         "network_port": instance.network_port,
5587         "hv_instance": instance.hvparams,
5588         "hv_actual": cluster.FillHV(instance),
5589         "be_instance": instance.beparams,
5590         "be_actual": cluster.FillBE(instance),
5591         }
5592
5593       result[instance.name] = idict
5594
5595     return result
5596
5597
5598 class LUSetInstanceParams(LogicalUnit):
5599   """Modifies an instances's parameters.
5600
5601   """
5602   HPATH = "instance-modify"
5603   HTYPE = constants.HTYPE_INSTANCE
5604   _OP_REQP = ["instance_name"]
5605   REQ_BGL = False
5606
5607   def CheckArguments(self):
5608     if not hasattr(self.op, 'nics'):
5609       self.op.nics = []
5610     if not hasattr(self.op, 'disks'):
5611       self.op.disks = []
5612     if not hasattr(self.op, 'beparams'):
5613       self.op.beparams = {}
5614     if not hasattr(self.op, 'hvparams'):
5615       self.op.hvparams = {}
5616     self.op.force = getattr(self.op, "force", False)
5617     if not (self.op.nics or self.op.disks or
5618             self.op.hvparams or self.op.beparams):
5619       raise errors.OpPrereqError("No changes submitted")
5620
5621     # Disk validation
5622     disk_addremove = 0
5623     for disk_op, disk_dict in self.op.disks:
5624       if disk_op == constants.DDM_REMOVE:
5625         disk_addremove += 1
5626         continue
5627       elif disk_op == constants.DDM_ADD:
5628         disk_addremove += 1
5629       else:
5630         if not isinstance(disk_op, int):
5631           raise errors.OpPrereqError("Invalid disk index")
5632       if disk_op == constants.DDM_ADD:
5633         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5634         if mode not in constants.DISK_ACCESS_SET:
5635           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5636         size = disk_dict.get('size', None)
5637         if size is None:
5638           raise errors.OpPrereqError("Required disk parameter size missing")
5639         try:
5640           size = int(size)
5641         except ValueError, err:
5642           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5643                                      str(err))
5644         disk_dict['size'] = size
5645       else:
5646         # modification of disk
5647         if 'size' in disk_dict:
5648           raise errors.OpPrereqError("Disk size change not possible, use"
5649                                      " grow-disk")
5650
5651     if disk_addremove > 1:
5652       raise errors.OpPrereqError("Only one disk add or remove operation"
5653                                  " supported at a time")
5654
5655     # NIC validation
5656     nic_addremove = 0
5657     for nic_op, nic_dict in self.op.nics:
5658       if nic_op == constants.DDM_REMOVE:
5659         nic_addremove += 1
5660         continue
5661       elif nic_op == constants.DDM_ADD:
5662         nic_addremove += 1
5663       else:
5664         if not isinstance(nic_op, int):
5665           raise errors.OpPrereqError("Invalid nic index")
5666
5667       # nic_dict should be a dict
5668       nic_ip = nic_dict.get('ip', None)
5669       if nic_ip is not None:
5670         if nic_ip.lower() == "none":
5671           nic_dict['ip'] = None
5672         else:
5673           if not utils.IsValidIP(nic_ip):
5674             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5675       # we can only check None bridges and assign the default one
5676       nic_bridge = nic_dict.get('bridge', None)
5677       if nic_bridge is None:
5678         nic_dict['bridge'] = self.cfg.GetDefBridge()
5679       # but we can validate MACs
5680       nic_mac = nic_dict.get('mac', None)
5681       if nic_mac is not None:
5682         if self.cfg.IsMacInUse(nic_mac):
5683           raise errors.OpPrereqError("MAC address %s already in use"
5684                                      " in cluster" % nic_mac)
5685         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5686           if not utils.IsValidMac(nic_mac):
5687             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5688     if nic_addremove > 1:
5689       raise errors.OpPrereqError("Only one NIC add or remove operation"
5690                                  " supported at a time")
5691
5692   def ExpandNames(self):
5693     self._ExpandAndLockInstance()
5694     self.needed_locks[locking.LEVEL_NODE] = []
5695     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5696
5697   def DeclareLocks(self, level):
5698     if level == locking.LEVEL_NODE:
5699       self._LockInstancesNodes()
5700
5701   def BuildHooksEnv(self):
5702     """Build hooks env.
5703
5704     This runs on the master, primary and secondaries.
5705
5706     """
5707     args = dict()
5708     if constants.BE_MEMORY in self.be_new:
5709       args['memory'] = self.be_new[constants.BE_MEMORY]
5710     if constants.BE_VCPUS in self.be_new:
5711       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5712     # FIXME: readd disk/nic changes
5713     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5714     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5715     return env, nl, nl
5716
5717   def CheckPrereq(self):
5718     """Check prerequisites.
5719
5720     This only checks the instance list against the existing names.
5721
5722     """
5723     force = self.force = self.op.force
5724
5725     # checking the new params on the primary/secondary nodes
5726
5727     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5728     assert self.instance is not None, \
5729       "Cannot retrieve locked instance %s" % self.op.instance_name
5730     pnode = instance.primary_node
5731     nodelist = list(instance.all_nodes)
5732
5733     # hvparams processing
5734     if self.op.hvparams:
5735       i_hvdict = copy.deepcopy(instance.hvparams)
5736       for key, val in self.op.hvparams.iteritems():
5737         if val == constants.VALUE_DEFAULT:
5738           try:
5739             del i_hvdict[key]
5740           except KeyError:
5741             pass
5742         else:
5743           i_hvdict[key] = val
5744       cluster = self.cfg.GetClusterInfo()
5745       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5746       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5747                                 i_hvdict)
5748       # local check
5749       hypervisor.GetHypervisor(
5750         instance.hypervisor).CheckParameterSyntax(hv_new)
5751       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5752       self.hv_new = hv_new # the new actual values
5753       self.hv_inst = i_hvdict # the new dict (without defaults)
5754     else:
5755       self.hv_new = self.hv_inst = {}
5756
5757     # beparams processing
5758     if self.op.beparams:
5759       i_bedict = copy.deepcopy(instance.beparams)
5760       for key, val in self.op.beparams.iteritems():
5761         if val == constants.VALUE_DEFAULT:
5762           try:
5763             del i_bedict[key]
5764           except KeyError:
5765             pass
5766         else:
5767           i_bedict[key] = val
5768       cluster = self.cfg.GetClusterInfo()
5769       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5770       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5771                                 i_bedict)
5772       self.be_new = be_new # the new actual values
5773       self.be_inst = i_bedict # the new dict (without defaults)
5774     else:
5775       self.be_new = self.be_inst = {}
5776
5777     self.warn = []
5778
5779     if constants.BE_MEMORY in self.op.beparams and not self.force:
5780       mem_check_list = [pnode]
5781       if be_new[constants.BE_AUTO_BALANCE]:
5782         # either we changed auto_balance to yes or it was from before
5783         mem_check_list.extend(instance.secondary_nodes)
5784       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5785                                                   instance.hypervisor)
5786       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5787                                          instance.hypervisor)
5788       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5789         # Assume the primary node is unreachable and go ahead
5790         self.warn.append("Can't get info from primary node %s" % pnode)
5791       else:
5792         if not instance_info.failed and instance_info.data:
5793           current_mem = instance_info.data['memory']
5794         else:
5795           # Assume instance not running
5796           # (there is a slight race condition here, but it's not very probable,
5797           # and we have no other way to check)
5798           current_mem = 0
5799         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5800                     nodeinfo[pnode].data['memory_free'])
5801         if miss_mem > 0:
5802           raise errors.OpPrereqError("This change will prevent the instance"
5803                                      " from starting, due to %d MB of memory"
5804                                      " missing on its primary node" % miss_mem)
5805
5806       if be_new[constants.BE_AUTO_BALANCE]:
5807         for node, nres in nodeinfo.iteritems():
5808           if node not in instance.secondary_nodes:
5809             continue
5810           if nres.failed or not isinstance(nres.data, dict):
5811             self.warn.append("Can't get info from secondary node %s" % node)
5812           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5813             self.warn.append("Not enough memory to failover instance to"
5814                              " secondary node %s" % node)
5815
5816     # NIC processing
5817     for nic_op, nic_dict in self.op.nics:
5818       if nic_op == constants.DDM_REMOVE:
5819         if not instance.nics:
5820           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5821         continue
5822       if nic_op != constants.DDM_ADD:
5823         # an existing nic
5824         if nic_op < 0 or nic_op >= len(instance.nics):
5825           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5826                                      " are 0 to %d" %
5827                                      (nic_op, len(instance.nics)))
5828       nic_bridge = nic_dict.get('bridge', None)
5829       if nic_bridge is not None:
5830         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5831           msg = ("Bridge '%s' doesn't exist on one of"
5832                  " the instance nodes" % nic_bridge)
5833           if self.force:
5834             self.warn.append(msg)
5835           else:
5836             raise errors.OpPrereqError(msg)
5837
5838     # DISK processing
5839     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5840       raise errors.OpPrereqError("Disk operations not supported for"
5841                                  " diskless instances")
5842     for disk_op, disk_dict in self.op.disks:
5843       if disk_op == constants.DDM_REMOVE:
5844         if len(instance.disks) == 1:
5845           raise errors.OpPrereqError("Cannot remove the last disk of"
5846                                      " an instance")
5847         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5848         ins_l = ins_l[pnode]
5849         if ins_l.failed or not isinstance(ins_l.data, list):
5850           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5851         if instance.name in ins_l.data:
5852           raise errors.OpPrereqError("Instance is running, can't remove"
5853                                      " disks.")
5854
5855       if (disk_op == constants.DDM_ADD and
5856           len(instance.nics) >= constants.MAX_DISKS):
5857         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5858                                    " add more" % constants.MAX_DISKS)
5859       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5860         # an existing disk
5861         if disk_op < 0 or disk_op >= len(instance.disks):
5862           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5863                                      " are 0 to %d" %
5864                                      (disk_op, len(instance.disks)))
5865
5866     return
5867
5868   def Exec(self, feedback_fn):
5869     """Modifies an instance.
5870
5871     All parameters take effect only at the next restart of the instance.
5872
5873     """
5874     # Process here the warnings from CheckPrereq, as we don't have a
5875     # feedback_fn there.
5876     for warn in self.warn:
5877       feedback_fn("WARNING: %s" % warn)
5878
5879     result = []
5880     instance = self.instance
5881     # disk changes
5882     for disk_op, disk_dict in self.op.disks:
5883       if disk_op == constants.DDM_REMOVE:
5884         # remove the last disk
5885         device = instance.disks.pop()
5886         device_idx = len(instance.disks)
5887         for node, disk in device.ComputeNodeTree(instance.primary_node):
5888           self.cfg.SetDiskID(disk, node)
5889           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5890           if msg:
5891             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5892                             " continuing anyway", device_idx, node, msg)
5893         result.append(("disk/%d" % device_idx, "remove"))
5894       elif disk_op == constants.DDM_ADD:
5895         # add a new disk
5896         if instance.disk_template == constants.DT_FILE:
5897           file_driver, file_path = instance.disks[0].logical_id
5898           file_path = os.path.dirname(file_path)
5899         else:
5900           file_driver = file_path = None
5901         disk_idx_base = len(instance.disks)
5902         new_disk = _GenerateDiskTemplate(self,
5903                                          instance.disk_template,
5904                                          instance.name, instance.primary_node,
5905                                          instance.secondary_nodes,
5906                                          [disk_dict],
5907                                          file_path,
5908                                          file_driver,
5909                                          disk_idx_base)[0]
5910         instance.disks.append(new_disk)
5911         info = _GetInstanceInfoText(instance)
5912
5913         logging.info("Creating volume %s for instance %s",
5914                      new_disk.iv_name, instance.name)
5915         # Note: this needs to be kept in sync with _CreateDisks
5916         #HARDCODE
5917         for node in instance.all_nodes:
5918           f_create = node == instance.primary_node
5919           try:
5920             _CreateBlockDev(self, node, instance, new_disk,
5921                             f_create, info, f_create)
5922           except errors.OpExecError, err:
5923             self.LogWarning("Failed to create volume %s (%s) on"
5924                             " node %s: %s",
5925                             new_disk.iv_name, new_disk, node, err)
5926         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5927                        (new_disk.size, new_disk.mode)))
5928       else:
5929         # change a given disk
5930         instance.disks[disk_op].mode = disk_dict['mode']
5931         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5932     # NIC changes
5933     for nic_op, nic_dict in self.op.nics:
5934       if nic_op == constants.DDM_REMOVE:
5935         # remove the last nic
5936         del instance.nics[-1]
5937         result.append(("nic.%d" % len(instance.nics), "remove"))
5938       elif nic_op == constants.DDM_ADD:
5939         # add a new nic
5940         if 'mac' not in nic_dict:
5941           mac = constants.VALUE_GENERATE
5942         else:
5943           mac = nic_dict['mac']
5944         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5945           mac = self.cfg.GenerateMAC()
5946         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5947                               bridge=nic_dict.get('bridge', None))
5948         instance.nics.append(new_nic)
5949         result.append(("nic.%d" % (len(instance.nics) - 1),
5950                        "add:mac=%s,ip=%s,bridge=%s" %
5951                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5952       else:
5953         # change a given nic
5954         for key in 'mac', 'ip', 'bridge':
5955           if key in nic_dict:
5956             setattr(instance.nics[nic_op], key, nic_dict[key])
5957             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5958
5959     # hvparams changes
5960     if self.op.hvparams:
5961       instance.hvparams = self.hv_inst
5962       for key, val in self.op.hvparams.iteritems():
5963         result.append(("hv/%s" % key, val))
5964
5965     # beparams changes
5966     if self.op.beparams:
5967       instance.beparams = self.be_inst
5968       for key, val in self.op.beparams.iteritems():
5969         result.append(("be/%s" % key, val))
5970
5971     self.cfg.Update(instance)
5972
5973     return result
5974
5975
5976 class LUQueryExports(NoHooksLU):
5977   """Query the exports list
5978
5979   """
5980   _OP_REQP = ['nodes']
5981   REQ_BGL = False
5982
5983   def ExpandNames(self):
5984     self.needed_locks = {}
5985     self.share_locks[locking.LEVEL_NODE] = 1
5986     if not self.op.nodes:
5987       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5988     else:
5989       self.needed_locks[locking.LEVEL_NODE] = \
5990         _GetWantedNodes(self, self.op.nodes)
5991
5992   def CheckPrereq(self):
5993     """Check prerequisites.
5994
5995     """
5996     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5997
5998   def Exec(self, feedback_fn):
5999     """Compute the list of all the exported system images.
6000
6001     @rtype: dict
6002     @return: a dictionary with the structure node->(export-list)
6003         where export-list is a list of the instances exported on
6004         that node.
6005
6006     """
6007     rpcresult = self.rpc.call_export_list(self.nodes)
6008     result = {}
6009     for node in rpcresult:
6010       if rpcresult[node].failed:
6011         result[node] = False
6012       else:
6013         result[node] = rpcresult[node].data
6014
6015     return result
6016
6017
6018 class LUExportInstance(LogicalUnit):
6019   """Export an instance to an image in the cluster.
6020
6021   """
6022   HPATH = "instance-export"
6023   HTYPE = constants.HTYPE_INSTANCE
6024   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6025   REQ_BGL = False
6026
6027   def ExpandNames(self):
6028     self._ExpandAndLockInstance()
6029     # FIXME: lock only instance primary and destination node
6030     #
6031     # Sad but true, for now we have do lock all nodes, as we don't know where
6032     # the previous export might be, and and in this LU we search for it and
6033     # remove it from its current node. In the future we could fix this by:
6034     #  - making a tasklet to search (share-lock all), then create the new one,
6035     #    then one to remove, after
6036     #  - removing the removal operation altoghether
6037     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6038
6039   def DeclareLocks(self, level):
6040     """Last minute lock declaration."""
6041     # All nodes are locked anyway, so nothing to do here.
6042
6043   def BuildHooksEnv(self):
6044     """Build hooks env.
6045
6046     This will run on the master, primary node and target node.
6047
6048     """
6049     env = {
6050       "EXPORT_NODE": self.op.target_node,
6051       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6052       }
6053     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6054     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6055           self.op.target_node]
6056     return env, nl, nl
6057
6058   def CheckPrereq(self):
6059     """Check prerequisites.
6060
6061     This checks that the instance and node names are valid.
6062
6063     """
6064     instance_name = self.op.instance_name
6065     self.instance = self.cfg.GetInstanceInfo(instance_name)
6066     assert self.instance is not None, \
6067           "Cannot retrieve locked instance %s" % self.op.instance_name
6068     _CheckNodeOnline(self, self.instance.primary_node)
6069
6070     self.dst_node = self.cfg.GetNodeInfo(
6071       self.cfg.ExpandNodeName(self.op.target_node))
6072
6073     if self.dst_node is None:
6074       # This is wrong node name, not a non-locked node
6075       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6076     _CheckNodeOnline(self, self.dst_node.name)
6077     _CheckNodeNotDrained(self, self.dst_node.name)
6078
6079     # instance disk type verification
6080     for disk in self.instance.disks:
6081       if disk.dev_type == constants.LD_FILE:
6082         raise errors.OpPrereqError("Export not supported for instances with"
6083                                    " file-based disks")
6084
6085   def Exec(self, feedback_fn):
6086     """Export an instance to an image in the cluster.
6087
6088     """
6089     instance = self.instance
6090     dst_node = self.dst_node
6091     src_node = instance.primary_node
6092     if self.op.shutdown:
6093       # shutdown the instance, but not the disks
6094       result = self.rpc.call_instance_shutdown(src_node, instance)
6095       msg = result.RemoteFailMsg()
6096       if msg:
6097         raise errors.OpExecError("Could not shutdown instance %s on"
6098                                  " node %s: %s" %
6099                                  (instance.name, src_node, msg))
6100
6101     vgname = self.cfg.GetVGName()
6102
6103     snap_disks = []
6104
6105     # set the disks ID correctly since call_instance_start needs the
6106     # correct drbd minor to create the symlinks
6107     for disk in instance.disks:
6108       self.cfg.SetDiskID(disk, src_node)
6109
6110     try:
6111       for disk in instance.disks:
6112         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6113         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6114         if new_dev_name.failed or not new_dev_name.data:
6115           self.LogWarning("Could not snapshot block device %s on node %s",
6116                           disk.logical_id[1], src_node)
6117           snap_disks.append(False)
6118         else:
6119           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6120                                  logical_id=(vgname, new_dev_name.data),
6121                                  physical_id=(vgname, new_dev_name.data),
6122                                  iv_name=disk.iv_name)
6123           snap_disks.append(new_dev)
6124
6125     finally:
6126       if self.op.shutdown and instance.admin_up:
6127         result = self.rpc.call_instance_start(src_node, instance, None)
6128         msg = result.RemoteFailMsg()
6129         if msg:
6130           _ShutdownInstanceDisks(self, instance)
6131           raise errors.OpExecError("Could not start instance: %s" % msg)
6132
6133     # TODO: check for size
6134
6135     cluster_name = self.cfg.GetClusterName()
6136     for idx, dev in enumerate(snap_disks):
6137       if dev:
6138         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6139                                                instance, cluster_name, idx)
6140         if result.failed or not result.data:
6141           self.LogWarning("Could not export block device %s from node %s to"
6142                           " node %s", dev.logical_id[1], src_node,
6143                           dst_node.name)
6144         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6145         if msg:
6146           self.LogWarning("Could not remove snapshot block device %s from node"
6147                           " %s: %s", dev.logical_id[1], src_node, msg)
6148
6149     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6150     if result.failed or not result.data:
6151       self.LogWarning("Could not finalize export for instance %s on node %s",
6152                       instance.name, dst_node.name)
6153
6154     nodelist = self.cfg.GetNodeList()
6155     nodelist.remove(dst_node.name)
6156
6157     # on one-node clusters nodelist will be empty after the removal
6158     # if we proceed the backup would be removed because OpQueryExports
6159     # substitutes an empty list with the full cluster node list.
6160     if nodelist:
6161       exportlist = self.rpc.call_export_list(nodelist)
6162       for node in exportlist:
6163         if exportlist[node].failed:
6164           continue
6165         if instance.name in exportlist[node].data:
6166           if not self.rpc.call_export_remove(node, instance.name):
6167             self.LogWarning("Could not remove older export for instance %s"
6168                             " on node %s", instance.name, node)
6169
6170
6171 class LURemoveExport(NoHooksLU):
6172   """Remove exports related to the named instance.
6173
6174   """
6175   _OP_REQP = ["instance_name"]
6176   REQ_BGL = False
6177
6178   def ExpandNames(self):
6179     self.needed_locks = {}
6180     # We need all nodes to be locked in order for RemoveExport to work, but we
6181     # don't need to lock the instance itself, as nothing will happen to it (and
6182     # we can remove exports also for a removed instance)
6183     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6184
6185   def CheckPrereq(self):
6186     """Check prerequisites.
6187     """
6188     pass
6189
6190   def Exec(self, feedback_fn):
6191     """Remove any export.
6192
6193     """
6194     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6195     # If the instance was not found we'll try with the name that was passed in.
6196     # This will only work if it was an FQDN, though.
6197     fqdn_warn = False
6198     if not instance_name:
6199       fqdn_warn = True
6200       instance_name = self.op.instance_name
6201
6202     exportlist = self.rpc.call_export_list(self.acquired_locks[
6203       locking.LEVEL_NODE])
6204     found = False
6205     for node in exportlist:
6206       if exportlist[node].failed:
6207         self.LogWarning("Failed to query node %s, continuing" % node)
6208         continue
6209       if instance_name in exportlist[node].data:
6210         found = True
6211         result = self.rpc.call_export_remove(node, instance_name)
6212         if result.failed or not result.data:
6213           logging.error("Could not remove export for instance %s"
6214                         " on node %s", instance_name, node)
6215
6216     if fqdn_warn and not found:
6217       feedback_fn("Export not found. If trying to remove an export belonging"
6218                   " to a deleted instance please use its Fully Qualified"
6219                   " Domain Name.")
6220
6221
6222 class TagsLU(NoHooksLU):
6223   """Generic tags LU.
6224
6225   This is an abstract class which is the parent of all the other tags LUs.
6226
6227   """
6228
6229   def ExpandNames(self):
6230     self.needed_locks = {}
6231     if self.op.kind == constants.TAG_NODE:
6232       name = self.cfg.ExpandNodeName(self.op.name)
6233       if name is None:
6234         raise errors.OpPrereqError("Invalid node name (%s)" %
6235                                    (self.op.name,))
6236       self.op.name = name
6237       self.needed_locks[locking.LEVEL_NODE] = name
6238     elif self.op.kind == constants.TAG_INSTANCE:
6239       name = self.cfg.ExpandInstanceName(self.op.name)
6240       if name is None:
6241         raise errors.OpPrereqError("Invalid instance name (%s)" %
6242                                    (self.op.name,))
6243       self.op.name = name
6244       self.needed_locks[locking.LEVEL_INSTANCE] = name
6245
6246   def CheckPrereq(self):
6247     """Check prerequisites.
6248
6249     """
6250     if self.op.kind == constants.TAG_CLUSTER:
6251       self.target = self.cfg.GetClusterInfo()
6252     elif self.op.kind == constants.TAG_NODE:
6253       self.target = self.cfg.GetNodeInfo(self.op.name)
6254     elif self.op.kind == constants.TAG_INSTANCE:
6255       self.target = self.cfg.GetInstanceInfo(self.op.name)
6256     else:
6257       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6258                                  str(self.op.kind))
6259
6260
6261 class LUGetTags(TagsLU):
6262   """Returns the tags of a given object.
6263
6264   """
6265   _OP_REQP = ["kind", "name"]
6266   REQ_BGL = False
6267
6268   def Exec(self, feedback_fn):
6269     """Returns the tag list.
6270
6271     """
6272     return list(self.target.GetTags())
6273
6274
6275 class LUSearchTags(NoHooksLU):
6276   """Searches the tags for a given pattern.
6277
6278   """
6279   _OP_REQP = ["pattern"]
6280   REQ_BGL = False
6281
6282   def ExpandNames(self):
6283     self.needed_locks = {}
6284
6285   def CheckPrereq(self):
6286     """Check prerequisites.
6287
6288     This checks the pattern passed for validity by compiling it.
6289
6290     """
6291     try:
6292       self.re = re.compile(self.op.pattern)
6293     except re.error, err:
6294       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6295                                  (self.op.pattern, err))
6296
6297   def Exec(self, feedback_fn):
6298     """Returns the tag list.
6299
6300     """
6301     cfg = self.cfg
6302     tgts = [("/cluster", cfg.GetClusterInfo())]
6303     ilist = cfg.GetAllInstancesInfo().values()
6304     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6305     nlist = cfg.GetAllNodesInfo().values()
6306     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6307     results = []
6308     for path, target in tgts:
6309       for tag in target.GetTags():
6310         if self.re.search(tag):
6311           results.append((path, tag))
6312     return results
6313
6314
6315 class LUAddTags(TagsLU):
6316   """Sets a tag on a given object.
6317
6318   """
6319   _OP_REQP = ["kind", "name", "tags"]
6320   REQ_BGL = False
6321
6322   def CheckPrereq(self):
6323     """Check prerequisites.
6324
6325     This checks the type and length of the tag name and value.
6326
6327     """
6328     TagsLU.CheckPrereq(self)
6329     for tag in self.op.tags:
6330       objects.TaggableObject.ValidateTag(tag)
6331
6332   def Exec(self, feedback_fn):
6333     """Sets the tag.
6334
6335     """
6336     try:
6337       for tag in self.op.tags:
6338         self.target.AddTag(tag)
6339     except errors.TagError, err:
6340       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6341     try:
6342       self.cfg.Update(self.target)
6343     except errors.ConfigurationError:
6344       raise errors.OpRetryError("There has been a modification to the"
6345                                 " config file and the operation has been"
6346                                 " aborted. Please retry.")
6347
6348
6349 class LUDelTags(TagsLU):
6350   """Delete a list of tags from a given object.
6351
6352   """
6353   _OP_REQP = ["kind", "name", "tags"]
6354   REQ_BGL = False
6355
6356   def CheckPrereq(self):
6357     """Check prerequisites.
6358
6359     This checks that we have the given tag.
6360
6361     """
6362     TagsLU.CheckPrereq(self)
6363     for tag in self.op.tags:
6364       objects.TaggableObject.ValidateTag(tag)
6365     del_tags = frozenset(self.op.tags)
6366     cur_tags = self.target.GetTags()
6367     if not del_tags <= cur_tags:
6368       diff_tags = del_tags - cur_tags
6369       diff_names = ["'%s'" % tag for tag in diff_tags]
6370       diff_names.sort()
6371       raise errors.OpPrereqError("Tag(s) %s not found" %
6372                                  (",".join(diff_names)))
6373
6374   def Exec(self, feedback_fn):
6375     """Remove the tag from the object.
6376
6377     """
6378     for tag in self.op.tags:
6379       self.target.RemoveTag(tag)
6380     try:
6381       self.cfg.Update(self.target)
6382     except errors.ConfigurationError:
6383       raise errors.OpRetryError("There has been a modification to the"
6384                                 " config file and the operation has been"
6385                                 " aborted. Please retry.")
6386
6387
6388 class LUTestDelay(NoHooksLU):
6389   """Sleep for a specified amount of time.
6390
6391   This LU sleeps on the master and/or nodes for a specified amount of
6392   time.
6393
6394   """
6395   _OP_REQP = ["duration", "on_master", "on_nodes"]
6396   REQ_BGL = False
6397
6398   def ExpandNames(self):
6399     """Expand names and set required locks.
6400
6401     This expands the node list, if any.
6402
6403     """
6404     self.needed_locks = {}
6405     if self.op.on_nodes:
6406       # _GetWantedNodes can be used here, but is not always appropriate to use
6407       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6408       # more information.
6409       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6410       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6411
6412   def CheckPrereq(self):
6413     """Check prerequisites.
6414
6415     """
6416
6417   def Exec(self, feedback_fn):
6418     """Do the actual sleep.
6419
6420     """
6421     if self.op.on_master:
6422       if not utils.TestDelay(self.op.duration):
6423         raise errors.OpExecError("Error during master delay test")
6424     if self.op.on_nodes:
6425       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6426       if not result:
6427         raise errors.OpExecError("Complete failure from rpc call")
6428       for node, node_result in result.items():
6429         node_result.Raise()
6430         if not node_result.data:
6431           raise errors.OpExecError("Failure during rpc call to node %s,"
6432                                    " result: %s" % (node, node_result.data))
6433
6434
6435 class IAllocator(object):
6436   """IAllocator framework.
6437
6438   An IAllocator instance has three sets of attributes:
6439     - cfg that is needed to query the cluster
6440     - input data (all members of the _KEYS class attribute are required)
6441     - four buffer attributes (in|out_data|text), that represent the
6442       input (to the external script) in text and data structure format,
6443       and the output from it, again in two formats
6444     - the result variables from the script (success, info, nodes) for
6445       easy usage
6446
6447   """
6448   _ALLO_KEYS = [
6449     "mem_size", "disks", "disk_template",
6450     "os", "tags", "nics", "vcpus", "hypervisor",
6451     ]
6452   _RELO_KEYS = [
6453     "relocate_from",
6454     ]
6455
6456   def __init__(self, lu, mode, name, **kwargs):
6457     self.lu = lu
6458     # init buffer variables
6459     self.in_text = self.out_text = self.in_data = self.out_data = None
6460     # init all input fields so that pylint is happy
6461     self.mode = mode
6462     self.name = name
6463     self.mem_size = self.disks = self.disk_template = None
6464     self.os = self.tags = self.nics = self.vcpus = None
6465     self.hypervisor = None
6466     self.relocate_from = None
6467     # computed fields
6468     self.required_nodes = None
6469     # init result fields
6470     self.success = self.info = self.nodes = None
6471     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6472       keyset = self._ALLO_KEYS
6473     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6474       keyset = self._RELO_KEYS
6475     else:
6476       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6477                                    " IAllocator" % self.mode)
6478     for key in kwargs:
6479       if key not in keyset:
6480         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6481                                      " IAllocator" % key)
6482       setattr(self, key, kwargs[key])
6483     for key in keyset:
6484       if key not in kwargs:
6485         raise errors.ProgrammerError("Missing input parameter '%s' to"
6486                                      " IAllocator" % key)
6487     self._BuildInputData()
6488
6489   def _ComputeClusterData(self):
6490     """Compute the generic allocator input data.
6491
6492     This is the data that is independent of the actual operation.
6493
6494     """
6495     cfg = self.lu.cfg
6496     cluster_info = cfg.GetClusterInfo()
6497     # cluster data
6498     data = {
6499       "version": 1,
6500       "cluster_name": cfg.GetClusterName(),
6501       "cluster_tags": list(cluster_info.GetTags()),
6502       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6503       # we don't have job IDs
6504       }
6505     iinfo = cfg.GetAllInstancesInfo().values()
6506     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6507
6508     # node data
6509     node_results = {}
6510     node_list = cfg.GetNodeList()
6511
6512     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6513       hypervisor_name = self.hypervisor
6514     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6515       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6516
6517     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6518                                            hypervisor_name)
6519     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6520                        cluster_info.enabled_hypervisors)
6521     for nname, nresult in node_data.items():
6522       # first fill in static (config-based) values
6523       ninfo = cfg.GetNodeInfo(nname)
6524       pnr = {
6525         "tags": list(ninfo.GetTags()),
6526         "primary_ip": ninfo.primary_ip,
6527         "secondary_ip": ninfo.secondary_ip,
6528         "offline": ninfo.offline,
6529         "drained": ninfo.drained,
6530         "master_candidate": ninfo.master_candidate,
6531         }
6532
6533       if not ninfo.offline:
6534         nresult.Raise()
6535         if not isinstance(nresult.data, dict):
6536           raise errors.OpExecError("Can't get data for node %s" % nname)
6537         remote_info = nresult.data
6538         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6539                      'vg_size', 'vg_free', 'cpu_total']:
6540           if attr not in remote_info:
6541             raise errors.OpExecError("Node '%s' didn't return attribute"
6542                                      " '%s'" % (nname, attr))
6543           try:
6544             remote_info[attr] = int(remote_info[attr])
6545           except ValueError, err:
6546             raise errors.OpExecError("Node '%s' returned invalid value"
6547                                      " for '%s': %s" % (nname, attr, err))
6548         # compute memory used by primary instances
6549         i_p_mem = i_p_up_mem = 0
6550         for iinfo, beinfo in i_list:
6551           if iinfo.primary_node == nname:
6552             i_p_mem += beinfo[constants.BE_MEMORY]
6553             if iinfo.name not in node_iinfo[nname].data:
6554               i_used_mem = 0
6555             else:
6556               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6557             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6558             remote_info['memory_free'] -= max(0, i_mem_diff)
6559
6560             if iinfo.admin_up:
6561               i_p_up_mem += beinfo[constants.BE_MEMORY]
6562
6563         # compute memory used by instances
6564         pnr_dyn = {
6565           "total_memory": remote_info['memory_total'],
6566           "reserved_memory": remote_info['memory_dom0'],
6567           "free_memory": remote_info['memory_free'],
6568           "total_disk": remote_info['vg_size'],
6569           "free_disk": remote_info['vg_free'],
6570           "total_cpus": remote_info['cpu_total'],
6571           "i_pri_memory": i_p_mem,
6572           "i_pri_up_memory": i_p_up_mem,
6573           }
6574         pnr.update(pnr_dyn)
6575
6576       node_results[nname] = pnr
6577     data["nodes"] = node_results
6578
6579     # instance data
6580     instance_data = {}
6581     for iinfo, beinfo in i_list:
6582       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6583                   for n in iinfo.nics]
6584       pir = {
6585         "tags": list(iinfo.GetTags()),
6586         "admin_up": iinfo.admin_up,
6587         "vcpus": beinfo[constants.BE_VCPUS],
6588         "memory": beinfo[constants.BE_MEMORY],
6589         "os": iinfo.os,
6590         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6591         "nics": nic_data,
6592         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6593         "disk_template": iinfo.disk_template,
6594         "hypervisor": iinfo.hypervisor,
6595         }
6596       instance_data[iinfo.name] = pir
6597
6598     data["instances"] = instance_data
6599
6600     self.in_data = data
6601
6602   def _AddNewInstance(self):
6603     """Add new instance data to allocator structure.
6604
6605     This in combination with _AllocatorGetClusterData will create the
6606     correct structure needed as input for the allocator.
6607
6608     The checks for the completeness of the opcode must have already been
6609     done.
6610
6611     """
6612     data = self.in_data
6613
6614     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6615
6616     if self.disk_template in constants.DTS_NET_MIRROR:
6617       self.required_nodes = 2
6618     else:
6619       self.required_nodes = 1
6620     request = {
6621       "type": "allocate",
6622       "name": self.name,
6623       "disk_template": self.disk_template,
6624       "tags": self.tags,
6625       "os": self.os,
6626       "vcpus": self.vcpus,
6627       "memory": self.mem_size,
6628       "disks": self.disks,
6629       "disk_space_total": disk_space,
6630       "nics": self.nics,
6631       "required_nodes": self.required_nodes,
6632       }
6633     data["request"] = request
6634
6635   def _AddRelocateInstance(self):
6636     """Add relocate instance data to allocator structure.
6637
6638     This in combination with _IAllocatorGetClusterData will create the
6639     correct structure needed as input for the allocator.
6640
6641     The checks for the completeness of the opcode must have already been
6642     done.
6643
6644     """
6645     instance = self.lu.cfg.GetInstanceInfo(self.name)
6646     if instance is None:
6647       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6648                                    " IAllocator" % self.name)
6649
6650     if instance.disk_template not in constants.DTS_NET_MIRROR:
6651       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6652
6653     if len(instance.secondary_nodes) != 1:
6654       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6655
6656     self.required_nodes = 1
6657     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6658     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6659
6660     request = {
6661       "type": "relocate",
6662       "name": self.name,
6663       "disk_space_total": disk_space,
6664       "required_nodes": self.required_nodes,
6665       "relocate_from": self.relocate_from,
6666       }
6667     self.in_data["request"] = request
6668
6669   def _BuildInputData(self):
6670     """Build input data structures.
6671
6672     """
6673     self._ComputeClusterData()
6674
6675     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6676       self._AddNewInstance()
6677     else:
6678       self._AddRelocateInstance()
6679
6680     self.in_text = serializer.Dump(self.in_data)
6681
6682   def Run(self, name, validate=True, call_fn=None):
6683     """Run an instance allocator and return the results.
6684
6685     """
6686     if call_fn is None:
6687       call_fn = self.lu.rpc.call_iallocator_runner
6688     data = self.in_text
6689
6690     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6691     result.Raise()
6692
6693     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6694       raise errors.OpExecError("Invalid result from master iallocator runner")
6695
6696     rcode, stdout, stderr, fail = result.data
6697
6698     if rcode == constants.IARUN_NOTFOUND:
6699       raise errors.OpExecError("Can't find allocator '%s'" % name)
6700     elif rcode == constants.IARUN_FAILURE:
6701       raise errors.OpExecError("Instance allocator call failed: %s,"
6702                                " output: %s" % (fail, stdout+stderr))
6703     self.out_text = stdout
6704     if validate:
6705       self._ValidateResult()
6706
6707   def _ValidateResult(self):
6708     """Process the allocator results.
6709
6710     This will process and if successful save the result in
6711     self.out_data and the other parameters.
6712
6713     """
6714     try:
6715       rdict = serializer.Load(self.out_text)
6716     except Exception, err:
6717       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6718
6719     if not isinstance(rdict, dict):
6720       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6721
6722     for key in "success", "info", "nodes":
6723       if key not in rdict:
6724         raise errors.OpExecError("Can't parse iallocator results:"
6725                                  " missing key '%s'" % key)
6726       setattr(self, key, rdict[key])
6727
6728     if not isinstance(rdict["nodes"], list):
6729       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6730                                " is not a list")
6731     self.out_data = rdict
6732
6733
6734 class LUTestAllocator(NoHooksLU):
6735   """Run allocator tests.
6736
6737   This LU runs the allocator tests
6738
6739   """
6740   _OP_REQP = ["direction", "mode", "name"]
6741
6742   def CheckPrereq(self):
6743     """Check prerequisites.
6744
6745     This checks the opcode parameters depending on the director and mode test.
6746
6747     """
6748     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6749       for attr in ["name", "mem_size", "disks", "disk_template",
6750                    "os", "tags", "nics", "vcpus"]:
6751         if not hasattr(self.op, attr):
6752           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6753                                      attr)
6754       iname = self.cfg.ExpandInstanceName(self.op.name)
6755       if iname is not None:
6756         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6757                                    iname)
6758       if not isinstance(self.op.nics, list):
6759         raise errors.OpPrereqError("Invalid parameter 'nics'")
6760       for row in self.op.nics:
6761         if (not isinstance(row, dict) or
6762             "mac" not in row or
6763             "ip" not in row or
6764             "bridge" not in row):
6765           raise errors.OpPrereqError("Invalid contents of the"
6766                                      " 'nics' parameter")
6767       if not isinstance(self.op.disks, list):
6768         raise errors.OpPrereqError("Invalid parameter 'disks'")
6769       for row in self.op.disks:
6770         if (not isinstance(row, dict) or
6771             "size" not in row or
6772             not isinstance(row["size"], int) or
6773             "mode" not in row or
6774             row["mode"] not in ['r', 'w']):
6775           raise errors.OpPrereqError("Invalid contents of the"
6776                                      " 'disks' parameter")
6777       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6778         self.op.hypervisor = self.cfg.GetHypervisorType()
6779     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6780       if not hasattr(self.op, "name"):
6781         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6782       fname = self.cfg.ExpandInstanceName(self.op.name)
6783       if fname is None:
6784         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6785                                    self.op.name)
6786       self.op.name = fname
6787       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6788     else:
6789       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6790                                  self.op.mode)
6791
6792     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6793       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6794         raise errors.OpPrereqError("Missing allocator name")
6795     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6796       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6797                                  self.op.direction)
6798
6799   def Exec(self, feedback_fn):
6800     """Run the allocator test.
6801
6802     """
6803     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6804       ial = IAllocator(self,
6805                        mode=self.op.mode,
6806                        name=self.op.name,
6807                        mem_size=self.op.mem_size,
6808                        disks=self.op.disks,
6809                        disk_template=self.op.disk_template,
6810                        os=self.op.os,
6811                        tags=self.op.tags,
6812                        nics=self.op.nics,
6813                        vcpus=self.op.vcpus,
6814                        hypervisor=self.op.hypervisor,
6815                        )
6816     else:
6817       ial = IAllocator(self,
6818                        mode=self.op.mode,
6819                        name=self.op.name,
6820                        relocate_from=list(self.relocate_from),
6821                        )
6822
6823     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6824       result = ial.in_text
6825     else:
6826       ial.Run(self.op.allocator, validate=False)
6827       result = ial.out_text
6828     return result