Implement modification of the drained flag
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @rtype: dict
480   @return: the hook environment for this instance
481
482   """
483   if status:
484     str_status = "up"
485   else:
486     str_status = "down"
487   env = {
488     "OP_TARGET": name,
489     "INSTANCE_NAME": name,
490     "INSTANCE_PRIMARY": primary_node,
491     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492     "INSTANCE_OS_TYPE": os_type,
493     "INSTANCE_STATUS": str_status,
494     "INSTANCE_MEMORY": memory,
495     "INSTANCE_VCPUS": vcpus,
496   }
497
498   if nics:
499     nic_count = len(nics)
500     for idx, (ip, bridge, mac) in enumerate(nics):
501       if ip is None:
502         ip = ""
503       env["INSTANCE_NIC%d_IP" % idx] = ip
504       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506   else:
507     nic_count = 0
508
509   env["INSTANCE_NIC_COUNT"] = nic_count
510
511   return env
512
513
514 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515   """Builds instance related env variables for hooks from an object.
516
517   @type lu: L{LogicalUnit}
518   @param lu: the logical unit on whose behalf we execute
519   @type instance: L{objects.Instance}
520   @param instance: the instance for which we should build the
521       environment
522   @type override: dict
523   @param override: dictionary with key/values that will override
524       our values
525   @rtype: dict
526   @return: the hook environment dictionary
527
528   """
529   bep = lu.cfg.GetClusterInfo().FillBE(instance)
530   args = {
531     'name': instance.name,
532     'primary_node': instance.primary_node,
533     'secondary_nodes': instance.secondary_nodes,
534     'os_type': instance.os,
535     'status': instance.admin_up,
536     'memory': bep[constants.BE_MEMORY],
537     'vcpus': bep[constants.BE_VCPUS],
538     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539   }
540   if override:
541     args.update(override)
542   return _BuildInstanceHookEnv(**args)
543
544
545 def _AdjustCandidatePool(lu):
546   """Adjust the candidate pool after node operations.
547
548   """
549   mod_list = lu.cfg.MaintainCandidatePool()
550   if mod_list:
551     lu.LogInfo("Promoted nodes to master candidate role: %s",
552                ", ".join(node.name for node in mod_list))
553     for name in mod_list:
554       lu.context.ReaddNode(name)
555   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556   if mc_now > mc_max:
557     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558                (mc_now, mc_max))
559
560
561 def _CheckInstanceBridgesExist(lu, instance):
562   """Check that the brigdes needed by an instance exist.
563
564   """
565   # check bridges existance
566   brlist = [nic.bridge for nic in instance.nics]
567   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568   result.Raise()
569   if not result.data:
570     raise errors.OpPrereqError("One or more target bridges %s does not"
571                                " exist on destination node '%s'" %
572                                (brlist, instance.primary_node))
573
574
575 class LUDestroyCluster(NoHooksLU):
576   """Logical unit for destroying the cluster.
577
578   """
579   _OP_REQP = []
580
581   def CheckPrereq(self):
582     """Check prerequisites.
583
584     This checks whether the cluster is empty.
585
586     Any errors are signalled by raising errors.OpPrereqError.
587
588     """
589     master = self.cfg.GetMasterNode()
590
591     nodelist = self.cfg.GetNodeList()
592     if len(nodelist) != 1 or nodelist[0] != master:
593       raise errors.OpPrereqError("There are still %d node(s) in"
594                                  " this cluster." % (len(nodelist) - 1))
595     instancelist = self.cfg.GetInstanceList()
596     if instancelist:
597       raise errors.OpPrereqError("There are still %d instance(s) in"
598                                  " this cluster." % len(instancelist))
599
600   def Exec(self, feedback_fn):
601     """Destroys the cluster.
602
603     """
604     master = self.cfg.GetMasterNode()
605     result = self.rpc.call_node_stop_master(master, False)
606     result.Raise()
607     if not result.data:
608       raise errors.OpExecError("Could not disable the master role")
609     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610     utils.CreateBackup(priv_key)
611     utils.CreateBackup(pub_key)
612     return master
613
614
615 class LUVerifyCluster(LogicalUnit):
616   """Verifies the cluster status.
617
618   """
619   HPATH = "cluster-verify"
620   HTYPE = constants.HTYPE_CLUSTER
621   _OP_REQP = ["skip_checks"]
622   REQ_BGL = False
623
624   def ExpandNames(self):
625     self.needed_locks = {
626       locking.LEVEL_NODE: locking.ALL_SET,
627       locking.LEVEL_INSTANCE: locking.ALL_SET,
628     }
629     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630
631   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632                   node_result, feedback_fn, master_files,
633                   drbd_map):
634     """Run multiple tests against a node.
635
636     Test list:
637
638       - compares ganeti version
639       - checks vg existance and size > 20G
640       - checks config file checksum
641       - checks ssh to other nodes
642
643     @type nodeinfo: L{objects.Node}
644     @param nodeinfo: the node to check
645     @param file_list: required list of files
646     @param local_cksum: dictionary of local files and their checksums
647     @param node_result: the results from the node
648     @param feedback_fn: function used to accumulate results
649     @param master_files: list of files that only masters should have
650     @param drbd_map: the useddrbd minors for this node, in
651         form of minor: (instance, must_exist) which correspond to instances
652         and their running status
653
654     """
655     node = nodeinfo.name
656
657     # main result, node_result should be a non-empty dict
658     if not node_result or not isinstance(node_result, dict):
659       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660       return True
661
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     remote_version = node_result.get('version', None)
665     if not (remote_version and isinstance(remote_version, (list, tuple)) and
666             len(remote_version) == 2):
667       feedback_fn("  - ERROR: connection to %s failed" % (node))
668       return True
669
670     if local_version != remote_version[0]:
671       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672                   " node %s %s" % (local_version, node, remote_version[0]))
673       return True
674
675     # node seems compatible, we can actually try to look into its results
676
677     bad = False
678
679     # full package version
680     if constants.RELEASE_VERSION != remote_version[1]:
681       feedback_fn("  - WARNING: software version mismatch: master %s,"
682                   " node %s %s" %
683                   (constants.RELEASE_VERSION, node, remote_version[1]))
684
685     # checks vg existence and size > 20G
686
687     vglist = node_result.get(constants.NV_VGLIST, None)
688     if not vglist:
689       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690                       (node,))
691       bad = True
692     else:
693       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694                                             constants.MIN_VG_SIZE)
695       if vgstatus:
696         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697         bad = True
698
699     # checks config file checksum
700
701     remote_cksum = node_result.get(constants.NV_FILELIST, None)
702     if not isinstance(remote_cksum, dict):
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned file checksum data")
705     else:
706       for file_name in file_list:
707         node_is_mc = nodeinfo.master_candidate
708         must_have_file = file_name not in master_files
709         if file_name not in remote_cksum:
710           if node_is_mc or must_have_file:
711             bad = True
712             feedback_fn("  - ERROR: file '%s' missing" % file_name)
713         elif remote_cksum[file_name] != local_cksum[file_name]:
714           if node_is_mc or must_have_file:
715             bad = True
716             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717           else:
718             # not candidate and this is not a must-have file
719             bad = True
720             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721                         " '%s'" % file_name)
722         else:
723           # all good, except non-master/non-must have combination
724           if not node_is_mc and not must_have_file:
725             feedback_fn("  - ERROR: file '%s' should not exist on non master"
726                         " candidates" % file_name)
727
728     # checks ssh to any
729
730     if constants.NV_NODELIST not in node_result:
731       bad = True
732       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733     else:
734       if node_result[constants.NV_NODELIST]:
735         bad = True
736         for node in node_result[constants.NV_NODELIST]:
737           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738                           (node, node_result[constants.NV_NODELIST][node]))
739
740     if constants.NV_NODENETTEST not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743     else:
744       if node_result[constants.NV_NODENETTEST]:
745         bad = True
746         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747         for node in nlist:
748           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749                           (node, node_result[constants.NV_NODENETTEST][node]))
750
751     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752     if isinstance(hyp_result, dict):
753       for hv_name, hv_result in hyp_result.iteritems():
754         if hv_result is not None:
755           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756                       (hv_name, hv_result))
757
758     # check used drbd list
759     used_minors = node_result.get(constants.NV_DRBDLIST, [])
760     for minor, (iname, must_exist) in drbd_map.items():
761       if minor not in used_minors and must_exist:
762         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
763                     (minor, iname))
764         bad = True
765     for minor in used_minors:
766       if minor not in drbd_map:
767         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
768         bad = True
769
770     return bad
771
772   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
773                       node_instance, feedback_fn, n_offline):
774     """Verify an instance.
775
776     This function checks to see if the required block devices are
777     available on the instance's node.
778
779     """
780     bad = False
781
782     node_current = instanceconfig.primary_node
783
784     node_vol_should = {}
785     instanceconfig.MapLVsByNode(node_vol_should)
786
787     for node in node_vol_should:
788       if node in n_offline:
789         # ignore missing volumes on offline nodes
790         continue
791       for volume in node_vol_should[node]:
792         if node not in node_vol_is or volume not in node_vol_is[node]:
793           feedback_fn("  - ERROR: volume %s missing on node %s" %
794                           (volume, node))
795           bad = True
796
797     if instanceconfig.admin_up:
798       if ((node_current not in node_instance or
799           not instance in node_instance[node_current]) and
800           node_current not in n_offline):
801         feedback_fn("  - ERROR: instance %s not running on node %s" %
802                         (instance, node_current))
803         bad = True
804
805     for node in node_instance:
806       if (not node == node_current):
807         if instance in node_instance[node]:
808           feedback_fn("  - ERROR: instance %s should not run on node %s" %
809                           (instance, node))
810           bad = True
811
812     return bad
813
814   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
815     """Verify if there are any unknown volumes in the cluster.
816
817     The .os, .swap and backup volumes are ignored. All other volumes are
818     reported as unknown.
819
820     """
821     bad = False
822
823     for node in node_vol_is:
824       for volume in node_vol_is[node]:
825         if node not in node_vol_should or volume not in node_vol_should[node]:
826           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
827                       (volume, node))
828           bad = True
829     return bad
830
831   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
832     """Verify the list of running instances.
833
834     This checks what instances are running but unknown to the cluster.
835
836     """
837     bad = False
838     for node in node_instance:
839       for runninginstance in node_instance[node]:
840         if runninginstance not in instancelist:
841           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
842                           (runninginstance, node))
843           bad = True
844     return bad
845
846   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
847     """Verify N+1 Memory Resilience.
848
849     Check that if one single node dies we can still start all the instances it
850     was primary for.
851
852     """
853     bad = False
854
855     for node, nodeinfo in node_info.iteritems():
856       # This code checks that every node which is now listed as secondary has
857       # enough memory to host all instances it is supposed to should a single
858       # other node in the cluster fail.
859       # FIXME: not ready for failover to an arbitrary node
860       # FIXME: does not support file-backed instances
861       # WARNING: we currently take into account down instances as well as up
862       # ones, considering that even if they're down someone might want to start
863       # them even in the event of a node failure.
864       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
865         needed_mem = 0
866         for instance in instances:
867           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
868           if bep[constants.BE_AUTO_BALANCE]:
869             needed_mem += bep[constants.BE_MEMORY]
870         if nodeinfo['mfree'] < needed_mem:
871           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
872                       " failovers should node %s fail" % (node, prinode))
873           bad = True
874     return bad
875
876   def CheckPrereq(self):
877     """Check prerequisites.
878
879     Transform the list of checks we're going to skip into a set and check that
880     all its members are valid.
881
882     """
883     self.skip_set = frozenset(self.op.skip_checks)
884     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
885       raise errors.OpPrereqError("Invalid checks to be skipped specified")
886
887   def BuildHooksEnv(self):
888     """Build hooks env.
889
890     Cluster-Verify hooks just rone in the post phase and their failure makes
891     the output be logged in the verify output and the verification to fail.
892
893     """
894     all_nodes = self.cfg.GetNodeList()
895     # TODO: populate the environment with useful information for verify hooks
896     env = {}
897     return env, [], all_nodes
898
899   def Exec(self, feedback_fn):
900     """Verify integrity of cluster, performing various test on nodes.
901
902     """
903     bad = False
904     feedback_fn("* Verifying global settings")
905     for msg in self.cfg.VerifyConfig():
906       feedback_fn("  - ERROR: %s" % msg)
907
908     vg_name = self.cfg.GetVGName()
909     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
910     nodelist = utils.NiceSort(self.cfg.GetNodeList())
911     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
912     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
913     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
914                         for iname in instancelist)
915     i_non_redundant = [] # Non redundant instances
916     i_non_a_balanced = [] # Non auto-balanced instances
917     n_offline = [] # List of offline nodes
918     n_drained = [] # List of nodes being drained
919     node_volume = {}
920     node_instance = {}
921     node_info = {}
922     instance_cfg = {}
923
924     # FIXME: verify OS list
925     # do local checksums
926     master_files = [constants.CLUSTER_CONF_FILE]
927
928     file_names = ssconf.SimpleStore().GetFileList()
929     file_names.append(constants.SSL_CERT_FILE)
930     file_names.append(constants.RAPI_CERT_FILE)
931     file_names.extend(master_files)
932
933     local_checksums = utils.FingerprintFiles(file_names)
934
935     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
936     node_verify_param = {
937       constants.NV_FILELIST: file_names,
938       constants.NV_NODELIST: [node.name for node in nodeinfo
939                               if not node.offline],
940       constants.NV_HYPERVISOR: hypervisors,
941       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
942                                   node.secondary_ip) for node in nodeinfo
943                                  if not node.offline],
944       constants.NV_LVLIST: vg_name,
945       constants.NV_INSTANCELIST: hypervisors,
946       constants.NV_VGLIST: None,
947       constants.NV_VERSION: None,
948       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
949       constants.NV_DRBDLIST: None,
950       }
951     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
952                                            self.cfg.GetClusterName())
953
954     cluster = self.cfg.GetClusterInfo()
955     master_node = self.cfg.GetMasterNode()
956     all_drbd_map = self.cfg.ComputeDRBDMap()
957
958     for node_i in nodeinfo:
959       node = node_i.name
960       nresult = all_nvinfo[node].data
961
962       if node_i.offline:
963         feedback_fn("* Skipping offline node %s" % (node,))
964         n_offline.append(node)
965         continue
966
967       if node == master_node:
968         ntype = "master"
969       elif node_i.master_candidate:
970         ntype = "master candidate"
971       elif node_i.drained:
972         ntype = "drained"
973         n_drained.append(node)
974       else:
975         ntype = "regular"
976       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
977
978       if all_nvinfo[node].failed or not isinstance(nresult, dict):
979         feedback_fn("  - ERROR: connection to %s failed" % (node,))
980         bad = True
981         continue
982
983       node_drbd = {}
984       for minor, instance in all_drbd_map[node].items():
985         instance = instanceinfo[instance]
986         node_drbd[minor] = (instance.name, instance.admin_up)
987       result = self._VerifyNode(node_i, file_names, local_checksums,
988                                 nresult, feedback_fn, master_files,
989                                 node_drbd)
990       bad = bad or result
991
992       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
993       if isinstance(lvdata, basestring):
994         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
995                     (node, utils.SafeEncode(lvdata)))
996         bad = True
997         node_volume[node] = {}
998       elif not isinstance(lvdata, dict):
999         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1000         bad = True
1001         continue
1002       else:
1003         node_volume[node] = lvdata
1004
1005       # node_instance
1006       idata = nresult.get(constants.NV_INSTANCELIST, None)
1007       if not isinstance(idata, list):
1008         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1009                     (node,))
1010         bad = True
1011         continue
1012
1013       node_instance[node] = idata
1014
1015       # node_info
1016       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1017       if not isinstance(nodeinfo, dict):
1018         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1019         bad = True
1020         continue
1021
1022       try:
1023         node_info[node] = {
1024           "mfree": int(nodeinfo['memory_free']),
1025           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1026           "pinst": [],
1027           "sinst": [],
1028           # dictionary holding all instances this node is secondary for,
1029           # grouped by their primary node. Each key is a cluster node, and each
1030           # value is a list of instances which have the key as primary and the
1031           # current node as secondary.  this is handy to calculate N+1 memory
1032           # availability if you can only failover from a primary to its
1033           # secondary.
1034           "sinst-by-pnode": {},
1035         }
1036       except ValueError:
1037         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1038         bad = True
1039         continue
1040
1041     node_vol_should = {}
1042
1043     for instance in instancelist:
1044       feedback_fn("* Verifying instance %s" % instance)
1045       inst_config = instanceinfo[instance]
1046       result =  self._VerifyInstance(instance, inst_config, node_volume,
1047                                      node_instance, feedback_fn, n_offline)
1048       bad = bad or result
1049       inst_nodes_offline = []
1050
1051       inst_config.MapLVsByNode(node_vol_should)
1052
1053       instance_cfg[instance] = inst_config
1054
1055       pnode = inst_config.primary_node
1056       if pnode in node_info:
1057         node_info[pnode]['pinst'].append(instance)
1058       elif pnode not in n_offline:
1059         feedback_fn("  - ERROR: instance %s, connection to primary node"
1060                     " %s failed" % (instance, pnode))
1061         bad = True
1062
1063       if pnode in n_offline:
1064         inst_nodes_offline.append(pnode)
1065
1066       # If the instance is non-redundant we cannot survive losing its primary
1067       # node, so we are not N+1 compliant. On the other hand we have no disk
1068       # templates with more than one secondary so that situation is not well
1069       # supported either.
1070       # FIXME: does not support file-backed instances
1071       if len(inst_config.secondary_nodes) == 0:
1072         i_non_redundant.append(instance)
1073       elif len(inst_config.secondary_nodes) > 1:
1074         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1075                     % instance)
1076
1077       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1078         i_non_a_balanced.append(instance)
1079
1080       for snode in inst_config.secondary_nodes:
1081         if snode in node_info:
1082           node_info[snode]['sinst'].append(instance)
1083           if pnode not in node_info[snode]['sinst-by-pnode']:
1084             node_info[snode]['sinst-by-pnode'][pnode] = []
1085           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1086         elif snode not in n_offline:
1087           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1088                       " %s failed" % (instance, snode))
1089           bad = True
1090         if snode in n_offline:
1091           inst_nodes_offline.append(snode)
1092
1093       if inst_nodes_offline:
1094         # warn that the instance lives on offline nodes, and set bad=True
1095         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1096                     ", ".join(inst_nodes_offline))
1097         bad = True
1098
1099     feedback_fn("* Verifying orphan volumes")
1100     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1101                                        feedback_fn)
1102     bad = bad or result
1103
1104     feedback_fn("* Verifying remaining instances")
1105     result = self._VerifyOrphanInstances(instancelist, node_instance,
1106                                          feedback_fn)
1107     bad = bad or result
1108
1109     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1110       feedback_fn("* Verifying N+1 Memory redundancy")
1111       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1112       bad = bad or result
1113
1114     feedback_fn("* Other Notes")
1115     if i_non_redundant:
1116       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1117                   % len(i_non_redundant))
1118
1119     if i_non_a_balanced:
1120       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1121                   % len(i_non_a_balanced))
1122
1123     if n_offline:
1124       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1125
1126     if n_drained:
1127       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1128
1129     return not bad
1130
1131   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1132     """Analize the post-hooks' result
1133
1134     This method analyses the hook result, handles it, and sends some
1135     nicely-formatted feedback back to the user.
1136
1137     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1138         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1139     @param hooks_results: the results of the multi-node hooks rpc call
1140     @param feedback_fn: function used send feedback back to the caller
1141     @param lu_result: previous Exec result
1142     @return: the new Exec result, based on the previous result
1143         and hook results
1144
1145     """
1146     # We only really run POST phase hooks, and are only interested in
1147     # their results
1148     if phase == constants.HOOKS_PHASE_POST:
1149       # Used to change hooks' output to proper indentation
1150       indent_re = re.compile('^', re.M)
1151       feedback_fn("* Hooks Results")
1152       if not hooks_results:
1153         feedback_fn("  - ERROR: general communication failure")
1154         lu_result = 1
1155       else:
1156         for node_name in hooks_results:
1157           show_node_header = True
1158           res = hooks_results[node_name]
1159           if res.failed or res.data is False or not isinstance(res.data, list):
1160             if res.offline:
1161               # no need to warn or set fail return value
1162               continue
1163             feedback_fn("    Communication failure in hooks execution")
1164             lu_result = 1
1165             continue
1166           for script, hkr, output in res.data:
1167             if hkr == constants.HKR_FAIL:
1168               # The node header is only shown once, if there are
1169               # failing hooks on that node
1170               if show_node_header:
1171                 feedback_fn("  Node %s:" % node_name)
1172                 show_node_header = False
1173               feedback_fn("    ERROR: Script %s failed, output:" % script)
1174               output = indent_re.sub('      ', output)
1175               feedback_fn("%s" % output)
1176               lu_result = 1
1177
1178       return lu_result
1179
1180
1181 class LUVerifyDisks(NoHooksLU):
1182   """Verifies the cluster disks status.
1183
1184   """
1185   _OP_REQP = []
1186   REQ_BGL = False
1187
1188   def ExpandNames(self):
1189     self.needed_locks = {
1190       locking.LEVEL_NODE: locking.ALL_SET,
1191       locking.LEVEL_INSTANCE: locking.ALL_SET,
1192     }
1193     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This has no prerequisites.
1199
1200     """
1201     pass
1202
1203   def Exec(self, feedback_fn):
1204     """Verify integrity of cluster disks.
1205
1206     """
1207     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1208
1209     vg_name = self.cfg.GetVGName()
1210     nodes = utils.NiceSort(self.cfg.GetNodeList())
1211     instances = [self.cfg.GetInstanceInfo(name)
1212                  for name in self.cfg.GetInstanceList()]
1213
1214     nv_dict = {}
1215     for inst in instances:
1216       inst_lvs = {}
1217       if (not inst.admin_up or
1218           inst.disk_template not in constants.DTS_NET_MIRROR):
1219         continue
1220       inst.MapLVsByNode(inst_lvs)
1221       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1222       for node, vol_list in inst_lvs.iteritems():
1223         for vol in vol_list:
1224           nv_dict[(node, vol)] = inst
1225
1226     if not nv_dict:
1227       return result
1228
1229     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1230
1231     to_act = set()
1232     for node in nodes:
1233       # node_volume
1234       lvs = node_lvs[node]
1235       if lvs.failed:
1236         if not lvs.offline:
1237           self.LogWarning("Connection to node %s failed: %s" %
1238                           (node, lvs.data))
1239         continue
1240       lvs = lvs.data
1241       if isinstance(lvs, basestring):
1242         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1243         res_nlvm[node] = lvs
1244       elif not isinstance(lvs, dict):
1245         logging.warning("Connection to node %s failed or invalid data"
1246                         " returned", node)
1247         res_nodes.append(node)
1248         continue
1249
1250       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1251         inst = nv_dict.pop((node, lv_name), None)
1252         if (not lv_online and inst is not None
1253             and inst.name not in res_instances):
1254           res_instances.append(inst.name)
1255
1256     # any leftover items in nv_dict are missing LVs, let's arrange the
1257     # data better
1258     for key, inst in nv_dict.iteritems():
1259       if inst.name not in res_missing:
1260         res_missing[inst.name] = []
1261       res_missing[inst.name].append(key)
1262
1263     return result
1264
1265
1266 class LURenameCluster(LogicalUnit):
1267   """Rename the cluster.
1268
1269   """
1270   HPATH = "cluster-rename"
1271   HTYPE = constants.HTYPE_CLUSTER
1272   _OP_REQP = ["name"]
1273
1274   def BuildHooksEnv(self):
1275     """Build hooks env.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.cfg.GetClusterName(),
1280       "NEW_NAME": self.op.name,
1281       }
1282     mn = self.cfg.GetMasterNode()
1283     return env, [mn], [mn]
1284
1285   def CheckPrereq(self):
1286     """Verify that the passed name is a valid one.
1287
1288     """
1289     hostname = utils.HostInfo(self.op.name)
1290
1291     new_name = hostname.name
1292     self.ip = new_ip = hostname.ip
1293     old_name = self.cfg.GetClusterName()
1294     old_ip = self.cfg.GetMasterIP()
1295     if new_name == old_name and new_ip == old_ip:
1296       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1297                                  " cluster has changed")
1298     if new_ip != old_ip:
1299       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1300         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1301                                    " reachable on the network. Aborting." %
1302                                    new_ip)
1303
1304     self.op.name = new_name
1305
1306   def Exec(self, feedback_fn):
1307     """Rename the cluster.
1308
1309     """
1310     clustername = self.op.name
1311     ip = self.ip
1312
1313     # shutdown the master IP
1314     master = self.cfg.GetMasterNode()
1315     result = self.rpc.call_node_stop_master(master, False)
1316     if result.failed or not result.data:
1317       raise errors.OpExecError("Could not disable the master role")
1318
1319     try:
1320       cluster = self.cfg.GetClusterInfo()
1321       cluster.cluster_name = clustername
1322       cluster.master_ip = ip
1323       self.cfg.Update(cluster)
1324
1325       # update the known hosts file
1326       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1327       node_list = self.cfg.GetNodeList()
1328       try:
1329         node_list.remove(master)
1330       except ValueError:
1331         pass
1332       result = self.rpc.call_upload_file(node_list,
1333                                          constants.SSH_KNOWN_HOSTS_FILE)
1334       for to_node, to_result in result.iteritems():
1335         if to_result.failed or not to_result.data:
1336           logging.error("Copy of file %s to node %s failed",
1337                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1338
1339     finally:
1340       result = self.rpc.call_node_start_master(master, False)
1341       if result.failed or not result.data:
1342         self.LogWarning("Could not re-enable the master role on"
1343                         " the master, please restart manually.")
1344
1345
1346 def _RecursiveCheckIfLVMBased(disk):
1347   """Check if the given disk or its children are lvm-based.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to check
1351   @rtype: booleean
1352   @return: boolean indicating whether a LD_LV dev_type was found or not
1353
1354   """
1355   if disk.children:
1356     for chdisk in disk.children:
1357       if _RecursiveCheckIfLVMBased(chdisk):
1358         return True
1359   return disk.dev_type == constants.LD_LV
1360
1361
1362 class LUSetClusterParams(LogicalUnit):
1363   """Change the parameters of the cluster.
1364
1365   """
1366   HPATH = "cluster-modify"
1367   HTYPE = constants.HTYPE_CLUSTER
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def CheckParameters(self):
1372     """Check parameters
1373
1374     """
1375     if not hasattr(self.op, "candidate_pool_size"):
1376       self.op.candidate_pool_size = None
1377     if self.op.candidate_pool_size is not None:
1378       try:
1379         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1380       except ValueError, err:
1381         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1382                                    str(err))
1383       if self.op.candidate_pool_size < 1:
1384         raise errors.OpPrereqError("At least one master candidate needed")
1385
1386   def ExpandNames(self):
1387     # FIXME: in the future maybe other cluster params won't require checking on
1388     # all nodes to be modified.
1389     self.needed_locks = {
1390       locking.LEVEL_NODE: locking.ALL_SET,
1391     }
1392     self.share_locks[locking.LEVEL_NODE] = 1
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     """
1398     env = {
1399       "OP_TARGET": self.cfg.GetClusterName(),
1400       "NEW_VG_NAME": self.op.vg_name,
1401       }
1402     mn = self.cfg.GetMasterNode()
1403     return env, [mn], [mn]
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     This checks whether the given params don't conflict and
1409     if the given volume group is valid.
1410
1411     """
1412     # FIXME: This only works because there is only one parameter that can be
1413     # changed or removed.
1414     if self.op.vg_name is not None and not self.op.vg_name:
1415       instances = self.cfg.GetAllInstancesInfo().values()
1416       for inst in instances:
1417         for disk in inst.disks:
1418           if _RecursiveCheckIfLVMBased(disk):
1419             raise errors.OpPrereqError("Cannot disable lvm storage while"
1420                                        " lvm-based instances exist")
1421
1422     node_list = self.acquired_locks[locking.LEVEL_NODE]
1423
1424     # if vg_name not None, checks given volume group on all nodes
1425     if self.op.vg_name:
1426       vglist = self.rpc.call_vg_list(node_list)
1427       for node in node_list:
1428         if vglist[node].failed:
1429           # ignoring down node
1430           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1431           continue
1432         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1433                                               self.op.vg_name,
1434                                               constants.MIN_VG_SIZE)
1435         if vgstatus:
1436           raise errors.OpPrereqError("Error on node '%s': %s" %
1437                                      (node, vgstatus))
1438
1439     self.cluster = cluster = self.cfg.GetClusterInfo()
1440     # validate beparams changes
1441     if self.op.beparams:
1442       utils.CheckBEParams(self.op.beparams)
1443       self.new_beparams = cluster.FillDict(
1444         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1445
1446     # hypervisor list/parameters
1447     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1448     if self.op.hvparams:
1449       if not isinstance(self.op.hvparams, dict):
1450         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1451       for hv_name, hv_dict in self.op.hvparams.items():
1452         if hv_name not in self.new_hvparams:
1453           self.new_hvparams[hv_name] = hv_dict
1454         else:
1455           self.new_hvparams[hv_name].update(hv_dict)
1456
1457     if self.op.enabled_hypervisors is not None:
1458       self.hv_list = self.op.enabled_hypervisors
1459     else:
1460       self.hv_list = cluster.enabled_hypervisors
1461
1462     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1463       # either the enabled list has changed, or the parameters have, validate
1464       for hv_name, hv_params in self.new_hvparams.items():
1465         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1466             (self.op.enabled_hypervisors and
1467              hv_name in self.op.enabled_hypervisors)):
1468           # either this is a new hypervisor, or its parameters have changed
1469           hv_class = hypervisor.GetHypervisor(hv_name)
1470           hv_class.CheckParameterSyntax(hv_params)
1471           _CheckHVParams(self, node_list, hv_name, hv_params)
1472
1473   def Exec(self, feedback_fn):
1474     """Change the parameters of the cluster.
1475
1476     """
1477     if self.op.vg_name is not None:
1478       if self.op.vg_name != self.cfg.GetVGName():
1479         self.cfg.SetVGName(self.op.vg_name)
1480       else:
1481         feedback_fn("Cluster LVM configuration already in desired"
1482                     " state, not changing")
1483     if self.op.hvparams:
1484       self.cluster.hvparams = self.new_hvparams
1485     if self.op.enabled_hypervisors is not None:
1486       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1487     if self.op.beparams:
1488       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1489     if self.op.candidate_pool_size is not None:
1490       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1491
1492     self.cfg.Update(self.cluster)
1493
1494     # we want to update nodes after the cluster so that if any errors
1495     # happen, we have recorded and saved the cluster info
1496     if self.op.candidate_pool_size is not None:
1497       _AdjustCandidatePool(self)
1498
1499
1500 class LURedistributeConfig(NoHooksLU):
1501   """Force the redistribution of cluster configuration.
1502
1503   This is a very simple LU.
1504
1505   """
1506   _OP_REQP = []
1507   REQ_BGL = False
1508
1509   def ExpandNames(self):
1510     self.needed_locks = {
1511       locking.LEVEL_NODE: locking.ALL_SET,
1512     }
1513     self.share_locks[locking.LEVEL_NODE] = 1
1514
1515   def CheckPrereq(self):
1516     """Check prerequisites.
1517
1518     """
1519
1520   def Exec(self, feedback_fn):
1521     """Redistribute the configuration.
1522
1523     """
1524     self.cfg.Update(self.cfg.GetClusterInfo())
1525
1526
1527 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1528   """Sleep and poll for an instance's disk to sync.
1529
1530   """
1531   if not instance.disks:
1532     return True
1533
1534   if not oneshot:
1535     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1536
1537   node = instance.primary_node
1538
1539   for dev in instance.disks:
1540     lu.cfg.SetDiskID(dev, node)
1541
1542   retries = 0
1543   while True:
1544     max_time = 0
1545     done = True
1546     cumul_degraded = False
1547     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1548     if rstats.failed or not rstats.data:
1549       lu.LogWarning("Can't get any data from node %s", node)
1550       retries += 1
1551       if retries >= 10:
1552         raise errors.RemoteError("Can't contact node %s for mirror data,"
1553                                  " aborting." % node)
1554       time.sleep(6)
1555       continue
1556     rstats = rstats.data
1557     retries = 0
1558     for i, mstat in enumerate(rstats):
1559       if mstat is None:
1560         lu.LogWarning("Can't compute data for node %s/%s",
1561                            node, instance.disks[i].iv_name)
1562         continue
1563       # we ignore the ldisk parameter
1564       perc_done, est_time, is_degraded, _ = mstat
1565       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1566       if perc_done is not None:
1567         done = False
1568         if est_time is not None:
1569           rem_time = "%d estimated seconds remaining" % est_time
1570           max_time = est_time
1571         else:
1572           rem_time = "no time estimate"
1573         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1574                         (instance.disks[i].iv_name, perc_done, rem_time))
1575     if done or oneshot:
1576       break
1577
1578     time.sleep(min(60, max_time))
1579
1580   if done:
1581     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1582   return not cumul_degraded
1583
1584
1585 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1586   """Check that mirrors are not degraded.
1587
1588   The ldisk parameter, if True, will change the test from the
1589   is_degraded attribute (which represents overall non-ok status for
1590   the device(s)) to the ldisk (representing the local storage status).
1591
1592   """
1593   lu.cfg.SetDiskID(dev, node)
1594   if ldisk:
1595     idx = 6
1596   else:
1597     idx = 5
1598
1599   result = True
1600   if on_primary or dev.AssembleOnSecondary():
1601     rstats = lu.rpc.call_blockdev_find(node, dev)
1602     msg = rstats.RemoteFailMsg()
1603     if msg:
1604       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1605       result = False
1606     elif not rstats.payload:
1607       lu.LogWarning("Can't find disk on node %s", node)
1608       result = False
1609     else:
1610       result = result and (not rstats.payload[idx])
1611   if dev.children:
1612     for child in dev.children:
1613       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1614
1615   return result
1616
1617
1618 class LUDiagnoseOS(NoHooksLU):
1619   """Logical unit for OS diagnose/query.
1620
1621   """
1622   _OP_REQP = ["output_fields", "names"]
1623   REQ_BGL = False
1624   _FIELDS_STATIC = utils.FieldSet()
1625   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1626
1627   def ExpandNames(self):
1628     if self.op.names:
1629       raise errors.OpPrereqError("Selective OS query not supported")
1630
1631     _CheckOutputFields(static=self._FIELDS_STATIC,
1632                        dynamic=self._FIELDS_DYNAMIC,
1633                        selected=self.op.output_fields)
1634
1635     # Lock all nodes, in shared mode
1636     self.needed_locks = {}
1637     self.share_locks[locking.LEVEL_NODE] = 1
1638     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1639
1640   def CheckPrereq(self):
1641     """Check prerequisites.
1642
1643     """
1644
1645   @staticmethod
1646   def _DiagnoseByOS(node_list, rlist):
1647     """Remaps a per-node return list into an a per-os per-node dictionary
1648
1649     @param node_list: a list with the names of all nodes
1650     @param rlist: a map with node names as keys and OS objects as values
1651
1652     @rtype: dict
1653     @returns: a dictionary with osnames as keys and as value another map, with
1654         nodes as keys and list of OS objects as values, eg::
1655
1656           {"debian-etch": {"node1": [<object>,...],
1657                            "node2": [<object>,]}
1658           }
1659
1660     """
1661     all_os = {}
1662     for node_name, nr in rlist.iteritems():
1663       if nr.failed or not nr.data:
1664         continue
1665       for os_obj in nr.data:
1666         if os_obj.name not in all_os:
1667           # build a list of nodes for this os containing empty lists
1668           # for each node in node_list
1669           all_os[os_obj.name] = {}
1670           for nname in node_list:
1671             all_os[os_obj.name][nname] = []
1672         all_os[os_obj.name][node_name].append(os_obj)
1673     return all_os
1674
1675   def Exec(self, feedback_fn):
1676     """Compute the list of OSes.
1677
1678     """
1679     node_list = self.acquired_locks[locking.LEVEL_NODE]
1680     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1681                    if node in node_list]
1682     node_data = self.rpc.call_os_diagnose(valid_nodes)
1683     if node_data == False:
1684       raise errors.OpExecError("Can't gather the list of OSes")
1685     pol = self._DiagnoseByOS(valid_nodes, node_data)
1686     output = []
1687     for os_name, os_data in pol.iteritems():
1688       row = []
1689       for field in self.op.output_fields:
1690         if field == "name":
1691           val = os_name
1692         elif field == "valid":
1693           val = utils.all([osl and osl[0] for osl in os_data.values()])
1694         elif field == "node_status":
1695           val = {}
1696           for node_name, nos_list in os_data.iteritems():
1697             val[node_name] = [(v.status, v.path) for v in nos_list]
1698         else:
1699           raise errors.ParameterError(field)
1700         row.append(val)
1701       output.append(row)
1702
1703     return output
1704
1705
1706 class LURemoveNode(LogicalUnit):
1707   """Logical unit for removing a node.
1708
1709   """
1710   HPATH = "node-remove"
1711   HTYPE = constants.HTYPE_NODE
1712   _OP_REQP = ["node_name"]
1713
1714   def BuildHooksEnv(self):
1715     """Build hooks env.
1716
1717     This doesn't run on the target node in the pre phase as a failed
1718     node would then be impossible to remove.
1719
1720     """
1721     env = {
1722       "OP_TARGET": self.op.node_name,
1723       "NODE_NAME": self.op.node_name,
1724       }
1725     all_nodes = self.cfg.GetNodeList()
1726     all_nodes.remove(self.op.node_name)
1727     return env, all_nodes, all_nodes
1728
1729   def CheckPrereq(self):
1730     """Check prerequisites.
1731
1732     This checks:
1733      - the node exists in the configuration
1734      - it does not have primary or secondary instances
1735      - it's not the master
1736
1737     Any errors are signalled by raising errors.OpPrereqError.
1738
1739     """
1740     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1741     if node is None:
1742       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1743
1744     instance_list = self.cfg.GetInstanceList()
1745
1746     masternode = self.cfg.GetMasterNode()
1747     if node.name == masternode:
1748       raise errors.OpPrereqError("Node is the master node,"
1749                                  " you need to failover first.")
1750
1751     for instance_name in instance_list:
1752       instance = self.cfg.GetInstanceInfo(instance_name)
1753       if node.name in instance.all_nodes:
1754         raise errors.OpPrereqError("Instance %s is still running on the node,"
1755                                    " please remove first." % instance_name)
1756     self.op.node_name = node.name
1757     self.node = node
1758
1759   def Exec(self, feedback_fn):
1760     """Removes the node from the cluster.
1761
1762     """
1763     node = self.node
1764     logging.info("Stopping the node daemon and removing configs from node %s",
1765                  node.name)
1766
1767     self.context.RemoveNode(node.name)
1768
1769     self.rpc.call_node_leave_cluster(node.name)
1770
1771     # Promote nodes to master candidate as needed
1772     _AdjustCandidatePool(self)
1773
1774
1775 class LUQueryNodes(NoHooksLU):
1776   """Logical unit for querying nodes.
1777
1778   """
1779   _OP_REQP = ["output_fields", "names", "use_locking"]
1780   REQ_BGL = False
1781   _FIELDS_DYNAMIC = utils.FieldSet(
1782     "dtotal", "dfree",
1783     "mtotal", "mnode", "mfree",
1784     "bootid",
1785     "ctotal", "cnodes", "csockets",
1786     )
1787
1788   _FIELDS_STATIC = utils.FieldSet(
1789     "name", "pinst_cnt", "sinst_cnt",
1790     "pinst_list", "sinst_list",
1791     "pip", "sip", "tags",
1792     "serial_no",
1793     "master_candidate",
1794     "master",
1795     "offline",
1796     "drained",
1797     )
1798
1799   def ExpandNames(self):
1800     _CheckOutputFields(static=self._FIELDS_STATIC,
1801                        dynamic=self._FIELDS_DYNAMIC,
1802                        selected=self.op.output_fields)
1803
1804     self.needed_locks = {}
1805     self.share_locks[locking.LEVEL_NODE] = 1
1806
1807     if self.op.names:
1808       self.wanted = _GetWantedNodes(self, self.op.names)
1809     else:
1810       self.wanted = locking.ALL_SET
1811
1812     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1813     self.do_locking = self.do_node_query and self.op.use_locking
1814     if self.do_locking:
1815       # if we don't request only static fields, we need to lock the nodes
1816       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1817
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     """
1823     # The validation of the node list is done in the _GetWantedNodes,
1824     # if non empty, and if empty, there's no validation to do
1825     pass
1826
1827   def Exec(self, feedback_fn):
1828     """Computes the list of nodes and their attributes.
1829
1830     """
1831     all_info = self.cfg.GetAllNodesInfo()
1832     if self.do_locking:
1833       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1834     elif self.wanted != locking.ALL_SET:
1835       nodenames = self.wanted
1836       missing = set(nodenames).difference(all_info.keys())
1837       if missing:
1838         raise errors.OpExecError(
1839           "Some nodes were removed before retrieving their data: %s" % missing)
1840     else:
1841       nodenames = all_info.keys()
1842
1843     nodenames = utils.NiceSort(nodenames)
1844     nodelist = [all_info[name] for name in nodenames]
1845
1846     # begin data gathering
1847
1848     if self.do_node_query:
1849       live_data = {}
1850       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1851                                           self.cfg.GetHypervisorType())
1852       for name in nodenames:
1853         nodeinfo = node_data[name]
1854         if not nodeinfo.failed and nodeinfo.data:
1855           nodeinfo = nodeinfo.data
1856           fn = utils.TryConvert
1857           live_data[name] = {
1858             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1859             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1860             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1861             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1862             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1863             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1864             "bootid": nodeinfo.get('bootid', None),
1865             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1866             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1867             }
1868         else:
1869           live_data[name] = {}
1870     else:
1871       live_data = dict.fromkeys(nodenames, {})
1872
1873     node_to_primary = dict([(name, set()) for name in nodenames])
1874     node_to_secondary = dict([(name, set()) for name in nodenames])
1875
1876     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1877                              "sinst_cnt", "sinst_list"))
1878     if inst_fields & frozenset(self.op.output_fields):
1879       instancelist = self.cfg.GetInstanceList()
1880
1881       for instance_name in instancelist:
1882         inst = self.cfg.GetInstanceInfo(instance_name)
1883         if inst.primary_node in node_to_primary:
1884           node_to_primary[inst.primary_node].add(inst.name)
1885         for secnode in inst.secondary_nodes:
1886           if secnode in node_to_secondary:
1887             node_to_secondary[secnode].add(inst.name)
1888
1889     master_node = self.cfg.GetMasterNode()
1890
1891     # end data gathering
1892
1893     output = []
1894     for node in nodelist:
1895       node_output = []
1896       for field in self.op.output_fields:
1897         if field == "name":
1898           val = node.name
1899         elif field == "pinst_list":
1900           val = list(node_to_primary[node.name])
1901         elif field == "sinst_list":
1902           val = list(node_to_secondary[node.name])
1903         elif field == "pinst_cnt":
1904           val = len(node_to_primary[node.name])
1905         elif field == "sinst_cnt":
1906           val = len(node_to_secondary[node.name])
1907         elif field == "pip":
1908           val = node.primary_ip
1909         elif field == "sip":
1910           val = node.secondary_ip
1911         elif field == "tags":
1912           val = list(node.GetTags())
1913         elif field == "serial_no":
1914           val = node.serial_no
1915         elif field == "master_candidate":
1916           val = node.master_candidate
1917         elif field == "master":
1918           val = node.name == master_node
1919         elif field == "offline":
1920           val = node.offline
1921         elif field == "drained":
1922           val = node.drained
1923         elif self._FIELDS_DYNAMIC.Matches(field):
1924           val = live_data[node.name].get(field, None)
1925         else:
1926           raise errors.ParameterError(field)
1927         node_output.append(val)
1928       output.append(node_output)
1929
1930     return output
1931
1932
1933 class LUQueryNodeVolumes(NoHooksLU):
1934   """Logical unit for getting volumes on node(s).
1935
1936   """
1937   _OP_REQP = ["nodes", "output_fields"]
1938   REQ_BGL = False
1939   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1940   _FIELDS_STATIC = utils.FieldSet("node")
1941
1942   def ExpandNames(self):
1943     _CheckOutputFields(static=self._FIELDS_STATIC,
1944                        dynamic=self._FIELDS_DYNAMIC,
1945                        selected=self.op.output_fields)
1946
1947     self.needed_locks = {}
1948     self.share_locks[locking.LEVEL_NODE] = 1
1949     if not self.op.nodes:
1950       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1951     else:
1952       self.needed_locks[locking.LEVEL_NODE] = \
1953         _GetWantedNodes(self, self.op.nodes)
1954
1955   def CheckPrereq(self):
1956     """Check prerequisites.
1957
1958     This checks that the fields required are valid output fields.
1959
1960     """
1961     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1962
1963   def Exec(self, feedback_fn):
1964     """Computes the list of nodes and their attributes.
1965
1966     """
1967     nodenames = self.nodes
1968     volumes = self.rpc.call_node_volumes(nodenames)
1969
1970     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1971              in self.cfg.GetInstanceList()]
1972
1973     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1974
1975     output = []
1976     for node in nodenames:
1977       if node not in volumes or volumes[node].failed or not volumes[node].data:
1978         continue
1979
1980       node_vols = volumes[node].data[:]
1981       node_vols.sort(key=lambda vol: vol['dev'])
1982
1983       for vol in node_vols:
1984         node_output = []
1985         for field in self.op.output_fields:
1986           if field == "node":
1987             val = node
1988           elif field == "phys":
1989             val = vol['dev']
1990           elif field == "vg":
1991             val = vol['vg']
1992           elif field == "name":
1993             val = vol['name']
1994           elif field == "size":
1995             val = int(float(vol['size']))
1996           elif field == "instance":
1997             for inst in ilist:
1998               if node not in lv_by_node[inst]:
1999                 continue
2000               if vol['name'] in lv_by_node[inst][node]:
2001                 val = inst.name
2002                 break
2003             else:
2004               val = '-'
2005           else:
2006             raise errors.ParameterError(field)
2007           node_output.append(str(val))
2008
2009         output.append(node_output)
2010
2011     return output
2012
2013
2014 class LUAddNode(LogicalUnit):
2015   """Logical unit for adding node to the cluster.
2016
2017   """
2018   HPATH = "node-add"
2019   HTYPE = constants.HTYPE_NODE
2020   _OP_REQP = ["node_name"]
2021
2022   def BuildHooksEnv(self):
2023     """Build hooks env.
2024
2025     This will run on all nodes before, and on all nodes + the new node after.
2026
2027     """
2028     env = {
2029       "OP_TARGET": self.op.node_name,
2030       "NODE_NAME": self.op.node_name,
2031       "NODE_PIP": self.op.primary_ip,
2032       "NODE_SIP": self.op.secondary_ip,
2033       }
2034     nodes_0 = self.cfg.GetNodeList()
2035     nodes_1 = nodes_0 + [self.op.node_name, ]
2036     return env, nodes_0, nodes_1
2037
2038   def CheckPrereq(self):
2039     """Check prerequisites.
2040
2041     This checks:
2042      - the new node is not already in the config
2043      - it is resolvable
2044      - its parameters (single/dual homed) matches the cluster
2045
2046     Any errors are signalled by raising errors.OpPrereqError.
2047
2048     """
2049     node_name = self.op.node_name
2050     cfg = self.cfg
2051
2052     dns_data = utils.HostInfo(node_name)
2053
2054     node = dns_data.name
2055     primary_ip = self.op.primary_ip = dns_data.ip
2056     secondary_ip = getattr(self.op, "secondary_ip", None)
2057     if secondary_ip is None:
2058       secondary_ip = primary_ip
2059     if not utils.IsValidIP(secondary_ip):
2060       raise errors.OpPrereqError("Invalid secondary IP given")
2061     self.op.secondary_ip = secondary_ip
2062
2063     node_list = cfg.GetNodeList()
2064     if not self.op.readd and node in node_list:
2065       raise errors.OpPrereqError("Node %s is already in the configuration" %
2066                                  node)
2067     elif self.op.readd and node not in node_list:
2068       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2069
2070     for existing_node_name in node_list:
2071       existing_node = cfg.GetNodeInfo(existing_node_name)
2072
2073       if self.op.readd and node == existing_node_name:
2074         if (existing_node.primary_ip != primary_ip or
2075             existing_node.secondary_ip != secondary_ip):
2076           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2077                                      " address configuration as before")
2078         continue
2079
2080       if (existing_node.primary_ip == primary_ip or
2081           existing_node.secondary_ip == primary_ip or
2082           existing_node.primary_ip == secondary_ip or
2083           existing_node.secondary_ip == secondary_ip):
2084         raise errors.OpPrereqError("New node ip address(es) conflict with"
2085                                    " existing node %s" % existing_node.name)
2086
2087     # check that the type of the node (single versus dual homed) is the
2088     # same as for the master
2089     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2090     master_singlehomed = myself.secondary_ip == myself.primary_ip
2091     newbie_singlehomed = secondary_ip == primary_ip
2092     if master_singlehomed != newbie_singlehomed:
2093       if master_singlehomed:
2094         raise errors.OpPrereqError("The master has no private ip but the"
2095                                    " new node has one")
2096       else:
2097         raise errors.OpPrereqError("The master has a private ip but the"
2098                                    " new node doesn't have one")
2099
2100     # checks reachablity
2101     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2102       raise errors.OpPrereqError("Node not reachable by ping")
2103
2104     if not newbie_singlehomed:
2105       # check reachability from my secondary ip to newbie's secondary ip
2106       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2107                            source=myself.secondary_ip):
2108         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2109                                    " based ping to noded port")
2110
2111     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2112     mc_now, _ = self.cfg.GetMasterCandidateStats()
2113     master_candidate = mc_now < cp_size
2114
2115     self.new_node = objects.Node(name=node,
2116                                  primary_ip=primary_ip,
2117                                  secondary_ip=secondary_ip,
2118                                  master_candidate=master_candidate,
2119                                  offline=False, drained=False)
2120
2121   def Exec(self, feedback_fn):
2122     """Adds the new node to the cluster.
2123
2124     """
2125     new_node = self.new_node
2126     node = new_node.name
2127
2128     # check connectivity
2129     result = self.rpc.call_version([node])[node]
2130     result.Raise()
2131     if result.data:
2132       if constants.PROTOCOL_VERSION == result.data:
2133         logging.info("Communication to node %s fine, sw version %s match",
2134                      node, result.data)
2135       else:
2136         raise errors.OpExecError("Version mismatch master version %s,"
2137                                  " node version %s" %
2138                                  (constants.PROTOCOL_VERSION, result.data))
2139     else:
2140       raise errors.OpExecError("Cannot get version from the new node")
2141
2142     # setup ssh on node
2143     logging.info("Copy ssh key to node %s", node)
2144     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2145     keyarray = []
2146     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2147                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2148                 priv_key, pub_key]
2149
2150     for i in keyfiles:
2151       f = open(i, 'r')
2152       try:
2153         keyarray.append(f.read())
2154       finally:
2155         f.close()
2156
2157     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2158                                     keyarray[2],
2159                                     keyarray[3], keyarray[4], keyarray[5])
2160
2161     msg = result.RemoteFailMsg()
2162     if msg:
2163       raise errors.OpExecError("Cannot transfer ssh keys to the"
2164                                " new node: %s" % msg)
2165
2166     # Add node to our /etc/hosts, and add key to known_hosts
2167     utils.AddHostToEtcHosts(new_node.name)
2168
2169     if new_node.secondary_ip != new_node.primary_ip:
2170       result = self.rpc.call_node_has_ip_address(new_node.name,
2171                                                  new_node.secondary_ip)
2172       if result.failed or not result.data:
2173         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2174                                  " you gave (%s). Please fix and re-run this"
2175                                  " command." % new_node.secondary_ip)
2176
2177     node_verify_list = [self.cfg.GetMasterNode()]
2178     node_verify_param = {
2179       'nodelist': [node],
2180       # TODO: do a node-net-test as well?
2181     }
2182
2183     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2184                                        self.cfg.GetClusterName())
2185     for verifier in node_verify_list:
2186       if result[verifier].failed or not result[verifier].data:
2187         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2188                                  " for remote verification" % verifier)
2189       if result[verifier].data['nodelist']:
2190         for failed in result[verifier].data['nodelist']:
2191           feedback_fn("ssh/hostname verification failed %s -> %s" %
2192                       (verifier, result[verifier].data['nodelist'][failed]))
2193         raise errors.OpExecError("ssh/hostname verification failed.")
2194
2195     # Distribute updated /etc/hosts and known_hosts to all nodes,
2196     # including the node just added
2197     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2198     dist_nodes = self.cfg.GetNodeList()
2199     if not self.op.readd:
2200       dist_nodes.append(node)
2201     if myself.name in dist_nodes:
2202       dist_nodes.remove(myself.name)
2203
2204     logging.debug("Copying hosts and known_hosts to all nodes")
2205     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2206       result = self.rpc.call_upload_file(dist_nodes, fname)
2207       for to_node, to_result in result.iteritems():
2208         if to_result.failed or not to_result.data:
2209           logging.error("Copy of file %s to node %s failed", fname, to_node)
2210
2211     to_copy = []
2212     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2213     if constants.HTS_USE_VNC.intersection(enabled_hypervisors):
2214       to_copy.append(constants.VNC_PASSWORD_FILE)
2215
2216     for fname in to_copy:
2217       result = self.rpc.call_upload_file([node], fname)
2218       if result[node].failed or not result[node]:
2219         logging.error("Could not copy file %s to node %s", fname, node)
2220
2221     if self.op.readd:
2222       self.context.ReaddNode(new_node)
2223     else:
2224       self.context.AddNode(new_node)
2225
2226
2227 class LUSetNodeParams(LogicalUnit):
2228   """Modifies the parameters of a node.
2229
2230   """
2231   HPATH = "node-modify"
2232   HTYPE = constants.HTYPE_NODE
2233   _OP_REQP = ["node_name"]
2234   REQ_BGL = False
2235
2236   def CheckArguments(self):
2237     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2238     if node_name is None:
2239       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2240     self.op.node_name = node_name
2241     _CheckBooleanOpField(self.op, 'master_candidate')
2242     _CheckBooleanOpField(self.op, 'offline')
2243     _CheckBooleanOpField(self.op, 'drained')
2244     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2245     if all_mods.count(None) == 3:
2246       raise errors.OpPrereqError("Please pass at least one modification")
2247     if all_mods.count(True) > 1:
2248       raise errors.OpPrereqError("Can't set the node into more than one"
2249                                  " state at the same time")
2250
2251   def ExpandNames(self):
2252     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2253
2254   def BuildHooksEnv(self):
2255     """Build hooks env.
2256
2257     This runs on the master node.
2258
2259     """
2260     env = {
2261       "OP_TARGET": self.op.node_name,
2262       "MASTER_CANDIDATE": str(self.op.master_candidate),
2263       "OFFLINE": str(self.op.offline),
2264       "DRAINED": str(self.op.drained),
2265       }
2266     nl = [self.cfg.GetMasterNode(),
2267           self.op.node_name]
2268     return env, nl, nl
2269
2270   def CheckPrereq(self):
2271     """Check prerequisites.
2272
2273     This only checks the instance list against the existing names.
2274
2275     """
2276     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2277
2278     if ((self.op.master_candidate == False or self.op.offline == True or
2279          self.op.drained == True) and node.master_candidate):
2280       # we will demote the node from master_candidate
2281       if self.op.node_name == self.cfg.GetMasterNode():
2282         raise errors.OpPrereqError("The master node has to be a"
2283                                    " master candidate, online and not drained")
2284       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2285       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2286       if num_candidates <= cp_size:
2287         msg = ("Not enough master candidates (desired"
2288                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2289         if self.op.force:
2290           self.LogWarning(msg)
2291         else:
2292           raise errors.OpPrereqError(msg)
2293
2294     if (self.op.master_candidate == True and
2295         ((node.offline and not self.op.offline == False) or
2296          (node.drained and not self.op.drained == False))):
2297       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2298                                  " to master_candidate")
2299
2300     return
2301
2302   def Exec(self, feedback_fn):
2303     """Modifies a node.
2304
2305     """
2306     node = self.node
2307
2308     result = []
2309     changed_mc = False
2310
2311     if self.op.offline is not None:
2312       node.offline = self.op.offline
2313       result.append(("offline", str(self.op.offline)))
2314       if self.op.offline == True:
2315         if node.master_candidate:
2316           node.master_candidate = False
2317           changed_mc = True
2318           result.append(("master_candidate", "auto-demotion due to offline"))
2319         if node.drained:
2320           node.drained = False
2321           result.append(("drained", "clear drained status due to offline"))
2322
2323     if self.op.master_candidate is not None:
2324       node.master_candidate = self.op.master_candidate
2325       changed_mc = True
2326       result.append(("master_candidate", str(self.op.master_candidate)))
2327       if self.op.master_candidate == False:
2328         rrc = self.rpc.call_node_demote_from_mc(node.name)
2329         msg = rrc.RemoteFailMsg()
2330         if msg:
2331           self.LogWarning("Node failed to demote itself: %s" % msg)
2332
2333     if self.op.drained is not None:
2334       node.drained = self.op.drained
2335       if self.op.drained == True:
2336         if node.master_candidate:
2337           node.master_candidate = False
2338           changed_mc = True
2339           result.append(("master_candidate", "auto-demotion due to drain"))
2340         if node.offline:
2341           node.offline = False
2342           result.append(("offline", "clear offline status due to drain"))
2343
2344     # this will trigger configuration file update, if needed
2345     self.cfg.Update(node)
2346     # this will trigger job queue propagation or cleanup
2347     if changed_mc:
2348       self.context.ReaddNode(node)
2349
2350     return result
2351
2352
2353 class LUQueryClusterInfo(NoHooksLU):
2354   """Query cluster configuration.
2355
2356   """
2357   _OP_REQP = []
2358   REQ_BGL = False
2359
2360   def ExpandNames(self):
2361     self.needed_locks = {}
2362
2363   def CheckPrereq(self):
2364     """No prerequsites needed for this LU.
2365
2366     """
2367     pass
2368
2369   def Exec(self, feedback_fn):
2370     """Return cluster config.
2371
2372     """
2373     cluster = self.cfg.GetClusterInfo()
2374     result = {
2375       "software_version": constants.RELEASE_VERSION,
2376       "protocol_version": constants.PROTOCOL_VERSION,
2377       "config_version": constants.CONFIG_VERSION,
2378       "os_api_version": constants.OS_API_VERSION,
2379       "export_version": constants.EXPORT_VERSION,
2380       "architecture": (platform.architecture()[0], platform.machine()),
2381       "name": cluster.cluster_name,
2382       "master": cluster.master_node,
2383       "default_hypervisor": cluster.default_hypervisor,
2384       "enabled_hypervisors": cluster.enabled_hypervisors,
2385       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2386                         for hypervisor in cluster.enabled_hypervisors]),
2387       "beparams": cluster.beparams,
2388       "candidate_pool_size": cluster.candidate_pool_size,
2389       }
2390
2391     return result
2392
2393
2394 class LUQueryConfigValues(NoHooksLU):
2395   """Return configuration values.
2396
2397   """
2398   _OP_REQP = []
2399   REQ_BGL = False
2400   _FIELDS_DYNAMIC = utils.FieldSet()
2401   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2402
2403   def ExpandNames(self):
2404     self.needed_locks = {}
2405
2406     _CheckOutputFields(static=self._FIELDS_STATIC,
2407                        dynamic=self._FIELDS_DYNAMIC,
2408                        selected=self.op.output_fields)
2409
2410   def CheckPrereq(self):
2411     """No prerequisites.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Dump a representation of the cluster config to the standard output.
2418
2419     """
2420     values = []
2421     for field in self.op.output_fields:
2422       if field == "cluster_name":
2423         entry = self.cfg.GetClusterName()
2424       elif field == "master_node":
2425         entry = self.cfg.GetMasterNode()
2426       elif field == "drain_flag":
2427         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2428       else:
2429         raise errors.ParameterError(field)
2430       values.append(entry)
2431     return values
2432
2433
2434 class LUActivateInstanceDisks(NoHooksLU):
2435   """Bring up an instance's disks.
2436
2437   """
2438   _OP_REQP = ["instance_name"]
2439   REQ_BGL = False
2440
2441   def ExpandNames(self):
2442     self._ExpandAndLockInstance()
2443     self.needed_locks[locking.LEVEL_NODE] = []
2444     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2445
2446   def DeclareLocks(self, level):
2447     if level == locking.LEVEL_NODE:
2448       self._LockInstancesNodes()
2449
2450   def CheckPrereq(self):
2451     """Check prerequisites.
2452
2453     This checks that the instance is in the cluster.
2454
2455     """
2456     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2457     assert self.instance is not None, \
2458       "Cannot retrieve locked instance %s" % self.op.instance_name
2459     _CheckNodeOnline(self, self.instance.primary_node)
2460
2461   def Exec(self, feedback_fn):
2462     """Activate the disks.
2463
2464     """
2465     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2466     if not disks_ok:
2467       raise errors.OpExecError("Cannot activate block devices")
2468
2469     return disks_info
2470
2471
2472 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2473   """Prepare the block devices for an instance.
2474
2475   This sets up the block devices on all nodes.
2476
2477   @type lu: L{LogicalUnit}
2478   @param lu: the logical unit on whose behalf we execute
2479   @type instance: L{objects.Instance}
2480   @param instance: the instance for whose disks we assemble
2481   @type ignore_secondaries: boolean
2482   @param ignore_secondaries: if true, errors on secondary nodes
2483       won't result in an error return from the function
2484   @return: False if the operation failed, otherwise a list of
2485       (host, instance_visible_name, node_visible_name)
2486       with the mapping from node devices to instance devices
2487
2488   """
2489   device_info = []
2490   disks_ok = True
2491   iname = instance.name
2492   # With the two passes mechanism we try to reduce the window of
2493   # opportunity for the race condition of switching DRBD to primary
2494   # before handshaking occured, but we do not eliminate it
2495
2496   # The proper fix would be to wait (with some limits) until the
2497   # connection has been made and drbd transitions from WFConnection
2498   # into any other network-connected state (Connected, SyncTarget,
2499   # SyncSource, etc.)
2500
2501   # 1st pass, assemble on all nodes in secondary mode
2502   for inst_disk in instance.disks:
2503     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2504       lu.cfg.SetDiskID(node_disk, node)
2505       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2506       msg = result.RemoteFailMsg()
2507       if msg:
2508         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2509                            " (is_primary=False, pass=1): %s",
2510                            inst_disk.iv_name, node, msg)
2511         if not ignore_secondaries:
2512           disks_ok = False
2513
2514   # FIXME: race condition on drbd migration to primary
2515
2516   # 2nd pass, do only the primary node
2517   for inst_disk in instance.disks:
2518     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2519       if node != instance.primary_node:
2520         continue
2521       lu.cfg.SetDiskID(node_disk, node)
2522       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2523       msg = result.RemoteFailMsg()
2524       if msg:
2525         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2526                            " (is_primary=True, pass=2): %s",
2527                            inst_disk.iv_name, node, msg)
2528         disks_ok = False
2529     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2530
2531   # leave the disks configured for the primary node
2532   # this is a workaround that would be fixed better by
2533   # improving the logical/physical id handling
2534   for disk in instance.disks:
2535     lu.cfg.SetDiskID(disk, instance.primary_node)
2536
2537   return disks_ok, device_info
2538
2539
2540 def _StartInstanceDisks(lu, instance, force):
2541   """Start the disks of an instance.
2542
2543   """
2544   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2545                                            ignore_secondaries=force)
2546   if not disks_ok:
2547     _ShutdownInstanceDisks(lu, instance)
2548     if force is not None and not force:
2549       lu.proc.LogWarning("", hint="If the message above refers to a"
2550                          " secondary node,"
2551                          " you can retry the operation using '--force'.")
2552     raise errors.OpExecError("Disk consistency error")
2553
2554
2555 class LUDeactivateInstanceDisks(NoHooksLU):
2556   """Shutdown an instance's disks.
2557
2558   """
2559   _OP_REQP = ["instance_name"]
2560   REQ_BGL = False
2561
2562   def ExpandNames(self):
2563     self._ExpandAndLockInstance()
2564     self.needed_locks[locking.LEVEL_NODE] = []
2565     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2566
2567   def DeclareLocks(self, level):
2568     if level == locking.LEVEL_NODE:
2569       self._LockInstancesNodes()
2570
2571   def CheckPrereq(self):
2572     """Check prerequisites.
2573
2574     This checks that the instance is in the cluster.
2575
2576     """
2577     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2578     assert self.instance is not None, \
2579       "Cannot retrieve locked instance %s" % self.op.instance_name
2580
2581   def Exec(self, feedback_fn):
2582     """Deactivate the disks
2583
2584     """
2585     instance = self.instance
2586     _SafeShutdownInstanceDisks(self, instance)
2587
2588
2589 def _SafeShutdownInstanceDisks(lu, instance):
2590   """Shutdown block devices of an instance.
2591
2592   This function checks if an instance is running, before calling
2593   _ShutdownInstanceDisks.
2594
2595   """
2596   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2597                                       [instance.hypervisor])
2598   ins_l = ins_l[instance.primary_node]
2599   if ins_l.failed or not isinstance(ins_l.data, list):
2600     raise errors.OpExecError("Can't contact node '%s'" %
2601                              instance.primary_node)
2602
2603   if instance.name in ins_l.data:
2604     raise errors.OpExecError("Instance is running, can't shutdown"
2605                              " block devices.")
2606
2607   _ShutdownInstanceDisks(lu, instance)
2608
2609
2610 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2611   """Shutdown block devices of an instance.
2612
2613   This does the shutdown on all nodes of the instance.
2614
2615   If the ignore_primary is false, errors on the primary node are
2616   ignored.
2617
2618   """
2619   all_result = True
2620   for disk in instance.disks:
2621     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2622       lu.cfg.SetDiskID(top_disk, node)
2623       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2624       msg = result.RemoteFailMsg()
2625       if msg:
2626         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2627                       disk.iv_name, node, msg)
2628         if not ignore_primary or node != instance.primary_node:
2629           all_result = False
2630   return all_result
2631
2632
2633 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2634   """Checks if a node has enough free memory.
2635
2636   This function check if a given node has the needed amount of free
2637   memory. In case the node has less memory or we cannot get the
2638   information from the node, this function raise an OpPrereqError
2639   exception.
2640
2641   @type lu: C{LogicalUnit}
2642   @param lu: a logical unit from which we get configuration data
2643   @type node: C{str}
2644   @param node: the node to check
2645   @type reason: C{str}
2646   @param reason: string to use in the error message
2647   @type requested: C{int}
2648   @param requested: the amount of memory in MiB to check for
2649   @type hypervisor_name: C{str}
2650   @param hypervisor_name: the hypervisor to ask for memory stats
2651   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2652       we cannot check the node
2653
2654   """
2655   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2656   nodeinfo[node].Raise()
2657   free_mem = nodeinfo[node].data.get('memory_free')
2658   if not isinstance(free_mem, int):
2659     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2660                              " was '%s'" % (node, free_mem))
2661   if requested > free_mem:
2662     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2663                              " needed %s MiB, available %s MiB" %
2664                              (node, reason, requested, free_mem))
2665
2666
2667 class LUStartupInstance(LogicalUnit):
2668   """Starts an instance.
2669
2670   """
2671   HPATH = "instance-start"
2672   HTYPE = constants.HTYPE_INSTANCE
2673   _OP_REQP = ["instance_name", "force"]
2674   REQ_BGL = False
2675
2676   def ExpandNames(self):
2677     self._ExpandAndLockInstance()
2678
2679   def BuildHooksEnv(self):
2680     """Build hooks env.
2681
2682     This runs on master, primary and secondary nodes of the instance.
2683
2684     """
2685     env = {
2686       "FORCE": self.op.force,
2687       }
2688     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2689     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2690     return env, nl, nl
2691
2692   def CheckPrereq(self):
2693     """Check prerequisites.
2694
2695     This checks that the instance is in the cluster.
2696
2697     """
2698     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699     assert self.instance is not None, \
2700       "Cannot retrieve locked instance %s" % self.op.instance_name
2701
2702     _CheckNodeOnline(self, instance.primary_node)
2703
2704     bep = self.cfg.GetClusterInfo().FillBE(instance)
2705     # check bridges existance
2706     _CheckInstanceBridgesExist(self, instance)
2707
2708     _CheckNodeFreeMemory(self, instance.primary_node,
2709                          "starting instance %s" % instance.name,
2710                          bep[constants.BE_MEMORY], instance.hypervisor)
2711
2712   def Exec(self, feedback_fn):
2713     """Start the instance.
2714
2715     """
2716     instance = self.instance
2717     force = self.op.force
2718     extra_args = getattr(self.op, "extra_args", "")
2719
2720     self.cfg.MarkInstanceUp(instance.name)
2721
2722     node_current = instance.primary_node
2723
2724     _StartInstanceDisks(self, instance, force)
2725
2726     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2727     msg = result.RemoteFailMsg()
2728     if msg:
2729       _ShutdownInstanceDisks(self, instance)
2730       raise errors.OpExecError("Could not start instance: %s" % msg)
2731
2732
2733 class LURebootInstance(LogicalUnit):
2734   """Reboot an instance.
2735
2736   """
2737   HPATH = "instance-reboot"
2738   HTYPE = constants.HTYPE_INSTANCE
2739   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2740   REQ_BGL = False
2741
2742   def ExpandNames(self):
2743     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2744                                    constants.INSTANCE_REBOOT_HARD,
2745                                    constants.INSTANCE_REBOOT_FULL]:
2746       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2747                                   (constants.INSTANCE_REBOOT_SOFT,
2748                                    constants.INSTANCE_REBOOT_HARD,
2749                                    constants.INSTANCE_REBOOT_FULL))
2750     self._ExpandAndLockInstance()
2751
2752   def BuildHooksEnv(self):
2753     """Build hooks env.
2754
2755     This runs on master, primary and secondary nodes of the instance.
2756
2757     """
2758     env = {
2759       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2760       }
2761     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2762     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2763     return env, nl, nl
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775     _CheckNodeOnline(self, instance.primary_node)
2776
2777     # check bridges existance
2778     _CheckInstanceBridgesExist(self, instance)
2779
2780   def Exec(self, feedback_fn):
2781     """Reboot the instance.
2782
2783     """
2784     instance = self.instance
2785     ignore_secondaries = self.op.ignore_secondaries
2786     reboot_type = self.op.reboot_type
2787     extra_args = getattr(self.op, "extra_args", "")
2788
2789     node_current = instance.primary_node
2790
2791     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2792                        constants.INSTANCE_REBOOT_HARD]:
2793       result = self.rpc.call_instance_reboot(node_current, instance,
2794                                              reboot_type, extra_args)
2795       if result.failed or not result.data:
2796         raise errors.OpExecError("Could not reboot instance")
2797     else:
2798       if not self.rpc.call_instance_shutdown(node_current, instance):
2799         raise errors.OpExecError("could not shutdown instance for full reboot")
2800       _ShutdownInstanceDisks(self, instance)
2801       _StartInstanceDisks(self, instance, ignore_secondaries)
2802       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2803       msg = result.RemoteFailMsg()
2804       if msg:
2805         _ShutdownInstanceDisks(self, instance)
2806         raise errors.OpExecError("Could not start instance for"
2807                                  " full reboot: %s" % msg)
2808
2809     self.cfg.MarkInstanceUp(instance.name)
2810
2811
2812 class LUShutdownInstance(LogicalUnit):
2813   """Shutdown an instance.
2814
2815   """
2816   HPATH = "instance-stop"
2817   HTYPE = constants.HTYPE_INSTANCE
2818   _OP_REQP = ["instance_name"]
2819   REQ_BGL = False
2820
2821   def ExpandNames(self):
2822     self._ExpandAndLockInstance()
2823
2824   def BuildHooksEnv(self):
2825     """Build hooks env.
2826
2827     This runs on master, primary and secondary nodes of the instance.
2828
2829     """
2830     env = _BuildInstanceHookEnvByObject(self, self.instance)
2831     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2832     return env, nl, nl
2833
2834   def CheckPrereq(self):
2835     """Check prerequisites.
2836
2837     This checks that the instance is in the cluster.
2838
2839     """
2840     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2841     assert self.instance is not None, \
2842       "Cannot retrieve locked instance %s" % self.op.instance_name
2843     _CheckNodeOnline(self, self.instance.primary_node)
2844
2845   def Exec(self, feedback_fn):
2846     """Shutdown the instance.
2847
2848     """
2849     instance = self.instance
2850     node_current = instance.primary_node
2851     self.cfg.MarkInstanceDown(instance.name)
2852     result = self.rpc.call_instance_shutdown(node_current, instance)
2853     if result.failed or not result.data:
2854       self.proc.LogWarning("Could not shutdown instance")
2855
2856     _ShutdownInstanceDisks(self, instance)
2857
2858
2859 class LUReinstallInstance(LogicalUnit):
2860   """Reinstall an instance.
2861
2862   """
2863   HPATH = "instance-reinstall"
2864   HTYPE = constants.HTYPE_INSTANCE
2865   _OP_REQP = ["instance_name"]
2866   REQ_BGL = False
2867
2868   def ExpandNames(self):
2869     self._ExpandAndLockInstance()
2870
2871   def BuildHooksEnv(self):
2872     """Build hooks env.
2873
2874     This runs on master, primary and secondary nodes of the instance.
2875
2876     """
2877     env = _BuildInstanceHookEnvByObject(self, self.instance)
2878     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2879     return env, nl, nl
2880
2881   def CheckPrereq(self):
2882     """Check prerequisites.
2883
2884     This checks that the instance is in the cluster and is not running.
2885
2886     """
2887     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2888     assert instance is not None, \
2889       "Cannot retrieve locked instance %s" % self.op.instance_name
2890     _CheckNodeOnline(self, instance.primary_node)
2891
2892     if instance.disk_template == constants.DT_DISKLESS:
2893       raise errors.OpPrereqError("Instance '%s' has no disks" %
2894                                  self.op.instance_name)
2895     if instance.admin_up:
2896       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2897                                  self.op.instance_name)
2898     remote_info = self.rpc.call_instance_info(instance.primary_node,
2899                                               instance.name,
2900                                               instance.hypervisor)
2901     if remote_info.failed or remote_info.data:
2902       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2903                                  (self.op.instance_name,
2904                                   instance.primary_node))
2905
2906     self.op.os_type = getattr(self.op, "os_type", None)
2907     if self.op.os_type is not None:
2908       # OS verification
2909       pnode = self.cfg.GetNodeInfo(
2910         self.cfg.ExpandNodeName(instance.primary_node))
2911       if pnode is None:
2912         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2913                                    self.op.pnode)
2914       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2915       result.Raise()
2916       if not isinstance(result.data, objects.OS):
2917         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2918                                    " primary node"  % self.op.os_type)
2919
2920     self.instance = instance
2921
2922   def Exec(self, feedback_fn):
2923     """Reinstall the instance.
2924
2925     """
2926     inst = self.instance
2927
2928     if self.op.os_type is not None:
2929       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2930       inst.os = self.op.os_type
2931       self.cfg.Update(inst)
2932
2933     _StartInstanceDisks(self, inst, None)
2934     try:
2935       feedback_fn("Running the instance OS create scripts...")
2936       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2937       msg = result.RemoteFailMsg()
2938       if msg:
2939         raise errors.OpExecError("Could not install OS for instance %s"
2940                                  " on node %s: %s" %
2941                                  (inst.name, inst.primary_node, msg))
2942     finally:
2943       _ShutdownInstanceDisks(self, inst)
2944
2945
2946 class LURenameInstance(LogicalUnit):
2947   """Rename an instance.
2948
2949   """
2950   HPATH = "instance-rename"
2951   HTYPE = constants.HTYPE_INSTANCE
2952   _OP_REQP = ["instance_name", "new_name"]
2953
2954   def BuildHooksEnv(self):
2955     """Build hooks env.
2956
2957     This runs on master, primary and secondary nodes of the instance.
2958
2959     """
2960     env = _BuildInstanceHookEnvByObject(self, self.instance)
2961     env["INSTANCE_NEW_NAME"] = self.op.new_name
2962     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2963     return env, nl, nl
2964
2965   def CheckPrereq(self):
2966     """Check prerequisites.
2967
2968     This checks that the instance is in the cluster and is not running.
2969
2970     """
2971     instance = self.cfg.GetInstanceInfo(
2972       self.cfg.ExpandInstanceName(self.op.instance_name))
2973     if instance is None:
2974       raise errors.OpPrereqError("Instance '%s' not known" %
2975                                  self.op.instance_name)
2976     _CheckNodeOnline(self, instance.primary_node)
2977
2978     if instance.admin_up:
2979       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2980                                  self.op.instance_name)
2981     remote_info = self.rpc.call_instance_info(instance.primary_node,
2982                                               instance.name,
2983                                               instance.hypervisor)
2984     remote_info.Raise()
2985     if remote_info.data:
2986       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2987                                  (self.op.instance_name,
2988                                   instance.primary_node))
2989     self.instance = instance
2990
2991     # new name verification
2992     name_info = utils.HostInfo(self.op.new_name)
2993
2994     self.op.new_name = new_name = name_info.name
2995     instance_list = self.cfg.GetInstanceList()
2996     if new_name in instance_list:
2997       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2998                                  new_name)
2999
3000     if not getattr(self.op, "ignore_ip", False):
3001       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3002         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3003                                    (name_info.ip, new_name))
3004
3005
3006   def Exec(self, feedback_fn):
3007     """Reinstall the instance.
3008
3009     """
3010     inst = self.instance
3011     old_name = inst.name
3012
3013     if inst.disk_template == constants.DT_FILE:
3014       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3015
3016     self.cfg.RenameInstance(inst.name, self.op.new_name)
3017     # Change the instance lock. This is definitely safe while we hold the BGL
3018     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3019     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3020
3021     # re-read the instance from the configuration after rename
3022     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3023
3024     if inst.disk_template == constants.DT_FILE:
3025       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3026       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3027                                                      old_file_storage_dir,
3028                                                      new_file_storage_dir)
3029       result.Raise()
3030       if not result.data:
3031         raise errors.OpExecError("Could not connect to node '%s' to rename"
3032                                  " directory '%s' to '%s' (but the instance"
3033                                  " has been renamed in Ganeti)" % (
3034                                  inst.primary_node, old_file_storage_dir,
3035                                  new_file_storage_dir))
3036
3037       if not result.data[0]:
3038         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3039                                  " (but the instance has been renamed in"
3040                                  " Ganeti)" % (old_file_storage_dir,
3041                                                new_file_storage_dir))
3042
3043     _StartInstanceDisks(self, inst, None)
3044     try:
3045       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3046                                                  old_name)
3047       msg = result.RemoteFailMsg()
3048       if msg:
3049         msg = ("Could not run OS rename script for instance %s on node %s"
3050                " (but the instance has been renamed in Ganeti): %s" %
3051                (inst.name, inst.primary_node, msg))
3052         self.proc.LogWarning(msg)
3053     finally:
3054       _ShutdownInstanceDisks(self, inst)
3055
3056
3057 class LURemoveInstance(LogicalUnit):
3058   """Remove an instance.
3059
3060   """
3061   HPATH = "instance-remove"
3062   HTYPE = constants.HTYPE_INSTANCE
3063   _OP_REQP = ["instance_name", "ignore_failures"]
3064   REQ_BGL = False
3065
3066   def ExpandNames(self):
3067     self._ExpandAndLockInstance()
3068     self.needed_locks[locking.LEVEL_NODE] = []
3069     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3070
3071   def DeclareLocks(self, level):
3072     if level == locking.LEVEL_NODE:
3073       self._LockInstancesNodes()
3074
3075   def BuildHooksEnv(self):
3076     """Build hooks env.
3077
3078     This runs on master, primary and secondary nodes of the instance.
3079
3080     """
3081     env = _BuildInstanceHookEnvByObject(self, self.instance)
3082     nl = [self.cfg.GetMasterNode()]
3083     return env, nl, nl
3084
3085   def CheckPrereq(self):
3086     """Check prerequisites.
3087
3088     This checks that the instance is in the cluster.
3089
3090     """
3091     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3092     assert self.instance is not None, \
3093       "Cannot retrieve locked instance %s" % self.op.instance_name
3094
3095   def Exec(self, feedback_fn):
3096     """Remove the instance.
3097
3098     """
3099     instance = self.instance
3100     logging.info("Shutting down instance %s on node %s",
3101                  instance.name, instance.primary_node)
3102
3103     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3104     if result.failed or not result.data:
3105       if self.op.ignore_failures:
3106         feedback_fn("Warning: can't shutdown instance")
3107       else:
3108         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3109                                  (instance.name, instance.primary_node))
3110
3111     logging.info("Removing block devices for instance %s", instance.name)
3112
3113     if not _RemoveDisks(self, instance):
3114       if self.op.ignore_failures:
3115         feedback_fn("Warning: can't remove instance's disks")
3116       else:
3117         raise errors.OpExecError("Can't remove instance's disks")
3118
3119     logging.info("Removing instance %s out of cluster config", instance.name)
3120
3121     self.cfg.RemoveInstance(instance.name)
3122     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3123
3124
3125 class LUQueryInstances(NoHooksLU):
3126   """Logical unit for querying instances.
3127
3128   """
3129   _OP_REQP = ["output_fields", "names", "use_locking"]
3130   REQ_BGL = False
3131   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3132                                     "admin_state", "admin_ram",
3133                                     "disk_template", "ip", "mac", "bridge",
3134                                     "sda_size", "sdb_size", "vcpus", "tags",
3135                                     "network_port", "beparams",
3136                                     "(disk).(size)/([0-9]+)",
3137                                     "(disk).(sizes)", "disk_usage",
3138                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3139                                     "(nic).(macs|ips|bridges)",
3140                                     "(disk|nic).(count)",
3141                                     "serial_no", "hypervisor", "hvparams",] +
3142                                   ["hv/%s" % name
3143                                    for name in constants.HVS_PARAMETERS] +
3144                                   ["be/%s" % name
3145                                    for name in constants.BES_PARAMETERS])
3146   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3147
3148
3149   def ExpandNames(self):
3150     _CheckOutputFields(static=self._FIELDS_STATIC,
3151                        dynamic=self._FIELDS_DYNAMIC,
3152                        selected=self.op.output_fields)
3153
3154     self.needed_locks = {}
3155     self.share_locks[locking.LEVEL_INSTANCE] = 1
3156     self.share_locks[locking.LEVEL_NODE] = 1
3157
3158     if self.op.names:
3159       self.wanted = _GetWantedInstances(self, self.op.names)
3160     else:
3161       self.wanted = locking.ALL_SET
3162
3163     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3164     self.do_locking = self.do_node_query and self.op.use_locking
3165     if self.do_locking:
3166       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3167       self.needed_locks[locking.LEVEL_NODE] = []
3168       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3169
3170   def DeclareLocks(self, level):
3171     if level == locking.LEVEL_NODE and self.do_locking:
3172       self._LockInstancesNodes()
3173
3174   def CheckPrereq(self):
3175     """Check prerequisites.
3176
3177     """
3178     pass
3179
3180   def Exec(self, feedback_fn):
3181     """Computes the list of nodes and their attributes.
3182
3183     """
3184     all_info = self.cfg.GetAllInstancesInfo()
3185     if self.wanted == locking.ALL_SET:
3186       # caller didn't specify instance names, so ordering is not important
3187       if self.do_locking:
3188         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3189       else:
3190         instance_names = all_info.keys()
3191       instance_names = utils.NiceSort(instance_names)
3192     else:
3193       # caller did specify names, so we must keep the ordering
3194       if self.do_locking:
3195         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3196       else:
3197         tgt_set = all_info.keys()
3198       missing = set(self.wanted).difference(tgt_set)
3199       if missing:
3200         raise errors.OpExecError("Some instances were removed before"
3201                                  " retrieving their data: %s" % missing)
3202       instance_names = self.wanted
3203
3204     instance_list = [all_info[iname] for iname in instance_names]
3205
3206     # begin data gathering
3207
3208     nodes = frozenset([inst.primary_node for inst in instance_list])
3209     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3210
3211     bad_nodes = []
3212     off_nodes = []
3213     if self.do_node_query:
3214       live_data = {}
3215       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3216       for name in nodes:
3217         result = node_data[name]
3218         if result.offline:
3219           # offline nodes will be in both lists
3220           off_nodes.append(name)
3221         if result.failed:
3222           bad_nodes.append(name)
3223         else:
3224           if result.data:
3225             live_data.update(result.data)
3226             # else no instance is alive
3227     else:
3228       live_data = dict([(name, {}) for name in instance_names])
3229
3230     # end data gathering
3231
3232     HVPREFIX = "hv/"
3233     BEPREFIX = "be/"
3234     output = []
3235     for instance in instance_list:
3236       iout = []
3237       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3238       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3239       for field in self.op.output_fields:
3240         st_match = self._FIELDS_STATIC.Matches(field)
3241         if field == "name":
3242           val = instance.name
3243         elif field == "os":
3244           val = instance.os
3245         elif field == "pnode":
3246           val = instance.primary_node
3247         elif field == "snodes":
3248           val = list(instance.secondary_nodes)
3249         elif field == "admin_state":
3250           val = instance.admin_up
3251         elif field == "oper_state":
3252           if instance.primary_node in bad_nodes:
3253             val = None
3254           else:
3255             val = bool(live_data.get(instance.name))
3256         elif field == "status":
3257           if instance.primary_node in off_nodes:
3258             val = "ERROR_nodeoffline"
3259           elif instance.primary_node in bad_nodes:
3260             val = "ERROR_nodedown"
3261           else:
3262             running = bool(live_data.get(instance.name))
3263             if running:
3264               if instance.admin_up:
3265                 val = "running"
3266               else:
3267                 val = "ERROR_up"
3268             else:
3269               if instance.admin_up:
3270                 val = "ERROR_down"
3271               else:
3272                 val = "ADMIN_down"
3273         elif field == "oper_ram":
3274           if instance.primary_node in bad_nodes:
3275             val = None
3276           elif instance.name in live_data:
3277             val = live_data[instance.name].get("memory", "?")
3278           else:
3279             val = "-"
3280         elif field == "disk_template":
3281           val = instance.disk_template
3282         elif field == "ip":
3283           val = instance.nics[0].ip
3284         elif field == "bridge":
3285           val = instance.nics[0].bridge
3286         elif field == "mac":
3287           val = instance.nics[0].mac
3288         elif field == "sda_size" or field == "sdb_size":
3289           idx = ord(field[2]) - ord('a')
3290           try:
3291             val = instance.FindDisk(idx).size
3292           except errors.OpPrereqError:
3293             val = None
3294         elif field == "disk_usage": # total disk usage per node
3295           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3296           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3297         elif field == "tags":
3298           val = list(instance.GetTags())
3299         elif field == "serial_no":
3300           val = instance.serial_no
3301         elif field == "network_port":
3302           val = instance.network_port
3303         elif field == "hypervisor":
3304           val = instance.hypervisor
3305         elif field == "hvparams":
3306           val = i_hv
3307         elif (field.startswith(HVPREFIX) and
3308               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3309           val = i_hv.get(field[len(HVPREFIX):], None)
3310         elif field == "beparams":
3311           val = i_be
3312         elif (field.startswith(BEPREFIX) and
3313               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3314           val = i_be.get(field[len(BEPREFIX):], None)
3315         elif st_match and st_match.groups():
3316           # matches a variable list
3317           st_groups = st_match.groups()
3318           if st_groups and st_groups[0] == "disk":
3319             if st_groups[1] == "count":
3320               val = len(instance.disks)
3321             elif st_groups[1] == "sizes":
3322               val = [disk.size for disk in instance.disks]
3323             elif st_groups[1] == "size":
3324               try:
3325                 val = instance.FindDisk(st_groups[2]).size
3326               except errors.OpPrereqError:
3327                 val = None
3328             else:
3329               assert False, "Unhandled disk parameter"
3330           elif st_groups[0] == "nic":
3331             if st_groups[1] == "count":
3332               val = len(instance.nics)
3333             elif st_groups[1] == "macs":
3334               val = [nic.mac for nic in instance.nics]
3335             elif st_groups[1] == "ips":
3336               val = [nic.ip for nic in instance.nics]
3337             elif st_groups[1] == "bridges":
3338               val = [nic.bridge for nic in instance.nics]
3339             else:
3340               # index-based item
3341               nic_idx = int(st_groups[2])
3342               if nic_idx >= len(instance.nics):
3343                 val = None
3344               else:
3345                 if st_groups[1] == "mac":
3346                   val = instance.nics[nic_idx].mac
3347                 elif st_groups[1] == "ip":
3348                   val = instance.nics[nic_idx].ip
3349                 elif st_groups[1] == "bridge":
3350                   val = instance.nics[nic_idx].bridge
3351                 else:
3352                   assert False, "Unhandled NIC parameter"
3353           else:
3354             assert False, "Unhandled variable parameter"
3355         else:
3356           raise errors.ParameterError(field)
3357         iout.append(val)
3358       output.append(iout)
3359
3360     return output
3361
3362
3363 class LUFailoverInstance(LogicalUnit):
3364   """Failover an instance.
3365
3366   """
3367   HPATH = "instance-failover"
3368   HTYPE = constants.HTYPE_INSTANCE
3369   _OP_REQP = ["instance_name", "ignore_consistency"]
3370   REQ_BGL = False
3371
3372   def ExpandNames(self):
3373     self._ExpandAndLockInstance()
3374     self.needed_locks[locking.LEVEL_NODE] = []
3375     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3376
3377   def DeclareLocks(self, level):
3378     if level == locking.LEVEL_NODE:
3379       self._LockInstancesNodes()
3380
3381   def BuildHooksEnv(self):
3382     """Build hooks env.
3383
3384     This runs on master, primary and secondary nodes of the instance.
3385
3386     """
3387     env = {
3388       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3389       }
3390     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3391     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3392     return env, nl, nl
3393
3394   def CheckPrereq(self):
3395     """Check prerequisites.
3396
3397     This checks that the instance is in the cluster.
3398
3399     """
3400     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3401     assert self.instance is not None, \
3402       "Cannot retrieve locked instance %s" % self.op.instance_name
3403
3404     bep = self.cfg.GetClusterInfo().FillBE(instance)
3405     if instance.disk_template not in constants.DTS_NET_MIRROR:
3406       raise errors.OpPrereqError("Instance's disk layout is not"
3407                                  " network mirrored, cannot failover.")
3408
3409     secondary_nodes = instance.secondary_nodes
3410     if not secondary_nodes:
3411       raise errors.ProgrammerError("no secondary node but using "
3412                                    "a mirrored disk template")
3413
3414     target_node = secondary_nodes[0]
3415     _CheckNodeOnline(self, target_node)
3416     _CheckNodeNotDrained(self, target_node)
3417     # check memory requirements on the secondary node
3418     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3419                          instance.name, bep[constants.BE_MEMORY],
3420                          instance.hypervisor)
3421
3422     # check bridge existance
3423     brlist = [nic.bridge for nic in instance.nics]
3424     result = self.rpc.call_bridges_exist(target_node, brlist)
3425     result.Raise()
3426     if not result.data:
3427       raise errors.OpPrereqError("One or more target bridges %s does not"
3428                                  " exist on destination node '%s'" %
3429                                  (brlist, target_node))
3430
3431   def Exec(self, feedback_fn):
3432     """Failover an instance.
3433
3434     The failover is done by shutting it down on its present node and
3435     starting it on the secondary.
3436
3437     """
3438     instance = self.instance
3439
3440     source_node = instance.primary_node
3441     target_node = instance.secondary_nodes[0]
3442
3443     feedback_fn("* checking disk consistency between source and target")
3444     for dev in instance.disks:
3445       # for drbd, these are drbd over lvm
3446       if not _CheckDiskConsistency(self, dev, target_node, False):
3447         if instance.admin_up and not self.op.ignore_consistency:
3448           raise errors.OpExecError("Disk %s is degraded on target node,"
3449                                    " aborting failover." % dev.iv_name)
3450
3451     feedback_fn("* shutting down instance on source node")
3452     logging.info("Shutting down instance %s on node %s",
3453                  instance.name, source_node)
3454
3455     result = self.rpc.call_instance_shutdown(source_node, instance)
3456     if result.failed or not result.data:
3457       if self.op.ignore_consistency:
3458         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3459                              " Proceeding"
3460                              " anyway. Please make sure node %s is down",
3461                              instance.name, source_node, source_node)
3462       else:
3463         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3464                                  (instance.name, source_node))
3465
3466     feedback_fn("* deactivating the instance's disks on source node")
3467     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3468       raise errors.OpExecError("Can't shut down the instance's disks.")
3469
3470     instance.primary_node = target_node
3471     # distribute new instance config to the other nodes
3472     self.cfg.Update(instance)
3473
3474     # Only start the instance if it's marked as up
3475     if instance.admin_up:
3476       feedback_fn("* activating the instance's disks on target node")
3477       logging.info("Starting instance %s on node %s",
3478                    instance.name, target_node)
3479
3480       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3481                                                ignore_secondaries=True)
3482       if not disks_ok:
3483         _ShutdownInstanceDisks(self, instance)
3484         raise errors.OpExecError("Can't activate the instance's disks")
3485
3486       feedback_fn("* starting the instance on the target node")
3487       result = self.rpc.call_instance_start(target_node, instance, None)
3488       msg = result.RemoteFailMsg()
3489       if msg:
3490         _ShutdownInstanceDisks(self, instance)
3491         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3492                                  (instance.name, target_node, msg))
3493
3494
3495 class LUMigrateInstance(LogicalUnit):
3496   """Migrate an instance.
3497
3498   This is migration without shutting down, compared to the failover,
3499   which is done with shutdown.
3500
3501   """
3502   HPATH = "instance-migrate"
3503   HTYPE = constants.HTYPE_INSTANCE
3504   _OP_REQP = ["instance_name", "live", "cleanup"]
3505
3506   REQ_BGL = False
3507
3508   def ExpandNames(self):
3509     self._ExpandAndLockInstance()
3510     self.needed_locks[locking.LEVEL_NODE] = []
3511     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3512
3513   def DeclareLocks(self, level):
3514     if level == locking.LEVEL_NODE:
3515       self._LockInstancesNodes()
3516
3517   def BuildHooksEnv(self):
3518     """Build hooks env.
3519
3520     This runs on master, primary and secondary nodes of the instance.
3521
3522     """
3523     env = _BuildInstanceHookEnvByObject(self, self.instance)
3524     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3525     return env, nl, nl
3526
3527   def CheckPrereq(self):
3528     """Check prerequisites.
3529
3530     This checks that the instance is in the cluster.
3531
3532     """
3533     instance = self.cfg.GetInstanceInfo(
3534       self.cfg.ExpandInstanceName(self.op.instance_name))
3535     if instance is None:
3536       raise errors.OpPrereqError("Instance '%s' not known" %
3537                                  self.op.instance_name)
3538
3539     if instance.disk_template != constants.DT_DRBD8:
3540       raise errors.OpPrereqError("Instance's disk layout is not"
3541                                  " drbd8, cannot migrate.")
3542
3543     secondary_nodes = instance.secondary_nodes
3544     if not secondary_nodes:
3545       raise errors.ConfigurationError("No secondary node but using"
3546                                       " drbd8 disk template")
3547
3548     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3549
3550     target_node = secondary_nodes[0]
3551     # check memory requirements on the secondary node
3552     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3553                          instance.name, i_be[constants.BE_MEMORY],
3554                          instance.hypervisor)
3555
3556     # check bridge existance
3557     brlist = [nic.bridge for nic in instance.nics]
3558     result = self.rpc.call_bridges_exist(target_node, brlist)
3559     if result.failed or not result.data:
3560       raise errors.OpPrereqError("One or more target bridges %s does not"
3561                                  " exist on destination node '%s'" %
3562                                  (brlist, target_node))
3563
3564     if not self.op.cleanup:
3565       _CheckNodeNotDrained(self, target_node)
3566       result = self.rpc.call_instance_migratable(instance.primary_node,
3567                                                  instance)
3568       msg = result.RemoteFailMsg()
3569       if msg:
3570         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3571                                    msg)
3572
3573     self.instance = instance
3574
3575   def _WaitUntilSync(self):
3576     """Poll with custom rpc for disk sync.
3577
3578     This uses our own step-based rpc call.
3579
3580     """
3581     self.feedback_fn("* wait until resync is done")
3582     all_done = False
3583     while not all_done:
3584       all_done = True
3585       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3586                                             self.nodes_ip,
3587                                             self.instance.disks)
3588       min_percent = 100
3589       for node, nres in result.items():
3590         msg = nres.RemoteFailMsg()
3591         if msg:
3592           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3593                                    (node, msg))
3594         node_done, node_percent = nres.payload
3595         all_done = all_done and node_done
3596         if node_percent is not None:
3597           min_percent = min(min_percent, node_percent)
3598       if not all_done:
3599         if min_percent < 100:
3600           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3601         time.sleep(2)
3602
3603   def _EnsureSecondary(self, node):
3604     """Demote a node to secondary.
3605
3606     """
3607     self.feedback_fn("* switching node %s to secondary mode" % node)
3608
3609     for dev in self.instance.disks:
3610       self.cfg.SetDiskID(dev, node)
3611
3612     result = self.rpc.call_blockdev_close(node, self.instance.name,
3613                                           self.instance.disks)
3614     msg = result.RemoteFailMsg()
3615     if msg:
3616       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3617                                " error %s" % (node, msg))
3618
3619   def _GoStandalone(self):
3620     """Disconnect from the network.
3621
3622     """
3623     self.feedback_fn("* changing into standalone mode")
3624     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3625                                                self.instance.disks)
3626     for node, nres in result.items():
3627       msg = nres.RemoteFailMsg()
3628       if msg:
3629         raise errors.OpExecError("Cannot disconnect disks node %s,"
3630                                  " error %s" % (node, msg))
3631
3632   def _GoReconnect(self, multimaster):
3633     """Reconnect to the network.
3634
3635     """
3636     if multimaster:
3637       msg = "dual-master"
3638     else:
3639       msg = "single-master"
3640     self.feedback_fn("* changing disks into %s mode" % msg)
3641     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3642                                            self.instance.disks,
3643                                            self.instance.name, multimaster)
3644     for node, nres in result.items():
3645       msg = nres.RemoteFailMsg()
3646       if msg:
3647         raise errors.OpExecError("Cannot change disks config on node %s,"
3648                                  " error: %s" % (node, msg))
3649
3650   def _ExecCleanup(self):
3651     """Try to cleanup after a failed migration.
3652
3653     The cleanup is done by:
3654       - check that the instance is running only on one node
3655         (and update the config if needed)
3656       - change disks on its secondary node to secondary
3657       - wait until disks are fully synchronized
3658       - disconnect from the network
3659       - change disks into single-master mode
3660       - wait again until disks are fully synchronized
3661
3662     """
3663     instance = self.instance
3664     target_node = self.target_node
3665     source_node = self.source_node
3666
3667     # check running on only one node
3668     self.feedback_fn("* checking where the instance actually runs"
3669                      " (if this hangs, the hypervisor might be in"
3670                      " a bad state)")
3671     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3672     for node, result in ins_l.items():
3673       result.Raise()
3674       if not isinstance(result.data, list):
3675         raise errors.OpExecError("Can't contact node '%s'" % node)
3676
3677     runningon_source = instance.name in ins_l[source_node].data
3678     runningon_target = instance.name in ins_l[target_node].data
3679
3680     if runningon_source and runningon_target:
3681       raise errors.OpExecError("Instance seems to be running on two nodes,"
3682                                " or the hypervisor is confused. You will have"
3683                                " to ensure manually that it runs only on one"
3684                                " and restart this operation.")
3685
3686     if not (runningon_source or runningon_target):
3687       raise errors.OpExecError("Instance does not seem to be running at all."
3688                                " In this case, it's safer to repair by"
3689                                " running 'gnt-instance stop' to ensure disk"
3690                                " shutdown, and then restarting it.")
3691
3692     if runningon_target:
3693       # the migration has actually succeeded, we need to update the config
3694       self.feedback_fn("* instance running on secondary node (%s),"
3695                        " updating config" % target_node)
3696       instance.primary_node = target_node
3697       self.cfg.Update(instance)
3698       demoted_node = source_node
3699     else:
3700       self.feedback_fn("* instance confirmed to be running on its"
3701                        " primary node (%s)" % source_node)
3702       demoted_node = target_node
3703
3704     self._EnsureSecondary(demoted_node)
3705     try:
3706       self._WaitUntilSync()
3707     except errors.OpExecError:
3708       # we ignore here errors, since if the device is standalone, it
3709       # won't be able to sync
3710       pass
3711     self._GoStandalone()
3712     self._GoReconnect(False)
3713     self._WaitUntilSync()
3714
3715     self.feedback_fn("* done")
3716
3717   def _RevertDiskStatus(self):
3718     """Try to revert the disk status after a failed migration.
3719
3720     """
3721     target_node = self.target_node
3722     try:
3723       self._EnsureSecondary(target_node)
3724       self._GoStandalone()
3725       self._GoReconnect(False)
3726       self._WaitUntilSync()
3727     except errors.OpExecError, err:
3728       self.LogWarning("Migration failed and I can't reconnect the"
3729                       " drives: error '%s'\n"
3730                       "Please look and recover the instance status" %
3731                       str(err))
3732
3733   def _AbortMigration(self):
3734     """Call the hypervisor code to abort a started migration.
3735
3736     """
3737     instance = self.instance
3738     target_node = self.target_node
3739     migration_info = self.migration_info
3740
3741     abort_result = self.rpc.call_finalize_migration(target_node,
3742                                                     instance,
3743                                                     migration_info,
3744                                                     False)
3745     abort_msg = abort_result.RemoteFailMsg()
3746     if abort_msg:
3747       logging.error("Aborting migration failed on target node %s: %s" %
3748                     (target_node, abort_msg))
3749       # Don't raise an exception here, as we stil have to try to revert the
3750       # disk status, even if this step failed.
3751
3752   def _ExecMigration(self):
3753     """Migrate an instance.
3754
3755     The migrate is done by:
3756       - change the disks into dual-master mode
3757       - wait until disks are fully synchronized again
3758       - migrate the instance
3759       - change disks on the new secondary node (the old primary) to secondary
3760       - wait until disks are fully synchronized
3761       - change disks into single-master mode
3762
3763     """
3764     instance = self.instance
3765     target_node = self.target_node
3766     source_node = self.source_node
3767
3768     self.feedback_fn("* checking disk consistency between source and target")
3769     for dev in instance.disks:
3770       if not _CheckDiskConsistency(self, dev, target_node, False):
3771         raise errors.OpExecError("Disk %s is degraded or not fully"
3772                                  " synchronized on target node,"
3773                                  " aborting migrate." % dev.iv_name)
3774
3775     # First get the migration information from the remote node
3776     result = self.rpc.call_migration_info(source_node, instance)
3777     msg = result.RemoteFailMsg()
3778     if msg:
3779       log_err = ("Failed fetching source migration information from %s: %s" %
3780                  (source_node, msg))
3781       logging.error(log_err)
3782       raise errors.OpExecError(log_err)
3783
3784     self.migration_info = migration_info = result.payload
3785
3786     # Then switch the disks to master/master mode
3787     self._EnsureSecondary(target_node)
3788     self._GoStandalone()
3789     self._GoReconnect(True)
3790     self._WaitUntilSync()
3791
3792     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3793     result = self.rpc.call_accept_instance(target_node,
3794                                            instance,
3795                                            migration_info,
3796                                            self.nodes_ip[target_node])
3797
3798     msg = result.RemoteFailMsg()
3799     if msg:
3800       logging.error("Instance pre-migration failed, trying to revert"
3801                     " disk status: %s", msg)
3802       self._AbortMigration()
3803       self._RevertDiskStatus()
3804       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3805                                (instance.name, msg))
3806
3807     self.feedback_fn("* migrating instance to %s" % target_node)
3808     time.sleep(10)
3809     result = self.rpc.call_instance_migrate(source_node, instance,
3810                                             self.nodes_ip[target_node],
3811                                             self.op.live)
3812     msg = result.RemoteFailMsg()
3813     if msg:
3814       logging.error("Instance migration failed, trying to revert"
3815                     " disk status: %s", msg)
3816       self._AbortMigration()
3817       self._RevertDiskStatus()
3818       raise errors.OpExecError("Could not migrate instance %s: %s" %
3819                                (instance.name, msg))
3820     time.sleep(10)
3821
3822     instance.primary_node = target_node
3823     # distribute new instance config to the other nodes
3824     self.cfg.Update(instance)
3825
3826     result = self.rpc.call_finalize_migration(target_node,
3827                                               instance,
3828                                               migration_info,
3829                                               True)
3830     msg = result.RemoteFailMsg()
3831     if msg:
3832       logging.error("Instance migration succeeded, but finalization failed:"
3833                     " %s" % msg)
3834       raise errors.OpExecError("Could not finalize instance migration: %s" %
3835                                msg)
3836
3837     self._EnsureSecondary(source_node)
3838     self._WaitUntilSync()
3839     self._GoStandalone()
3840     self._GoReconnect(False)
3841     self._WaitUntilSync()
3842
3843     self.feedback_fn("* done")
3844
3845   def Exec(self, feedback_fn):
3846     """Perform the migration.
3847
3848     """
3849     self.feedback_fn = feedback_fn
3850
3851     self.source_node = self.instance.primary_node
3852     self.target_node = self.instance.secondary_nodes[0]
3853     self.all_nodes = [self.source_node, self.target_node]
3854     self.nodes_ip = {
3855       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3856       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3857       }
3858     if self.op.cleanup:
3859       return self._ExecCleanup()
3860     else:
3861       return self._ExecMigration()
3862
3863
3864 def _CreateBlockDev(lu, node, instance, device, force_create,
3865                     info, force_open):
3866   """Create a tree of block devices on a given node.
3867
3868   If this device type has to be created on secondaries, create it and
3869   all its children.
3870
3871   If not, just recurse to children keeping the same 'force' value.
3872
3873   @param lu: the lu on whose behalf we execute
3874   @param node: the node on which to create the device
3875   @type instance: L{objects.Instance}
3876   @param instance: the instance which owns the device
3877   @type device: L{objects.Disk}
3878   @param device: the device to create
3879   @type force_create: boolean
3880   @param force_create: whether to force creation of this device; this
3881       will be change to True whenever we find a device which has
3882       CreateOnSecondary() attribute
3883   @param info: the extra 'metadata' we should attach to the device
3884       (this will be represented as a LVM tag)
3885   @type force_open: boolean
3886   @param force_open: this parameter will be passes to the
3887       L{backend.BlockdevCreate} function where it specifies
3888       whether we run on primary or not, and it affects both
3889       the child assembly and the device own Open() execution
3890
3891   """
3892   if device.CreateOnSecondary():
3893     force_create = True
3894
3895   if device.children:
3896     for child in device.children:
3897       _CreateBlockDev(lu, node, instance, child, force_create,
3898                       info, force_open)
3899
3900   if not force_create:
3901     return
3902
3903   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3904
3905
3906 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3907   """Create a single block device on a given node.
3908
3909   This will not recurse over children of the device, so they must be
3910   created in advance.
3911
3912   @param lu: the lu on whose behalf we execute
3913   @param node: the node on which to create the device
3914   @type instance: L{objects.Instance}
3915   @param instance: the instance which owns the device
3916   @type device: L{objects.Disk}
3917   @param device: the device to create
3918   @param info: the extra 'metadata' we should attach to the device
3919       (this will be represented as a LVM tag)
3920   @type force_open: boolean
3921   @param force_open: this parameter will be passes to the
3922       L{backend.BlockdevCreate} function where it specifies
3923       whether we run on primary or not, and it affects both
3924       the child assembly and the device own Open() execution
3925
3926   """
3927   lu.cfg.SetDiskID(device, node)
3928   result = lu.rpc.call_blockdev_create(node, device, device.size,
3929                                        instance.name, force_open, info)
3930   msg = result.RemoteFailMsg()
3931   if msg:
3932     raise errors.OpExecError("Can't create block device %s on"
3933                              " node %s for instance %s: %s" %
3934                              (device, node, instance.name, msg))
3935   if device.physical_id is None:
3936     device.physical_id = result.payload
3937
3938
3939 def _GenerateUniqueNames(lu, exts):
3940   """Generate a suitable LV name.
3941
3942   This will generate a logical volume name for the given instance.
3943
3944   """
3945   results = []
3946   for val in exts:
3947     new_id = lu.cfg.GenerateUniqueID()
3948     results.append("%s%s" % (new_id, val))
3949   return results
3950
3951
3952 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3953                          p_minor, s_minor):
3954   """Generate a drbd8 device complete with its children.
3955
3956   """
3957   port = lu.cfg.AllocatePort()
3958   vgname = lu.cfg.GetVGName()
3959   shared_secret = lu.cfg.GenerateDRBDSecret()
3960   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3961                           logical_id=(vgname, names[0]))
3962   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3963                           logical_id=(vgname, names[1]))
3964   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3965                           logical_id=(primary, secondary, port,
3966                                       p_minor, s_minor,
3967                                       shared_secret),
3968                           children=[dev_data, dev_meta],
3969                           iv_name=iv_name)
3970   return drbd_dev
3971
3972
3973 def _GenerateDiskTemplate(lu, template_name,
3974                           instance_name, primary_node,
3975                           secondary_nodes, disk_info,
3976                           file_storage_dir, file_driver,
3977                           base_index):
3978   """Generate the entire disk layout for a given template type.
3979
3980   """
3981   #TODO: compute space requirements
3982
3983   vgname = lu.cfg.GetVGName()
3984   disk_count = len(disk_info)
3985   disks = []
3986   if template_name == constants.DT_DISKLESS:
3987     pass
3988   elif template_name == constants.DT_PLAIN:
3989     if len(secondary_nodes) != 0:
3990       raise errors.ProgrammerError("Wrong template configuration")
3991
3992     names = _GenerateUniqueNames(lu, [".disk%d" % i
3993                                       for i in range(disk_count)])
3994     for idx, disk in enumerate(disk_info):
3995       disk_index = idx + base_index
3996       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3997                               logical_id=(vgname, names[idx]),
3998                               iv_name="disk/%d" % disk_index,
3999                               mode=disk["mode"])
4000       disks.append(disk_dev)
4001   elif template_name == constants.DT_DRBD8:
4002     if len(secondary_nodes) != 1:
4003       raise errors.ProgrammerError("Wrong template configuration")
4004     remote_node = secondary_nodes[0]
4005     minors = lu.cfg.AllocateDRBDMinor(
4006       [primary_node, remote_node] * len(disk_info), instance_name)
4007
4008     names = []
4009     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4010                                                for i in range(disk_count)]):
4011       names.append(lv_prefix + "_data")
4012       names.append(lv_prefix + "_meta")
4013     for idx, disk in enumerate(disk_info):
4014       disk_index = idx + base_index
4015       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4016                                       disk["size"], names[idx*2:idx*2+2],
4017                                       "disk/%d" % disk_index,
4018                                       minors[idx*2], minors[idx*2+1])
4019       disk_dev.mode = disk["mode"]
4020       disks.append(disk_dev)
4021   elif template_name == constants.DT_FILE:
4022     if len(secondary_nodes) != 0:
4023       raise errors.ProgrammerError("Wrong template configuration")
4024
4025     for idx, disk in enumerate(disk_info):
4026       disk_index = idx + base_index
4027       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4028                               iv_name="disk/%d" % disk_index,
4029                               logical_id=(file_driver,
4030                                           "%s/disk%d" % (file_storage_dir,
4031                                                          idx)),
4032                               mode=disk["mode"])
4033       disks.append(disk_dev)
4034   else:
4035     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4036   return disks
4037
4038
4039 def _GetInstanceInfoText(instance):
4040   """Compute that text that should be added to the disk's metadata.
4041
4042   """
4043   return "originstname+%s" % instance.name
4044
4045
4046 def _CreateDisks(lu, instance):
4047   """Create all disks for an instance.
4048
4049   This abstracts away some work from AddInstance.
4050
4051   @type lu: L{LogicalUnit}
4052   @param lu: the logical unit on whose behalf we execute
4053   @type instance: L{objects.Instance}
4054   @param instance: the instance whose disks we should create
4055   @rtype: boolean
4056   @return: the success of the creation
4057
4058   """
4059   info = _GetInstanceInfoText(instance)
4060   pnode = instance.primary_node
4061
4062   if instance.disk_template == constants.DT_FILE:
4063     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4064     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4065
4066     if result.failed or not result.data:
4067       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4068
4069     if not result.data[0]:
4070       raise errors.OpExecError("Failed to create directory '%s'" %
4071                                file_storage_dir)
4072
4073   # Note: this needs to be kept in sync with adding of disks in
4074   # LUSetInstanceParams
4075   for device in instance.disks:
4076     logging.info("Creating volume %s for instance %s",
4077                  device.iv_name, instance.name)
4078     #HARDCODE
4079     for node in instance.all_nodes:
4080       f_create = node == pnode
4081       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4082
4083
4084 def _RemoveDisks(lu, instance):
4085   """Remove all disks for an instance.
4086
4087   This abstracts away some work from `AddInstance()` and
4088   `RemoveInstance()`. Note that in case some of the devices couldn't
4089   be removed, the removal will continue with the other ones (compare
4090   with `_CreateDisks()`).
4091
4092   @type lu: L{LogicalUnit}
4093   @param lu: the logical unit on whose behalf we execute
4094   @type instance: L{objects.Instance}
4095   @param instance: the instance whose disks we should remove
4096   @rtype: boolean
4097   @return: the success of the removal
4098
4099   """
4100   logging.info("Removing block devices for instance %s", instance.name)
4101
4102   all_result = True
4103   for device in instance.disks:
4104     for node, disk in device.ComputeNodeTree(instance.primary_node):
4105       lu.cfg.SetDiskID(disk, node)
4106       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4107       if msg:
4108         lu.LogWarning("Could not remove block device %s on node %s,"
4109                       " continuing anyway: %s", device.iv_name, node, msg)
4110         all_result = False
4111
4112   if instance.disk_template == constants.DT_FILE:
4113     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4114     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4115                                                  file_storage_dir)
4116     if result.failed or not result.data:
4117       logging.error("Could not remove directory '%s'", file_storage_dir)
4118       all_result = False
4119
4120   return all_result
4121
4122
4123 def _ComputeDiskSize(disk_template, disks):
4124   """Compute disk size requirements in the volume group
4125
4126   """
4127   # Required free disk space as a function of disk and swap space
4128   req_size_dict = {
4129     constants.DT_DISKLESS: None,
4130     constants.DT_PLAIN: sum(d["size"] for d in disks),
4131     # 128 MB are added for drbd metadata for each disk
4132     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4133     constants.DT_FILE: None,
4134   }
4135
4136   if disk_template not in req_size_dict:
4137     raise errors.ProgrammerError("Disk template '%s' size requirement"
4138                                  " is unknown" %  disk_template)
4139
4140   return req_size_dict[disk_template]
4141
4142
4143 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4144   """Hypervisor parameter validation.
4145
4146   This function abstract the hypervisor parameter validation to be
4147   used in both instance create and instance modify.
4148
4149   @type lu: L{LogicalUnit}
4150   @param lu: the logical unit for which we check
4151   @type nodenames: list
4152   @param nodenames: the list of nodes on which we should check
4153   @type hvname: string
4154   @param hvname: the name of the hypervisor we should use
4155   @type hvparams: dict
4156   @param hvparams: the parameters which we need to check
4157   @raise errors.OpPrereqError: if the parameters are not valid
4158
4159   """
4160   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4161                                                   hvname,
4162                                                   hvparams)
4163   for node in nodenames:
4164     info = hvinfo[node]
4165     if info.offline:
4166       continue
4167     msg = info.RemoteFailMsg()
4168     if msg:
4169       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4170                                  " %s" % msg)
4171
4172
4173 class LUCreateInstance(LogicalUnit):
4174   """Create an instance.
4175
4176   """
4177   HPATH = "instance-add"
4178   HTYPE = constants.HTYPE_INSTANCE
4179   _OP_REQP = ["instance_name", "disks", "disk_template",
4180               "mode", "start",
4181               "wait_for_sync", "ip_check", "nics",
4182               "hvparams", "beparams"]
4183   REQ_BGL = False
4184
4185   def _ExpandNode(self, node):
4186     """Expands and checks one node name.
4187
4188     """
4189     node_full = self.cfg.ExpandNodeName(node)
4190     if node_full is None:
4191       raise errors.OpPrereqError("Unknown node %s" % node)
4192     return node_full
4193
4194   def ExpandNames(self):
4195     """ExpandNames for CreateInstance.
4196
4197     Figure out the right locks for instance creation.
4198
4199     """
4200     self.needed_locks = {}
4201
4202     # set optional parameters to none if they don't exist
4203     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4204       if not hasattr(self.op, attr):
4205         setattr(self.op, attr, None)
4206
4207     # cheap checks, mostly valid constants given
4208
4209     # verify creation mode
4210     if self.op.mode not in (constants.INSTANCE_CREATE,
4211                             constants.INSTANCE_IMPORT):
4212       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4213                                  self.op.mode)
4214
4215     # disk template and mirror node verification
4216     if self.op.disk_template not in constants.DISK_TEMPLATES:
4217       raise errors.OpPrereqError("Invalid disk template name")
4218
4219     if self.op.hypervisor is None:
4220       self.op.hypervisor = self.cfg.GetHypervisorType()
4221
4222     cluster = self.cfg.GetClusterInfo()
4223     enabled_hvs = cluster.enabled_hypervisors
4224     if self.op.hypervisor not in enabled_hvs:
4225       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4226                                  " cluster (%s)" % (self.op.hypervisor,
4227                                   ",".join(enabled_hvs)))
4228
4229     # check hypervisor parameter syntax (locally)
4230
4231     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4232                                   self.op.hvparams)
4233     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4234     hv_type.CheckParameterSyntax(filled_hvp)
4235
4236     # fill and remember the beparams dict
4237     utils.CheckBEParams(self.op.beparams)
4238     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4239                                     self.op.beparams)
4240
4241     #### instance parameters check
4242
4243     # instance name verification
4244     hostname1 = utils.HostInfo(self.op.instance_name)
4245     self.op.instance_name = instance_name = hostname1.name
4246
4247     # this is just a preventive check, but someone might still add this
4248     # instance in the meantime, and creation will fail at lock-add time
4249     if instance_name in self.cfg.GetInstanceList():
4250       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4251                                  instance_name)
4252
4253     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4254
4255     # NIC buildup
4256     self.nics = []
4257     for nic in self.op.nics:
4258       # ip validity checks
4259       ip = nic.get("ip", None)
4260       if ip is None or ip.lower() == "none":
4261         nic_ip = None
4262       elif ip.lower() == constants.VALUE_AUTO:
4263         nic_ip = hostname1.ip
4264       else:
4265         if not utils.IsValidIP(ip):
4266           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4267                                      " like a valid IP" % ip)
4268         nic_ip = ip
4269
4270       # MAC address verification
4271       mac = nic.get("mac", constants.VALUE_AUTO)
4272       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4273         if not utils.IsValidMac(mac.lower()):
4274           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4275                                      mac)
4276       # bridge verification
4277       bridge = nic.get("bridge", None)
4278       if bridge is None:
4279         bridge = self.cfg.GetDefBridge()
4280       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4281
4282     # disk checks/pre-build
4283     self.disks = []
4284     for disk in self.op.disks:
4285       mode = disk.get("mode", constants.DISK_RDWR)
4286       if mode not in constants.DISK_ACCESS_SET:
4287         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4288                                    mode)
4289       size = disk.get("size", None)
4290       if size is None:
4291         raise errors.OpPrereqError("Missing disk size")
4292       try:
4293         size = int(size)
4294       except ValueError:
4295         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4296       self.disks.append({"size": size, "mode": mode})
4297
4298     # used in CheckPrereq for ip ping check
4299     self.check_ip = hostname1.ip
4300
4301     # file storage checks
4302     if (self.op.file_driver and
4303         not self.op.file_driver in constants.FILE_DRIVER):
4304       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4305                                  self.op.file_driver)
4306
4307     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4308       raise errors.OpPrereqError("File storage directory path not absolute")
4309
4310     ### Node/iallocator related checks
4311     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4312       raise errors.OpPrereqError("One and only one of iallocator and primary"
4313                                  " node must be given")
4314
4315     if self.op.iallocator:
4316       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4317     else:
4318       self.op.pnode = self._ExpandNode(self.op.pnode)
4319       nodelist = [self.op.pnode]
4320       if self.op.snode is not None:
4321         self.op.snode = self._ExpandNode(self.op.snode)
4322         nodelist.append(self.op.snode)
4323       self.needed_locks[locking.LEVEL_NODE] = nodelist
4324
4325     # in case of import lock the source node too
4326     if self.op.mode == constants.INSTANCE_IMPORT:
4327       src_node = getattr(self.op, "src_node", None)
4328       src_path = getattr(self.op, "src_path", None)
4329
4330       if src_path is None:
4331         self.op.src_path = src_path = self.op.instance_name
4332
4333       if src_node is None:
4334         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4335         self.op.src_node = None
4336         if os.path.isabs(src_path):
4337           raise errors.OpPrereqError("Importing an instance from an absolute"
4338                                      " path requires a source node option.")
4339       else:
4340         self.op.src_node = src_node = self._ExpandNode(src_node)
4341         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4342           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4343         if not os.path.isabs(src_path):
4344           self.op.src_path = src_path = \
4345             os.path.join(constants.EXPORT_DIR, src_path)
4346
4347     else: # INSTANCE_CREATE
4348       if getattr(self.op, "os_type", None) is None:
4349         raise errors.OpPrereqError("No guest OS specified")
4350
4351   def _RunAllocator(self):
4352     """Run the allocator based on input opcode.
4353
4354     """
4355     nics = [n.ToDict() for n in self.nics]
4356     ial = IAllocator(self,
4357                      mode=constants.IALLOCATOR_MODE_ALLOC,
4358                      name=self.op.instance_name,
4359                      disk_template=self.op.disk_template,
4360                      tags=[],
4361                      os=self.op.os_type,
4362                      vcpus=self.be_full[constants.BE_VCPUS],
4363                      mem_size=self.be_full[constants.BE_MEMORY],
4364                      disks=self.disks,
4365                      nics=nics,
4366                      hypervisor=self.op.hypervisor,
4367                      )
4368
4369     ial.Run(self.op.iallocator)
4370
4371     if not ial.success:
4372       raise errors.OpPrereqError("Can't compute nodes using"
4373                                  " iallocator '%s': %s" % (self.op.iallocator,
4374                                                            ial.info))
4375     if len(ial.nodes) != ial.required_nodes:
4376       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4377                                  " of nodes (%s), required %s" %
4378                                  (self.op.iallocator, len(ial.nodes),
4379                                   ial.required_nodes))
4380     self.op.pnode = ial.nodes[0]
4381     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4382                  self.op.instance_name, self.op.iallocator,
4383                  ", ".join(ial.nodes))
4384     if ial.required_nodes == 2:
4385       self.op.snode = ial.nodes[1]
4386
4387   def BuildHooksEnv(self):
4388     """Build hooks env.
4389
4390     This runs on master, primary and secondary nodes of the instance.
4391
4392     """
4393     env = {
4394       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4395       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4396       "INSTANCE_ADD_MODE": self.op.mode,
4397       }
4398     if self.op.mode == constants.INSTANCE_IMPORT:
4399       env["INSTANCE_SRC_NODE"] = self.op.src_node
4400       env["INSTANCE_SRC_PATH"] = self.op.src_path
4401       env["INSTANCE_SRC_IMAGES"] = self.src_images
4402
4403     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4404       primary_node=self.op.pnode,
4405       secondary_nodes=self.secondaries,
4406       status=self.op.start,
4407       os_type=self.op.os_type,
4408       memory=self.be_full[constants.BE_MEMORY],
4409       vcpus=self.be_full[constants.BE_VCPUS],
4410       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4411     ))
4412
4413     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4414           self.secondaries)
4415     return env, nl, nl
4416
4417
4418   def CheckPrereq(self):
4419     """Check prerequisites.
4420
4421     """
4422     if (not self.cfg.GetVGName() and
4423         self.op.disk_template not in constants.DTS_NOT_LVM):
4424       raise errors.OpPrereqError("Cluster does not support lvm-based"
4425                                  " instances")
4426
4427
4428     if self.op.mode == constants.INSTANCE_IMPORT:
4429       src_node = self.op.src_node
4430       src_path = self.op.src_path
4431
4432       if src_node is None:
4433         exp_list = self.rpc.call_export_list(
4434           self.acquired_locks[locking.LEVEL_NODE])
4435         found = False
4436         for node in exp_list:
4437           if not exp_list[node].failed and src_path in exp_list[node].data:
4438             found = True
4439             self.op.src_node = src_node = node
4440             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4441                                                        src_path)
4442             break
4443         if not found:
4444           raise errors.OpPrereqError("No export found for relative path %s" %
4445                                       src_path)
4446
4447       _CheckNodeOnline(self, src_node)
4448       result = self.rpc.call_export_info(src_node, src_path)
4449       result.Raise()
4450       if not result.data:
4451         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4452
4453       export_info = result.data
4454       if not export_info.has_section(constants.INISECT_EXP):
4455         raise errors.ProgrammerError("Corrupted export config")
4456
4457       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4458       if (int(ei_version) != constants.EXPORT_VERSION):
4459         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4460                                    (ei_version, constants.EXPORT_VERSION))
4461
4462       # Check that the new instance doesn't have less disks than the export
4463       instance_disks = len(self.disks)
4464       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4465       if instance_disks < export_disks:
4466         raise errors.OpPrereqError("Not enough disks to import."
4467                                    " (instance: %d, export: %d)" %
4468                                    (instance_disks, export_disks))
4469
4470       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4471       disk_images = []
4472       for idx in range(export_disks):
4473         option = 'disk%d_dump' % idx
4474         if export_info.has_option(constants.INISECT_INS, option):
4475           # FIXME: are the old os-es, disk sizes, etc. useful?
4476           export_name = export_info.get(constants.INISECT_INS, option)
4477           image = os.path.join(src_path, export_name)
4478           disk_images.append(image)
4479         else:
4480           disk_images.append(False)
4481
4482       self.src_images = disk_images
4483
4484       old_name = export_info.get(constants.INISECT_INS, 'name')
4485       # FIXME: int() here could throw a ValueError on broken exports
4486       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4487       if self.op.instance_name == old_name:
4488         for idx, nic in enumerate(self.nics):
4489           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4490             nic_mac_ini = 'nic%d_mac' % idx
4491             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4492
4493     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4494     if self.op.start and not self.op.ip_check:
4495       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4496                                  " adding an instance in start mode")
4497
4498     if self.op.ip_check:
4499       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4500         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4501                                    (self.check_ip, self.op.instance_name))
4502
4503     #### allocator run
4504
4505     if self.op.iallocator is not None:
4506       self._RunAllocator()
4507
4508     #### node related checks
4509
4510     # check primary node
4511     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4512     assert self.pnode is not None, \
4513       "Cannot retrieve locked node %s" % self.op.pnode
4514     if pnode.offline:
4515       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4516                                  pnode.name)
4517     if pnode.drained:
4518       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4519                                  pnode.name)
4520
4521     self.secondaries = []
4522
4523     # mirror node verification
4524     if self.op.disk_template in constants.DTS_NET_MIRROR:
4525       if self.op.snode is None:
4526         raise errors.OpPrereqError("The networked disk templates need"
4527                                    " a mirror node")
4528       if self.op.snode == pnode.name:
4529         raise errors.OpPrereqError("The secondary node cannot be"
4530                                    " the primary node.")
4531       _CheckNodeOnline(self, self.op.snode)
4532       _CheckNodeNotDrained(self, self.op.snode)
4533       self.secondaries.append(self.op.snode)
4534
4535     nodenames = [pnode.name] + self.secondaries
4536
4537     req_size = _ComputeDiskSize(self.op.disk_template,
4538                                 self.disks)
4539
4540     # Check lv size requirements
4541     if req_size is not None:
4542       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4543                                          self.op.hypervisor)
4544       for node in nodenames:
4545         info = nodeinfo[node]
4546         info.Raise()
4547         info = info.data
4548         if not info:
4549           raise errors.OpPrereqError("Cannot get current information"
4550                                      " from node '%s'" % node)
4551         vg_free = info.get('vg_free', None)
4552         if not isinstance(vg_free, int):
4553           raise errors.OpPrereqError("Can't compute free disk space on"
4554                                      " node %s" % node)
4555         if req_size > info['vg_free']:
4556           raise errors.OpPrereqError("Not enough disk space on target node %s."
4557                                      " %d MB available, %d MB required" %
4558                                      (node, info['vg_free'], req_size))
4559
4560     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4561
4562     # os verification
4563     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4564     result.Raise()
4565     if not isinstance(result.data, objects.OS):
4566       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4567                                  " primary node"  % self.op.os_type)
4568
4569     # bridge check on primary node
4570     bridges = [n.bridge for n in self.nics]
4571     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4572     result.Raise()
4573     if not result.data:
4574       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4575                                  " exist on destination node '%s'" %
4576                                  (",".join(bridges), pnode.name))
4577
4578     # memory check on primary node
4579     if self.op.start:
4580       _CheckNodeFreeMemory(self, self.pnode.name,
4581                            "creating instance %s" % self.op.instance_name,
4582                            self.be_full[constants.BE_MEMORY],
4583                            self.op.hypervisor)
4584
4585   def Exec(self, feedback_fn):
4586     """Create and add the instance to the cluster.
4587
4588     """
4589     instance = self.op.instance_name
4590     pnode_name = self.pnode.name
4591
4592     for nic in self.nics:
4593       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4594         nic.mac = self.cfg.GenerateMAC()
4595
4596     ht_kind = self.op.hypervisor
4597     if ht_kind in constants.HTS_REQ_PORT:
4598       network_port = self.cfg.AllocatePort()
4599     else:
4600       network_port = None
4601
4602     ##if self.op.vnc_bind_address is None:
4603     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4604
4605     # this is needed because os.path.join does not accept None arguments
4606     if self.op.file_storage_dir is None:
4607       string_file_storage_dir = ""
4608     else:
4609       string_file_storage_dir = self.op.file_storage_dir
4610
4611     # build the full file storage dir path
4612     file_storage_dir = os.path.normpath(os.path.join(
4613                                         self.cfg.GetFileStorageDir(),
4614                                         string_file_storage_dir, instance))
4615
4616
4617     disks = _GenerateDiskTemplate(self,
4618                                   self.op.disk_template,
4619                                   instance, pnode_name,
4620                                   self.secondaries,
4621                                   self.disks,
4622                                   file_storage_dir,
4623                                   self.op.file_driver,
4624                                   0)
4625
4626     iobj = objects.Instance(name=instance, os=self.op.os_type,
4627                             primary_node=pnode_name,
4628                             nics=self.nics, disks=disks,
4629                             disk_template=self.op.disk_template,
4630                             admin_up=False,
4631                             network_port=network_port,
4632                             beparams=self.op.beparams,
4633                             hvparams=self.op.hvparams,
4634                             hypervisor=self.op.hypervisor,
4635                             )
4636
4637     feedback_fn("* creating instance disks...")
4638     try:
4639       _CreateDisks(self, iobj)
4640     except errors.OpExecError:
4641       self.LogWarning("Device creation failed, reverting...")
4642       try:
4643         _RemoveDisks(self, iobj)
4644       finally:
4645         self.cfg.ReleaseDRBDMinors(instance)
4646         raise
4647
4648     feedback_fn("adding instance %s to cluster config" % instance)
4649
4650     self.cfg.AddInstance(iobj)
4651     # Declare that we don't want to remove the instance lock anymore, as we've
4652     # added the instance to the config
4653     del self.remove_locks[locking.LEVEL_INSTANCE]
4654     # Unlock all the nodes
4655     if self.op.mode == constants.INSTANCE_IMPORT:
4656       nodes_keep = [self.op.src_node]
4657       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4658                        if node != self.op.src_node]
4659       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4660       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4661     else:
4662       self.context.glm.release(locking.LEVEL_NODE)
4663       del self.acquired_locks[locking.LEVEL_NODE]
4664
4665     if self.op.wait_for_sync:
4666       disk_abort = not _WaitForSync(self, iobj)
4667     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4668       # make sure the disks are not degraded (still sync-ing is ok)
4669       time.sleep(15)
4670       feedback_fn("* checking mirrors status")
4671       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4672     else:
4673       disk_abort = False
4674
4675     if disk_abort:
4676       _RemoveDisks(self, iobj)
4677       self.cfg.RemoveInstance(iobj.name)
4678       # Make sure the instance lock gets removed
4679       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4680       raise errors.OpExecError("There are some degraded disks for"
4681                                " this instance")
4682
4683     feedback_fn("creating os for instance %s on node %s" %
4684                 (instance, pnode_name))
4685
4686     if iobj.disk_template != constants.DT_DISKLESS:
4687       if self.op.mode == constants.INSTANCE_CREATE:
4688         feedback_fn("* running the instance OS create scripts...")
4689         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4690         msg = result.RemoteFailMsg()
4691         if msg:
4692           raise errors.OpExecError("Could not add os for instance %s"
4693                                    " on node %s: %s" %
4694                                    (instance, pnode_name, msg))
4695
4696       elif self.op.mode == constants.INSTANCE_IMPORT:
4697         feedback_fn("* running the instance OS import scripts...")
4698         src_node = self.op.src_node
4699         src_images = self.src_images
4700         cluster_name = self.cfg.GetClusterName()
4701         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4702                                                          src_node, src_images,
4703                                                          cluster_name)
4704         import_result.Raise()
4705         for idx, result in enumerate(import_result.data):
4706           if not result:
4707             self.LogWarning("Could not import the image %s for instance"
4708                             " %s, disk %d, on node %s" %
4709                             (src_images[idx], instance, idx, pnode_name))
4710       else:
4711         # also checked in the prereq part
4712         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4713                                      % self.op.mode)
4714
4715     if self.op.start:
4716       iobj.admin_up = True
4717       self.cfg.Update(iobj)
4718       logging.info("Starting instance %s on node %s", instance, pnode_name)
4719       feedback_fn("* starting instance...")
4720       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4721       msg = result.RemoteFailMsg()
4722       if msg:
4723         raise errors.OpExecError("Could not start instance: %s" % msg)
4724
4725
4726 class LUConnectConsole(NoHooksLU):
4727   """Connect to an instance's console.
4728
4729   This is somewhat special in that it returns the command line that
4730   you need to run on the master node in order to connect to the
4731   console.
4732
4733   """
4734   _OP_REQP = ["instance_name"]
4735   REQ_BGL = False
4736
4737   def ExpandNames(self):
4738     self._ExpandAndLockInstance()
4739
4740   def CheckPrereq(self):
4741     """Check prerequisites.
4742
4743     This checks that the instance is in the cluster.
4744
4745     """
4746     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4747     assert self.instance is not None, \
4748       "Cannot retrieve locked instance %s" % self.op.instance_name
4749     _CheckNodeOnline(self, self.instance.primary_node)
4750
4751   def Exec(self, feedback_fn):
4752     """Connect to the console of an instance
4753
4754     """
4755     instance = self.instance
4756     node = instance.primary_node
4757
4758     node_insts = self.rpc.call_instance_list([node],
4759                                              [instance.hypervisor])[node]
4760     node_insts.Raise()
4761
4762     if instance.name not in node_insts.data:
4763       raise errors.OpExecError("Instance %s is not running." % instance.name)
4764
4765     logging.debug("Connecting to console of %s on %s", instance.name, node)
4766
4767     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4768     cluster = self.cfg.GetClusterInfo()
4769     # beparams and hvparams are passed separately, to avoid editing the
4770     # instance and then saving the defaults in the instance itself.
4771     hvparams = cluster.FillHV(instance)
4772     beparams = cluster.FillBE(instance)
4773     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4774
4775     # build ssh cmdline
4776     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4777
4778
4779 class LUReplaceDisks(LogicalUnit):
4780   """Replace the disks of an instance.
4781
4782   """
4783   HPATH = "mirrors-replace"
4784   HTYPE = constants.HTYPE_INSTANCE
4785   _OP_REQP = ["instance_name", "mode", "disks"]
4786   REQ_BGL = False
4787
4788   def CheckArguments(self):
4789     if not hasattr(self.op, "remote_node"):
4790       self.op.remote_node = None
4791     if not hasattr(self.op, "iallocator"):
4792       self.op.iallocator = None
4793
4794     # check for valid parameter combination
4795     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4796     if self.op.mode == constants.REPLACE_DISK_CHG:
4797       if cnt == 2:
4798         raise errors.OpPrereqError("When changing the secondary either an"
4799                                    " iallocator script must be used or the"
4800                                    " new node given")
4801       elif cnt == 0:
4802         raise errors.OpPrereqError("Give either the iallocator or the new"
4803                                    " secondary, not both")
4804     else: # not replacing the secondary
4805       if cnt != 2:
4806         raise errors.OpPrereqError("The iallocator and new node options can"
4807                                    " be used only when changing the"
4808                                    " secondary node")
4809
4810   def ExpandNames(self):
4811     self._ExpandAndLockInstance()
4812
4813     if self.op.iallocator is not None:
4814       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4815     elif self.op.remote_node is not None:
4816       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4817       if remote_node is None:
4818         raise errors.OpPrereqError("Node '%s' not known" %
4819                                    self.op.remote_node)
4820       self.op.remote_node = remote_node
4821       # Warning: do not remove the locking of the new secondary here
4822       # unless DRBD8.AddChildren is changed to work in parallel;
4823       # currently it doesn't since parallel invocations of
4824       # FindUnusedMinor will conflict
4825       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4826       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4827     else:
4828       self.needed_locks[locking.LEVEL_NODE] = []
4829       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4830
4831   def DeclareLocks(self, level):
4832     # If we're not already locking all nodes in the set we have to declare the
4833     # instance's primary/secondary nodes.
4834     if (level == locking.LEVEL_NODE and
4835         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4836       self._LockInstancesNodes()
4837
4838   def _RunAllocator(self):
4839     """Compute a new secondary node using an IAllocator.
4840
4841     """
4842     ial = IAllocator(self,
4843                      mode=constants.IALLOCATOR_MODE_RELOC,
4844                      name=self.op.instance_name,
4845                      relocate_from=[self.sec_node])
4846
4847     ial.Run(self.op.iallocator)
4848
4849     if not ial.success:
4850       raise errors.OpPrereqError("Can't compute nodes using"
4851                                  " iallocator '%s': %s" % (self.op.iallocator,
4852                                                            ial.info))
4853     if len(ial.nodes) != ial.required_nodes:
4854       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4855                                  " of nodes (%s), required %s" %
4856                                  (len(ial.nodes), ial.required_nodes))
4857     self.op.remote_node = ial.nodes[0]
4858     self.LogInfo("Selected new secondary for the instance: %s",
4859                  self.op.remote_node)
4860
4861   def BuildHooksEnv(self):
4862     """Build hooks env.
4863
4864     This runs on the master, the primary and all the secondaries.
4865
4866     """
4867     env = {
4868       "MODE": self.op.mode,
4869       "NEW_SECONDARY": self.op.remote_node,
4870       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4871       }
4872     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4873     nl = [
4874       self.cfg.GetMasterNode(),
4875       self.instance.primary_node,
4876       ]
4877     if self.op.remote_node is not None:
4878       nl.append(self.op.remote_node)
4879     return env, nl, nl
4880
4881   def CheckPrereq(self):
4882     """Check prerequisites.
4883
4884     This checks that the instance is in the cluster.
4885
4886     """
4887     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4888     assert instance is not None, \
4889       "Cannot retrieve locked instance %s" % self.op.instance_name
4890     self.instance = instance
4891
4892     if instance.disk_template != constants.DT_DRBD8:
4893       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4894                                  " instances")
4895
4896     if len(instance.secondary_nodes) != 1:
4897       raise errors.OpPrereqError("The instance has a strange layout,"
4898                                  " expected one secondary but found %d" %
4899                                  len(instance.secondary_nodes))
4900
4901     self.sec_node = instance.secondary_nodes[0]
4902
4903     if self.op.iallocator is not None:
4904       self._RunAllocator()
4905
4906     remote_node = self.op.remote_node
4907     if remote_node is not None:
4908       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4909       assert self.remote_node_info is not None, \
4910         "Cannot retrieve locked node %s" % remote_node
4911     else:
4912       self.remote_node_info = None
4913     if remote_node == instance.primary_node:
4914       raise errors.OpPrereqError("The specified node is the primary node of"
4915                                  " the instance.")
4916     elif remote_node == self.sec_node:
4917       raise errors.OpPrereqError("The specified node is already the"
4918                                  " secondary node of the instance.")
4919
4920     if self.op.mode == constants.REPLACE_DISK_PRI:
4921       n1 = self.tgt_node = instance.primary_node
4922       n2 = self.oth_node = self.sec_node
4923     elif self.op.mode == constants.REPLACE_DISK_SEC:
4924       n1 = self.tgt_node = self.sec_node
4925       n2 = self.oth_node = instance.primary_node
4926     elif self.op.mode == constants.REPLACE_DISK_CHG:
4927       n1 = self.new_node = remote_node
4928       n2 = self.oth_node = instance.primary_node
4929       self.tgt_node = self.sec_node
4930       _CheckNodeNotDrained(self, remote_node)
4931     else:
4932       raise errors.ProgrammerError("Unhandled disk replace mode")
4933
4934     _CheckNodeOnline(self, n1)
4935     _CheckNodeOnline(self, n2)
4936
4937     if not self.op.disks:
4938       self.op.disks = range(len(instance.disks))
4939
4940     for disk_idx in self.op.disks:
4941       instance.FindDisk(disk_idx)
4942
4943   def _ExecD8DiskOnly(self, feedback_fn):
4944     """Replace a disk on the primary or secondary for dbrd8.
4945
4946     The algorithm for replace is quite complicated:
4947
4948       1. for each disk to be replaced:
4949
4950         1. create new LVs on the target node with unique names
4951         1. detach old LVs from the drbd device
4952         1. rename old LVs to name_replaced.<time_t>
4953         1. rename new LVs to old LVs
4954         1. attach the new LVs (with the old names now) to the drbd device
4955
4956       1. wait for sync across all devices
4957
4958       1. for each modified disk:
4959
4960         1. remove old LVs (which have the name name_replaces.<time_t>)
4961
4962     Failures are not very well handled.
4963
4964     """
4965     steps_total = 6
4966     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4967     instance = self.instance
4968     iv_names = {}
4969     vgname = self.cfg.GetVGName()
4970     # start of work
4971     cfg = self.cfg
4972     tgt_node = self.tgt_node
4973     oth_node = self.oth_node
4974
4975     # Step: check device activation
4976     self.proc.LogStep(1, steps_total, "check device existence")
4977     info("checking volume groups")
4978     my_vg = cfg.GetVGName()
4979     results = self.rpc.call_vg_list([oth_node, tgt_node])
4980     if not results:
4981       raise errors.OpExecError("Can't list volume groups on the nodes")
4982     for node in oth_node, tgt_node:
4983       res = results[node]
4984       if res.failed or not res.data or my_vg not in res.data:
4985         raise errors.OpExecError("Volume group '%s' not found on %s" %
4986                                  (my_vg, node))
4987     for idx, dev in enumerate(instance.disks):
4988       if idx not in self.op.disks:
4989         continue
4990       for node in tgt_node, oth_node:
4991         info("checking disk/%d on %s" % (idx, node))
4992         cfg.SetDiskID(dev, node)
4993         result = self.rpc.call_blockdev_find(node, dev)
4994         msg = result.RemoteFailMsg()
4995         if not msg and not result.payload:
4996           msg = "disk not found"
4997         if msg:
4998           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
4999                                    (idx, node, msg))
5000
5001     # Step: check other node consistency
5002     self.proc.LogStep(2, steps_total, "check peer consistency")
5003     for idx, dev in enumerate(instance.disks):
5004       if idx not in self.op.disks:
5005         continue
5006       info("checking disk/%d consistency on %s" % (idx, oth_node))
5007       if not _CheckDiskConsistency(self, dev, oth_node,
5008                                    oth_node==instance.primary_node):
5009         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5010                                  " to replace disks on this node (%s)" %
5011                                  (oth_node, tgt_node))
5012
5013     # Step: create new storage
5014     self.proc.LogStep(3, steps_total, "allocate new storage")
5015     for idx, dev in enumerate(instance.disks):
5016       if idx not in self.op.disks:
5017         continue
5018       size = dev.size
5019       cfg.SetDiskID(dev, tgt_node)
5020       lv_names = [".disk%d_%s" % (idx, suf)
5021                   for suf in ["data", "meta"]]
5022       names = _GenerateUniqueNames(self, lv_names)
5023       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5024                              logical_id=(vgname, names[0]))
5025       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5026                              logical_id=(vgname, names[1]))
5027       new_lvs = [lv_data, lv_meta]
5028       old_lvs = dev.children
5029       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5030       info("creating new local storage on %s for %s" %
5031            (tgt_node, dev.iv_name))
5032       # we pass force_create=True to force the LVM creation
5033       for new_lv in new_lvs:
5034         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5035                         _GetInstanceInfoText(instance), False)
5036
5037     # Step: for each lv, detach+rename*2+attach
5038     self.proc.LogStep(4, steps_total, "change drbd configuration")
5039     for dev, old_lvs, new_lvs in iv_names.itervalues():
5040       info("detaching %s drbd from local storage" % dev.iv_name)
5041       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5042       result.Raise()
5043       if not result.data:
5044         raise errors.OpExecError("Can't detach drbd from local storage on node"
5045                                  " %s for device %s" % (tgt_node, dev.iv_name))
5046       #dev.children = []
5047       #cfg.Update(instance)
5048
5049       # ok, we created the new LVs, so now we know we have the needed
5050       # storage; as such, we proceed on the target node to rename
5051       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5052       # using the assumption that logical_id == physical_id (which in
5053       # turn is the unique_id on that node)
5054
5055       # FIXME(iustin): use a better name for the replaced LVs
5056       temp_suffix = int(time.time())
5057       ren_fn = lambda d, suff: (d.physical_id[0],
5058                                 d.physical_id[1] + "_replaced-%s" % suff)
5059       # build the rename list based on what LVs exist on the node
5060       rlist = []
5061       for to_ren in old_lvs:
5062         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5063         if not result.RemoteFailMsg() and result.payload:
5064           # device exists
5065           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5066
5067       info("renaming the old LVs on the target node")
5068       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5069       result.Raise()
5070       if not result.data:
5071         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5072       # now we rename the new LVs to the old LVs
5073       info("renaming the new LVs on the target node")
5074       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5075       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5076       result.Raise()
5077       if not result.data:
5078         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5079
5080       for old, new in zip(old_lvs, new_lvs):
5081         new.logical_id = old.logical_id
5082         cfg.SetDiskID(new, tgt_node)
5083
5084       for disk in old_lvs:
5085         disk.logical_id = ren_fn(disk, temp_suffix)
5086         cfg.SetDiskID(disk, tgt_node)
5087
5088       # now that the new lvs have the old name, we can add them to the device
5089       info("adding new mirror component on %s" % tgt_node)
5090       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5091       if result.failed or not result.data:
5092         for new_lv in new_lvs:
5093           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5094           if msg:
5095             warning("Can't rollback device %s: %s", dev, msg,
5096                     hint="cleanup manually the unused logical volumes")
5097         raise errors.OpExecError("Can't add local storage to drbd")
5098
5099       dev.children = new_lvs
5100       cfg.Update(instance)
5101
5102     # Step: wait for sync
5103
5104     # this can fail as the old devices are degraded and _WaitForSync
5105     # does a combined result over all disks, so we don't check its
5106     # return value
5107     self.proc.LogStep(5, steps_total, "sync devices")
5108     _WaitForSync(self, instance, unlock=True)
5109
5110     # so check manually all the devices
5111     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5112       cfg.SetDiskID(dev, instance.primary_node)
5113       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5114       msg = result.RemoteFailMsg()
5115       if not msg and not result.payload:
5116         msg = "disk not found"
5117       if msg:
5118         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5119                                  (name, msg))
5120       if result.payload[5]:
5121         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5122
5123     # Step: remove old storage
5124     self.proc.LogStep(6, steps_total, "removing old storage")
5125     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5126       info("remove logical volumes for %s" % name)
5127       for lv in old_lvs:
5128         cfg.SetDiskID(lv, tgt_node)
5129         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5130         if msg:
5131           warning("Can't remove old LV: %s" % msg,
5132                   hint="manually remove unused LVs")
5133           continue
5134
5135   def _ExecD8Secondary(self, feedback_fn):
5136     """Replace the secondary node for drbd8.
5137
5138     The algorithm for replace is quite complicated:
5139       - for all disks of the instance:
5140         - create new LVs on the new node with same names
5141         - shutdown the drbd device on the old secondary
5142         - disconnect the drbd network on the primary
5143         - create the drbd device on the new secondary
5144         - network attach the drbd on the primary, using an artifice:
5145           the drbd code for Attach() will connect to the network if it
5146           finds a device which is connected to the good local disks but
5147           not network enabled
5148       - wait for sync across all devices
5149       - remove all disks from the old secondary
5150
5151     Failures are not very well handled.
5152
5153     """
5154     steps_total = 6
5155     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5156     instance = self.instance
5157     iv_names = {}
5158     # start of work
5159     cfg = self.cfg
5160     old_node = self.tgt_node
5161     new_node = self.new_node
5162     pri_node = instance.primary_node
5163     nodes_ip = {
5164       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5165       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5166       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5167       }
5168
5169     # Step: check device activation
5170     self.proc.LogStep(1, steps_total, "check device existence")
5171     info("checking volume groups")
5172     my_vg = cfg.GetVGName()
5173     results = self.rpc.call_vg_list([pri_node, new_node])
5174     for node in pri_node, new_node:
5175       res = results[node]
5176       if res.failed or not res.data or my_vg not in res.data:
5177         raise errors.OpExecError("Volume group '%s' not found on %s" %
5178                                  (my_vg, node))
5179     for idx, dev in enumerate(instance.disks):
5180       if idx not in self.op.disks:
5181         continue
5182       info("checking disk/%d on %s" % (idx, pri_node))
5183       cfg.SetDiskID(dev, pri_node)
5184       result = self.rpc.call_blockdev_find(pri_node, dev)
5185       msg = result.RemoteFailMsg()
5186       if not msg and not result.payload:
5187         msg = "disk not found"
5188       if msg:
5189         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5190                                  (idx, pri_node, msg))
5191
5192     # Step: check other node consistency
5193     self.proc.LogStep(2, steps_total, "check peer consistency")
5194     for idx, dev in enumerate(instance.disks):
5195       if idx not in self.op.disks:
5196         continue
5197       info("checking disk/%d consistency on %s" % (idx, pri_node))
5198       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5199         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5200                                  " unsafe to replace the secondary" %
5201                                  pri_node)
5202
5203     # Step: create new storage
5204     self.proc.LogStep(3, steps_total, "allocate new storage")
5205     for idx, dev in enumerate(instance.disks):
5206       info("adding new local storage on %s for disk/%d" %
5207            (new_node, idx))
5208       # we pass force_create=True to force LVM creation
5209       for new_lv in dev.children:
5210         _CreateBlockDev(self, new_node, instance, new_lv, True,
5211                         _GetInstanceInfoText(instance), False)
5212
5213     # Step 4: dbrd minors and drbd setups changes
5214     # after this, we must manually remove the drbd minors on both the
5215     # error and the success paths
5216     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5217                                    instance.name)
5218     logging.debug("Allocated minors %s" % (minors,))
5219     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5220     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5221       size = dev.size
5222       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5223       # create new devices on new_node; note that we create two IDs:
5224       # one without port, so the drbd will be activated without
5225       # networking information on the new node at this stage, and one
5226       # with network, for the latter activation in step 4
5227       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5228       if pri_node == o_node1:
5229         p_minor = o_minor1
5230       else:
5231         p_minor = o_minor2
5232
5233       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5234       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5235
5236       iv_names[idx] = (dev, dev.children, new_net_id)
5237       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5238                     new_net_id)
5239       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5240                               logical_id=new_alone_id,
5241                               children=dev.children)
5242       try:
5243         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5244                               _GetInstanceInfoText(instance), False)
5245       except errors.BlockDeviceError:
5246         self.cfg.ReleaseDRBDMinors(instance.name)
5247         raise
5248
5249     for idx, dev in enumerate(instance.disks):
5250       # we have new devices, shutdown the drbd on the old secondary
5251       info("shutting down drbd for disk/%d on old node" % idx)
5252       cfg.SetDiskID(dev, old_node)
5253       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5254       if msg:
5255         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5256                 (idx, msg),
5257                 hint="Please cleanup this device manually as soon as possible")
5258
5259     info("detaching primary drbds from the network (=> standalone)")
5260     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5261                                                instance.disks)[pri_node]
5262
5263     msg = result.RemoteFailMsg()
5264     if msg:
5265       # detaches didn't succeed (unlikely)
5266       self.cfg.ReleaseDRBDMinors(instance.name)
5267       raise errors.OpExecError("Can't detach the disks from the network on"
5268                                " old node: %s" % (msg,))
5269
5270     # if we managed to detach at least one, we update all the disks of
5271     # the instance to point to the new secondary
5272     info("updating instance configuration")
5273     for dev, _, new_logical_id in iv_names.itervalues():
5274       dev.logical_id = new_logical_id
5275       cfg.SetDiskID(dev, pri_node)
5276     cfg.Update(instance)
5277
5278     # and now perform the drbd attach
5279     info("attaching primary drbds to new secondary (standalone => connected)")
5280     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5281                                            instance.disks, instance.name,
5282                                            False)
5283     for to_node, to_result in result.items():
5284       msg = to_result.RemoteFailMsg()
5285       if msg:
5286         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5287                 hint="please do a gnt-instance info to see the"
5288                 " status of disks")
5289
5290     # this can fail as the old devices are degraded and _WaitForSync
5291     # does a combined result over all disks, so we don't check its
5292     # return value
5293     self.proc.LogStep(5, steps_total, "sync devices")
5294     _WaitForSync(self, instance, unlock=True)
5295
5296     # so check manually all the devices
5297     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5298       cfg.SetDiskID(dev, pri_node)
5299       result = self.rpc.call_blockdev_find(pri_node, dev)
5300       msg = result.RemoteFailMsg()
5301       if not msg and not result.payload:
5302         msg = "disk not found"
5303       if msg:
5304         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5305                                  (idx, msg))
5306       if result.payload[5]:
5307         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5308
5309     self.proc.LogStep(6, steps_total, "removing old storage")
5310     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5311       info("remove logical volumes for disk/%d" % idx)
5312       for lv in old_lvs:
5313         cfg.SetDiskID(lv, old_node)
5314         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5315         if msg:
5316           warning("Can't remove LV on old secondary: %s", msg,
5317                   hint="Cleanup stale volumes by hand")
5318
5319   def Exec(self, feedback_fn):
5320     """Execute disk replacement.
5321
5322     This dispatches the disk replacement to the appropriate handler.
5323
5324     """
5325     instance = self.instance
5326
5327     # Activate the instance disks if we're replacing them on a down instance
5328     if not instance.admin_up:
5329       _StartInstanceDisks(self, instance, True)
5330
5331     if self.op.mode == constants.REPLACE_DISK_CHG:
5332       fn = self._ExecD8Secondary
5333     else:
5334       fn = self._ExecD8DiskOnly
5335
5336     ret = fn(feedback_fn)
5337
5338     # Deactivate the instance disks if we're replacing them on a down instance
5339     if not instance.admin_up:
5340       _SafeShutdownInstanceDisks(self, instance)
5341
5342     return ret
5343
5344
5345 class LUGrowDisk(LogicalUnit):
5346   """Grow a disk of an instance.
5347
5348   """
5349   HPATH = "disk-grow"
5350   HTYPE = constants.HTYPE_INSTANCE
5351   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5352   REQ_BGL = False
5353
5354   def ExpandNames(self):
5355     self._ExpandAndLockInstance()
5356     self.needed_locks[locking.LEVEL_NODE] = []
5357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5358
5359   def DeclareLocks(self, level):
5360     if level == locking.LEVEL_NODE:
5361       self._LockInstancesNodes()
5362
5363   def BuildHooksEnv(self):
5364     """Build hooks env.
5365
5366     This runs on the master, the primary and all the secondaries.
5367
5368     """
5369     env = {
5370       "DISK": self.op.disk,
5371       "AMOUNT": self.op.amount,
5372       }
5373     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5374     nl = [
5375       self.cfg.GetMasterNode(),
5376       self.instance.primary_node,
5377       ]
5378     return env, nl, nl
5379
5380   def CheckPrereq(self):
5381     """Check prerequisites.
5382
5383     This checks that the instance is in the cluster.
5384
5385     """
5386     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5387     assert instance is not None, \
5388       "Cannot retrieve locked instance %s" % self.op.instance_name
5389     nodenames = list(instance.all_nodes)
5390     for node in nodenames:
5391       _CheckNodeOnline(self, node)
5392
5393
5394     self.instance = instance
5395
5396     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5397       raise errors.OpPrereqError("Instance's disk layout does not support"
5398                                  " growing.")
5399
5400     self.disk = instance.FindDisk(self.op.disk)
5401
5402     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5403                                        instance.hypervisor)
5404     for node in nodenames:
5405       info = nodeinfo[node]
5406       if info.failed or not info.data:
5407         raise errors.OpPrereqError("Cannot get current information"
5408                                    " from node '%s'" % node)
5409       vg_free = info.data.get('vg_free', None)
5410       if not isinstance(vg_free, int):
5411         raise errors.OpPrereqError("Can't compute free disk space on"
5412                                    " node %s" % node)
5413       if self.op.amount > vg_free:
5414         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5415                                    " %d MiB available, %d MiB required" %
5416                                    (node, vg_free, self.op.amount))
5417
5418   def Exec(self, feedback_fn):
5419     """Execute disk grow.
5420
5421     """
5422     instance = self.instance
5423     disk = self.disk
5424     for node in instance.all_nodes:
5425       self.cfg.SetDiskID(disk, node)
5426       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5427       msg = result.RemoteFailMsg()
5428       if msg:
5429         raise errors.OpExecError("Grow request failed to node %s: %s" %
5430                                  (node, msg))
5431     disk.RecordGrow(self.op.amount)
5432     self.cfg.Update(instance)
5433     if self.op.wait_for_sync:
5434       disk_abort = not _WaitForSync(self, instance)
5435       if disk_abort:
5436         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5437                              " status.\nPlease check the instance.")
5438
5439
5440 class LUQueryInstanceData(NoHooksLU):
5441   """Query runtime instance data.
5442
5443   """
5444   _OP_REQP = ["instances", "static"]
5445   REQ_BGL = False
5446
5447   def ExpandNames(self):
5448     self.needed_locks = {}
5449     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5450
5451     if not isinstance(self.op.instances, list):
5452       raise errors.OpPrereqError("Invalid argument type 'instances'")
5453
5454     if self.op.instances:
5455       self.wanted_names = []
5456       for name in self.op.instances:
5457         full_name = self.cfg.ExpandInstanceName(name)
5458         if full_name is None:
5459           raise errors.OpPrereqError("Instance '%s' not known" % name)
5460         self.wanted_names.append(full_name)
5461       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5462     else:
5463       self.wanted_names = None
5464       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5465
5466     self.needed_locks[locking.LEVEL_NODE] = []
5467     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5468
5469   def DeclareLocks(self, level):
5470     if level == locking.LEVEL_NODE:
5471       self._LockInstancesNodes()
5472
5473   def CheckPrereq(self):
5474     """Check prerequisites.
5475
5476     This only checks the optional instance list against the existing names.
5477
5478     """
5479     if self.wanted_names is None:
5480       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5481
5482     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5483                              in self.wanted_names]
5484     return
5485
5486   def _ComputeDiskStatus(self, instance, snode, dev):
5487     """Compute block device status.
5488
5489     """
5490     static = self.op.static
5491     if not static:
5492       self.cfg.SetDiskID(dev, instance.primary_node)
5493       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5494       msg = dev_pstatus.RemoteFailMsg()
5495       if msg:
5496         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5497                                  (instance.name, msg))
5498       dev_pstatus = dev_pstatus.payload
5499     else:
5500       dev_pstatus = None
5501
5502     if dev.dev_type in constants.LDS_DRBD:
5503       # we change the snode then (otherwise we use the one passed in)
5504       if dev.logical_id[0] == instance.primary_node:
5505         snode = dev.logical_id[1]
5506       else:
5507         snode = dev.logical_id[0]
5508
5509     if snode and not static:
5510       self.cfg.SetDiskID(dev, snode)
5511       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5512       msg = dev_sstatus.RemoteFailMsg()
5513       if msg:
5514         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5515                                  (instance.name, msg))
5516       dev_sstatus = dev_sstatus.payload
5517     else:
5518       dev_sstatus = None
5519
5520     if dev.children:
5521       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5522                       for child in dev.children]
5523     else:
5524       dev_children = []
5525
5526     data = {
5527       "iv_name": dev.iv_name,
5528       "dev_type": dev.dev_type,
5529       "logical_id": dev.logical_id,
5530       "physical_id": dev.physical_id,
5531       "pstatus": dev_pstatus,
5532       "sstatus": dev_sstatus,
5533       "children": dev_children,
5534       "mode": dev.mode,
5535       }
5536
5537     return data
5538
5539   def Exec(self, feedback_fn):
5540     """Gather and return data"""
5541     result = {}
5542
5543     cluster = self.cfg.GetClusterInfo()
5544
5545     for instance in self.wanted_instances:
5546       if not self.op.static:
5547         remote_info = self.rpc.call_instance_info(instance.primary_node,
5548                                                   instance.name,
5549                                                   instance.hypervisor)
5550         remote_info.Raise()
5551         remote_info = remote_info.data
5552         if remote_info and "state" in remote_info:
5553           remote_state = "up"
5554         else:
5555           remote_state = "down"
5556       else:
5557         remote_state = None
5558       if instance.admin_up:
5559         config_state = "up"
5560       else:
5561         config_state = "down"
5562
5563       disks = [self._ComputeDiskStatus(instance, None, device)
5564                for device in instance.disks]
5565
5566       idict = {
5567         "name": instance.name,
5568         "config_state": config_state,
5569         "run_state": remote_state,
5570         "pnode": instance.primary_node,
5571         "snodes": instance.secondary_nodes,
5572         "os": instance.os,
5573         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5574         "disks": disks,
5575         "hypervisor": instance.hypervisor,
5576         "network_port": instance.network_port,
5577         "hv_instance": instance.hvparams,
5578         "hv_actual": cluster.FillHV(instance),
5579         "be_instance": instance.beparams,
5580         "be_actual": cluster.FillBE(instance),
5581         }
5582
5583       result[instance.name] = idict
5584
5585     return result
5586
5587
5588 class LUSetInstanceParams(LogicalUnit):
5589   """Modifies an instances's parameters.
5590
5591   """
5592   HPATH = "instance-modify"
5593   HTYPE = constants.HTYPE_INSTANCE
5594   _OP_REQP = ["instance_name"]
5595   REQ_BGL = False
5596
5597   def CheckArguments(self):
5598     if not hasattr(self.op, 'nics'):
5599       self.op.nics = []
5600     if not hasattr(self.op, 'disks'):
5601       self.op.disks = []
5602     if not hasattr(self.op, 'beparams'):
5603       self.op.beparams = {}
5604     if not hasattr(self.op, 'hvparams'):
5605       self.op.hvparams = {}
5606     self.op.force = getattr(self.op, "force", False)
5607     if not (self.op.nics or self.op.disks or
5608             self.op.hvparams or self.op.beparams):
5609       raise errors.OpPrereqError("No changes submitted")
5610
5611     utils.CheckBEParams(self.op.beparams)
5612
5613     # Disk validation
5614     disk_addremove = 0
5615     for disk_op, disk_dict in self.op.disks:
5616       if disk_op == constants.DDM_REMOVE:
5617         disk_addremove += 1
5618         continue
5619       elif disk_op == constants.DDM_ADD:
5620         disk_addremove += 1
5621       else:
5622         if not isinstance(disk_op, int):
5623           raise errors.OpPrereqError("Invalid disk index")
5624       if disk_op == constants.DDM_ADD:
5625         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5626         if mode not in constants.DISK_ACCESS_SET:
5627           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5628         size = disk_dict.get('size', None)
5629         if size is None:
5630           raise errors.OpPrereqError("Required disk parameter size missing")
5631         try:
5632           size = int(size)
5633         except ValueError, err:
5634           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5635                                      str(err))
5636         disk_dict['size'] = size
5637       else:
5638         # modification of disk
5639         if 'size' in disk_dict:
5640           raise errors.OpPrereqError("Disk size change not possible, use"
5641                                      " grow-disk")
5642
5643     if disk_addremove > 1:
5644       raise errors.OpPrereqError("Only one disk add or remove operation"
5645                                  " supported at a time")
5646
5647     # NIC validation
5648     nic_addremove = 0
5649     for nic_op, nic_dict in self.op.nics:
5650       if nic_op == constants.DDM_REMOVE:
5651         nic_addremove += 1
5652         continue
5653       elif nic_op == constants.DDM_ADD:
5654         nic_addremove += 1
5655       else:
5656         if not isinstance(nic_op, int):
5657           raise errors.OpPrereqError("Invalid nic index")
5658
5659       # nic_dict should be a dict
5660       nic_ip = nic_dict.get('ip', None)
5661       if nic_ip is not None:
5662         if nic_ip.lower() == "none":
5663           nic_dict['ip'] = None
5664         else:
5665           if not utils.IsValidIP(nic_ip):
5666             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5667       # we can only check None bridges and assign the default one
5668       nic_bridge = nic_dict.get('bridge', None)
5669       if nic_bridge is None:
5670         nic_dict['bridge'] = self.cfg.GetDefBridge()
5671       # but we can validate MACs
5672       nic_mac = nic_dict.get('mac', None)
5673       if nic_mac is not None:
5674         if self.cfg.IsMacInUse(nic_mac):
5675           raise errors.OpPrereqError("MAC address %s already in use"
5676                                      " in cluster" % nic_mac)
5677         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5678           if not utils.IsValidMac(nic_mac):
5679             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5680     if nic_addremove > 1:
5681       raise errors.OpPrereqError("Only one NIC add or remove operation"
5682                                  " supported at a time")
5683
5684   def ExpandNames(self):
5685     self._ExpandAndLockInstance()
5686     self.needed_locks[locking.LEVEL_NODE] = []
5687     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5688
5689   def DeclareLocks(self, level):
5690     if level == locking.LEVEL_NODE:
5691       self._LockInstancesNodes()
5692
5693   def BuildHooksEnv(self):
5694     """Build hooks env.
5695
5696     This runs on the master, primary and secondaries.
5697
5698     """
5699     args = dict()
5700     if constants.BE_MEMORY in self.be_new:
5701       args['memory'] = self.be_new[constants.BE_MEMORY]
5702     if constants.BE_VCPUS in self.be_new:
5703       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5704     # FIXME: readd disk/nic changes
5705     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5706     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5707     return env, nl, nl
5708
5709   def CheckPrereq(self):
5710     """Check prerequisites.
5711
5712     This only checks the instance list against the existing names.
5713
5714     """
5715     force = self.force = self.op.force
5716
5717     # checking the new params on the primary/secondary nodes
5718
5719     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5720     assert self.instance is not None, \
5721       "Cannot retrieve locked instance %s" % self.op.instance_name
5722     pnode = instance.primary_node
5723     nodelist = list(instance.all_nodes)
5724
5725     # hvparams processing
5726     if self.op.hvparams:
5727       i_hvdict = copy.deepcopy(instance.hvparams)
5728       for key, val in self.op.hvparams.iteritems():
5729         if val == constants.VALUE_DEFAULT:
5730           try:
5731             del i_hvdict[key]
5732           except KeyError:
5733             pass
5734         elif val == constants.VALUE_NONE:
5735           i_hvdict[key] = None
5736         else:
5737           i_hvdict[key] = val
5738       cluster = self.cfg.GetClusterInfo()
5739       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5740                                 i_hvdict)
5741       # local check
5742       hypervisor.GetHypervisor(
5743         instance.hypervisor).CheckParameterSyntax(hv_new)
5744       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5745       self.hv_new = hv_new # the new actual values
5746       self.hv_inst = i_hvdict # the new dict (without defaults)
5747     else:
5748       self.hv_new = self.hv_inst = {}
5749
5750     # beparams processing
5751     if self.op.beparams:
5752       i_bedict = copy.deepcopy(instance.beparams)
5753       for key, val in self.op.beparams.iteritems():
5754         if val == constants.VALUE_DEFAULT:
5755           try:
5756             del i_bedict[key]
5757           except KeyError:
5758             pass
5759         else:
5760           i_bedict[key] = val
5761       cluster = self.cfg.GetClusterInfo()
5762       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5763                                 i_bedict)
5764       self.be_new = be_new # the new actual values
5765       self.be_inst = i_bedict # the new dict (without defaults)
5766     else:
5767       self.be_new = self.be_inst = {}
5768
5769     self.warn = []
5770
5771     if constants.BE_MEMORY in self.op.beparams and not self.force:
5772       mem_check_list = [pnode]
5773       if be_new[constants.BE_AUTO_BALANCE]:
5774         # either we changed auto_balance to yes or it was from before
5775         mem_check_list.extend(instance.secondary_nodes)
5776       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5777                                                   instance.hypervisor)
5778       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5779                                          instance.hypervisor)
5780       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5781         # Assume the primary node is unreachable and go ahead
5782         self.warn.append("Can't get info from primary node %s" % pnode)
5783       else:
5784         if not instance_info.failed and instance_info.data:
5785           current_mem = instance_info.data['memory']
5786         else:
5787           # Assume instance not running
5788           # (there is a slight race condition here, but it's not very probable,
5789           # and we have no other way to check)
5790           current_mem = 0
5791         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5792                     nodeinfo[pnode].data['memory_free'])
5793         if miss_mem > 0:
5794           raise errors.OpPrereqError("This change will prevent the instance"
5795                                      " from starting, due to %d MB of memory"
5796                                      " missing on its primary node" % miss_mem)
5797
5798       if be_new[constants.BE_AUTO_BALANCE]:
5799         for node, nres in nodeinfo.iteritems():
5800           if node not in instance.secondary_nodes:
5801             continue
5802           if nres.failed or not isinstance(nres.data, dict):
5803             self.warn.append("Can't get info from secondary node %s" % node)
5804           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5805             self.warn.append("Not enough memory to failover instance to"
5806                              " secondary node %s" % node)
5807
5808     # NIC processing
5809     for nic_op, nic_dict in self.op.nics:
5810       if nic_op == constants.DDM_REMOVE:
5811         if not instance.nics:
5812           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5813         continue
5814       if nic_op != constants.DDM_ADD:
5815         # an existing nic
5816         if nic_op < 0 or nic_op >= len(instance.nics):
5817           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5818                                      " are 0 to %d" %
5819                                      (nic_op, len(instance.nics)))
5820       nic_bridge = nic_dict.get('bridge', None)
5821       if nic_bridge is not None:
5822         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5823           msg = ("Bridge '%s' doesn't exist on one of"
5824                  " the instance nodes" % nic_bridge)
5825           if self.force:
5826             self.warn.append(msg)
5827           else:
5828             raise errors.OpPrereqError(msg)
5829
5830     # DISK processing
5831     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5832       raise errors.OpPrereqError("Disk operations not supported for"
5833                                  " diskless instances")
5834     for disk_op, disk_dict in self.op.disks:
5835       if disk_op == constants.DDM_REMOVE:
5836         if len(instance.disks) == 1:
5837           raise errors.OpPrereqError("Cannot remove the last disk of"
5838                                      " an instance")
5839         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5840         ins_l = ins_l[pnode]
5841         if ins_l.failed or not isinstance(ins_l.data, list):
5842           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5843         if instance.name in ins_l.data:
5844           raise errors.OpPrereqError("Instance is running, can't remove"
5845                                      " disks.")
5846
5847       if (disk_op == constants.DDM_ADD and
5848           len(instance.nics) >= constants.MAX_DISKS):
5849         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5850                                    " add more" % constants.MAX_DISKS)
5851       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5852         # an existing disk
5853         if disk_op < 0 or disk_op >= len(instance.disks):
5854           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5855                                      " are 0 to %d" %
5856                                      (disk_op, len(instance.disks)))
5857
5858     return
5859
5860   def Exec(self, feedback_fn):
5861     """Modifies an instance.
5862
5863     All parameters take effect only at the next restart of the instance.
5864
5865     """
5866     # Process here the warnings from CheckPrereq, as we don't have a
5867     # feedback_fn there.
5868     for warn in self.warn:
5869       feedback_fn("WARNING: %s" % warn)
5870
5871     result = []
5872     instance = self.instance
5873     # disk changes
5874     for disk_op, disk_dict in self.op.disks:
5875       if disk_op == constants.DDM_REMOVE:
5876         # remove the last disk
5877         device = instance.disks.pop()
5878         device_idx = len(instance.disks)
5879         for node, disk in device.ComputeNodeTree(instance.primary_node):
5880           self.cfg.SetDiskID(disk, node)
5881           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5882           if msg:
5883             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5884                             " continuing anyway", device_idx, node, msg)
5885         result.append(("disk/%d" % device_idx, "remove"))
5886       elif disk_op == constants.DDM_ADD:
5887         # add a new disk
5888         if instance.disk_template == constants.DT_FILE:
5889           file_driver, file_path = instance.disks[0].logical_id
5890           file_path = os.path.dirname(file_path)
5891         else:
5892           file_driver = file_path = None
5893         disk_idx_base = len(instance.disks)
5894         new_disk = _GenerateDiskTemplate(self,
5895                                          instance.disk_template,
5896                                          instance.name, instance.primary_node,
5897                                          instance.secondary_nodes,
5898                                          [disk_dict],
5899                                          file_path,
5900                                          file_driver,
5901                                          disk_idx_base)[0]
5902         instance.disks.append(new_disk)
5903         info = _GetInstanceInfoText(instance)
5904
5905         logging.info("Creating volume %s for instance %s",
5906                      new_disk.iv_name, instance.name)
5907         # Note: this needs to be kept in sync with _CreateDisks
5908         #HARDCODE
5909         for node in instance.all_nodes:
5910           f_create = node == instance.primary_node
5911           try:
5912             _CreateBlockDev(self, node, instance, new_disk,
5913                             f_create, info, f_create)
5914           except errors.OpExecError, err:
5915             self.LogWarning("Failed to create volume %s (%s) on"
5916                             " node %s: %s",
5917                             new_disk.iv_name, new_disk, node, err)
5918         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5919                        (new_disk.size, new_disk.mode)))
5920       else:
5921         # change a given disk
5922         instance.disks[disk_op].mode = disk_dict['mode']
5923         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5924     # NIC changes
5925     for nic_op, nic_dict in self.op.nics:
5926       if nic_op == constants.DDM_REMOVE:
5927         # remove the last nic
5928         del instance.nics[-1]
5929         result.append(("nic.%d" % len(instance.nics), "remove"))
5930       elif nic_op == constants.DDM_ADD:
5931         # add a new nic
5932         if 'mac' not in nic_dict:
5933           mac = constants.VALUE_GENERATE
5934         else:
5935           mac = nic_dict['mac']
5936         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5937           mac = self.cfg.GenerateMAC()
5938         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5939                               bridge=nic_dict.get('bridge', None))
5940         instance.nics.append(new_nic)
5941         result.append(("nic.%d" % (len(instance.nics) - 1),
5942                        "add:mac=%s,ip=%s,bridge=%s" %
5943                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5944       else:
5945         # change a given nic
5946         for key in 'mac', 'ip', 'bridge':
5947           if key in nic_dict:
5948             setattr(instance.nics[nic_op], key, nic_dict[key])
5949             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5950
5951     # hvparams changes
5952     if self.op.hvparams:
5953       instance.hvparams = self.hv_inst
5954       for key, val in self.op.hvparams.iteritems():
5955         result.append(("hv/%s" % key, val))
5956
5957     # beparams changes
5958     if self.op.beparams:
5959       instance.beparams = self.be_inst
5960       for key, val in self.op.beparams.iteritems():
5961         result.append(("be/%s" % key, val))
5962
5963     self.cfg.Update(instance)
5964
5965     return result
5966
5967
5968 class LUQueryExports(NoHooksLU):
5969   """Query the exports list
5970
5971   """
5972   _OP_REQP = ['nodes']
5973   REQ_BGL = False
5974
5975   def ExpandNames(self):
5976     self.needed_locks = {}
5977     self.share_locks[locking.LEVEL_NODE] = 1
5978     if not self.op.nodes:
5979       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5980     else:
5981       self.needed_locks[locking.LEVEL_NODE] = \
5982         _GetWantedNodes(self, self.op.nodes)
5983
5984   def CheckPrereq(self):
5985     """Check prerequisites.
5986
5987     """
5988     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5989
5990   def Exec(self, feedback_fn):
5991     """Compute the list of all the exported system images.
5992
5993     @rtype: dict
5994     @return: a dictionary with the structure node->(export-list)
5995         where export-list is a list of the instances exported on
5996         that node.
5997
5998     """
5999     rpcresult = self.rpc.call_export_list(self.nodes)
6000     result = {}
6001     for node in rpcresult:
6002       if rpcresult[node].failed:
6003         result[node] = False
6004       else:
6005         result[node] = rpcresult[node].data
6006
6007     return result
6008
6009
6010 class LUExportInstance(LogicalUnit):
6011   """Export an instance to an image in the cluster.
6012
6013   """
6014   HPATH = "instance-export"
6015   HTYPE = constants.HTYPE_INSTANCE
6016   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6017   REQ_BGL = False
6018
6019   def ExpandNames(self):
6020     self._ExpandAndLockInstance()
6021     # FIXME: lock only instance primary and destination node
6022     #
6023     # Sad but true, for now we have do lock all nodes, as we don't know where
6024     # the previous export might be, and and in this LU we search for it and
6025     # remove it from its current node. In the future we could fix this by:
6026     #  - making a tasklet to search (share-lock all), then create the new one,
6027     #    then one to remove, after
6028     #  - removing the removal operation altoghether
6029     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6030
6031   def DeclareLocks(self, level):
6032     """Last minute lock declaration."""
6033     # All nodes are locked anyway, so nothing to do here.
6034
6035   def BuildHooksEnv(self):
6036     """Build hooks env.
6037
6038     This will run on the master, primary node and target node.
6039
6040     """
6041     env = {
6042       "EXPORT_NODE": self.op.target_node,
6043       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6044       }
6045     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6046     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6047           self.op.target_node]
6048     return env, nl, nl
6049
6050   def CheckPrereq(self):
6051     """Check prerequisites.
6052
6053     This checks that the instance and node names are valid.
6054
6055     """
6056     instance_name = self.op.instance_name
6057     self.instance = self.cfg.GetInstanceInfo(instance_name)
6058     assert self.instance is not None, \
6059           "Cannot retrieve locked instance %s" % self.op.instance_name
6060     _CheckNodeOnline(self, self.instance.primary_node)
6061
6062     self.dst_node = self.cfg.GetNodeInfo(
6063       self.cfg.ExpandNodeName(self.op.target_node))
6064
6065     if self.dst_node is None:
6066       # This is wrong node name, not a non-locked node
6067       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6068     _CheckNodeOnline(self, self.dst_node.name)
6069     _CheckNodeNotDrained(self, self.dst_node.name)
6070
6071     # instance disk type verification
6072     for disk in self.instance.disks:
6073       if disk.dev_type == constants.LD_FILE:
6074         raise errors.OpPrereqError("Export not supported for instances with"
6075                                    " file-based disks")
6076
6077   def Exec(self, feedback_fn):
6078     """Export an instance to an image in the cluster.
6079
6080     """
6081     instance = self.instance
6082     dst_node = self.dst_node
6083     src_node = instance.primary_node
6084     if self.op.shutdown:
6085       # shutdown the instance, but not the disks
6086       result = self.rpc.call_instance_shutdown(src_node, instance)
6087       result.Raise()
6088       if not result.data:
6089         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6090                                  (instance.name, src_node))
6091
6092     vgname = self.cfg.GetVGName()
6093
6094     snap_disks = []
6095
6096     # set the disks ID correctly since call_instance_start needs the
6097     # correct drbd minor to create the symlinks
6098     for disk in instance.disks:
6099       self.cfg.SetDiskID(disk, src_node)
6100
6101     try:
6102       for disk in instance.disks:
6103         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6104         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6105         if new_dev_name.failed or not new_dev_name.data:
6106           self.LogWarning("Could not snapshot block device %s on node %s",
6107                           disk.logical_id[1], src_node)
6108           snap_disks.append(False)
6109         else:
6110           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6111                                  logical_id=(vgname, new_dev_name.data),
6112                                  physical_id=(vgname, new_dev_name.data),
6113                                  iv_name=disk.iv_name)
6114           snap_disks.append(new_dev)
6115
6116     finally:
6117       if self.op.shutdown and instance.admin_up:
6118         result = self.rpc.call_instance_start(src_node, instance, None)
6119         msg = result.RemoteFailMsg()
6120         if msg:
6121           _ShutdownInstanceDisks(self, instance)
6122           raise errors.OpExecError("Could not start instance: %s" % msg)
6123
6124     # TODO: check for size
6125
6126     cluster_name = self.cfg.GetClusterName()
6127     for idx, dev in enumerate(snap_disks):
6128       if dev:
6129         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6130                                                instance, cluster_name, idx)
6131         if result.failed or not result.data:
6132           self.LogWarning("Could not export block device %s from node %s to"
6133                           " node %s", dev.logical_id[1], src_node,
6134                           dst_node.name)
6135         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6136         if msg:
6137           self.LogWarning("Could not remove snapshot block device %s from node"
6138                           " %s: %s", dev.logical_id[1], src_node, msg)
6139
6140     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6141     if result.failed or not result.data:
6142       self.LogWarning("Could not finalize export for instance %s on node %s",
6143                       instance.name, dst_node.name)
6144
6145     nodelist = self.cfg.GetNodeList()
6146     nodelist.remove(dst_node.name)
6147
6148     # on one-node clusters nodelist will be empty after the removal
6149     # if we proceed the backup would be removed because OpQueryExports
6150     # substitutes an empty list with the full cluster node list.
6151     if nodelist:
6152       exportlist = self.rpc.call_export_list(nodelist)
6153       for node in exportlist:
6154         if exportlist[node].failed:
6155           continue
6156         if instance.name in exportlist[node].data:
6157           if not self.rpc.call_export_remove(node, instance.name):
6158             self.LogWarning("Could not remove older export for instance %s"
6159                             " on node %s", instance.name, node)
6160
6161
6162 class LURemoveExport(NoHooksLU):
6163   """Remove exports related to the named instance.
6164
6165   """
6166   _OP_REQP = ["instance_name"]
6167   REQ_BGL = False
6168
6169   def ExpandNames(self):
6170     self.needed_locks = {}
6171     # We need all nodes to be locked in order for RemoveExport to work, but we
6172     # don't need to lock the instance itself, as nothing will happen to it (and
6173     # we can remove exports also for a removed instance)
6174     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6175
6176   def CheckPrereq(self):
6177     """Check prerequisites.
6178     """
6179     pass
6180
6181   def Exec(self, feedback_fn):
6182     """Remove any export.
6183
6184     """
6185     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6186     # If the instance was not found we'll try with the name that was passed in.
6187     # This will only work if it was an FQDN, though.
6188     fqdn_warn = False
6189     if not instance_name:
6190       fqdn_warn = True
6191       instance_name = self.op.instance_name
6192
6193     exportlist = self.rpc.call_export_list(self.acquired_locks[
6194       locking.LEVEL_NODE])
6195     found = False
6196     for node in exportlist:
6197       if exportlist[node].failed:
6198         self.LogWarning("Failed to query node %s, continuing" % node)
6199         continue
6200       if instance_name in exportlist[node].data:
6201         found = True
6202         result = self.rpc.call_export_remove(node, instance_name)
6203         if result.failed or not result.data:
6204           logging.error("Could not remove export for instance %s"
6205                         " on node %s", instance_name, node)
6206
6207     if fqdn_warn and not found:
6208       feedback_fn("Export not found. If trying to remove an export belonging"
6209                   " to a deleted instance please use its Fully Qualified"
6210                   " Domain Name.")
6211
6212
6213 class TagsLU(NoHooksLU):
6214   """Generic tags LU.
6215
6216   This is an abstract class which is the parent of all the other tags LUs.
6217
6218   """
6219
6220   def ExpandNames(self):
6221     self.needed_locks = {}
6222     if self.op.kind == constants.TAG_NODE:
6223       name = self.cfg.ExpandNodeName(self.op.name)
6224       if name is None:
6225         raise errors.OpPrereqError("Invalid node name (%s)" %
6226                                    (self.op.name,))
6227       self.op.name = name
6228       self.needed_locks[locking.LEVEL_NODE] = name
6229     elif self.op.kind == constants.TAG_INSTANCE:
6230       name = self.cfg.ExpandInstanceName(self.op.name)
6231       if name is None:
6232         raise errors.OpPrereqError("Invalid instance name (%s)" %
6233                                    (self.op.name,))
6234       self.op.name = name
6235       self.needed_locks[locking.LEVEL_INSTANCE] = name
6236
6237   def CheckPrereq(self):
6238     """Check prerequisites.
6239
6240     """
6241     if self.op.kind == constants.TAG_CLUSTER:
6242       self.target = self.cfg.GetClusterInfo()
6243     elif self.op.kind == constants.TAG_NODE:
6244       self.target = self.cfg.GetNodeInfo(self.op.name)
6245     elif self.op.kind == constants.TAG_INSTANCE:
6246       self.target = self.cfg.GetInstanceInfo(self.op.name)
6247     else:
6248       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6249                                  str(self.op.kind))
6250
6251
6252 class LUGetTags(TagsLU):
6253   """Returns the tags of a given object.
6254
6255   """
6256   _OP_REQP = ["kind", "name"]
6257   REQ_BGL = False
6258
6259   def Exec(self, feedback_fn):
6260     """Returns the tag list.
6261
6262     """
6263     return list(self.target.GetTags())
6264
6265
6266 class LUSearchTags(NoHooksLU):
6267   """Searches the tags for a given pattern.
6268
6269   """
6270   _OP_REQP = ["pattern"]
6271   REQ_BGL = False
6272
6273   def ExpandNames(self):
6274     self.needed_locks = {}
6275
6276   def CheckPrereq(self):
6277     """Check prerequisites.
6278
6279     This checks the pattern passed for validity by compiling it.
6280
6281     """
6282     try:
6283       self.re = re.compile(self.op.pattern)
6284     except re.error, err:
6285       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6286                                  (self.op.pattern, err))
6287
6288   def Exec(self, feedback_fn):
6289     """Returns the tag list.
6290
6291     """
6292     cfg = self.cfg
6293     tgts = [("/cluster", cfg.GetClusterInfo())]
6294     ilist = cfg.GetAllInstancesInfo().values()
6295     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6296     nlist = cfg.GetAllNodesInfo().values()
6297     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6298     results = []
6299     for path, target in tgts:
6300       for tag in target.GetTags():
6301         if self.re.search(tag):
6302           results.append((path, tag))
6303     return results
6304
6305
6306 class LUAddTags(TagsLU):
6307   """Sets a tag on a given object.
6308
6309   """
6310   _OP_REQP = ["kind", "name", "tags"]
6311   REQ_BGL = False
6312
6313   def CheckPrereq(self):
6314     """Check prerequisites.
6315
6316     This checks the type and length of the tag name and value.
6317
6318     """
6319     TagsLU.CheckPrereq(self)
6320     for tag in self.op.tags:
6321       objects.TaggableObject.ValidateTag(tag)
6322
6323   def Exec(self, feedback_fn):
6324     """Sets the tag.
6325
6326     """
6327     try:
6328       for tag in self.op.tags:
6329         self.target.AddTag(tag)
6330     except errors.TagError, err:
6331       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6332     try:
6333       self.cfg.Update(self.target)
6334     except errors.ConfigurationError:
6335       raise errors.OpRetryError("There has been a modification to the"
6336                                 " config file and the operation has been"
6337                                 " aborted. Please retry.")
6338
6339
6340 class LUDelTags(TagsLU):
6341   """Delete a list of tags from a given object.
6342
6343   """
6344   _OP_REQP = ["kind", "name", "tags"]
6345   REQ_BGL = False
6346
6347   def CheckPrereq(self):
6348     """Check prerequisites.
6349
6350     This checks that we have the given tag.
6351
6352     """
6353     TagsLU.CheckPrereq(self)
6354     for tag in self.op.tags:
6355       objects.TaggableObject.ValidateTag(tag)
6356     del_tags = frozenset(self.op.tags)
6357     cur_tags = self.target.GetTags()
6358     if not del_tags <= cur_tags:
6359       diff_tags = del_tags - cur_tags
6360       diff_names = ["'%s'" % tag for tag in diff_tags]
6361       diff_names.sort()
6362       raise errors.OpPrereqError("Tag(s) %s not found" %
6363                                  (",".join(diff_names)))
6364
6365   def Exec(self, feedback_fn):
6366     """Remove the tag from the object.
6367
6368     """
6369     for tag in self.op.tags:
6370       self.target.RemoveTag(tag)
6371     try:
6372       self.cfg.Update(self.target)
6373     except errors.ConfigurationError:
6374       raise errors.OpRetryError("There has been a modification to the"
6375                                 " config file and the operation has been"
6376                                 " aborted. Please retry.")
6377
6378
6379 class LUTestDelay(NoHooksLU):
6380   """Sleep for a specified amount of time.
6381
6382   This LU sleeps on the master and/or nodes for a specified amount of
6383   time.
6384
6385   """
6386   _OP_REQP = ["duration", "on_master", "on_nodes"]
6387   REQ_BGL = False
6388
6389   def ExpandNames(self):
6390     """Expand names and set required locks.
6391
6392     This expands the node list, if any.
6393
6394     """
6395     self.needed_locks = {}
6396     if self.op.on_nodes:
6397       # _GetWantedNodes can be used here, but is not always appropriate to use
6398       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6399       # more information.
6400       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6401       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6402
6403   def CheckPrereq(self):
6404     """Check prerequisites.
6405
6406     """
6407
6408   def Exec(self, feedback_fn):
6409     """Do the actual sleep.
6410
6411     """
6412     if self.op.on_master:
6413       if not utils.TestDelay(self.op.duration):
6414         raise errors.OpExecError("Error during master delay test")
6415     if self.op.on_nodes:
6416       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6417       if not result:
6418         raise errors.OpExecError("Complete failure from rpc call")
6419       for node, node_result in result.items():
6420         node_result.Raise()
6421         if not node_result.data:
6422           raise errors.OpExecError("Failure during rpc call to node %s,"
6423                                    " result: %s" % (node, node_result.data))
6424
6425
6426 class IAllocator(object):
6427   """IAllocator framework.
6428
6429   An IAllocator instance has three sets of attributes:
6430     - cfg that is needed to query the cluster
6431     - input data (all members of the _KEYS class attribute are required)
6432     - four buffer attributes (in|out_data|text), that represent the
6433       input (to the external script) in text and data structure format,
6434       and the output from it, again in two formats
6435     - the result variables from the script (success, info, nodes) for
6436       easy usage
6437
6438   """
6439   _ALLO_KEYS = [
6440     "mem_size", "disks", "disk_template",
6441     "os", "tags", "nics", "vcpus", "hypervisor",
6442     ]
6443   _RELO_KEYS = [
6444     "relocate_from",
6445     ]
6446
6447   def __init__(self, lu, mode, name, **kwargs):
6448     self.lu = lu
6449     # init buffer variables
6450     self.in_text = self.out_text = self.in_data = self.out_data = None
6451     # init all input fields so that pylint is happy
6452     self.mode = mode
6453     self.name = name
6454     self.mem_size = self.disks = self.disk_template = None
6455     self.os = self.tags = self.nics = self.vcpus = None
6456     self.hypervisor = None
6457     self.relocate_from = None
6458     # computed fields
6459     self.required_nodes = None
6460     # init result fields
6461     self.success = self.info = self.nodes = None
6462     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6463       keyset = self._ALLO_KEYS
6464     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6465       keyset = self._RELO_KEYS
6466     else:
6467       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6468                                    " IAllocator" % self.mode)
6469     for key in kwargs:
6470       if key not in keyset:
6471         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6472                                      " IAllocator" % key)
6473       setattr(self, key, kwargs[key])
6474     for key in keyset:
6475       if key not in kwargs:
6476         raise errors.ProgrammerError("Missing input parameter '%s' to"
6477                                      " IAllocator" % key)
6478     self._BuildInputData()
6479
6480   def _ComputeClusterData(self):
6481     """Compute the generic allocator input data.
6482
6483     This is the data that is independent of the actual operation.
6484
6485     """
6486     cfg = self.lu.cfg
6487     cluster_info = cfg.GetClusterInfo()
6488     # cluster data
6489     data = {
6490       "version": 1,
6491       "cluster_name": cfg.GetClusterName(),
6492       "cluster_tags": list(cluster_info.GetTags()),
6493       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6494       # we don't have job IDs
6495       }
6496     iinfo = cfg.GetAllInstancesInfo().values()
6497     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6498
6499     # node data
6500     node_results = {}
6501     node_list = cfg.GetNodeList()
6502
6503     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6504       hypervisor_name = self.hypervisor
6505     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6506       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6507
6508     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6509                                            hypervisor_name)
6510     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6511                        cluster_info.enabled_hypervisors)
6512     for nname, nresult in node_data.items():
6513       # first fill in static (config-based) values
6514       ninfo = cfg.GetNodeInfo(nname)
6515       pnr = {
6516         "tags": list(ninfo.GetTags()),
6517         "primary_ip": ninfo.primary_ip,
6518         "secondary_ip": ninfo.secondary_ip,
6519         "offline": ninfo.offline,
6520         "drained": ninfo.drained,
6521         "master_candidate": ninfo.master_candidate,
6522         }
6523
6524       if not ninfo.offline:
6525         nresult.Raise()
6526         if not isinstance(nresult.data, dict):
6527           raise errors.OpExecError("Can't get data for node %s" % nname)
6528         remote_info = nresult.data
6529         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6530                      'vg_size', 'vg_free', 'cpu_total']:
6531           if attr not in remote_info:
6532             raise errors.OpExecError("Node '%s' didn't return attribute"
6533                                      " '%s'" % (nname, attr))
6534           try:
6535             remote_info[attr] = int(remote_info[attr])
6536           except ValueError, err:
6537             raise errors.OpExecError("Node '%s' returned invalid value"
6538                                      " for '%s': %s" % (nname, attr, err))
6539         # compute memory used by primary instances
6540         i_p_mem = i_p_up_mem = 0
6541         for iinfo, beinfo in i_list:
6542           if iinfo.primary_node == nname:
6543             i_p_mem += beinfo[constants.BE_MEMORY]
6544             if iinfo.name not in node_iinfo[nname].data:
6545               i_used_mem = 0
6546             else:
6547               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6548             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6549             remote_info['memory_free'] -= max(0, i_mem_diff)
6550
6551             if iinfo.admin_up:
6552               i_p_up_mem += beinfo[constants.BE_MEMORY]
6553
6554         # compute memory used by instances
6555         pnr_dyn = {
6556           "total_memory": remote_info['memory_total'],
6557           "reserved_memory": remote_info['memory_dom0'],
6558           "free_memory": remote_info['memory_free'],
6559           "total_disk": remote_info['vg_size'],
6560           "free_disk": remote_info['vg_free'],
6561           "total_cpus": remote_info['cpu_total'],
6562           "i_pri_memory": i_p_mem,
6563           "i_pri_up_memory": i_p_up_mem,
6564           }
6565         pnr.update(pnr_dyn)
6566
6567       node_results[nname] = pnr
6568     data["nodes"] = node_results
6569
6570     # instance data
6571     instance_data = {}
6572     for iinfo, beinfo in i_list:
6573       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6574                   for n in iinfo.nics]
6575       pir = {
6576         "tags": list(iinfo.GetTags()),
6577         "admin_up": iinfo.admin_up,
6578         "vcpus": beinfo[constants.BE_VCPUS],
6579         "memory": beinfo[constants.BE_MEMORY],
6580         "os": iinfo.os,
6581         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6582         "nics": nic_data,
6583         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6584         "disk_template": iinfo.disk_template,
6585         "hypervisor": iinfo.hypervisor,
6586         }
6587       instance_data[iinfo.name] = pir
6588
6589     data["instances"] = instance_data
6590
6591     self.in_data = data
6592
6593   def _AddNewInstance(self):
6594     """Add new instance data to allocator structure.
6595
6596     This in combination with _AllocatorGetClusterData will create the
6597     correct structure needed as input for the allocator.
6598
6599     The checks for the completeness of the opcode must have already been
6600     done.
6601
6602     """
6603     data = self.in_data
6604
6605     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6606
6607     if self.disk_template in constants.DTS_NET_MIRROR:
6608       self.required_nodes = 2
6609     else:
6610       self.required_nodes = 1
6611     request = {
6612       "type": "allocate",
6613       "name": self.name,
6614       "disk_template": self.disk_template,
6615       "tags": self.tags,
6616       "os": self.os,
6617       "vcpus": self.vcpus,
6618       "memory": self.mem_size,
6619       "disks": self.disks,
6620       "disk_space_total": disk_space,
6621       "nics": self.nics,
6622       "required_nodes": self.required_nodes,
6623       }
6624     data["request"] = request
6625
6626   def _AddRelocateInstance(self):
6627     """Add relocate instance data to allocator structure.
6628
6629     This in combination with _IAllocatorGetClusterData will create the
6630     correct structure needed as input for the allocator.
6631
6632     The checks for the completeness of the opcode must have already been
6633     done.
6634
6635     """
6636     instance = self.lu.cfg.GetInstanceInfo(self.name)
6637     if instance is None:
6638       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6639                                    " IAllocator" % self.name)
6640
6641     if instance.disk_template not in constants.DTS_NET_MIRROR:
6642       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6643
6644     if len(instance.secondary_nodes) != 1:
6645       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6646
6647     self.required_nodes = 1
6648     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6649     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6650
6651     request = {
6652       "type": "relocate",
6653       "name": self.name,
6654       "disk_space_total": disk_space,
6655       "required_nodes": self.required_nodes,
6656       "relocate_from": self.relocate_from,
6657       }
6658     self.in_data["request"] = request
6659
6660   def _BuildInputData(self):
6661     """Build input data structures.
6662
6663     """
6664     self._ComputeClusterData()
6665
6666     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6667       self._AddNewInstance()
6668     else:
6669       self._AddRelocateInstance()
6670
6671     self.in_text = serializer.Dump(self.in_data)
6672
6673   def Run(self, name, validate=True, call_fn=None):
6674     """Run an instance allocator and return the results.
6675
6676     """
6677     if call_fn is None:
6678       call_fn = self.lu.rpc.call_iallocator_runner
6679     data = self.in_text
6680
6681     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6682     result.Raise()
6683
6684     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6685       raise errors.OpExecError("Invalid result from master iallocator runner")
6686
6687     rcode, stdout, stderr, fail = result.data
6688
6689     if rcode == constants.IARUN_NOTFOUND:
6690       raise errors.OpExecError("Can't find allocator '%s'" % name)
6691     elif rcode == constants.IARUN_FAILURE:
6692       raise errors.OpExecError("Instance allocator call failed: %s,"
6693                                " output: %s" % (fail, stdout+stderr))
6694     self.out_text = stdout
6695     if validate:
6696       self._ValidateResult()
6697
6698   def _ValidateResult(self):
6699     """Process the allocator results.
6700
6701     This will process and if successful save the result in
6702     self.out_data and the other parameters.
6703
6704     """
6705     try:
6706       rdict = serializer.Load(self.out_text)
6707     except Exception, err:
6708       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6709
6710     if not isinstance(rdict, dict):
6711       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6712
6713     for key in "success", "info", "nodes":
6714       if key not in rdict:
6715         raise errors.OpExecError("Can't parse iallocator results:"
6716                                  " missing key '%s'" % key)
6717       setattr(self, key, rdict[key])
6718
6719     if not isinstance(rdict["nodes"], list):
6720       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6721                                " is not a list")
6722     self.out_data = rdict
6723
6724
6725 class LUTestAllocator(NoHooksLU):
6726   """Run allocator tests.
6727
6728   This LU runs the allocator tests
6729
6730   """
6731   _OP_REQP = ["direction", "mode", "name"]
6732
6733   def CheckPrereq(self):
6734     """Check prerequisites.
6735
6736     This checks the opcode parameters depending on the director and mode test.
6737
6738     """
6739     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6740       for attr in ["name", "mem_size", "disks", "disk_template",
6741                    "os", "tags", "nics", "vcpus"]:
6742         if not hasattr(self.op, attr):
6743           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6744                                      attr)
6745       iname = self.cfg.ExpandInstanceName(self.op.name)
6746       if iname is not None:
6747         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6748                                    iname)
6749       if not isinstance(self.op.nics, list):
6750         raise errors.OpPrereqError("Invalid parameter 'nics'")
6751       for row in self.op.nics:
6752         if (not isinstance(row, dict) or
6753             "mac" not in row or
6754             "ip" not in row or
6755             "bridge" not in row):
6756           raise errors.OpPrereqError("Invalid contents of the"
6757                                      " 'nics' parameter")
6758       if not isinstance(self.op.disks, list):
6759         raise errors.OpPrereqError("Invalid parameter 'disks'")
6760       for row in self.op.disks:
6761         if (not isinstance(row, dict) or
6762             "size" not in row or
6763             not isinstance(row["size"], int) or
6764             "mode" not in row or
6765             row["mode"] not in ['r', 'w']):
6766           raise errors.OpPrereqError("Invalid contents of the"
6767                                      " 'disks' parameter")
6768       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6769         self.op.hypervisor = self.cfg.GetHypervisorType()
6770     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6771       if not hasattr(self.op, "name"):
6772         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6773       fname = self.cfg.ExpandInstanceName(self.op.name)
6774       if fname is None:
6775         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6776                                    self.op.name)
6777       self.op.name = fname
6778       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6779     else:
6780       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6781                                  self.op.mode)
6782
6783     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6784       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6785         raise errors.OpPrereqError("Missing allocator name")
6786     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6787       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6788                                  self.op.direction)
6789
6790   def Exec(self, feedback_fn):
6791     """Run the allocator test.
6792
6793     """
6794     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6795       ial = IAllocator(self,
6796                        mode=self.op.mode,
6797                        name=self.op.name,
6798                        mem_size=self.op.mem_size,
6799                        disks=self.op.disks,
6800                        disk_template=self.op.disk_template,
6801                        os=self.op.os,
6802                        tags=self.op.tags,
6803                        nics=self.op.nics,
6804                        vcpus=self.op.vcpus,
6805                        hypervisor=self.op.hypervisor,
6806                        )
6807     else:
6808       ial = IAllocator(self,
6809                        mode=self.op.mode,
6810                        name=self.op.name,
6811                        relocate_from=list(self.relocate_from),
6812                        )
6813
6814     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6815       result = ial.in_text
6816     else:
6817       ial.Run(self.op.allocator, validate=False)
6818       result = ial.out_text
6819     return result