a06decbc59ab0ac27b6e74a429caa956c4028ac4
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @rtype: dict
480   @return: the hook environment for this instance
481
482   """
483   if status:
484     str_status = "up"
485   else:
486     str_status = "down"
487   env = {
488     "OP_TARGET": name,
489     "INSTANCE_NAME": name,
490     "INSTANCE_PRIMARY": primary_node,
491     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492     "INSTANCE_OS_TYPE": os_type,
493     "INSTANCE_STATUS": str_status,
494     "INSTANCE_MEMORY": memory,
495     "INSTANCE_VCPUS": vcpus,
496   }
497
498   if nics:
499     nic_count = len(nics)
500     for idx, (ip, bridge, mac) in enumerate(nics):
501       if ip is None:
502         ip = ""
503       env["INSTANCE_NIC%d_IP" % idx] = ip
504       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506   else:
507     nic_count = 0
508
509   env["INSTANCE_NIC_COUNT"] = nic_count
510
511   return env
512
513
514 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515   """Builds instance related env variables for hooks from an object.
516
517   @type lu: L{LogicalUnit}
518   @param lu: the logical unit on whose behalf we execute
519   @type instance: L{objects.Instance}
520   @param instance: the instance for which we should build the
521       environment
522   @type override: dict
523   @param override: dictionary with key/values that will override
524       our values
525   @rtype: dict
526   @return: the hook environment dictionary
527
528   """
529   bep = lu.cfg.GetClusterInfo().FillBE(instance)
530   args = {
531     'name': instance.name,
532     'primary_node': instance.primary_node,
533     'secondary_nodes': instance.secondary_nodes,
534     'os_type': instance.os,
535     'status': instance.admin_up,
536     'memory': bep[constants.BE_MEMORY],
537     'vcpus': bep[constants.BE_VCPUS],
538     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539   }
540   if override:
541     args.update(override)
542   return _BuildInstanceHookEnv(**args)
543
544
545 def _AdjustCandidatePool(lu):
546   """Adjust the candidate pool after node operations.
547
548   """
549   mod_list = lu.cfg.MaintainCandidatePool()
550   if mod_list:
551     lu.LogInfo("Promoted nodes to master candidate role: %s",
552                ", ".join(node.name for node in mod_list))
553     for name in mod_list:
554       lu.context.ReaddNode(name)
555   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556   if mc_now > mc_max:
557     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558                (mc_now, mc_max))
559
560
561 def _CheckInstanceBridgesExist(lu, instance):
562   """Check that the brigdes needed by an instance exist.
563
564   """
565   # check bridges existance
566   brlist = [nic.bridge for nic in instance.nics]
567   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568   result.Raise()
569   if not result.data:
570     raise errors.OpPrereqError("One or more target bridges %s does not"
571                                " exist on destination node '%s'" %
572                                (brlist, instance.primary_node))
573
574
575 class LUDestroyCluster(NoHooksLU):
576   """Logical unit for destroying the cluster.
577
578   """
579   _OP_REQP = []
580
581   def CheckPrereq(self):
582     """Check prerequisites.
583
584     This checks whether the cluster is empty.
585
586     Any errors are signalled by raising errors.OpPrereqError.
587
588     """
589     master = self.cfg.GetMasterNode()
590
591     nodelist = self.cfg.GetNodeList()
592     if len(nodelist) != 1 or nodelist[0] != master:
593       raise errors.OpPrereqError("There are still %d node(s) in"
594                                  " this cluster." % (len(nodelist) - 1))
595     instancelist = self.cfg.GetInstanceList()
596     if instancelist:
597       raise errors.OpPrereqError("There are still %d instance(s) in"
598                                  " this cluster." % len(instancelist))
599
600   def Exec(self, feedback_fn):
601     """Destroys the cluster.
602
603     """
604     master = self.cfg.GetMasterNode()
605     result = self.rpc.call_node_stop_master(master, False)
606     result.Raise()
607     if not result.data:
608       raise errors.OpExecError("Could not disable the master role")
609     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610     utils.CreateBackup(priv_key)
611     utils.CreateBackup(pub_key)
612     return master
613
614
615 class LUVerifyCluster(LogicalUnit):
616   """Verifies the cluster status.
617
618   """
619   HPATH = "cluster-verify"
620   HTYPE = constants.HTYPE_CLUSTER
621   _OP_REQP = ["skip_checks"]
622   REQ_BGL = False
623
624   def ExpandNames(self):
625     self.needed_locks = {
626       locking.LEVEL_NODE: locking.ALL_SET,
627       locking.LEVEL_INSTANCE: locking.ALL_SET,
628     }
629     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630
631   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632                   node_result, feedback_fn, master_files,
633                   drbd_map):
634     """Run multiple tests against a node.
635
636     Test list:
637
638       - compares ganeti version
639       - checks vg existance and size > 20G
640       - checks config file checksum
641       - checks ssh to other nodes
642
643     @type nodeinfo: L{objects.Node}
644     @param nodeinfo: the node to check
645     @param file_list: required list of files
646     @param local_cksum: dictionary of local files and their checksums
647     @param node_result: the results from the node
648     @param feedback_fn: function used to accumulate results
649     @param master_files: list of files that only masters should have
650     @param drbd_map: the useddrbd minors for this node, in
651         form of minor: (instance, must_exist) which correspond to instances
652         and their running status
653
654     """
655     node = nodeinfo.name
656
657     # main result, node_result should be a non-empty dict
658     if not node_result or not isinstance(node_result, dict):
659       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660       return True
661
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     remote_version = node_result.get('version', None)
665     if not (remote_version and isinstance(remote_version, (list, tuple)) and
666             len(remote_version) == 2):
667       feedback_fn("  - ERROR: connection to %s failed" % (node))
668       return True
669
670     if local_version != remote_version[0]:
671       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672                   " node %s %s" % (local_version, node, remote_version[0]))
673       return True
674
675     # node seems compatible, we can actually try to look into its results
676
677     bad = False
678
679     # full package version
680     if constants.RELEASE_VERSION != remote_version[1]:
681       feedback_fn("  - WARNING: software version mismatch: master %s,"
682                   " node %s %s" %
683                   (constants.RELEASE_VERSION, node, remote_version[1]))
684
685     # checks vg existence and size > 20G
686
687     vglist = node_result.get(constants.NV_VGLIST, None)
688     if not vglist:
689       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690                       (node,))
691       bad = True
692     else:
693       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694                                             constants.MIN_VG_SIZE)
695       if vgstatus:
696         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697         bad = True
698
699     # checks config file checksum
700
701     remote_cksum = node_result.get(constants.NV_FILELIST, None)
702     if not isinstance(remote_cksum, dict):
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned file checksum data")
705     else:
706       for file_name in file_list:
707         node_is_mc = nodeinfo.master_candidate
708         must_have_file = file_name not in master_files
709         if file_name not in remote_cksum:
710           if node_is_mc or must_have_file:
711             bad = True
712             feedback_fn("  - ERROR: file '%s' missing" % file_name)
713         elif remote_cksum[file_name] != local_cksum[file_name]:
714           if node_is_mc or must_have_file:
715             bad = True
716             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717           else:
718             # not candidate and this is not a must-have file
719             bad = True
720             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721                         " '%s'" % file_name)
722         else:
723           # all good, except non-master/non-must have combination
724           if not node_is_mc and not must_have_file:
725             feedback_fn("  - ERROR: file '%s' should not exist on non master"
726                         " candidates" % file_name)
727
728     # checks ssh to any
729
730     if constants.NV_NODELIST not in node_result:
731       bad = True
732       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733     else:
734       if node_result[constants.NV_NODELIST]:
735         bad = True
736         for node in node_result[constants.NV_NODELIST]:
737           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738                           (node, node_result[constants.NV_NODELIST][node]))
739
740     if constants.NV_NODENETTEST not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743     else:
744       if node_result[constants.NV_NODENETTEST]:
745         bad = True
746         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747         for node in nlist:
748           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749                           (node, node_result[constants.NV_NODENETTEST][node]))
750
751     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752     if isinstance(hyp_result, dict):
753       for hv_name, hv_result in hyp_result.iteritems():
754         if hv_result is not None:
755           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756                       (hv_name, hv_result))
757
758     # check used drbd list
759     used_minors = node_result.get(constants.NV_DRBDLIST, [])
760     for minor, (iname, must_exist) in drbd_map.items():
761       if minor not in used_minors and must_exist:
762         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
763                     (minor, iname))
764         bad = True
765     for minor in used_minors:
766       if minor not in drbd_map:
767         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
768         bad = True
769
770     return bad
771
772   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
773                       node_instance, feedback_fn, n_offline):
774     """Verify an instance.
775
776     This function checks to see if the required block devices are
777     available on the instance's node.
778
779     """
780     bad = False
781
782     node_current = instanceconfig.primary_node
783
784     node_vol_should = {}
785     instanceconfig.MapLVsByNode(node_vol_should)
786
787     for node in node_vol_should:
788       if node in n_offline:
789         # ignore missing volumes on offline nodes
790         continue
791       for volume in node_vol_should[node]:
792         if node not in node_vol_is or volume not in node_vol_is[node]:
793           feedback_fn("  - ERROR: volume %s missing on node %s" %
794                           (volume, node))
795           bad = True
796
797     if instanceconfig.admin_up:
798       if ((node_current not in node_instance or
799           not instance in node_instance[node_current]) and
800           node_current not in n_offline):
801         feedback_fn("  - ERROR: instance %s not running on node %s" %
802                         (instance, node_current))
803         bad = True
804
805     for node in node_instance:
806       if (not node == node_current):
807         if instance in node_instance[node]:
808           feedback_fn("  - ERROR: instance %s should not run on node %s" %
809                           (instance, node))
810           bad = True
811
812     return bad
813
814   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
815     """Verify if there are any unknown volumes in the cluster.
816
817     The .os, .swap and backup volumes are ignored. All other volumes are
818     reported as unknown.
819
820     """
821     bad = False
822
823     for node in node_vol_is:
824       for volume in node_vol_is[node]:
825         if node not in node_vol_should or volume not in node_vol_should[node]:
826           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
827                       (volume, node))
828           bad = True
829     return bad
830
831   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
832     """Verify the list of running instances.
833
834     This checks what instances are running but unknown to the cluster.
835
836     """
837     bad = False
838     for node in node_instance:
839       for runninginstance in node_instance[node]:
840         if runninginstance not in instancelist:
841           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
842                           (runninginstance, node))
843           bad = True
844     return bad
845
846   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
847     """Verify N+1 Memory Resilience.
848
849     Check that if one single node dies we can still start all the instances it
850     was primary for.
851
852     """
853     bad = False
854
855     for node, nodeinfo in node_info.iteritems():
856       # This code checks that every node which is now listed as secondary has
857       # enough memory to host all instances it is supposed to should a single
858       # other node in the cluster fail.
859       # FIXME: not ready for failover to an arbitrary node
860       # FIXME: does not support file-backed instances
861       # WARNING: we currently take into account down instances as well as up
862       # ones, considering that even if they're down someone might want to start
863       # them even in the event of a node failure.
864       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
865         needed_mem = 0
866         for instance in instances:
867           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
868           if bep[constants.BE_AUTO_BALANCE]:
869             needed_mem += bep[constants.BE_MEMORY]
870         if nodeinfo['mfree'] < needed_mem:
871           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
872                       " failovers should node %s fail" % (node, prinode))
873           bad = True
874     return bad
875
876   def CheckPrereq(self):
877     """Check prerequisites.
878
879     Transform the list of checks we're going to skip into a set and check that
880     all its members are valid.
881
882     """
883     self.skip_set = frozenset(self.op.skip_checks)
884     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
885       raise errors.OpPrereqError("Invalid checks to be skipped specified")
886
887   def BuildHooksEnv(self):
888     """Build hooks env.
889
890     Cluster-Verify hooks just rone in the post phase and their failure makes
891     the output be logged in the verify output and the verification to fail.
892
893     """
894     all_nodes = self.cfg.GetNodeList()
895     # TODO: populate the environment with useful information for verify hooks
896     env = {}
897     return env, [], all_nodes
898
899   def Exec(self, feedback_fn):
900     """Verify integrity of cluster, performing various test on nodes.
901
902     """
903     bad = False
904     feedback_fn("* Verifying global settings")
905     for msg in self.cfg.VerifyConfig():
906       feedback_fn("  - ERROR: %s" % msg)
907
908     vg_name = self.cfg.GetVGName()
909     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
910     nodelist = utils.NiceSort(self.cfg.GetNodeList())
911     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
912     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
913     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
914                         for iname in instancelist)
915     i_non_redundant = [] # Non redundant instances
916     i_non_a_balanced = [] # Non auto-balanced instances
917     n_offline = [] # List of offline nodes
918     n_drained = [] # List of nodes being drained
919     node_volume = {}
920     node_instance = {}
921     node_info = {}
922     instance_cfg = {}
923
924     # FIXME: verify OS list
925     # do local checksums
926     master_files = [constants.CLUSTER_CONF_FILE]
927
928     file_names = ssconf.SimpleStore().GetFileList()
929     file_names.append(constants.SSL_CERT_FILE)
930     file_names.append(constants.RAPI_CERT_FILE)
931     file_names.extend(master_files)
932
933     local_checksums = utils.FingerprintFiles(file_names)
934
935     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
936     node_verify_param = {
937       constants.NV_FILELIST: file_names,
938       constants.NV_NODELIST: [node.name for node in nodeinfo
939                               if not node.offline],
940       constants.NV_HYPERVISOR: hypervisors,
941       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
942                                   node.secondary_ip) for node in nodeinfo
943                                  if not node.offline],
944       constants.NV_LVLIST: vg_name,
945       constants.NV_INSTANCELIST: hypervisors,
946       constants.NV_VGLIST: None,
947       constants.NV_VERSION: None,
948       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
949       constants.NV_DRBDLIST: None,
950       }
951     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
952                                            self.cfg.GetClusterName())
953
954     cluster = self.cfg.GetClusterInfo()
955     master_node = self.cfg.GetMasterNode()
956     all_drbd_map = self.cfg.ComputeDRBDMap()
957
958     for node_i in nodeinfo:
959       node = node_i.name
960       nresult = all_nvinfo[node].data
961
962       if node_i.offline:
963         feedback_fn("* Skipping offline node %s" % (node,))
964         n_offline.append(node)
965         continue
966
967       if node == master_node:
968         ntype = "master"
969       elif node_i.master_candidate:
970         ntype = "master candidate"
971       elif node_i.drained:
972         ntype = "drained"
973         n_drained.append(node)
974       else:
975         ntype = "regular"
976       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
977
978       if all_nvinfo[node].failed or not isinstance(nresult, dict):
979         feedback_fn("  - ERROR: connection to %s failed" % (node,))
980         bad = True
981         continue
982
983       node_drbd = {}
984       for minor, instance in all_drbd_map[node].items():
985         instance = instanceinfo[instance]
986         node_drbd[minor] = (instance.name, instance.admin_up)
987       result = self._VerifyNode(node_i, file_names, local_checksums,
988                                 nresult, feedback_fn, master_files,
989                                 node_drbd)
990       bad = bad or result
991
992       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
993       if isinstance(lvdata, basestring):
994         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
995                     (node, utils.SafeEncode(lvdata)))
996         bad = True
997         node_volume[node] = {}
998       elif not isinstance(lvdata, dict):
999         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1000         bad = True
1001         continue
1002       else:
1003         node_volume[node] = lvdata
1004
1005       # node_instance
1006       idata = nresult.get(constants.NV_INSTANCELIST, None)
1007       if not isinstance(idata, list):
1008         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1009                     (node,))
1010         bad = True
1011         continue
1012
1013       node_instance[node] = idata
1014
1015       # node_info
1016       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1017       if not isinstance(nodeinfo, dict):
1018         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1019         bad = True
1020         continue
1021
1022       try:
1023         node_info[node] = {
1024           "mfree": int(nodeinfo['memory_free']),
1025           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1026           "pinst": [],
1027           "sinst": [],
1028           # dictionary holding all instances this node is secondary for,
1029           # grouped by their primary node. Each key is a cluster node, and each
1030           # value is a list of instances which have the key as primary and the
1031           # current node as secondary.  this is handy to calculate N+1 memory
1032           # availability if you can only failover from a primary to its
1033           # secondary.
1034           "sinst-by-pnode": {},
1035         }
1036       except ValueError:
1037         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1038         bad = True
1039         continue
1040
1041     node_vol_should = {}
1042
1043     for instance in instancelist:
1044       feedback_fn("* Verifying instance %s" % instance)
1045       inst_config = instanceinfo[instance]
1046       result =  self._VerifyInstance(instance, inst_config, node_volume,
1047                                      node_instance, feedback_fn, n_offline)
1048       bad = bad or result
1049       inst_nodes_offline = []
1050
1051       inst_config.MapLVsByNode(node_vol_should)
1052
1053       instance_cfg[instance] = inst_config
1054
1055       pnode = inst_config.primary_node
1056       if pnode in node_info:
1057         node_info[pnode]['pinst'].append(instance)
1058       elif pnode not in n_offline:
1059         feedback_fn("  - ERROR: instance %s, connection to primary node"
1060                     " %s failed" % (instance, pnode))
1061         bad = True
1062
1063       if pnode in n_offline:
1064         inst_nodes_offline.append(pnode)
1065
1066       # If the instance is non-redundant we cannot survive losing its primary
1067       # node, so we are not N+1 compliant. On the other hand we have no disk
1068       # templates with more than one secondary so that situation is not well
1069       # supported either.
1070       # FIXME: does not support file-backed instances
1071       if len(inst_config.secondary_nodes) == 0:
1072         i_non_redundant.append(instance)
1073       elif len(inst_config.secondary_nodes) > 1:
1074         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1075                     % instance)
1076
1077       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1078         i_non_a_balanced.append(instance)
1079
1080       for snode in inst_config.secondary_nodes:
1081         if snode in node_info:
1082           node_info[snode]['sinst'].append(instance)
1083           if pnode not in node_info[snode]['sinst-by-pnode']:
1084             node_info[snode]['sinst-by-pnode'][pnode] = []
1085           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1086         elif snode not in n_offline:
1087           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1088                       " %s failed" % (instance, snode))
1089           bad = True
1090         if snode in n_offline:
1091           inst_nodes_offline.append(snode)
1092
1093       if inst_nodes_offline:
1094         # warn that the instance lives on offline nodes, and set bad=True
1095         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1096                     ", ".join(inst_nodes_offline))
1097         bad = True
1098
1099     feedback_fn("* Verifying orphan volumes")
1100     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1101                                        feedback_fn)
1102     bad = bad or result
1103
1104     feedback_fn("* Verifying remaining instances")
1105     result = self._VerifyOrphanInstances(instancelist, node_instance,
1106                                          feedback_fn)
1107     bad = bad or result
1108
1109     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1110       feedback_fn("* Verifying N+1 Memory redundancy")
1111       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1112       bad = bad or result
1113
1114     feedback_fn("* Other Notes")
1115     if i_non_redundant:
1116       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1117                   % len(i_non_redundant))
1118
1119     if i_non_a_balanced:
1120       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1121                   % len(i_non_a_balanced))
1122
1123     if n_offline:
1124       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1125
1126     if n_drained:
1127       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1128
1129     return not bad
1130
1131   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1132     """Analize the post-hooks' result
1133
1134     This method analyses the hook result, handles it, and sends some
1135     nicely-formatted feedback back to the user.
1136
1137     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1138         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1139     @param hooks_results: the results of the multi-node hooks rpc call
1140     @param feedback_fn: function used send feedback back to the caller
1141     @param lu_result: previous Exec result
1142     @return: the new Exec result, based on the previous result
1143         and hook results
1144
1145     """
1146     # We only really run POST phase hooks, and are only interested in
1147     # their results
1148     if phase == constants.HOOKS_PHASE_POST:
1149       # Used to change hooks' output to proper indentation
1150       indent_re = re.compile('^', re.M)
1151       feedback_fn("* Hooks Results")
1152       if not hooks_results:
1153         feedback_fn("  - ERROR: general communication failure")
1154         lu_result = 1
1155       else:
1156         for node_name in hooks_results:
1157           show_node_header = True
1158           res = hooks_results[node_name]
1159           if res.failed or res.data is False or not isinstance(res.data, list):
1160             if res.offline:
1161               # no need to warn or set fail return value
1162               continue
1163             feedback_fn("    Communication failure in hooks execution")
1164             lu_result = 1
1165             continue
1166           for script, hkr, output in res.data:
1167             if hkr == constants.HKR_FAIL:
1168               # The node header is only shown once, if there are
1169               # failing hooks on that node
1170               if show_node_header:
1171                 feedback_fn("  Node %s:" % node_name)
1172                 show_node_header = False
1173               feedback_fn("    ERROR: Script %s failed, output:" % script)
1174               output = indent_re.sub('      ', output)
1175               feedback_fn("%s" % output)
1176               lu_result = 1
1177
1178       return lu_result
1179
1180
1181 class LUVerifyDisks(NoHooksLU):
1182   """Verifies the cluster disks status.
1183
1184   """
1185   _OP_REQP = []
1186   REQ_BGL = False
1187
1188   def ExpandNames(self):
1189     self.needed_locks = {
1190       locking.LEVEL_NODE: locking.ALL_SET,
1191       locking.LEVEL_INSTANCE: locking.ALL_SET,
1192     }
1193     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This has no prerequisites.
1199
1200     """
1201     pass
1202
1203   def Exec(self, feedback_fn):
1204     """Verify integrity of cluster disks.
1205
1206     """
1207     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1208
1209     vg_name = self.cfg.GetVGName()
1210     nodes = utils.NiceSort(self.cfg.GetNodeList())
1211     instances = [self.cfg.GetInstanceInfo(name)
1212                  for name in self.cfg.GetInstanceList()]
1213
1214     nv_dict = {}
1215     for inst in instances:
1216       inst_lvs = {}
1217       if (not inst.admin_up or
1218           inst.disk_template not in constants.DTS_NET_MIRROR):
1219         continue
1220       inst.MapLVsByNode(inst_lvs)
1221       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1222       for node, vol_list in inst_lvs.iteritems():
1223         for vol in vol_list:
1224           nv_dict[(node, vol)] = inst
1225
1226     if not nv_dict:
1227       return result
1228
1229     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1230
1231     to_act = set()
1232     for node in nodes:
1233       # node_volume
1234       lvs = node_lvs[node]
1235       if lvs.failed:
1236         if not lvs.offline:
1237           self.LogWarning("Connection to node %s failed: %s" %
1238                           (node, lvs.data))
1239         continue
1240       lvs = lvs.data
1241       if isinstance(lvs, basestring):
1242         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1243         res_nlvm[node] = lvs
1244       elif not isinstance(lvs, dict):
1245         logging.warning("Connection to node %s failed or invalid data"
1246                         " returned", node)
1247         res_nodes.append(node)
1248         continue
1249
1250       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1251         inst = nv_dict.pop((node, lv_name), None)
1252         if (not lv_online and inst is not None
1253             and inst.name not in res_instances):
1254           res_instances.append(inst.name)
1255
1256     # any leftover items in nv_dict are missing LVs, let's arrange the
1257     # data better
1258     for key, inst in nv_dict.iteritems():
1259       if inst.name not in res_missing:
1260         res_missing[inst.name] = []
1261       res_missing[inst.name].append(key)
1262
1263     return result
1264
1265
1266 class LURenameCluster(LogicalUnit):
1267   """Rename the cluster.
1268
1269   """
1270   HPATH = "cluster-rename"
1271   HTYPE = constants.HTYPE_CLUSTER
1272   _OP_REQP = ["name"]
1273
1274   def BuildHooksEnv(self):
1275     """Build hooks env.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.cfg.GetClusterName(),
1280       "NEW_NAME": self.op.name,
1281       }
1282     mn = self.cfg.GetMasterNode()
1283     return env, [mn], [mn]
1284
1285   def CheckPrereq(self):
1286     """Verify that the passed name is a valid one.
1287
1288     """
1289     hostname = utils.HostInfo(self.op.name)
1290
1291     new_name = hostname.name
1292     self.ip = new_ip = hostname.ip
1293     old_name = self.cfg.GetClusterName()
1294     old_ip = self.cfg.GetMasterIP()
1295     if new_name == old_name and new_ip == old_ip:
1296       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1297                                  " cluster has changed")
1298     if new_ip != old_ip:
1299       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1300         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1301                                    " reachable on the network. Aborting." %
1302                                    new_ip)
1303
1304     self.op.name = new_name
1305
1306   def Exec(self, feedback_fn):
1307     """Rename the cluster.
1308
1309     """
1310     clustername = self.op.name
1311     ip = self.ip
1312
1313     # shutdown the master IP
1314     master = self.cfg.GetMasterNode()
1315     result = self.rpc.call_node_stop_master(master, False)
1316     if result.failed or not result.data:
1317       raise errors.OpExecError("Could not disable the master role")
1318
1319     try:
1320       cluster = self.cfg.GetClusterInfo()
1321       cluster.cluster_name = clustername
1322       cluster.master_ip = ip
1323       self.cfg.Update(cluster)
1324
1325       # update the known hosts file
1326       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1327       node_list = self.cfg.GetNodeList()
1328       try:
1329         node_list.remove(master)
1330       except ValueError:
1331         pass
1332       result = self.rpc.call_upload_file(node_list,
1333                                          constants.SSH_KNOWN_HOSTS_FILE)
1334       for to_node, to_result in result.iteritems():
1335         if to_result.failed or not to_result.data:
1336           logging.error("Copy of file %s to node %s failed",
1337                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1338
1339     finally:
1340       result = self.rpc.call_node_start_master(master, False)
1341       if result.failed or not result.data:
1342         self.LogWarning("Could not re-enable the master role on"
1343                         " the master, please restart manually.")
1344
1345
1346 def _RecursiveCheckIfLVMBased(disk):
1347   """Check if the given disk or its children are lvm-based.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to check
1351   @rtype: booleean
1352   @return: boolean indicating whether a LD_LV dev_type was found or not
1353
1354   """
1355   if disk.children:
1356     for chdisk in disk.children:
1357       if _RecursiveCheckIfLVMBased(chdisk):
1358         return True
1359   return disk.dev_type == constants.LD_LV
1360
1361
1362 class LUSetClusterParams(LogicalUnit):
1363   """Change the parameters of the cluster.
1364
1365   """
1366   HPATH = "cluster-modify"
1367   HTYPE = constants.HTYPE_CLUSTER
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def CheckParameters(self):
1372     """Check parameters
1373
1374     """
1375     if not hasattr(self.op, "candidate_pool_size"):
1376       self.op.candidate_pool_size = None
1377     if self.op.candidate_pool_size is not None:
1378       try:
1379         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1380       except ValueError, err:
1381         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1382                                    str(err))
1383       if self.op.candidate_pool_size < 1:
1384         raise errors.OpPrereqError("At least one master candidate needed")
1385
1386   def ExpandNames(self):
1387     # FIXME: in the future maybe other cluster params won't require checking on
1388     # all nodes to be modified.
1389     self.needed_locks = {
1390       locking.LEVEL_NODE: locking.ALL_SET,
1391     }
1392     self.share_locks[locking.LEVEL_NODE] = 1
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     """
1398     env = {
1399       "OP_TARGET": self.cfg.GetClusterName(),
1400       "NEW_VG_NAME": self.op.vg_name,
1401       }
1402     mn = self.cfg.GetMasterNode()
1403     return env, [mn], [mn]
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     This checks whether the given params don't conflict and
1409     if the given volume group is valid.
1410
1411     """
1412     if self.op.vg_name is not None and not self.op.vg_name:
1413       instances = self.cfg.GetAllInstancesInfo().values()
1414       for inst in instances:
1415         for disk in inst.disks:
1416           if _RecursiveCheckIfLVMBased(disk):
1417             raise errors.OpPrereqError("Cannot disable lvm storage while"
1418                                        " lvm-based instances exist")
1419
1420     node_list = self.acquired_locks[locking.LEVEL_NODE]
1421
1422     # if vg_name not None, checks given volume group on all nodes
1423     if self.op.vg_name:
1424       vglist = self.rpc.call_vg_list(node_list)
1425       for node in node_list:
1426         if vglist[node].failed:
1427           # ignoring down node
1428           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1429           continue
1430         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1431                                               self.op.vg_name,
1432                                               constants.MIN_VG_SIZE)
1433         if vgstatus:
1434           raise errors.OpPrereqError("Error on node '%s': %s" %
1435                                      (node, vgstatus))
1436
1437     self.cluster = cluster = self.cfg.GetClusterInfo()
1438     # validate beparams changes
1439     if self.op.beparams:
1440       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1441       self.new_beparams = cluster.FillDict(
1442         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1443
1444     # hypervisor list/parameters
1445     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1446     if self.op.hvparams:
1447       if not isinstance(self.op.hvparams, dict):
1448         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1449       for hv_name, hv_dict in self.op.hvparams.items():
1450         if hv_name not in self.new_hvparams:
1451           self.new_hvparams[hv_name] = hv_dict
1452         else:
1453           self.new_hvparams[hv_name].update(hv_dict)
1454
1455     if self.op.enabled_hypervisors is not None:
1456       self.hv_list = self.op.enabled_hypervisors
1457     else:
1458       self.hv_list = cluster.enabled_hypervisors
1459
1460     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1461       # either the enabled list has changed, or the parameters have, validate
1462       for hv_name, hv_params in self.new_hvparams.items():
1463         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1464             (self.op.enabled_hypervisors and
1465              hv_name in self.op.enabled_hypervisors)):
1466           # either this is a new hypervisor, or its parameters have changed
1467           hv_class = hypervisor.GetHypervisor(hv_name)
1468           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1469           hv_class.CheckParameterSyntax(hv_params)
1470           _CheckHVParams(self, node_list, hv_name, hv_params)
1471
1472   def Exec(self, feedback_fn):
1473     """Change the parameters of the cluster.
1474
1475     """
1476     if self.op.vg_name is not None:
1477       if self.op.vg_name != self.cfg.GetVGName():
1478         self.cfg.SetVGName(self.op.vg_name)
1479       else:
1480         feedback_fn("Cluster LVM configuration already in desired"
1481                     " state, not changing")
1482     if self.op.hvparams:
1483       self.cluster.hvparams = self.new_hvparams
1484     if self.op.enabled_hypervisors is not None:
1485       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1486     if self.op.beparams:
1487       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1488     if self.op.candidate_pool_size is not None:
1489       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1490
1491     self.cfg.Update(self.cluster)
1492
1493     # we want to update nodes after the cluster so that if any errors
1494     # happen, we have recorded and saved the cluster info
1495     if self.op.candidate_pool_size is not None:
1496       _AdjustCandidatePool(self)
1497
1498
1499 class LURedistributeConfig(NoHooksLU):
1500   """Force the redistribution of cluster configuration.
1501
1502   This is a very simple LU.
1503
1504   """
1505   _OP_REQP = []
1506   REQ_BGL = False
1507
1508   def ExpandNames(self):
1509     self.needed_locks = {
1510       locking.LEVEL_NODE: locking.ALL_SET,
1511     }
1512     self.share_locks[locking.LEVEL_NODE] = 1
1513
1514   def CheckPrereq(self):
1515     """Check prerequisites.
1516
1517     """
1518
1519   def Exec(self, feedback_fn):
1520     """Redistribute the configuration.
1521
1522     """
1523     self.cfg.Update(self.cfg.GetClusterInfo())
1524
1525
1526 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1527   """Sleep and poll for an instance's disk to sync.
1528
1529   """
1530   if not instance.disks:
1531     return True
1532
1533   if not oneshot:
1534     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1535
1536   node = instance.primary_node
1537
1538   for dev in instance.disks:
1539     lu.cfg.SetDiskID(dev, node)
1540
1541   retries = 0
1542   while True:
1543     max_time = 0
1544     done = True
1545     cumul_degraded = False
1546     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1547     if rstats.failed or not rstats.data:
1548       lu.LogWarning("Can't get any data from node %s", node)
1549       retries += 1
1550       if retries >= 10:
1551         raise errors.RemoteError("Can't contact node %s for mirror data,"
1552                                  " aborting." % node)
1553       time.sleep(6)
1554       continue
1555     rstats = rstats.data
1556     retries = 0
1557     for i, mstat in enumerate(rstats):
1558       if mstat is None:
1559         lu.LogWarning("Can't compute data for node %s/%s",
1560                            node, instance.disks[i].iv_name)
1561         continue
1562       # we ignore the ldisk parameter
1563       perc_done, est_time, is_degraded, _ = mstat
1564       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1565       if perc_done is not None:
1566         done = False
1567         if est_time is not None:
1568           rem_time = "%d estimated seconds remaining" % est_time
1569           max_time = est_time
1570         else:
1571           rem_time = "no time estimate"
1572         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1573                         (instance.disks[i].iv_name, perc_done, rem_time))
1574     if done or oneshot:
1575       break
1576
1577     time.sleep(min(60, max_time))
1578
1579   if done:
1580     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1581   return not cumul_degraded
1582
1583
1584 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1585   """Check that mirrors are not degraded.
1586
1587   The ldisk parameter, if True, will change the test from the
1588   is_degraded attribute (which represents overall non-ok status for
1589   the device(s)) to the ldisk (representing the local storage status).
1590
1591   """
1592   lu.cfg.SetDiskID(dev, node)
1593   if ldisk:
1594     idx = 6
1595   else:
1596     idx = 5
1597
1598   result = True
1599   if on_primary or dev.AssembleOnSecondary():
1600     rstats = lu.rpc.call_blockdev_find(node, dev)
1601     msg = rstats.RemoteFailMsg()
1602     if msg:
1603       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1604       result = False
1605     elif not rstats.payload:
1606       lu.LogWarning("Can't find disk on node %s", node)
1607       result = False
1608     else:
1609       result = result and (not rstats.payload[idx])
1610   if dev.children:
1611     for child in dev.children:
1612       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1613
1614   return result
1615
1616
1617 class LUDiagnoseOS(NoHooksLU):
1618   """Logical unit for OS diagnose/query.
1619
1620   """
1621   _OP_REQP = ["output_fields", "names"]
1622   REQ_BGL = False
1623   _FIELDS_STATIC = utils.FieldSet()
1624   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1625
1626   def ExpandNames(self):
1627     if self.op.names:
1628       raise errors.OpPrereqError("Selective OS query not supported")
1629
1630     _CheckOutputFields(static=self._FIELDS_STATIC,
1631                        dynamic=self._FIELDS_DYNAMIC,
1632                        selected=self.op.output_fields)
1633
1634     # Lock all nodes, in shared mode
1635     self.needed_locks = {}
1636     self.share_locks[locking.LEVEL_NODE] = 1
1637     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1638
1639   def CheckPrereq(self):
1640     """Check prerequisites.
1641
1642     """
1643
1644   @staticmethod
1645   def _DiagnoseByOS(node_list, rlist):
1646     """Remaps a per-node return list into an a per-os per-node dictionary
1647
1648     @param node_list: a list with the names of all nodes
1649     @param rlist: a map with node names as keys and OS objects as values
1650
1651     @rtype: dict
1652     @returns: a dictionary with osnames as keys and as value another map, with
1653         nodes as keys and list of OS objects as values, eg::
1654
1655           {"debian-etch": {"node1": [<object>,...],
1656                            "node2": [<object>,]}
1657           }
1658
1659     """
1660     all_os = {}
1661     for node_name, nr in rlist.iteritems():
1662       if nr.failed or not nr.data:
1663         continue
1664       for os_obj in nr.data:
1665         if os_obj.name not in all_os:
1666           # build a list of nodes for this os containing empty lists
1667           # for each node in node_list
1668           all_os[os_obj.name] = {}
1669           for nname in node_list:
1670             all_os[os_obj.name][nname] = []
1671         all_os[os_obj.name][node_name].append(os_obj)
1672     return all_os
1673
1674   def Exec(self, feedback_fn):
1675     """Compute the list of OSes.
1676
1677     """
1678     node_list = self.acquired_locks[locking.LEVEL_NODE]
1679     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1680                    if node in node_list]
1681     node_data = self.rpc.call_os_diagnose(valid_nodes)
1682     if node_data == False:
1683       raise errors.OpExecError("Can't gather the list of OSes")
1684     pol = self._DiagnoseByOS(valid_nodes, node_data)
1685     output = []
1686     for os_name, os_data in pol.iteritems():
1687       row = []
1688       for field in self.op.output_fields:
1689         if field == "name":
1690           val = os_name
1691         elif field == "valid":
1692           val = utils.all([osl and osl[0] for osl in os_data.values()])
1693         elif field == "node_status":
1694           val = {}
1695           for node_name, nos_list in os_data.iteritems():
1696             val[node_name] = [(v.status, v.path) for v in nos_list]
1697         else:
1698           raise errors.ParameterError(field)
1699         row.append(val)
1700       output.append(row)
1701
1702     return output
1703
1704
1705 class LURemoveNode(LogicalUnit):
1706   """Logical unit for removing a node.
1707
1708   """
1709   HPATH = "node-remove"
1710   HTYPE = constants.HTYPE_NODE
1711   _OP_REQP = ["node_name"]
1712
1713   def BuildHooksEnv(self):
1714     """Build hooks env.
1715
1716     This doesn't run on the target node in the pre phase as a failed
1717     node would then be impossible to remove.
1718
1719     """
1720     env = {
1721       "OP_TARGET": self.op.node_name,
1722       "NODE_NAME": self.op.node_name,
1723       }
1724     all_nodes = self.cfg.GetNodeList()
1725     all_nodes.remove(self.op.node_name)
1726     return env, all_nodes, all_nodes
1727
1728   def CheckPrereq(self):
1729     """Check prerequisites.
1730
1731     This checks:
1732      - the node exists in the configuration
1733      - it does not have primary or secondary instances
1734      - it's not the master
1735
1736     Any errors are signalled by raising errors.OpPrereqError.
1737
1738     """
1739     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1740     if node is None:
1741       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1742
1743     instance_list = self.cfg.GetInstanceList()
1744
1745     masternode = self.cfg.GetMasterNode()
1746     if node.name == masternode:
1747       raise errors.OpPrereqError("Node is the master node,"
1748                                  " you need to failover first.")
1749
1750     for instance_name in instance_list:
1751       instance = self.cfg.GetInstanceInfo(instance_name)
1752       if node.name in instance.all_nodes:
1753         raise errors.OpPrereqError("Instance %s is still running on the node,"
1754                                    " please remove first." % instance_name)
1755     self.op.node_name = node.name
1756     self.node = node
1757
1758   def Exec(self, feedback_fn):
1759     """Removes the node from the cluster.
1760
1761     """
1762     node = self.node
1763     logging.info("Stopping the node daemon and removing configs from node %s",
1764                  node.name)
1765
1766     self.context.RemoveNode(node.name)
1767
1768     self.rpc.call_node_leave_cluster(node.name)
1769
1770     # Promote nodes to master candidate as needed
1771     _AdjustCandidatePool(self)
1772
1773
1774 class LUQueryNodes(NoHooksLU):
1775   """Logical unit for querying nodes.
1776
1777   """
1778   _OP_REQP = ["output_fields", "names", "use_locking"]
1779   REQ_BGL = False
1780   _FIELDS_DYNAMIC = utils.FieldSet(
1781     "dtotal", "dfree",
1782     "mtotal", "mnode", "mfree",
1783     "bootid",
1784     "ctotal", "cnodes", "csockets",
1785     )
1786
1787   _FIELDS_STATIC = utils.FieldSet(
1788     "name", "pinst_cnt", "sinst_cnt",
1789     "pinst_list", "sinst_list",
1790     "pip", "sip", "tags",
1791     "serial_no",
1792     "master_candidate",
1793     "master",
1794     "offline",
1795     "drained",
1796     )
1797
1798   def ExpandNames(self):
1799     _CheckOutputFields(static=self._FIELDS_STATIC,
1800                        dynamic=self._FIELDS_DYNAMIC,
1801                        selected=self.op.output_fields)
1802
1803     self.needed_locks = {}
1804     self.share_locks[locking.LEVEL_NODE] = 1
1805
1806     if self.op.names:
1807       self.wanted = _GetWantedNodes(self, self.op.names)
1808     else:
1809       self.wanted = locking.ALL_SET
1810
1811     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1812     self.do_locking = self.do_node_query and self.op.use_locking
1813     if self.do_locking:
1814       # if we don't request only static fields, we need to lock the nodes
1815       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1816
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     """
1822     # The validation of the node list is done in the _GetWantedNodes,
1823     # if non empty, and if empty, there's no validation to do
1824     pass
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     all_info = self.cfg.GetAllNodesInfo()
1831     if self.do_locking:
1832       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1833     elif self.wanted != locking.ALL_SET:
1834       nodenames = self.wanted
1835       missing = set(nodenames).difference(all_info.keys())
1836       if missing:
1837         raise errors.OpExecError(
1838           "Some nodes were removed before retrieving their data: %s" % missing)
1839     else:
1840       nodenames = all_info.keys()
1841
1842     nodenames = utils.NiceSort(nodenames)
1843     nodelist = [all_info[name] for name in nodenames]
1844
1845     # begin data gathering
1846
1847     if self.do_node_query:
1848       live_data = {}
1849       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1850                                           self.cfg.GetHypervisorType())
1851       for name in nodenames:
1852         nodeinfo = node_data[name]
1853         if not nodeinfo.failed and nodeinfo.data:
1854           nodeinfo = nodeinfo.data
1855           fn = utils.TryConvert
1856           live_data[name] = {
1857             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1858             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1859             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1860             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1861             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1862             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1863             "bootid": nodeinfo.get('bootid', None),
1864             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1865             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1866             }
1867         else:
1868           live_data[name] = {}
1869     else:
1870       live_data = dict.fromkeys(nodenames, {})
1871
1872     node_to_primary = dict([(name, set()) for name in nodenames])
1873     node_to_secondary = dict([(name, set()) for name in nodenames])
1874
1875     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1876                              "sinst_cnt", "sinst_list"))
1877     if inst_fields & frozenset(self.op.output_fields):
1878       instancelist = self.cfg.GetInstanceList()
1879
1880       for instance_name in instancelist:
1881         inst = self.cfg.GetInstanceInfo(instance_name)
1882         if inst.primary_node in node_to_primary:
1883           node_to_primary[inst.primary_node].add(inst.name)
1884         for secnode in inst.secondary_nodes:
1885           if secnode in node_to_secondary:
1886             node_to_secondary[secnode].add(inst.name)
1887
1888     master_node = self.cfg.GetMasterNode()
1889
1890     # end data gathering
1891
1892     output = []
1893     for node in nodelist:
1894       node_output = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = node.name
1898         elif field == "pinst_list":
1899           val = list(node_to_primary[node.name])
1900         elif field == "sinst_list":
1901           val = list(node_to_secondary[node.name])
1902         elif field == "pinst_cnt":
1903           val = len(node_to_primary[node.name])
1904         elif field == "sinst_cnt":
1905           val = len(node_to_secondary[node.name])
1906         elif field == "pip":
1907           val = node.primary_ip
1908         elif field == "sip":
1909           val = node.secondary_ip
1910         elif field == "tags":
1911           val = list(node.GetTags())
1912         elif field == "serial_no":
1913           val = node.serial_no
1914         elif field == "master_candidate":
1915           val = node.master_candidate
1916         elif field == "master":
1917           val = node.name == master_node
1918         elif field == "offline":
1919           val = node.offline
1920         elif field == "drained":
1921           val = node.drained
1922         elif self._FIELDS_DYNAMIC.Matches(field):
1923           val = live_data[node.name].get(field, None)
1924         else:
1925           raise errors.ParameterError(field)
1926         node_output.append(val)
1927       output.append(node_output)
1928
1929     return output
1930
1931
1932 class LUQueryNodeVolumes(NoHooksLU):
1933   """Logical unit for getting volumes on node(s).
1934
1935   """
1936   _OP_REQP = ["nodes", "output_fields"]
1937   REQ_BGL = False
1938   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1939   _FIELDS_STATIC = utils.FieldSet("node")
1940
1941   def ExpandNames(self):
1942     _CheckOutputFields(static=self._FIELDS_STATIC,
1943                        dynamic=self._FIELDS_DYNAMIC,
1944                        selected=self.op.output_fields)
1945
1946     self.needed_locks = {}
1947     self.share_locks[locking.LEVEL_NODE] = 1
1948     if not self.op.nodes:
1949       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1950     else:
1951       self.needed_locks[locking.LEVEL_NODE] = \
1952         _GetWantedNodes(self, self.op.nodes)
1953
1954   def CheckPrereq(self):
1955     """Check prerequisites.
1956
1957     This checks that the fields required are valid output fields.
1958
1959     """
1960     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1961
1962   def Exec(self, feedback_fn):
1963     """Computes the list of nodes and their attributes.
1964
1965     """
1966     nodenames = self.nodes
1967     volumes = self.rpc.call_node_volumes(nodenames)
1968
1969     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1970              in self.cfg.GetInstanceList()]
1971
1972     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1973
1974     output = []
1975     for node in nodenames:
1976       if node not in volumes or volumes[node].failed or not volumes[node].data:
1977         continue
1978
1979       node_vols = volumes[node].data[:]
1980       node_vols.sort(key=lambda vol: vol['dev'])
1981
1982       for vol in node_vols:
1983         node_output = []
1984         for field in self.op.output_fields:
1985           if field == "node":
1986             val = node
1987           elif field == "phys":
1988             val = vol['dev']
1989           elif field == "vg":
1990             val = vol['vg']
1991           elif field == "name":
1992             val = vol['name']
1993           elif field == "size":
1994             val = int(float(vol['size']))
1995           elif field == "instance":
1996             for inst in ilist:
1997               if node not in lv_by_node[inst]:
1998                 continue
1999               if vol['name'] in lv_by_node[inst][node]:
2000                 val = inst.name
2001                 break
2002             else:
2003               val = '-'
2004           else:
2005             raise errors.ParameterError(field)
2006           node_output.append(str(val))
2007
2008         output.append(node_output)
2009
2010     return output
2011
2012
2013 class LUAddNode(LogicalUnit):
2014   """Logical unit for adding node to the cluster.
2015
2016   """
2017   HPATH = "node-add"
2018   HTYPE = constants.HTYPE_NODE
2019   _OP_REQP = ["node_name"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This will run on all nodes before, and on all nodes + the new node after.
2025
2026     """
2027     env = {
2028       "OP_TARGET": self.op.node_name,
2029       "NODE_NAME": self.op.node_name,
2030       "NODE_PIP": self.op.primary_ip,
2031       "NODE_SIP": self.op.secondary_ip,
2032       }
2033     nodes_0 = self.cfg.GetNodeList()
2034     nodes_1 = nodes_0 + [self.op.node_name, ]
2035     return env, nodes_0, nodes_1
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks:
2041      - the new node is not already in the config
2042      - it is resolvable
2043      - its parameters (single/dual homed) matches the cluster
2044
2045     Any errors are signalled by raising errors.OpPrereqError.
2046
2047     """
2048     node_name = self.op.node_name
2049     cfg = self.cfg
2050
2051     dns_data = utils.HostInfo(node_name)
2052
2053     node = dns_data.name
2054     primary_ip = self.op.primary_ip = dns_data.ip
2055     secondary_ip = getattr(self.op, "secondary_ip", None)
2056     if secondary_ip is None:
2057       secondary_ip = primary_ip
2058     if not utils.IsValidIP(secondary_ip):
2059       raise errors.OpPrereqError("Invalid secondary IP given")
2060     self.op.secondary_ip = secondary_ip
2061
2062     node_list = cfg.GetNodeList()
2063     if not self.op.readd and node in node_list:
2064       raise errors.OpPrereqError("Node %s is already in the configuration" %
2065                                  node)
2066     elif self.op.readd and node not in node_list:
2067       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2068
2069     for existing_node_name in node_list:
2070       existing_node = cfg.GetNodeInfo(existing_node_name)
2071
2072       if self.op.readd and node == existing_node_name:
2073         if (existing_node.primary_ip != primary_ip or
2074             existing_node.secondary_ip != secondary_ip):
2075           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2076                                      " address configuration as before")
2077         continue
2078
2079       if (existing_node.primary_ip == primary_ip or
2080           existing_node.secondary_ip == primary_ip or
2081           existing_node.primary_ip == secondary_ip or
2082           existing_node.secondary_ip == secondary_ip):
2083         raise errors.OpPrereqError("New node ip address(es) conflict with"
2084                                    " existing node %s" % existing_node.name)
2085
2086     # check that the type of the node (single versus dual homed) is the
2087     # same as for the master
2088     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2089     master_singlehomed = myself.secondary_ip == myself.primary_ip
2090     newbie_singlehomed = secondary_ip == primary_ip
2091     if master_singlehomed != newbie_singlehomed:
2092       if master_singlehomed:
2093         raise errors.OpPrereqError("The master has no private ip but the"
2094                                    " new node has one")
2095       else:
2096         raise errors.OpPrereqError("The master has a private ip but the"
2097                                    " new node doesn't have one")
2098
2099     # checks reachablity
2100     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2101       raise errors.OpPrereqError("Node not reachable by ping")
2102
2103     if not newbie_singlehomed:
2104       # check reachability from my secondary ip to newbie's secondary ip
2105       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2106                            source=myself.secondary_ip):
2107         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2108                                    " based ping to noded port")
2109
2110     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2111     mc_now, _ = self.cfg.GetMasterCandidateStats()
2112     master_candidate = mc_now < cp_size
2113
2114     self.new_node = objects.Node(name=node,
2115                                  primary_ip=primary_ip,
2116                                  secondary_ip=secondary_ip,
2117                                  master_candidate=master_candidate,
2118                                  offline=False, drained=False)
2119
2120   def Exec(self, feedback_fn):
2121     """Adds the new node to the cluster.
2122
2123     """
2124     new_node = self.new_node
2125     node = new_node.name
2126
2127     # check connectivity
2128     result = self.rpc.call_version([node])[node]
2129     result.Raise()
2130     if result.data:
2131       if constants.PROTOCOL_VERSION == result.data:
2132         logging.info("Communication to node %s fine, sw version %s match",
2133                      node, result.data)
2134       else:
2135         raise errors.OpExecError("Version mismatch master version %s,"
2136                                  " node version %s" %
2137                                  (constants.PROTOCOL_VERSION, result.data))
2138     else:
2139       raise errors.OpExecError("Cannot get version from the new node")
2140
2141     # setup ssh on node
2142     logging.info("Copy ssh key to node %s", node)
2143     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2144     keyarray = []
2145     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2146                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2147                 priv_key, pub_key]
2148
2149     for i in keyfiles:
2150       f = open(i, 'r')
2151       try:
2152         keyarray.append(f.read())
2153       finally:
2154         f.close()
2155
2156     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2157                                     keyarray[2],
2158                                     keyarray[3], keyarray[4], keyarray[5])
2159
2160     msg = result.RemoteFailMsg()
2161     if msg:
2162       raise errors.OpExecError("Cannot transfer ssh keys to the"
2163                                " new node: %s" % msg)
2164
2165     # Add node to our /etc/hosts, and add key to known_hosts
2166     utils.AddHostToEtcHosts(new_node.name)
2167
2168     if new_node.secondary_ip != new_node.primary_ip:
2169       result = self.rpc.call_node_has_ip_address(new_node.name,
2170                                                  new_node.secondary_ip)
2171       if result.failed or not result.data:
2172         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2173                                  " you gave (%s). Please fix and re-run this"
2174                                  " command." % new_node.secondary_ip)
2175
2176     node_verify_list = [self.cfg.GetMasterNode()]
2177     node_verify_param = {
2178       'nodelist': [node],
2179       # TODO: do a node-net-test as well?
2180     }
2181
2182     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2183                                        self.cfg.GetClusterName())
2184     for verifier in node_verify_list:
2185       if result[verifier].failed or not result[verifier].data:
2186         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2187                                  " for remote verification" % verifier)
2188       if result[verifier].data['nodelist']:
2189         for failed in result[verifier].data['nodelist']:
2190           feedback_fn("ssh/hostname verification failed %s -> %s" %
2191                       (verifier, result[verifier].data['nodelist'][failed]))
2192         raise errors.OpExecError("ssh/hostname verification failed.")
2193
2194     # Distribute updated /etc/hosts and known_hosts to all nodes,
2195     # including the node just added
2196     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2197     dist_nodes = self.cfg.GetNodeList()
2198     if not self.op.readd:
2199       dist_nodes.append(node)
2200     if myself.name in dist_nodes:
2201       dist_nodes.remove(myself.name)
2202
2203     logging.debug("Copying hosts and known_hosts to all nodes")
2204     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2205       result = self.rpc.call_upload_file(dist_nodes, fname)
2206       for to_node, to_result in result.iteritems():
2207         if to_result.failed or not to_result.data:
2208           logging.error("Copy of file %s to node %s failed", fname, to_node)
2209
2210     to_copy = []
2211     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2212     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2213       to_copy.append(constants.VNC_PASSWORD_FILE)
2214
2215     for fname in to_copy:
2216       result = self.rpc.call_upload_file([node], fname)
2217       if result[node].failed or not result[node]:
2218         logging.error("Could not copy file %s to node %s", fname, node)
2219
2220     if self.op.readd:
2221       self.context.ReaddNode(new_node)
2222     else:
2223       self.context.AddNode(new_node)
2224
2225
2226 class LUSetNodeParams(LogicalUnit):
2227   """Modifies the parameters of a node.
2228
2229   """
2230   HPATH = "node-modify"
2231   HTYPE = constants.HTYPE_NODE
2232   _OP_REQP = ["node_name"]
2233   REQ_BGL = False
2234
2235   def CheckArguments(self):
2236     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2237     if node_name is None:
2238       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2239     self.op.node_name = node_name
2240     _CheckBooleanOpField(self.op, 'master_candidate')
2241     _CheckBooleanOpField(self.op, 'offline')
2242     _CheckBooleanOpField(self.op, 'drained')
2243     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2244     if all_mods.count(None) == 3:
2245       raise errors.OpPrereqError("Please pass at least one modification")
2246     if all_mods.count(True) > 1:
2247       raise errors.OpPrereqError("Can't set the node into more than one"
2248                                  " state at the same time")
2249
2250   def ExpandNames(self):
2251     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2252
2253   def BuildHooksEnv(self):
2254     """Build hooks env.
2255
2256     This runs on the master node.
2257
2258     """
2259     env = {
2260       "OP_TARGET": self.op.node_name,
2261       "MASTER_CANDIDATE": str(self.op.master_candidate),
2262       "OFFLINE": str(self.op.offline),
2263       "DRAINED": str(self.op.drained),
2264       }
2265     nl = [self.cfg.GetMasterNode(),
2266           self.op.node_name]
2267     return env, nl, nl
2268
2269   def CheckPrereq(self):
2270     """Check prerequisites.
2271
2272     This only checks the instance list against the existing names.
2273
2274     """
2275     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2276
2277     if ((self.op.master_candidate == False or self.op.offline == True or
2278          self.op.drained == True) and node.master_candidate):
2279       # we will demote the node from master_candidate
2280       if self.op.node_name == self.cfg.GetMasterNode():
2281         raise errors.OpPrereqError("The master node has to be a"
2282                                    " master candidate, online and not drained")
2283       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2284       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2285       if num_candidates <= cp_size:
2286         msg = ("Not enough master candidates (desired"
2287                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2288         if self.op.force:
2289           self.LogWarning(msg)
2290         else:
2291           raise errors.OpPrereqError(msg)
2292
2293     if (self.op.master_candidate == True and
2294         ((node.offline and not self.op.offline == False) or
2295          (node.drained and not self.op.drained == False))):
2296       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2297                                  " to master_candidate")
2298
2299     return
2300
2301   def Exec(self, feedback_fn):
2302     """Modifies a node.
2303
2304     """
2305     node = self.node
2306
2307     result = []
2308     changed_mc = False
2309
2310     if self.op.offline is not None:
2311       node.offline = self.op.offline
2312       result.append(("offline", str(self.op.offline)))
2313       if self.op.offline == True:
2314         if node.master_candidate:
2315           node.master_candidate = False
2316           changed_mc = True
2317           result.append(("master_candidate", "auto-demotion due to offline"))
2318         if node.drained:
2319           node.drained = False
2320           result.append(("drained", "clear drained status due to offline"))
2321
2322     if self.op.master_candidate is not None:
2323       node.master_candidate = self.op.master_candidate
2324       changed_mc = True
2325       result.append(("master_candidate", str(self.op.master_candidate)))
2326       if self.op.master_candidate == False:
2327         rrc = self.rpc.call_node_demote_from_mc(node.name)
2328         msg = rrc.RemoteFailMsg()
2329         if msg:
2330           self.LogWarning("Node failed to demote itself: %s" % msg)
2331
2332     if self.op.drained is not None:
2333       node.drained = self.op.drained
2334       result.append(("drained", str(self.op.drained)))
2335       if self.op.drained == True:
2336         if node.master_candidate:
2337           node.master_candidate = False
2338           changed_mc = True
2339           result.append(("master_candidate", "auto-demotion due to drain"))
2340         if node.offline:
2341           node.offline = False
2342           result.append(("offline", "clear offline status due to drain"))
2343
2344     # this will trigger configuration file update, if needed
2345     self.cfg.Update(node)
2346     # this will trigger job queue propagation or cleanup
2347     if changed_mc:
2348       self.context.ReaddNode(node)
2349
2350     return result
2351
2352
2353 class LUQueryClusterInfo(NoHooksLU):
2354   """Query cluster configuration.
2355
2356   """
2357   _OP_REQP = []
2358   REQ_BGL = False
2359
2360   def ExpandNames(self):
2361     self.needed_locks = {}
2362
2363   def CheckPrereq(self):
2364     """No prerequsites needed for this LU.
2365
2366     """
2367     pass
2368
2369   def Exec(self, feedback_fn):
2370     """Return cluster config.
2371
2372     """
2373     cluster = self.cfg.GetClusterInfo()
2374     result = {
2375       "software_version": constants.RELEASE_VERSION,
2376       "protocol_version": constants.PROTOCOL_VERSION,
2377       "config_version": constants.CONFIG_VERSION,
2378       "os_api_version": constants.OS_API_VERSION,
2379       "export_version": constants.EXPORT_VERSION,
2380       "architecture": (platform.architecture()[0], platform.machine()),
2381       "name": cluster.cluster_name,
2382       "master": cluster.master_node,
2383       "default_hypervisor": cluster.default_hypervisor,
2384       "enabled_hypervisors": cluster.enabled_hypervisors,
2385       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2386                         for hypervisor in cluster.enabled_hypervisors]),
2387       "beparams": cluster.beparams,
2388       "candidate_pool_size": cluster.candidate_pool_size,
2389       }
2390
2391     return result
2392
2393
2394 class LUQueryConfigValues(NoHooksLU):
2395   """Return configuration values.
2396
2397   """
2398   _OP_REQP = []
2399   REQ_BGL = False
2400   _FIELDS_DYNAMIC = utils.FieldSet()
2401   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2402
2403   def ExpandNames(self):
2404     self.needed_locks = {}
2405
2406     _CheckOutputFields(static=self._FIELDS_STATIC,
2407                        dynamic=self._FIELDS_DYNAMIC,
2408                        selected=self.op.output_fields)
2409
2410   def CheckPrereq(self):
2411     """No prerequisites.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Dump a representation of the cluster config to the standard output.
2418
2419     """
2420     values = []
2421     for field in self.op.output_fields:
2422       if field == "cluster_name":
2423         entry = self.cfg.GetClusterName()
2424       elif field == "master_node":
2425         entry = self.cfg.GetMasterNode()
2426       elif field == "drain_flag":
2427         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2428       else:
2429         raise errors.ParameterError(field)
2430       values.append(entry)
2431     return values
2432
2433
2434 class LUActivateInstanceDisks(NoHooksLU):
2435   """Bring up an instance's disks.
2436
2437   """
2438   _OP_REQP = ["instance_name"]
2439   REQ_BGL = False
2440
2441   def ExpandNames(self):
2442     self._ExpandAndLockInstance()
2443     self.needed_locks[locking.LEVEL_NODE] = []
2444     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2445
2446   def DeclareLocks(self, level):
2447     if level == locking.LEVEL_NODE:
2448       self._LockInstancesNodes()
2449
2450   def CheckPrereq(self):
2451     """Check prerequisites.
2452
2453     This checks that the instance is in the cluster.
2454
2455     """
2456     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2457     assert self.instance is not None, \
2458       "Cannot retrieve locked instance %s" % self.op.instance_name
2459     _CheckNodeOnline(self, self.instance.primary_node)
2460
2461   def Exec(self, feedback_fn):
2462     """Activate the disks.
2463
2464     """
2465     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2466     if not disks_ok:
2467       raise errors.OpExecError("Cannot activate block devices")
2468
2469     return disks_info
2470
2471
2472 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2473   """Prepare the block devices for an instance.
2474
2475   This sets up the block devices on all nodes.
2476
2477   @type lu: L{LogicalUnit}
2478   @param lu: the logical unit on whose behalf we execute
2479   @type instance: L{objects.Instance}
2480   @param instance: the instance for whose disks we assemble
2481   @type ignore_secondaries: boolean
2482   @param ignore_secondaries: if true, errors on secondary nodes
2483       won't result in an error return from the function
2484   @return: False if the operation failed, otherwise a list of
2485       (host, instance_visible_name, node_visible_name)
2486       with the mapping from node devices to instance devices
2487
2488   """
2489   device_info = []
2490   disks_ok = True
2491   iname = instance.name
2492   # With the two passes mechanism we try to reduce the window of
2493   # opportunity for the race condition of switching DRBD to primary
2494   # before handshaking occured, but we do not eliminate it
2495
2496   # The proper fix would be to wait (with some limits) until the
2497   # connection has been made and drbd transitions from WFConnection
2498   # into any other network-connected state (Connected, SyncTarget,
2499   # SyncSource, etc.)
2500
2501   # 1st pass, assemble on all nodes in secondary mode
2502   for inst_disk in instance.disks:
2503     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2504       lu.cfg.SetDiskID(node_disk, node)
2505       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2506       msg = result.RemoteFailMsg()
2507       if msg:
2508         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2509                            " (is_primary=False, pass=1): %s",
2510                            inst_disk.iv_name, node, msg)
2511         if not ignore_secondaries:
2512           disks_ok = False
2513
2514   # FIXME: race condition on drbd migration to primary
2515
2516   # 2nd pass, do only the primary node
2517   for inst_disk in instance.disks:
2518     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2519       if node != instance.primary_node:
2520         continue
2521       lu.cfg.SetDiskID(node_disk, node)
2522       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2523       msg = result.RemoteFailMsg()
2524       if msg:
2525         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2526                            " (is_primary=True, pass=2): %s",
2527                            inst_disk.iv_name, node, msg)
2528         disks_ok = False
2529     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2530
2531   # leave the disks configured for the primary node
2532   # this is a workaround that would be fixed better by
2533   # improving the logical/physical id handling
2534   for disk in instance.disks:
2535     lu.cfg.SetDiskID(disk, instance.primary_node)
2536
2537   return disks_ok, device_info
2538
2539
2540 def _StartInstanceDisks(lu, instance, force):
2541   """Start the disks of an instance.
2542
2543   """
2544   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2545                                            ignore_secondaries=force)
2546   if not disks_ok:
2547     _ShutdownInstanceDisks(lu, instance)
2548     if force is not None and not force:
2549       lu.proc.LogWarning("", hint="If the message above refers to a"
2550                          " secondary node,"
2551                          " you can retry the operation using '--force'.")
2552     raise errors.OpExecError("Disk consistency error")
2553
2554
2555 class LUDeactivateInstanceDisks(NoHooksLU):
2556   """Shutdown an instance's disks.
2557
2558   """
2559   _OP_REQP = ["instance_name"]
2560   REQ_BGL = False
2561
2562   def ExpandNames(self):
2563     self._ExpandAndLockInstance()
2564     self.needed_locks[locking.LEVEL_NODE] = []
2565     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2566
2567   def DeclareLocks(self, level):
2568     if level == locking.LEVEL_NODE:
2569       self._LockInstancesNodes()
2570
2571   def CheckPrereq(self):
2572     """Check prerequisites.
2573
2574     This checks that the instance is in the cluster.
2575
2576     """
2577     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2578     assert self.instance is not None, \
2579       "Cannot retrieve locked instance %s" % self.op.instance_name
2580
2581   def Exec(self, feedback_fn):
2582     """Deactivate the disks
2583
2584     """
2585     instance = self.instance
2586     _SafeShutdownInstanceDisks(self, instance)
2587
2588
2589 def _SafeShutdownInstanceDisks(lu, instance):
2590   """Shutdown block devices of an instance.
2591
2592   This function checks if an instance is running, before calling
2593   _ShutdownInstanceDisks.
2594
2595   """
2596   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2597                                       [instance.hypervisor])
2598   ins_l = ins_l[instance.primary_node]
2599   if ins_l.failed or not isinstance(ins_l.data, list):
2600     raise errors.OpExecError("Can't contact node '%s'" %
2601                              instance.primary_node)
2602
2603   if instance.name in ins_l.data:
2604     raise errors.OpExecError("Instance is running, can't shutdown"
2605                              " block devices.")
2606
2607   _ShutdownInstanceDisks(lu, instance)
2608
2609
2610 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2611   """Shutdown block devices of an instance.
2612
2613   This does the shutdown on all nodes of the instance.
2614
2615   If the ignore_primary is false, errors on the primary node are
2616   ignored.
2617
2618   """
2619   all_result = True
2620   for disk in instance.disks:
2621     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2622       lu.cfg.SetDiskID(top_disk, node)
2623       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2624       msg = result.RemoteFailMsg()
2625       if msg:
2626         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2627                       disk.iv_name, node, msg)
2628         if not ignore_primary or node != instance.primary_node:
2629           all_result = False
2630   return all_result
2631
2632
2633 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2634   """Checks if a node has enough free memory.
2635
2636   This function check if a given node has the needed amount of free
2637   memory. In case the node has less memory or we cannot get the
2638   information from the node, this function raise an OpPrereqError
2639   exception.
2640
2641   @type lu: C{LogicalUnit}
2642   @param lu: a logical unit from which we get configuration data
2643   @type node: C{str}
2644   @param node: the node to check
2645   @type reason: C{str}
2646   @param reason: string to use in the error message
2647   @type requested: C{int}
2648   @param requested: the amount of memory in MiB to check for
2649   @type hypervisor_name: C{str}
2650   @param hypervisor_name: the hypervisor to ask for memory stats
2651   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2652       we cannot check the node
2653
2654   """
2655   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2656   nodeinfo[node].Raise()
2657   free_mem = nodeinfo[node].data.get('memory_free')
2658   if not isinstance(free_mem, int):
2659     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2660                              " was '%s'" % (node, free_mem))
2661   if requested > free_mem:
2662     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2663                              " needed %s MiB, available %s MiB" %
2664                              (node, reason, requested, free_mem))
2665
2666
2667 class LUStartupInstance(LogicalUnit):
2668   """Starts an instance.
2669
2670   """
2671   HPATH = "instance-start"
2672   HTYPE = constants.HTYPE_INSTANCE
2673   _OP_REQP = ["instance_name", "force"]
2674   REQ_BGL = False
2675
2676   def ExpandNames(self):
2677     self._ExpandAndLockInstance()
2678
2679   def BuildHooksEnv(self):
2680     """Build hooks env.
2681
2682     This runs on master, primary and secondary nodes of the instance.
2683
2684     """
2685     env = {
2686       "FORCE": self.op.force,
2687       }
2688     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2689     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2690     return env, nl, nl
2691
2692   def CheckPrereq(self):
2693     """Check prerequisites.
2694
2695     This checks that the instance is in the cluster.
2696
2697     """
2698     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699     assert self.instance is not None, \
2700       "Cannot retrieve locked instance %s" % self.op.instance_name
2701
2702     _CheckNodeOnline(self, instance.primary_node)
2703
2704     bep = self.cfg.GetClusterInfo().FillBE(instance)
2705     # check bridges existance
2706     _CheckInstanceBridgesExist(self, instance)
2707
2708     _CheckNodeFreeMemory(self, instance.primary_node,
2709                          "starting instance %s" % instance.name,
2710                          bep[constants.BE_MEMORY], instance.hypervisor)
2711
2712   def Exec(self, feedback_fn):
2713     """Start the instance.
2714
2715     """
2716     instance = self.instance
2717     force = self.op.force
2718     extra_args = getattr(self.op, "extra_args", "")
2719
2720     self.cfg.MarkInstanceUp(instance.name)
2721
2722     node_current = instance.primary_node
2723
2724     _StartInstanceDisks(self, instance, force)
2725
2726     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2727     msg = result.RemoteFailMsg()
2728     if msg:
2729       _ShutdownInstanceDisks(self, instance)
2730       raise errors.OpExecError("Could not start instance: %s" % msg)
2731
2732
2733 class LURebootInstance(LogicalUnit):
2734   """Reboot an instance.
2735
2736   """
2737   HPATH = "instance-reboot"
2738   HTYPE = constants.HTYPE_INSTANCE
2739   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2740   REQ_BGL = False
2741
2742   def ExpandNames(self):
2743     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2744                                    constants.INSTANCE_REBOOT_HARD,
2745                                    constants.INSTANCE_REBOOT_FULL]:
2746       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2747                                   (constants.INSTANCE_REBOOT_SOFT,
2748                                    constants.INSTANCE_REBOOT_HARD,
2749                                    constants.INSTANCE_REBOOT_FULL))
2750     self._ExpandAndLockInstance()
2751
2752   def BuildHooksEnv(self):
2753     """Build hooks env.
2754
2755     This runs on master, primary and secondary nodes of the instance.
2756
2757     """
2758     env = {
2759       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2760       }
2761     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2762     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2763     return env, nl, nl
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775     _CheckNodeOnline(self, instance.primary_node)
2776
2777     # check bridges existance
2778     _CheckInstanceBridgesExist(self, instance)
2779
2780   def Exec(self, feedback_fn):
2781     """Reboot the instance.
2782
2783     """
2784     instance = self.instance
2785     ignore_secondaries = self.op.ignore_secondaries
2786     reboot_type = self.op.reboot_type
2787     extra_args = getattr(self.op, "extra_args", "")
2788
2789     node_current = instance.primary_node
2790
2791     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2792                        constants.INSTANCE_REBOOT_HARD]:
2793       result = self.rpc.call_instance_reboot(node_current, instance,
2794                                              reboot_type, extra_args)
2795       msg = result.RemoteFailMsg()
2796       if msg:
2797         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2798     else:
2799       result = self.rpc.call_instance_shutdown(node_current, instance)
2800       msg = result.RemoteFailMsg()
2801       if msg:
2802         raise errors.OpExecError("Could not shutdown instance for"
2803                                  " full reboot: %s" % msg)
2804       _ShutdownInstanceDisks(self, instance)
2805       _StartInstanceDisks(self, instance, ignore_secondaries)
2806       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2807       msg = result.RemoteFailMsg()
2808       if msg:
2809         _ShutdownInstanceDisks(self, instance)
2810         raise errors.OpExecError("Could not start instance for"
2811                                  " full reboot: %s" % msg)
2812
2813     self.cfg.MarkInstanceUp(instance.name)
2814
2815
2816 class LUShutdownInstance(LogicalUnit):
2817   """Shutdown an instance.
2818
2819   """
2820   HPATH = "instance-stop"
2821   HTYPE = constants.HTYPE_INSTANCE
2822   _OP_REQP = ["instance_name"]
2823   REQ_BGL = False
2824
2825   def ExpandNames(self):
2826     self._ExpandAndLockInstance()
2827
2828   def BuildHooksEnv(self):
2829     """Build hooks env.
2830
2831     This runs on master, primary and secondary nodes of the instance.
2832
2833     """
2834     env = _BuildInstanceHookEnvByObject(self, self.instance)
2835     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2836     return env, nl, nl
2837
2838   def CheckPrereq(self):
2839     """Check prerequisites.
2840
2841     This checks that the instance is in the cluster.
2842
2843     """
2844     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2845     assert self.instance is not None, \
2846       "Cannot retrieve locked instance %s" % self.op.instance_name
2847     _CheckNodeOnline(self, self.instance.primary_node)
2848
2849   def Exec(self, feedback_fn):
2850     """Shutdown the instance.
2851
2852     """
2853     instance = self.instance
2854     node_current = instance.primary_node
2855     self.cfg.MarkInstanceDown(instance.name)
2856     result = self.rpc.call_instance_shutdown(node_current, instance)
2857     msg = result.RemoteFailMsg()
2858     if msg:
2859       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2860
2861     _ShutdownInstanceDisks(self, instance)
2862
2863
2864 class LUReinstallInstance(LogicalUnit):
2865   """Reinstall an instance.
2866
2867   """
2868   HPATH = "instance-reinstall"
2869   HTYPE = constants.HTYPE_INSTANCE
2870   _OP_REQP = ["instance_name"]
2871   REQ_BGL = False
2872
2873   def ExpandNames(self):
2874     self._ExpandAndLockInstance()
2875
2876   def BuildHooksEnv(self):
2877     """Build hooks env.
2878
2879     This runs on master, primary and secondary nodes of the instance.
2880
2881     """
2882     env = _BuildInstanceHookEnvByObject(self, self.instance)
2883     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2884     return env, nl, nl
2885
2886   def CheckPrereq(self):
2887     """Check prerequisites.
2888
2889     This checks that the instance is in the cluster and is not running.
2890
2891     """
2892     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2893     assert instance is not None, \
2894       "Cannot retrieve locked instance %s" % self.op.instance_name
2895     _CheckNodeOnline(self, instance.primary_node)
2896
2897     if instance.disk_template == constants.DT_DISKLESS:
2898       raise errors.OpPrereqError("Instance '%s' has no disks" %
2899                                  self.op.instance_name)
2900     if instance.admin_up:
2901       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2902                                  self.op.instance_name)
2903     remote_info = self.rpc.call_instance_info(instance.primary_node,
2904                                               instance.name,
2905                                               instance.hypervisor)
2906     if remote_info.failed or remote_info.data:
2907       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2908                                  (self.op.instance_name,
2909                                   instance.primary_node))
2910
2911     self.op.os_type = getattr(self.op, "os_type", None)
2912     if self.op.os_type is not None:
2913       # OS verification
2914       pnode = self.cfg.GetNodeInfo(
2915         self.cfg.ExpandNodeName(instance.primary_node))
2916       if pnode is None:
2917         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2918                                    self.op.pnode)
2919       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2920       result.Raise()
2921       if not isinstance(result.data, objects.OS):
2922         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2923                                    " primary node"  % self.op.os_type)
2924
2925     self.instance = instance
2926
2927   def Exec(self, feedback_fn):
2928     """Reinstall the instance.
2929
2930     """
2931     inst = self.instance
2932
2933     if self.op.os_type is not None:
2934       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2935       inst.os = self.op.os_type
2936       self.cfg.Update(inst)
2937
2938     _StartInstanceDisks(self, inst, None)
2939     try:
2940       feedback_fn("Running the instance OS create scripts...")
2941       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2942       msg = result.RemoteFailMsg()
2943       if msg:
2944         raise errors.OpExecError("Could not install OS for instance %s"
2945                                  " on node %s: %s" %
2946                                  (inst.name, inst.primary_node, msg))
2947     finally:
2948       _ShutdownInstanceDisks(self, inst)
2949
2950
2951 class LURenameInstance(LogicalUnit):
2952   """Rename an instance.
2953
2954   """
2955   HPATH = "instance-rename"
2956   HTYPE = constants.HTYPE_INSTANCE
2957   _OP_REQP = ["instance_name", "new_name"]
2958
2959   def BuildHooksEnv(self):
2960     """Build hooks env.
2961
2962     This runs on master, primary and secondary nodes of the instance.
2963
2964     """
2965     env = _BuildInstanceHookEnvByObject(self, self.instance)
2966     env["INSTANCE_NEW_NAME"] = self.op.new_name
2967     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2968     return env, nl, nl
2969
2970   def CheckPrereq(self):
2971     """Check prerequisites.
2972
2973     This checks that the instance is in the cluster and is not running.
2974
2975     """
2976     instance = self.cfg.GetInstanceInfo(
2977       self.cfg.ExpandInstanceName(self.op.instance_name))
2978     if instance is None:
2979       raise errors.OpPrereqError("Instance '%s' not known" %
2980                                  self.op.instance_name)
2981     _CheckNodeOnline(self, instance.primary_node)
2982
2983     if instance.admin_up:
2984       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2985                                  self.op.instance_name)
2986     remote_info = self.rpc.call_instance_info(instance.primary_node,
2987                                               instance.name,
2988                                               instance.hypervisor)
2989     remote_info.Raise()
2990     if remote_info.data:
2991       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2992                                  (self.op.instance_name,
2993                                   instance.primary_node))
2994     self.instance = instance
2995
2996     # new name verification
2997     name_info = utils.HostInfo(self.op.new_name)
2998
2999     self.op.new_name = new_name = name_info.name
3000     instance_list = self.cfg.GetInstanceList()
3001     if new_name in instance_list:
3002       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3003                                  new_name)
3004
3005     if not getattr(self.op, "ignore_ip", False):
3006       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3007         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3008                                    (name_info.ip, new_name))
3009
3010
3011   def Exec(self, feedback_fn):
3012     """Reinstall the instance.
3013
3014     """
3015     inst = self.instance
3016     old_name = inst.name
3017
3018     if inst.disk_template == constants.DT_FILE:
3019       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3020
3021     self.cfg.RenameInstance(inst.name, self.op.new_name)
3022     # Change the instance lock. This is definitely safe while we hold the BGL
3023     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3024     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3025
3026     # re-read the instance from the configuration after rename
3027     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3028
3029     if inst.disk_template == constants.DT_FILE:
3030       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3031       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3032                                                      old_file_storage_dir,
3033                                                      new_file_storage_dir)
3034       result.Raise()
3035       if not result.data:
3036         raise errors.OpExecError("Could not connect to node '%s' to rename"
3037                                  " directory '%s' to '%s' (but the instance"
3038                                  " has been renamed in Ganeti)" % (
3039                                  inst.primary_node, old_file_storage_dir,
3040                                  new_file_storage_dir))
3041
3042       if not result.data[0]:
3043         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3044                                  " (but the instance has been renamed in"
3045                                  " Ganeti)" % (old_file_storage_dir,
3046                                                new_file_storage_dir))
3047
3048     _StartInstanceDisks(self, inst, None)
3049     try:
3050       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3051                                                  old_name)
3052       msg = result.RemoteFailMsg()
3053       if msg:
3054         msg = ("Could not run OS rename script for instance %s on node %s"
3055                " (but the instance has been renamed in Ganeti): %s" %
3056                (inst.name, inst.primary_node, msg))
3057         self.proc.LogWarning(msg)
3058     finally:
3059       _ShutdownInstanceDisks(self, inst)
3060
3061
3062 class LURemoveInstance(LogicalUnit):
3063   """Remove an instance.
3064
3065   """
3066   HPATH = "instance-remove"
3067   HTYPE = constants.HTYPE_INSTANCE
3068   _OP_REQP = ["instance_name", "ignore_failures"]
3069   REQ_BGL = False
3070
3071   def ExpandNames(self):
3072     self._ExpandAndLockInstance()
3073     self.needed_locks[locking.LEVEL_NODE] = []
3074     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3075
3076   def DeclareLocks(self, level):
3077     if level == locking.LEVEL_NODE:
3078       self._LockInstancesNodes()
3079
3080   def BuildHooksEnv(self):
3081     """Build hooks env.
3082
3083     This runs on master, primary and secondary nodes of the instance.
3084
3085     """
3086     env = _BuildInstanceHookEnvByObject(self, self.instance)
3087     nl = [self.cfg.GetMasterNode()]
3088     return env, nl, nl
3089
3090   def CheckPrereq(self):
3091     """Check prerequisites.
3092
3093     This checks that the instance is in the cluster.
3094
3095     """
3096     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3097     assert self.instance is not None, \
3098       "Cannot retrieve locked instance %s" % self.op.instance_name
3099
3100   def Exec(self, feedback_fn):
3101     """Remove the instance.
3102
3103     """
3104     instance = self.instance
3105     logging.info("Shutting down instance %s on node %s",
3106                  instance.name, instance.primary_node)
3107
3108     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3109     msg = result.RemoteFailMsg()
3110     if msg:
3111       if self.op.ignore_failures:
3112         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3113       else:
3114         raise errors.OpExecError("Could not shutdown instance %s on"
3115                                  " node %s: %s" %
3116                                  (instance.name, instance.primary_node, msg))
3117
3118     logging.info("Removing block devices for instance %s", instance.name)
3119
3120     if not _RemoveDisks(self, instance):
3121       if self.op.ignore_failures:
3122         feedback_fn("Warning: can't remove instance's disks")
3123       else:
3124         raise errors.OpExecError("Can't remove instance's disks")
3125
3126     logging.info("Removing instance %s out of cluster config", instance.name)
3127
3128     self.cfg.RemoveInstance(instance.name)
3129     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3130
3131
3132 class LUQueryInstances(NoHooksLU):
3133   """Logical unit for querying instances.
3134
3135   """
3136   _OP_REQP = ["output_fields", "names", "use_locking"]
3137   REQ_BGL = False
3138   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3139                                     "admin_state", "admin_ram",
3140                                     "disk_template", "ip", "mac", "bridge",
3141                                     "sda_size", "sdb_size", "vcpus", "tags",
3142                                     "network_port", "beparams",
3143                                     "(disk).(size)/([0-9]+)",
3144                                     "(disk).(sizes)", "disk_usage",
3145                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3146                                     "(nic).(macs|ips|bridges)",
3147                                     "(disk|nic).(count)",
3148                                     "serial_no", "hypervisor", "hvparams",] +
3149                                   ["hv/%s" % name
3150                                    for name in constants.HVS_PARAMETERS] +
3151                                   ["be/%s" % name
3152                                    for name in constants.BES_PARAMETERS])
3153   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3154
3155
3156   def ExpandNames(self):
3157     _CheckOutputFields(static=self._FIELDS_STATIC,
3158                        dynamic=self._FIELDS_DYNAMIC,
3159                        selected=self.op.output_fields)
3160
3161     self.needed_locks = {}
3162     self.share_locks[locking.LEVEL_INSTANCE] = 1
3163     self.share_locks[locking.LEVEL_NODE] = 1
3164
3165     if self.op.names:
3166       self.wanted = _GetWantedInstances(self, self.op.names)
3167     else:
3168       self.wanted = locking.ALL_SET
3169
3170     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3171     self.do_locking = self.do_node_query and self.op.use_locking
3172     if self.do_locking:
3173       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3174       self.needed_locks[locking.LEVEL_NODE] = []
3175       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3176
3177   def DeclareLocks(self, level):
3178     if level == locking.LEVEL_NODE and self.do_locking:
3179       self._LockInstancesNodes()
3180
3181   def CheckPrereq(self):
3182     """Check prerequisites.
3183
3184     """
3185     pass
3186
3187   def Exec(self, feedback_fn):
3188     """Computes the list of nodes and their attributes.
3189
3190     """
3191     all_info = self.cfg.GetAllInstancesInfo()
3192     if self.wanted == locking.ALL_SET:
3193       # caller didn't specify instance names, so ordering is not important
3194       if self.do_locking:
3195         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3196       else:
3197         instance_names = all_info.keys()
3198       instance_names = utils.NiceSort(instance_names)
3199     else:
3200       # caller did specify names, so we must keep the ordering
3201       if self.do_locking:
3202         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3203       else:
3204         tgt_set = all_info.keys()
3205       missing = set(self.wanted).difference(tgt_set)
3206       if missing:
3207         raise errors.OpExecError("Some instances were removed before"
3208                                  " retrieving their data: %s" % missing)
3209       instance_names = self.wanted
3210
3211     instance_list = [all_info[iname] for iname in instance_names]
3212
3213     # begin data gathering
3214
3215     nodes = frozenset([inst.primary_node for inst in instance_list])
3216     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3217
3218     bad_nodes = []
3219     off_nodes = []
3220     if self.do_node_query:
3221       live_data = {}
3222       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3223       for name in nodes:
3224         result = node_data[name]
3225         if result.offline:
3226           # offline nodes will be in both lists
3227           off_nodes.append(name)
3228         if result.failed:
3229           bad_nodes.append(name)
3230         else:
3231           if result.data:
3232             live_data.update(result.data)
3233             # else no instance is alive
3234     else:
3235       live_data = dict([(name, {}) for name in instance_names])
3236
3237     # end data gathering
3238
3239     HVPREFIX = "hv/"
3240     BEPREFIX = "be/"
3241     output = []
3242     for instance in instance_list:
3243       iout = []
3244       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3245       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3246       for field in self.op.output_fields:
3247         st_match = self._FIELDS_STATIC.Matches(field)
3248         if field == "name":
3249           val = instance.name
3250         elif field == "os":
3251           val = instance.os
3252         elif field == "pnode":
3253           val = instance.primary_node
3254         elif field == "snodes":
3255           val = list(instance.secondary_nodes)
3256         elif field == "admin_state":
3257           val = instance.admin_up
3258         elif field == "oper_state":
3259           if instance.primary_node in bad_nodes:
3260             val = None
3261           else:
3262             val = bool(live_data.get(instance.name))
3263         elif field == "status":
3264           if instance.primary_node in off_nodes:
3265             val = "ERROR_nodeoffline"
3266           elif instance.primary_node in bad_nodes:
3267             val = "ERROR_nodedown"
3268           else:
3269             running = bool(live_data.get(instance.name))
3270             if running:
3271               if instance.admin_up:
3272                 val = "running"
3273               else:
3274                 val = "ERROR_up"
3275             else:
3276               if instance.admin_up:
3277                 val = "ERROR_down"
3278               else:
3279                 val = "ADMIN_down"
3280         elif field == "oper_ram":
3281           if instance.primary_node in bad_nodes:
3282             val = None
3283           elif instance.name in live_data:
3284             val = live_data[instance.name].get("memory", "?")
3285           else:
3286             val = "-"
3287         elif field == "disk_template":
3288           val = instance.disk_template
3289         elif field == "ip":
3290           val = instance.nics[0].ip
3291         elif field == "bridge":
3292           val = instance.nics[0].bridge
3293         elif field == "mac":
3294           val = instance.nics[0].mac
3295         elif field == "sda_size" or field == "sdb_size":
3296           idx = ord(field[2]) - ord('a')
3297           try:
3298             val = instance.FindDisk(idx).size
3299           except errors.OpPrereqError:
3300             val = None
3301         elif field == "disk_usage": # total disk usage per node
3302           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3303           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3304         elif field == "tags":
3305           val = list(instance.GetTags())
3306         elif field == "serial_no":
3307           val = instance.serial_no
3308         elif field == "network_port":
3309           val = instance.network_port
3310         elif field == "hypervisor":
3311           val = instance.hypervisor
3312         elif field == "hvparams":
3313           val = i_hv
3314         elif (field.startswith(HVPREFIX) and
3315               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3316           val = i_hv.get(field[len(HVPREFIX):], None)
3317         elif field == "beparams":
3318           val = i_be
3319         elif (field.startswith(BEPREFIX) and
3320               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3321           val = i_be.get(field[len(BEPREFIX):], None)
3322         elif st_match and st_match.groups():
3323           # matches a variable list
3324           st_groups = st_match.groups()
3325           if st_groups and st_groups[0] == "disk":
3326             if st_groups[1] == "count":
3327               val = len(instance.disks)
3328             elif st_groups[1] == "sizes":
3329               val = [disk.size for disk in instance.disks]
3330             elif st_groups[1] == "size":
3331               try:
3332                 val = instance.FindDisk(st_groups[2]).size
3333               except errors.OpPrereqError:
3334                 val = None
3335             else:
3336               assert False, "Unhandled disk parameter"
3337           elif st_groups[0] == "nic":
3338             if st_groups[1] == "count":
3339               val = len(instance.nics)
3340             elif st_groups[1] == "macs":
3341               val = [nic.mac for nic in instance.nics]
3342             elif st_groups[1] == "ips":
3343               val = [nic.ip for nic in instance.nics]
3344             elif st_groups[1] == "bridges":
3345               val = [nic.bridge for nic in instance.nics]
3346             else:
3347               # index-based item
3348               nic_idx = int(st_groups[2])
3349               if nic_idx >= len(instance.nics):
3350                 val = None
3351               else:
3352                 if st_groups[1] == "mac":
3353                   val = instance.nics[nic_idx].mac
3354                 elif st_groups[1] == "ip":
3355                   val = instance.nics[nic_idx].ip
3356                 elif st_groups[1] == "bridge":
3357                   val = instance.nics[nic_idx].bridge
3358                 else:
3359                   assert False, "Unhandled NIC parameter"
3360           else:
3361             assert False, "Unhandled variable parameter"
3362         else:
3363           raise errors.ParameterError(field)
3364         iout.append(val)
3365       output.append(iout)
3366
3367     return output
3368
3369
3370 class LUFailoverInstance(LogicalUnit):
3371   """Failover an instance.
3372
3373   """
3374   HPATH = "instance-failover"
3375   HTYPE = constants.HTYPE_INSTANCE
3376   _OP_REQP = ["instance_name", "ignore_consistency"]
3377   REQ_BGL = False
3378
3379   def ExpandNames(self):
3380     self._ExpandAndLockInstance()
3381     self.needed_locks[locking.LEVEL_NODE] = []
3382     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3383
3384   def DeclareLocks(self, level):
3385     if level == locking.LEVEL_NODE:
3386       self._LockInstancesNodes()
3387
3388   def BuildHooksEnv(self):
3389     """Build hooks env.
3390
3391     This runs on master, primary and secondary nodes of the instance.
3392
3393     """
3394     env = {
3395       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3396       }
3397     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3398     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3399     return env, nl, nl
3400
3401   def CheckPrereq(self):
3402     """Check prerequisites.
3403
3404     This checks that the instance is in the cluster.
3405
3406     """
3407     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3408     assert self.instance is not None, \
3409       "Cannot retrieve locked instance %s" % self.op.instance_name
3410
3411     bep = self.cfg.GetClusterInfo().FillBE(instance)
3412     if instance.disk_template not in constants.DTS_NET_MIRROR:
3413       raise errors.OpPrereqError("Instance's disk layout is not"
3414                                  " network mirrored, cannot failover.")
3415
3416     secondary_nodes = instance.secondary_nodes
3417     if not secondary_nodes:
3418       raise errors.ProgrammerError("no secondary node but using "
3419                                    "a mirrored disk template")
3420
3421     target_node = secondary_nodes[0]
3422     _CheckNodeOnline(self, target_node)
3423     _CheckNodeNotDrained(self, target_node)
3424     # check memory requirements on the secondary node
3425     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3426                          instance.name, bep[constants.BE_MEMORY],
3427                          instance.hypervisor)
3428
3429     # check bridge existance
3430     brlist = [nic.bridge for nic in instance.nics]
3431     result = self.rpc.call_bridges_exist(target_node, brlist)
3432     result.Raise()
3433     if not result.data:
3434       raise errors.OpPrereqError("One or more target bridges %s does not"
3435                                  " exist on destination node '%s'" %
3436                                  (brlist, target_node))
3437
3438   def Exec(self, feedback_fn):
3439     """Failover an instance.
3440
3441     The failover is done by shutting it down on its present node and
3442     starting it on the secondary.
3443
3444     """
3445     instance = self.instance
3446
3447     source_node = instance.primary_node
3448     target_node = instance.secondary_nodes[0]
3449
3450     feedback_fn("* checking disk consistency between source and target")
3451     for dev in instance.disks:
3452       # for drbd, these are drbd over lvm
3453       if not _CheckDiskConsistency(self, dev, target_node, False):
3454         if instance.admin_up and not self.op.ignore_consistency:
3455           raise errors.OpExecError("Disk %s is degraded on target node,"
3456                                    " aborting failover." % dev.iv_name)
3457
3458     feedback_fn("* shutting down instance on source node")
3459     logging.info("Shutting down instance %s on node %s",
3460                  instance.name, source_node)
3461
3462     result = self.rpc.call_instance_shutdown(source_node, instance)
3463     msg = result.RemoteFailMsg()
3464     if msg:
3465       if self.op.ignore_consistency:
3466         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3467                              " Proceeding anyway. Please make sure node"
3468                              " %s is down. Error details: %s",
3469                              instance.name, source_node, source_node, msg)
3470       else:
3471         raise errors.OpExecError("Could not shutdown instance %s on"
3472                                  " node %s: %s" %
3473                                  (instance.name, source_node, msg))
3474
3475     feedback_fn("* deactivating the instance's disks on source node")
3476     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3477       raise errors.OpExecError("Can't shut down the instance's disks.")
3478
3479     instance.primary_node = target_node
3480     # distribute new instance config to the other nodes
3481     self.cfg.Update(instance)
3482
3483     # Only start the instance if it's marked as up
3484     if instance.admin_up:
3485       feedback_fn("* activating the instance's disks on target node")
3486       logging.info("Starting instance %s on node %s",
3487                    instance.name, target_node)
3488
3489       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3490                                                ignore_secondaries=True)
3491       if not disks_ok:
3492         _ShutdownInstanceDisks(self, instance)
3493         raise errors.OpExecError("Can't activate the instance's disks")
3494
3495       feedback_fn("* starting the instance on the target node")
3496       result = self.rpc.call_instance_start(target_node, instance, None)
3497       msg = result.RemoteFailMsg()
3498       if msg:
3499         _ShutdownInstanceDisks(self, instance)
3500         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3501                                  (instance.name, target_node, msg))
3502
3503
3504 class LUMigrateInstance(LogicalUnit):
3505   """Migrate an instance.
3506
3507   This is migration without shutting down, compared to the failover,
3508   which is done with shutdown.
3509
3510   """
3511   HPATH = "instance-migrate"
3512   HTYPE = constants.HTYPE_INSTANCE
3513   _OP_REQP = ["instance_name", "live", "cleanup"]
3514
3515   REQ_BGL = False
3516
3517   def ExpandNames(self):
3518     self._ExpandAndLockInstance()
3519     self.needed_locks[locking.LEVEL_NODE] = []
3520     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3521
3522   def DeclareLocks(self, level):
3523     if level == locking.LEVEL_NODE:
3524       self._LockInstancesNodes()
3525
3526   def BuildHooksEnv(self):
3527     """Build hooks env.
3528
3529     This runs on master, primary and secondary nodes of the instance.
3530
3531     """
3532     env = _BuildInstanceHookEnvByObject(self, self.instance)
3533     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3534     return env, nl, nl
3535
3536   def CheckPrereq(self):
3537     """Check prerequisites.
3538
3539     This checks that the instance is in the cluster.
3540
3541     """
3542     instance = self.cfg.GetInstanceInfo(
3543       self.cfg.ExpandInstanceName(self.op.instance_name))
3544     if instance is None:
3545       raise errors.OpPrereqError("Instance '%s' not known" %
3546                                  self.op.instance_name)
3547
3548     if instance.disk_template != constants.DT_DRBD8:
3549       raise errors.OpPrereqError("Instance's disk layout is not"
3550                                  " drbd8, cannot migrate.")
3551
3552     secondary_nodes = instance.secondary_nodes
3553     if not secondary_nodes:
3554       raise errors.ConfigurationError("No secondary node but using"
3555                                       " drbd8 disk template")
3556
3557     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3558
3559     target_node = secondary_nodes[0]
3560     # check memory requirements on the secondary node
3561     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3562                          instance.name, i_be[constants.BE_MEMORY],
3563                          instance.hypervisor)
3564
3565     # check bridge existance
3566     brlist = [nic.bridge for nic in instance.nics]
3567     result = self.rpc.call_bridges_exist(target_node, brlist)
3568     if result.failed or not result.data:
3569       raise errors.OpPrereqError("One or more target bridges %s does not"
3570                                  " exist on destination node '%s'" %
3571                                  (brlist, target_node))
3572
3573     if not self.op.cleanup:
3574       _CheckNodeNotDrained(self, target_node)
3575       result = self.rpc.call_instance_migratable(instance.primary_node,
3576                                                  instance)
3577       msg = result.RemoteFailMsg()
3578       if msg:
3579         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3580                                    msg)
3581
3582     self.instance = instance
3583
3584   def _WaitUntilSync(self):
3585     """Poll with custom rpc for disk sync.
3586
3587     This uses our own step-based rpc call.
3588
3589     """
3590     self.feedback_fn("* wait until resync is done")
3591     all_done = False
3592     while not all_done:
3593       all_done = True
3594       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3595                                             self.nodes_ip,
3596                                             self.instance.disks)
3597       min_percent = 100
3598       for node, nres in result.items():
3599         msg = nres.RemoteFailMsg()
3600         if msg:
3601           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3602                                    (node, msg))
3603         node_done, node_percent = nres.payload
3604         all_done = all_done and node_done
3605         if node_percent is not None:
3606           min_percent = min(min_percent, node_percent)
3607       if not all_done:
3608         if min_percent < 100:
3609           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3610         time.sleep(2)
3611
3612   def _EnsureSecondary(self, node):
3613     """Demote a node to secondary.
3614
3615     """
3616     self.feedback_fn("* switching node %s to secondary mode" % node)
3617
3618     for dev in self.instance.disks:
3619       self.cfg.SetDiskID(dev, node)
3620
3621     result = self.rpc.call_blockdev_close(node, self.instance.name,
3622                                           self.instance.disks)
3623     msg = result.RemoteFailMsg()
3624     if msg:
3625       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3626                                " error %s" % (node, msg))
3627
3628   def _GoStandalone(self):
3629     """Disconnect from the network.
3630
3631     """
3632     self.feedback_fn("* changing into standalone mode")
3633     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3634                                                self.instance.disks)
3635     for node, nres in result.items():
3636       msg = nres.RemoteFailMsg()
3637       if msg:
3638         raise errors.OpExecError("Cannot disconnect disks node %s,"
3639                                  " error %s" % (node, msg))
3640
3641   def _GoReconnect(self, multimaster):
3642     """Reconnect to the network.
3643
3644     """
3645     if multimaster:
3646       msg = "dual-master"
3647     else:
3648       msg = "single-master"
3649     self.feedback_fn("* changing disks into %s mode" % msg)
3650     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3651                                            self.instance.disks,
3652                                            self.instance.name, multimaster)
3653     for node, nres in result.items():
3654       msg = nres.RemoteFailMsg()
3655       if msg:
3656         raise errors.OpExecError("Cannot change disks config on node %s,"
3657                                  " error: %s" % (node, msg))
3658
3659   def _ExecCleanup(self):
3660     """Try to cleanup after a failed migration.
3661
3662     The cleanup is done by:
3663       - check that the instance is running only on one node
3664         (and update the config if needed)
3665       - change disks on its secondary node to secondary
3666       - wait until disks are fully synchronized
3667       - disconnect from the network
3668       - change disks into single-master mode
3669       - wait again until disks are fully synchronized
3670
3671     """
3672     instance = self.instance
3673     target_node = self.target_node
3674     source_node = self.source_node
3675
3676     # check running on only one node
3677     self.feedback_fn("* checking where the instance actually runs"
3678                      " (if this hangs, the hypervisor might be in"
3679                      " a bad state)")
3680     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3681     for node, result in ins_l.items():
3682       result.Raise()
3683       if not isinstance(result.data, list):
3684         raise errors.OpExecError("Can't contact node '%s'" % node)
3685
3686     runningon_source = instance.name in ins_l[source_node].data
3687     runningon_target = instance.name in ins_l[target_node].data
3688
3689     if runningon_source and runningon_target:
3690       raise errors.OpExecError("Instance seems to be running on two nodes,"
3691                                " or the hypervisor is confused. You will have"
3692                                " to ensure manually that it runs only on one"
3693                                " and restart this operation.")
3694
3695     if not (runningon_source or runningon_target):
3696       raise errors.OpExecError("Instance does not seem to be running at all."
3697                                " In this case, it's safer to repair by"
3698                                " running 'gnt-instance stop' to ensure disk"
3699                                " shutdown, and then restarting it.")
3700
3701     if runningon_target:
3702       # the migration has actually succeeded, we need to update the config
3703       self.feedback_fn("* instance running on secondary node (%s),"
3704                        " updating config" % target_node)
3705       instance.primary_node = target_node
3706       self.cfg.Update(instance)
3707       demoted_node = source_node
3708     else:
3709       self.feedback_fn("* instance confirmed to be running on its"
3710                        " primary node (%s)" % source_node)
3711       demoted_node = target_node
3712
3713     self._EnsureSecondary(demoted_node)
3714     try:
3715       self._WaitUntilSync()
3716     except errors.OpExecError:
3717       # we ignore here errors, since if the device is standalone, it
3718       # won't be able to sync
3719       pass
3720     self._GoStandalone()
3721     self._GoReconnect(False)
3722     self._WaitUntilSync()
3723
3724     self.feedback_fn("* done")
3725
3726   def _RevertDiskStatus(self):
3727     """Try to revert the disk status after a failed migration.
3728
3729     """
3730     target_node = self.target_node
3731     try:
3732       self._EnsureSecondary(target_node)
3733       self._GoStandalone()
3734       self._GoReconnect(False)
3735       self._WaitUntilSync()
3736     except errors.OpExecError, err:
3737       self.LogWarning("Migration failed and I can't reconnect the"
3738                       " drives: error '%s'\n"
3739                       "Please look and recover the instance status" %
3740                       str(err))
3741
3742   def _AbortMigration(self):
3743     """Call the hypervisor code to abort a started migration.
3744
3745     """
3746     instance = self.instance
3747     target_node = self.target_node
3748     migration_info = self.migration_info
3749
3750     abort_result = self.rpc.call_finalize_migration(target_node,
3751                                                     instance,
3752                                                     migration_info,
3753                                                     False)
3754     abort_msg = abort_result.RemoteFailMsg()
3755     if abort_msg:
3756       logging.error("Aborting migration failed on target node %s: %s" %
3757                     (target_node, abort_msg))
3758       # Don't raise an exception here, as we stil have to try to revert the
3759       # disk status, even if this step failed.
3760
3761   def _ExecMigration(self):
3762     """Migrate an instance.
3763
3764     The migrate is done by:
3765       - change the disks into dual-master mode
3766       - wait until disks are fully synchronized again
3767       - migrate the instance
3768       - change disks on the new secondary node (the old primary) to secondary
3769       - wait until disks are fully synchronized
3770       - change disks into single-master mode
3771
3772     """
3773     instance = self.instance
3774     target_node = self.target_node
3775     source_node = self.source_node
3776
3777     self.feedback_fn("* checking disk consistency between source and target")
3778     for dev in instance.disks:
3779       if not _CheckDiskConsistency(self, dev, target_node, False):
3780         raise errors.OpExecError("Disk %s is degraded or not fully"
3781                                  " synchronized on target node,"
3782                                  " aborting migrate." % dev.iv_name)
3783
3784     # First get the migration information from the remote node
3785     result = self.rpc.call_migration_info(source_node, instance)
3786     msg = result.RemoteFailMsg()
3787     if msg:
3788       log_err = ("Failed fetching source migration information from %s: %s" %
3789                  (source_node, msg))
3790       logging.error(log_err)
3791       raise errors.OpExecError(log_err)
3792
3793     self.migration_info = migration_info = result.payload
3794
3795     # Then switch the disks to master/master mode
3796     self._EnsureSecondary(target_node)
3797     self._GoStandalone()
3798     self._GoReconnect(True)
3799     self._WaitUntilSync()
3800
3801     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3802     result = self.rpc.call_accept_instance(target_node,
3803                                            instance,
3804                                            migration_info,
3805                                            self.nodes_ip[target_node])
3806
3807     msg = result.RemoteFailMsg()
3808     if msg:
3809       logging.error("Instance pre-migration failed, trying to revert"
3810                     " disk status: %s", msg)
3811       self._AbortMigration()
3812       self._RevertDiskStatus()
3813       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3814                                (instance.name, msg))
3815
3816     self.feedback_fn("* migrating instance to %s" % target_node)
3817     time.sleep(10)
3818     result = self.rpc.call_instance_migrate(source_node, instance,
3819                                             self.nodes_ip[target_node],
3820                                             self.op.live)
3821     msg = result.RemoteFailMsg()
3822     if msg:
3823       logging.error("Instance migration failed, trying to revert"
3824                     " disk status: %s", msg)
3825       self._AbortMigration()
3826       self._RevertDiskStatus()
3827       raise errors.OpExecError("Could not migrate instance %s: %s" %
3828                                (instance.name, msg))
3829     time.sleep(10)
3830
3831     instance.primary_node = target_node
3832     # distribute new instance config to the other nodes
3833     self.cfg.Update(instance)
3834
3835     result = self.rpc.call_finalize_migration(target_node,
3836                                               instance,
3837                                               migration_info,
3838                                               True)
3839     msg = result.RemoteFailMsg()
3840     if msg:
3841       logging.error("Instance migration succeeded, but finalization failed:"
3842                     " %s" % msg)
3843       raise errors.OpExecError("Could not finalize instance migration: %s" %
3844                                msg)
3845
3846     self._EnsureSecondary(source_node)
3847     self._WaitUntilSync()
3848     self._GoStandalone()
3849     self._GoReconnect(False)
3850     self._WaitUntilSync()
3851
3852     self.feedback_fn("* done")
3853
3854   def Exec(self, feedback_fn):
3855     """Perform the migration.
3856
3857     """
3858     self.feedback_fn = feedback_fn
3859
3860     self.source_node = self.instance.primary_node
3861     self.target_node = self.instance.secondary_nodes[0]
3862     self.all_nodes = [self.source_node, self.target_node]
3863     self.nodes_ip = {
3864       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3865       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3866       }
3867     if self.op.cleanup:
3868       return self._ExecCleanup()
3869     else:
3870       return self._ExecMigration()
3871
3872
3873 def _CreateBlockDev(lu, node, instance, device, force_create,
3874                     info, force_open):
3875   """Create a tree of block devices on a given node.
3876
3877   If this device type has to be created on secondaries, create it and
3878   all its children.
3879
3880   If not, just recurse to children keeping the same 'force' value.
3881
3882   @param lu: the lu on whose behalf we execute
3883   @param node: the node on which to create the device
3884   @type instance: L{objects.Instance}
3885   @param instance: the instance which owns the device
3886   @type device: L{objects.Disk}
3887   @param device: the device to create
3888   @type force_create: boolean
3889   @param force_create: whether to force creation of this device; this
3890       will be change to True whenever we find a device which has
3891       CreateOnSecondary() attribute
3892   @param info: the extra 'metadata' we should attach to the device
3893       (this will be represented as a LVM tag)
3894   @type force_open: boolean
3895   @param force_open: this parameter will be passes to the
3896       L{backend.BlockdevCreate} function where it specifies
3897       whether we run on primary or not, and it affects both
3898       the child assembly and the device own Open() execution
3899
3900   """
3901   if device.CreateOnSecondary():
3902     force_create = True
3903
3904   if device.children:
3905     for child in device.children:
3906       _CreateBlockDev(lu, node, instance, child, force_create,
3907                       info, force_open)
3908
3909   if not force_create:
3910     return
3911
3912   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3913
3914
3915 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3916   """Create a single block device on a given node.
3917
3918   This will not recurse over children of the device, so they must be
3919   created in advance.
3920
3921   @param lu: the lu on whose behalf we execute
3922   @param node: the node on which to create the device
3923   @type instance: L{objects.Instance}
3924   @param instance: the instance which owns the device
3925   @type device: L{objects.Disk}
3926   @param device: the device to create
3927   @param info: the extra 'metadata' we should attach to the device
3928       (this will be represented as a LVM tag)
3929   @type force_open: boolean
3930   @param force_open: this parameter will be passes to the
3931       L{backend.BlockdevCreate} function where it specifies
3932       whether we run on primary or not, and it affects both
3933       the child assembly and the device own Open() execution
3934
3935   """
3936   lu.cfg.SetDiskID(device, node)
3937   result = lu.rpc.call_blockdev_create(node, device, device.size,
3938                                        instance.name, force_open, info)
3939   msg = result.RemoteFailMsg()
3940   if msg:
3941     raise errors.OpExecError("Can't create block device %s on"
3942                              " node %s for instance %s: %s" %
3943                              (device, node, instance.name, msg))
3944   if device.physical_id is None:
3945     device.physical_id = result.payload
3946
3947
3948 def _GenerateUniqueNames(lu, exts):
3949   """Generate a suitable LV name.
3950
3951   This will generate a logical volume name for the given instance.
3952
3953   """
3954   results = []
3955   for val in exts:
3956     new_id = lu.cfg.GenerateUniqueID()
3957     results.append("%s%s" % (new_id, val))
3958   return results
3959
3960
3961 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3962                          p_minor, s_minor):
3963   """Generate a drbd8 device complete with its children.
3964
3965   """
3966   port = lu.cfg.AllocatePort()
3967   vgname = lu.cfg.GetVGName()
3968   shared_secret = lu.cfg.GenerateDRBDSecret()
3969   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3970                           logical_id=(vgname, names[0]))
3971   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3972                           logical_id=(vgname, names[1]))
3973   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3974                           logical_id=(primary, secondary, port,
3975                                       p_minor, s_minor,
3976                                       shared_secret),
3977                           children=[dev_data, dev_meta],
3978                           iv_name=iv_name)
3979   return drbd_dev
3980
3981
3982 def _GenerateDiskTemplate(lu, template_name,
3983                           instance_name, primary_node,
3984                           secondary_nodes, disk_info,
3985                           file_storage_dir, file_driver,
3986                           base_index):
3987   """Generate the entire disk layout for a given template type.
3988
3989   """
3990   #TODO: compute space requirements
3991
3992   vgname = lu.cfg.GetVGName()
3993   disk_count = len(disk_info)
3994   disks = []
3995   if template_name == constants.DT_DISKLESS:
3996     pass
3997   elif template_name == constants.DT_PLAIN:
3998     if len(secondary_nodes) != 0:
3999       raise errors.ProgrammerError("Wrong template configuration")
4000
4001     names = _GenerateUniqueNames(lu, [".disk%d" % i
4002                                       for i in range(disk_count)])
4003     for idx, disk in enumerate(disk_info):
4004       disk_index = idx + base_index
4005       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4006                               logical_id=(vgname, names[idx]),
4007                               iv_name="disk/%d" % disk_index,
4008                               mode=disk["mode"])
4009       disks.append(disk_dev)
4010   elif template_name == constants.DT_DRBD8:
4011     if len(secondary_nodes) != 1:
4012       raise errors.ProgrammerError("Wrong template configuration")
4013     remote_node = secondary_nodes[0]
4014     minors = lu.cfg.AllocateDRBDMinor(
4015       [primary_node, remote_node] * len(disk_info), instance_name)
4016
4017     names = []
4018     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4019                                                for i in range(disk_count)]):
4020       names.append(lv_prefix + "_data")
4021       names.append(lv_prefix + "_meta")
4022     for idx, disk in enumerate(disk_info):
4023       disk_index = idx + base_index
4024       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4025                                       disk["size"], names[idx*2:idx*2+2],
4026                                       "disk/%d" % disk_index,
4027                                       minors[idx*2], minors[idx*2+1])
4028       disk_dev.mode = disk["mode"]
4029       disks.append(disk_dev)
4030   elif template_name == constants.DT_FILE:
4031     if len(secondary_nodes) != 0:
4032       raise errors.ProgrammerError("Wrong template configuration")
4033
4034     for idx, disk in enumerate(disk_info):
4035       disk_index = idx + base_index
4036       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4037                               iv_name="disk/%d" % disk_index,
4038                               logical_id=(file_driver,
4039                                           "%s/disk%d" % (file_storage_dir,
4040                                                          disk_index)),
4041                               mode=disk["mode"])
4042       disks.append(disk_dev)
4043   else:
4044     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4045   return disks
4046
4047
4048 def _GetInstanceInfoText(instance):
4049   """Compute that text that should be added to the disk's metadata.
4050
4051   """
4052   return "originstname+%s" % instance.name
4053
4054
4055 def _CreateDisks(lu, instance):
4056   """Create all disks for an instance.
4057
4058   This abstracts away some work from AddInstance.
4059
4060   @type lu: L{LogicalUnit}
4061   @param lu: the logical unit on whose behalf we execute
4062   @type instance: L{objects.Instance}
4063   @param instance: the instance whose disks we should create
4064   @rtype: boolean
4065   @return: the success of the creation
4066
4067   """
4068   info = _GetInstanceInfoText(instance)
4069   pnode = instance.primary_node
4070
4071   if instance.disk_template == constants.DT_FILE:
4072     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4073     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4074
4075     if result.failed or not result.data:
4076       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4077
4078     if not result.data[0]:
4079       raise errors.OpExecError("Failed to create directory '%s'" %
4080                                file_storage_dir)
4081
4082   # Note: this needs to be kept in sync with adding of disks in
4083   # LUSetInstanceParams
4084   for device in instance.disks:
4085     logging.info("Creating volume %s for instance %s",
4086                  device.iv_name, instance.name)
4087     #HARDCODE
4088     for node in instance.all_nodes:
4089       f_create = node == pnode
4090       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4091
4092
4093 def _RemoveDisks(lu, instance):
4094   """Remove all disks for an instance.
4095
4096   This abstracts away some work from `AddInstance()` and
4097   `RemoveInstance()`. Note that in case some of the devices couldn't
4098   be removed, the removal will continue with the other ones (compare
4099   with `_CreateDisks()`).
4100
4101   @type lu: L{LogicalUnit}
4102   @param lu: the logical unit on whose behalf we execute
4103   @type instance: L{objects.Instance}
4104   @param instance: the instance whose disks we should remove
4105   @rtype: boolean
4106   @return: the success of the removal
4107
4108   """
4109   logging.info("Removing block devices for instance %s", instance.name)
4110
4111   all_result = True
4112   for device in instance.disks:
4113     for node, disk in device.ComputeNodeTree(instance.primary_node):
4114       lu.cfg.SetDiskID(disk, node)
4115       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4116       if msg:
4117         lu.LogWarning("Could not remove block device %s on node %s,"
4118                       " continuing anyway: %s", device.iv_name, node, msg)
4119         all_result = False
4120
4121   if instance.disk_template == constants.DT_FILE:
4122     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4123     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4124                                                  file_storage_dir)
4125     if result.failed or not result.data:
4126       logging.error("Could not remove directory '%s'", file_storage_dir)
4127       all_result = False
4128
4129   return all_result
4130
4131
4132 def _ComputeDiskSize(disk_template, disks):
4133   """Compute disk size requirements in the volume group
4134
4135   """
4136   # Required free disk space as a function of disk and swap space
4137   req_size_dict = {
4138     constants.DT_DISKLESS: None,
4139     constants.DT_PLAIN: sum(d["size"] for d in disks),
4140     # 128 MB are added for drbd metadata for each disk
4141     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4142     constants.DT_FILE: None,
4143   }
4144
4145   if disk_template not in req_size_dict:
4146     raise errors.ProgrammerError("Disk template '%s' size requirement"
4147                                  " is unknown" %  disk_template)
4148
4149   return req_size_dict[disk_template]
4150
4151
4152 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4153   """Hypervisor parameter validation.
4154
4155   This function abstract the hypervisor parameter validation to be
4156   used in both instance create and instance modify.
4157
4158   @type lu: L{LogicalUnit}
4159   @param lu: the logical unit for which we check
4160   @type nodenames: list
4161   @param nodenames: the list of nodes on which we should check
4162   @type hvname: string
4163   @param hvname: the name of the hypervisor we should use
4164   @type hvparams: dict
4165   @param hvparams: the parameters which we need to check
4166   @raise errors.OpPrereqError: if the parameters are not valid
4167
4168   """
4169   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4170                                                   hvname,
4171                                                   hvparams)
4172   for node in nodenames:
4173     info = hvinfo[node]
4174     if info.offline:
4175       continue
4176     msg = info.RemoteFailMsg()
4177     if msg:
4178       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4179                                  " %s" % msg)
4180
4181
4182 class LUCreateInstance(LogicalUnit):
4183   """Create an instance.
4184
4185   """
4186   HPATH = "instance-add"
4187   HTYPE = constants.HTYPE_INSTANCE
4188   _OP_REQP = ["instance_name", "disks", "disk_template",
4189               "mode", "start",
4190               "wait_for_sync", "ip_check", "nics",
4191               "hvparams", "beparams"]
4192   REQ_BGL = False
4193
4194   def _ExpandNode(self, node):
4195     """Expands and checks one node name.
4196
4197     """
4198     node_full = self.cfg.ExpandNodeName(node)
4199     if node_full is None:
4200       raise errors.OpPrereqError("Unknown node %s" % node)
4201     return node_full
4202
4203   def ExpandNames(self):
4204     """ExpandNames for CreateInstance.
4205
4206     Figure out the right locks for instance creation.
4207
4208     """
4209     self.needed_locks = {}
4210
4211     # set optional parameters to none if they don't exist
4212     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4213       if not hasattr(self.op, attr):
4214         setattr(self.op, attr, None)
4215
4216     # cheap checks, mostly valid constants given
4217
4218     # verify creation mode
4219     if self.op.mode not in (constants.INSTANCE_CREATE,
4220                             constants.INSTANCE_IMPORT):
4221       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4222                                  self.op.mode)
4223
4224     # disk template and mirror node verification
4225     if self.op.disk_template not in constants.DISK_TEMPLATES:
4226       raise errors.OpPrereqError("Invalid disk template name")
4227
4228     if self.op.hypervisor is None:
4229       self.op.hypervisor = self.cfg.GetHypervisorType()
4230
4231     cluster = self.cfg.GetClusterInfo()
4232     enabled_hvs = cluster.enabled_hypervisors
4233     if self.op.hypervisor not in enabled_hvs:
4234       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4235                                  " cluster (%s)" % (self.op.hypervisor,
4236                                   ",".join(enabled_hvs)))
4237
4238     # check hypervisor parameter syntax (locally)
4239     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4240     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4241                                   self.op.hvparams)
4242     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4243     hv_type.CheckParameterSyntax(filled_hvp)
4244
4245     # fill and remember the beparams dict
4246     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4247     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4248                                     self.op.beparams)
4249
4250     #### instance parameters check
4251
4252     # instance name verification
4253     hostname1 = utils.HostInfo(self.op.instance_name)
4254     self.op.instance_name = instance_name = hostname1.name
4255
4256     # this is just a preventive check, but someone might still add this
4257     # instance in the meantime, and creation will fail at lock-add time
4258     if instance_name in self.cfg.GetInstanceList():
4259       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4260                                  instance_name)
4261
4262     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4263
4264     # NIC buildup
4265     self.nics = []
4266     for nic in self.op.nics:
4267       # ip validity checks
4268       ip = nic.get("ip", None)
4269       if ip is None or ip.lower() == "none":
4270         nic_ip = None
4271       elif ip.lower() == constants.VALUE_AUTO:
4272         nic_ip = hostname1.ip
4273       else:
4274         if not utils.IsValidIP(ip):
4275           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4276                                      " like a valid IP" % ip)
4277         nic_ip = ip
4278
4279       # MAC address verification
4280       mac = nic.get("mac", constants.VALUE_AUTO)
4281       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4282         if not utils.IsValidMac(mac.lower()):
4283           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4284                                      mac)
4285       # bridge verification
4286       bridge = nic.get("bridge", None)
4287       if bridge is None:
4288         bridge = self.cfg.GetDefBridge()
4289       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4290
4291     # disk checks/pre-build
4292     self.disks = []
4293     for disk in self.op.disks:
4294       mode = disk.get("mode", constants.DISK_RDWR)
4295       if mode not in constants.DISK_ACCESS_SET:
4296         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4297                                    mode)
4298       size = disk.get("size", None)
4299       if size is None:
4300         raise errors.OpPrereqError("Missing disk size")
4301       try:
4302         size = int(size)
4303       except ValueError:
4304         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4305       self.disks.append({"size": size, "mode": mode})
4306
4307     # used in CheckPrereq for ip ping check
4308     self.check_ip = hostname1.ip
4309
4310     # file storage checks
4311     if (self.op.file_driver and
4312         not self.op.file_driver in constants.FILE_DRIVER):
4313       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4314                                  self.op.file_driver)
4315
4316     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4317       raise errors.OpPrereqError("File storage directory path not absolute")
4318
4319     ### Node/iallocator related checks
4320     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4321       raise errors.OpPrereqError("One and only one of iallocator and primary"
4322                                  " node must be given")
4323
4324     if self.op.iallocator:
4325       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4326     else:
4327       self.op.pnode = self._ExpandNode(self.op.pnode)
4328       nodelist = [self.op.pnode]
4329       if self.op.snode is not None:
4330         self.op.snode = self._ExpandNode(self.op.snode)
4331         nodelist.append(self.op.snode)
4332       self.needed_locks[locking.LEVEL_NODE] = nodelist
4333
4334     # in case of import lock the source node too
4335     if self.op.mode == constants.INSTANCE_IMPORT:
4336       src_node = getattr(self.op, "src_node", None)
4337       src_path = getattr(self.op, "src_path", None)
4338
4339       if src_path is None:
4340         self.op.src_path = src_path = self.op.instance_name
4341
4342       if src_node is None:
4343         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4344         self.op.src_node = None
4345         if os.path.isabs(src_path):
4346           raise errors.OpPrereqError("Importing an instance from an absolute"
4347                                      " path requires a source node option.")
4348       else:
4349         self.op.src_node = src_node = self._ExpandNode(src_node)
4350         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4351           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4352         if not os.path.isabs(src_path):
4353           self.op.src_path = src_path = \
4354             os.path.join(constants.EXPORT_DIR, src_path)
4355
4356     else: # INSTANCE_CREATE
4357       if getattr(self.op, "os_type", None) is None:
4358         raise errors.OpPrereqError("No guest OS specified")
4359
4360   def _RunAllocator(self):
4361     """Run the allocator based on input opcode.
4362
4363     """
4364     nics = [n.ToDict() for n in self.nics]
4365     ial = IAllocator(self,
4366                      mode=constants.IALLOCATOR_MODE_ALLOC,
4367                      name=self.op.instance_name,
4368                      disk_template=self.op.disk_template,
4369                      tags=[],
4370                      os=self.op.os_type,
4371                      vcpus=self.be_full[constants.BE_VCPUS],
4372                      mem_size=self.be_full[constants.BE_MEMORY],
4373                      disks=self.disks,
4374                      nics=nics,
4375                      hypervisor=self.op.hypervisor,
4376                      )
4377
4378     ial.Run(self.op.iallocator)
4379
4380     if not ial.success:
4381       raise errors.OpPrereqError("Can't compute nodes using"
4382                                  " iallocator '%s': %s" % (self.op.iallocator,
4383                                                            ial.info))
4384     if len(ial.nodes) != ial.required_nodes:
4385       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4386                                  " of nodes (%s), required %s" %
4387                                  (self.op.iallocator, len(ial.nodes),
4388                                   ial.required_nodes))
4389     self.op.pnode = ial.nodes[0]
4390     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4391                  self.op.instance_name, self.op.iallocator,
4392                  ", ".join(ial.nodes))
4393     if ial.required_nodes == 2:
4394       self.op.snode = ial.nodes[1]
4395
4396   def BuildHooksEnv(self):
4397     """Build hooks env.
4398
4399     This runs on master, primary and secondary nodes of the instance.
4400
4401     """
4402     env = {
4403       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4404       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4405       "INSTANCE_ADD_MODE": self.op.mode,
4406       }
4407     if self.op.mode == constants.INSTANCE_IMPORT:
4408       env["INSTANCE_SRC_NODE"] = self.op.src_node
4409       env["INSTANCE_SRC_PATH"] = self.op.src_path
4410       env["INSTANCE_SRC_IMAGES"] = self.src_images
4411
4412     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4413       primary_node=self.op.pnode,
4414       secondary_nodes=self.secondaries,
4415       status=self.op.start,
4416       os_type=self.op.os_type,
4417       memory=self.be_full[constants.BE_MEMORY],
4418       vcpus=self.be_full[constants.BE_VCPUS],
4419       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4420     ))
4421
4422     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4423           self.secondaries)
4424     return env, nl, nl
4425
4426
4427   def CheckPrereq(self):
4428     """Check prerequisites.
4429
4430     """
4431     if (not self.cfg.GetVGName() and
4432         self.op.disk_template not in constants.DTS_NOT_LVM):
4433       raise errors.OpPrereqError("Cluster does not support lvm-based"
4434                                  " instances")
4435
4436
4437     if self.op.mode == constants.INSTANCE_IMPORT:
4438       src_node = self.op.src_node
4439       src_path = self.op.src_path
4440
4441       if src_node is None:
4442         exp_list = self.rpc.call_export_list(
4443           self.acquired_locks[locking.LEVEL_NODE])
4444         found = False
4445         for node in exp_list:
4446           if not exp_list[node].failed and src_path in exp_list[node].data:
4447             found = True
4448             self.op.src_node = src_node = node
4449             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4450                                                        src_path)
4451             break
4452         if not found:
4453           raise errors.OpPrereqError("No export found for relative path %s" %
4454                                       src_path)
4455
4456       _CheckNodeOnline(self, src_node)
4457       result = self.rpc.call_export_info(src_node, src_path)
4458       result.Raise()
4459       if not result.data:
4460         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4461
4462       export_info = result.data
4463       if not export_info.has_section(constants.INISECT_EXP):
4464         raise errors.ProgrammerError("Corrupted export config")
4465
4466       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4467       if (int(ei_version) != constants.EXPORT_VERSION):
4468         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4469                                    (ei_version, constants.EXPORT_VERSION))
4470
4471       # Check that the new instance doesn't have less disks than the export
4472       instance_disks = len(self.disks)
4473       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4474       if instance_disks < export_disks:
4475         raise errors.OpPrereqError("Not enough disks to import."
4476                                    " (instance: %d, export: %d)" %
4477                                    (instance_disks, export_disks))
4478
4479       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4480       disk_images = []
4481       for idx in range(export_disks):
4482         option = 'disk%d_dump' % idx
4483         if export_info.has_option(constants.INISECT_INS, option):
4484           # FIXME: are the old os-es, disk sizes, etc. useful?
4485           export_name = export_info.get(constants.INISECT_INS, option)
4486           image = os.path.join(src_path, export_name)
4487           disk_images.append(image)
4488         else:
4489           disk_images.append(False)
4490
4491       self.src_images = disk_images
4492
4493       old_name = export_info.get(constants.INISECT_INS, 'name')
4494       # FIXME: int() here could throw a ValueError on broken exports
4495       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4496       if self.op.instance_name == old_name:
4497         for idx, nic in enumerate(self.nics):
4498           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4499             nic_mac_ini = 'nic%d_mac' % idx
4500             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4501
4502     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4503     if self.op.start and not self.op.ip_check:
4504       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4505                                  " adding an instance in start mode")
4506
4507     if self.op.ip_check:
4508       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4509         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4510                                    (self.check_ip, self.op.instance_name))
4511
4512     #### allocator run
4513
4514     if self.op.iallocator is not None:
4515       self._RunAllocator()
4516
4517     #### node related checks
4518
4519     # check primary node
4520     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4521     assert self.pnode is not None, \
4522       "Cannot retrieve locked node %s" % self.op.pnode
4523     if pnode.offline:
4524       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4525                                  pnode.name)
4526     if pnode.drained:
4527       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4528                                  pnode.name)
4529
4530     self.secondaries = []
4531
4532     # mirror node verification
4533     if self.op.disk_template in constants.DTS_NET_MIRROR:
4534       if self.op.snode is None:
4535         raise errors.OpPrereqError("The networked disk templates need"
4536                                    " a mirror node")
4537       if self.op.snode == pnode.name:
4538         raise errors.OpPrereqError("The secondary node cannot be"
4539                                    " the primary node.")
4540       _CheckNodeOnline(self, self.op.snode)
4541       _CheckNodeNotDrained(self, self.op.snode)
4542       self.secondaries.append(self.op.snode)
4543
4544     nodenames = [pnode.name] + self.secondaries
4545
4546     req_size = _ComputeDiskSize(self.op.disk_template,
4547                                 self.disks)
4548
4549     # Check lv size requirements
4550     if req_size is not None:
4551       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4552                                          self.op.hypervisor)
4553       for node in nodenames:
4554         info = nodeinfo[node]
4555         info.Raise()
4556         info = info.data
4557         if not info:
4558           raise errors.OpPrereqError("Cannot get current information"
4559                                      " from node '%s'" % node)
4560         vg_free = info.get('vg_free', None)
4561         if not isinstance(vg_free, int):
4562           raise errors.OpPrereqError("Can't compute free disk space on"
4563                                      " node %s" % node)
4564         if req_size > info['vg_free']:
4565           raise errors.OpPrereqError("Not enough disk space on target node %s."
4566                                      " %d MB available, %d MB required" %
4567                                      (node, info['vg_free'], req_size))
4568
4569     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4570
4571     # os verification
4572     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4573     result.Raise()
4574     if not isinstance(result.data, objects.OS):
4575       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4576                                  " primary node"  % self.op.os_type)
4577
4578     # bridge check on primary node
4579     bridges = [n.bridge for n in self.nics]
4580     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4581     result.Raise()
4582     if not result.data:
4583       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4584                                  " exist on destination node '%s'" %
4585                                  (",".join(bridges), pnode.name))
4586
4587     # memory check on primary node
4588     if self.op.start:
4589       _CheckNodeFreeMemory(self, self.pnode.name,
4590                            "creating instance %s" % self.op.instance_name,
4591                            self.be_full[constants.BE_MEMORY],
4592                            self.op.hypervisor)
4593
4594   def Exec(self, feedback_fn):
4595     """Create and add the instance to the cluster.
4596
4597     """
4598     instance = self.op.instance_name
4599     pnode_name = self.pnode.name
4600
4601     for nic in self.nics:
4602       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4603         nic.mac = self.cfg.GenerateMAC()
4604
4605     ht_kind = self.op.hypervisor
4606     if ht_kind in constants.HTS_REQ_PORT:
4607       network_port = self.cfg.AllocatePort()
4608     else:
4609       network_port = None
4610
4611     ##if self.op.vnc_bind_address is None:
4612     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4613
4614     # this is needed because os.path.join does not accept None arguments
4615     if self.op.file_storage_dir is None:
4616       string_file_storage_dir = ""
4617     else:
4618       string_file_storage_dir = self.op.file_storage_dir
4619
4620     # build the full file storage dir path
4621     file_storage_dir = os.path.normpath(os.path.join(
4622                                         self.cfg.GetFileStorageDir(),
4623                                         string_file_storage_dir, instance))
4624
4625
4626     disks = _GenerateDiskTemplate(self,
4627                                   self.op.disk_template,
4628                                   instance, pnode_name,
4629                                   self.secondaries,
4630                                   self.disks,
4631                                   file_storage_dir,
4632                                   self.op.file_driver,
4633                                   0)
4634
4635     iobj = objects.Instance(name=instance, os=self.op.os_type,
4636                             primary_node=pnode_name,
4637                             nics=self.nics, disks=disks,
4638                             disk_template=self.op.disk_template,
4639                             admin_up=False,
4640                             network_port=network_port,
4641                             beparams=self.op.beparams,
4642                             hvparams=self.op.hvparams,
4643                             hypervisor=self.op.hypervisor,
4644                             )
4645
4646     feedback_fn("* creating instance disks...")
4647     try:
4648       _CreateDisks(self, iobj)
4649     except errors.OpExecError:
4650       self.LogWarning("Device creation failed, reverting...")
4651       try:
4652         _RemoveDisks(self, iobj)
4653       finally:
4654         self.cfg.ReleaseDRBDMinors(instance)
4655         raise
4656
4657     feedback_fn("adding instance %s to cluster config" % instance)
4658
4659     self.cfg.AddInstance(iobj)
4660     # Declare that we don't want to remove the instance lock anymore, as we've
4661     # added the instance to the config
4662     del self.remove_locks[locking.LEVEL_INSTANCE]
4663     # Unlock all the nodes
4664     if self.op.mode == constants.INSTANCE_IMPORT:
4665       nodes_keep = [self.op.src_node]
4666       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4667                        if node != self.op.src_node]
4668       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4669       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4670     else:
4671       self.context.glm.release(locking.LEVEL_NODE)
4672       del self.acquired_locks[locking.LEVEL_NODE]
4673
4674     if self.op.wait_for_sync:
4675       disk_abort = not _WaitForSync(self, iobj)
4676     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4677       # make sure the disks are not degraded (still sync-ing is ok)
4678       time.sleep(15)
4679       feedback_fn("* checking mirrors status")
4680       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4681     else:
4682       disk_abort = False
4683
4684     if disk_abort:
4685       _RemoveDisks(self, iobj)
4686       self.cfg.RemoveInstance(iobj.name)
4687       # Make sure the instance lock gets removed
4688       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4689       raise errors.OpExecError("There are some degraded disks for"
4690                                " this instance")
4691
4692     feedback_fn("creating os for instance %s on node %s" %
4693                 (instance, pnode_name))
4694
4695     if iobj.disk_template != constants.DT_DISKLESS:
4696       if self.op.mode == constants.INSTANCE_CREATE:
4697         feedback_fn("* running the instance OS create scripts...")
4698         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4699         msg = result.RemoteFailMsg()
4700         if msg:
4701           raise errors.OpExecError("Could not add os for instance %s"
4702                                    " on node %s: %s" %
4703                                    (instance, pnode_name, msg))
4704
4705       elif self.op.mode == constants.INSTANCE_IMPORT:
4706         feedback_fn("* running the instance OS import scripts...")
4707         src_node = self.op.src_node
4708         src_images = self.src_images
4709         cluster_name = self.cfg.GetClusterName()
4710         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4711                                                          src_node, src_images,
4712                                                          cluster_name)
4713         import_result.Raise()
4714         for idx, result in enumerate(import_result.data):
4715           if not result:
4716             self.LogWarning("Could not import the image %s for instance"
4717                             " %s, disk %d, on node %s" %
4718                             (src_images[idx], instance, idx, pnode_name))
4719       else:
4720         # also checked in the prereq part
4721         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4722                                      % self.op.mode)
4723
4724     if self.op.start:
4725       iobj.admin_up = True
4726       self.cfg.Update(iobj)
4727       logging.info("Starting instance %s on node %s", instance, pnode_name)
4728       feedback_fn("* starting instance...")
4729       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4730       msg = result.RemoteFailMsg()
4731       if msg:
4732         raise errors.OpExecError("Could not start instance: %s" % msg)
4733
4734
4735 class LUConnectConsole(NoHooksLU):
4736   """Connect to an instance's console.
4737
4738   This is somewhat special in that it returns the command line that
4739   you need to run on the master node in order to connect to the
4740   console.
4741
4742   """
4743   _OP_REQP = ["instance_name"]
4744   REQ_BGL = False
4745
4746   def ExpandNames(self):
4747     self._ExpandAndLockInstance()
4748
4749   def CheckPrereq(self):
4750     """Check prerequisites.
4751
4752     This checks that the instance is in the cluster.
4753
4754     """
4755     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4756     assert self.instance is not None, \
4757       "Cannot retrieve locked instance %s" % self.op.instance_name
4758     _CheckNodeOnline(self, self.instance.primary_node)
4759
4760   def Exec(self, feedback_fn):
4761     """Connect to the console of an instance
4762
4763     """
4764     instance = self.instance
4765     node = instance.primary_node
4766
4767     node_insts = self.rpc.call_instance_list([node],
4768                                              [instance.hypervisor])[node]
4769     node_insts.Raise()
4770
4771     if instance.name not in node_insts.data:
4772       raise errors.OpExecError("Instance %s is not running." % instance.name)
4773
4774     logging.debug("Connecting to console of %s on %s", instance.name, node)
4775
4776     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4777     cluster = self.cfg.GetClusterInfo()
4778     # beparams and hvparams are passed separately, to avoid editing the
4779     # instance and then saving the defaults in the instance itself.
4780     hvparams = cluster.FillHV(instance)
4781     beparams = cluster.FillBE(instance)
4782     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4783
4784     # build ssh cmdline
4785     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4786
4787
4788 class LUReplaceDisks(LogicalUnit):
4789   """Replace the disks of an instance.
4790
4791   """
4792   HPATH = "mirrors-replace"
4793   HTYPE = constants.HTYPE_INSTANCE
4794   _OP_REQP = ["instance_name", "mode", "disks"]
4795   REQ_BGL = False
4796
4797   def CheckArguments(self):
4798     if not hasattr(self.op, "remote_node"):
4799       self.op.remote_node = None
4800     if not hasattr(self.op, "iallocator"):
4801       self.op.iallocator = None
4802
4803     # check for valid parameter combination
4804     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4805     if self.op.mode == constants.REPLACE_DISK_CHG:
4806       if cnt == 2:
4807         raise errors.OpPrereqError("When changing the secondary either an"
4808                                    " iallocator script must be used or the"
4809                                    " new node given")
4810       elif cnt == 0:
4811         raise errors.OpPrereqError("Give either the iallocator or the new"
4812                                    " secondary, not both")
4813     else: # not replacing the secondary
4814       if cnt != 2:
4815         raise errors.OpPrereqError("The iallocator and new node options can"
4816                                    " be used only when changing the"
4817                                    " secondary node")
4818
4819   def ExpandNames(self):
4820     self._ExpandAndLockInstance()
4821
4822     if self.op.iallocator is not None:
4823       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4824     elif self.op.remote_node is not None:
4825       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4826       if remote_node is None:
4827         raise errors.OpPrereqError("Node '%s' not known" %
4828                                    self.op.remote_node)
4829       self.op.remote_node = remote_node
4830       # Warning: do not remove the locking of the new secondary here
4831       # unless DRBD8.AddChildren is changed to work in parallel;
4832       # currently it doesn't since parallel invocations of
4833       # FindUnusedMinor will conflict
4834       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4835       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4836     else:
4837       self.needed_locks[locking.LEVEL_NODE] = []
4838       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4839
4840   def DeclareLocks(self, level):
4841     # If we're not already locking all nodes in the set we have to declare the
4842     # instance's primary/secondary nodes.
4843     if (level == locking.LEVEL_NODE and
4844         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4845       self._LockInstancesNodes()
4846
4847   def _RunAllocator(self):
4848     """Compute a new secondary node using an IAllocator.
4849
4850     """
4851     ial = IAllocator(self,
4852                      mode=constants.IALLOCATOR_MODE_RELOC,
4853                      name=self.op.instance_name,
4854                      relocate_from=[self.sec_node])
4855
4856     ial.Run(self.op.iallocator)
4857
4858     if not ial.success:
4859       raise errors.OpPrereqError("Can't compute nodes using"
4860                                  " iallocator '%s': %s" % (self.op.iallocator,
4861                                                            ial.info))
4862     if len(ial.nodes) != ial.required_nodes:
4863       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4864                                  " of nodes (%s), required %s" %
4865                                  (len(ial.nodes), ial.required_nodes))
4866     self.op.remote_node = ial.nodes[0]
4867     self.LogInfo("Selected new secondary for the instance: %s",
4868                  self.op.remote_node)
4869
4870   def BuildHooksEnv(self):
4871     """Build hooks env.
4872
4873     This runs on the master, the primary and all the secondaries.
4874
4875     """
4876     env = {
4877       "MODE": self.op.mode,
4878       "NEW_SECONDARY": self.op.remote_node,
4879       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4880       }
4881     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4882     nl = [
4883       self.cfg.GetMasterNode(),
4884       self.instance.primary_node,
4885       ]
4886     if self.op.remote_node is not None:
4887       nl.append(self.op.remote_node)
4888     return env, nl, nl
4889
4890   def CheckPrereq(self):
4891     """Check prerequisites.
4892
4893     This checks that the instance is in the cluster.
4894
4895     """
4896     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4897     assert instance is not None, \
4898       "Cannot retrieve locked instance %s" % self.op.instance_name
4899     self.instance = instance
4900
4901     if instance.disk_template != constants.DT_DRBD8:
4902       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4903                                  " instances")
4904
4905     if len(instance.secondary_nodes) != 1:
4906       raise errors.OpPrereqError("The instance has a strange layout,"
4907                                  " expected one secondary but found %d" %
4908                                  len(instance.secondary_nodes))
4909
4910     self.sec_node = instance.secondary_nodes[0]
4911
4912     if self.op.iallocator is not None:
4913       self._RunAllocator()
4914
4915     remote_node = self.op.remote_node
4916     if remote_node is not None:
4917       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4918       assert self.remote_node_info is not None, \
4919         "Cannot retrieve locked node %s" % remote_node
4920     else:
4921       self.remote_node_info = None
4922     if remote_node == instance.primary_node:
4923       raise errors.OpPrereqError("The specified node is the primary node of"
4924                                  " the instance.")
4925     elif remote_node == self.sec_node:
4926       raise errors.OpPrereqError("The specified node is already the"
4927                                  " secondary node of the instance.")
4928
4929     if self.op.mode == constants.REPLACE_DISK_PRI:
4930       n1 = self.tgt_node = instance.primary_node
4931       n2 = self.oth_node = self.sec_node
4932     elif self.op.mode == constants.REPLACE_DISK_SEC:
4933       n1 = self.tgt_node = self.sec_node
4934       n2 = self.oth_node = instance.primary_node
4935     elif self.op.mode == constants.REPLACE_DISK_CHG:
4936       n1 = self.new_node = remote_node
4937       n2 = self.oth_node = instance.primary_node
4938       self.tgt_node = self.sec_node
4939       _CheckNodeNotDrained(self, remote_node)
4940     else:
4941       raise errors.ProgrammerError("Unhandled disk replace mode")
4942
4943     _CheckNodeOnline(self, n1)
4944     _CheckNodeOnline(self, n2)
4945
4946     if not self.op.disks:
4947       self.op.disks = range(len(instance.disks))
4948
4949     for disk_idx in self.op.disks:
4950       instance.FindDisk(disk_idx)
4951
4952   def _ExecD8DiskOnly(self, feedback_fn):
4953     """Replace a disk on the primary or secondary for dbrd8.
4954
4955     The algorithm for replace is quite complicated:
4956
4957       1. for each disk to be replaced:
4958
4959         1. create new LVs on the target node with unique names
4960         1. detach old LVs from the drbd device
4961         1. rename old LVs to name_replaced.<time_t>
4962         1. rename new LVs to old LVs
4963         1. attach the new LVs (with the old names now) to the drbd device
4964
4965       1. wait for sync across all devices
4966
4967       1. for each modified disk:
4968
4969         1. remove old LVs (which have the name name_replaces.<time_t>)
4970
4971     Failures are not very well handled.
4972
4973     """
4974     steps_total = 6
4975     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4976     instance = self.instance
4977     iv_names = {}
4978     vgname = self.cfg.GetVGName()
4979     # start of work
4980     cfg = self.cfg
4981     tgt_node = self.tgt_node
4982     oth_node = self.oth_node
4983
4984     # Step: check device activation
4985     self.proc.LogStep(1, steps_total, "check device existence")
4986     info("checking volume groups")
4987     my_vg = cfg.GetVGName()
4988     results = self.rpc.call_vg_list([oth_node, tgt_node])
4989     if not results:
4990       raise errors.OpExecError("Can't list volume groups on the nodes")
4991     for node in oth_node, tgt_node:
4992       res = results[node]
4993       if res.failed or not res.data or my_vg not in res.data:
4994         raise errors.OpExecError("Volume group '%s' not found on %s" %
4995                                  (my_vg, node))
4996     for idx, dev in enumerate(instance.disks):
4997       if idx not in self.op.disks:
4998         continue
4999       for node in tgt_node, oth_node:
5000         info("checking disk/%d on %s" % (idx, node))
5001         cfg.SetDiskID(dev, node)
5002         result = self.rpc.call_blockdev_find(node, dev)
5003         msg = result.RemoteFailMsg()
5004         if not msg and not result.payload:
5005           msg = "disk not found"
5006         if msg:
5007           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5008                                    (idx, node, msg))
5009
5010     # Step: check other node consistency
5011     self.proc.LogStep(2, steps_total, "check peer consistency")
5012     for idx, dev in enumerate(instance.disks):
5013       if idx not in self.op.disks:
5014         continue
5015       info("checking disk/%d consistency on %s" % (idx, oth_node))
5016       if not _CheckDiskConsistency(self, dev, oth_node,
5017                                    oth_node==instance.primary_node):
5018         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5019                                  " to replace disks on this node (%s)" %
5020                                  (oth_node, tgt_node))
5021
5022     # Step: create new storage
5023     self.proc.LogStep(3, steps_total, "allocate new storage")
5024     for idx, dev in enumerate(instance.disks):
5025       if idx not in self.op.disks:
5026         continue
5027       size = dev.size
5028       cfg.SetDiskID(dev, tgt_node)
5029       lv_names = [".disk%d_%s" % (idx, suf)
5030                   for suf in ["data", "meta"]]
5031       names = _GenerateUniqueNames(self, lv_names)
5032       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5033                              logical_id=(vgname, names[0]))
5034       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5035                              logical_id=(vgname, names[1]))
5036       new_lvs = [lv_data, lv_meta]
5037       old_lvs = dev.children
5038       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5039       info("creating new local storage on %s for %s" %
5040            (tgt_node, dev.iv_name))
5041       # we pass force_create=True to force the LVM creation
5042       for new_lv in new_lvs:
5043         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5044                         _GetInstanceInfoText(instance), False)
5045
5046     # Step: for each lv, detach+rename*2+attach
5047     self.proc.LogStep(4, steps_total, "change drbd configuration")
5048     for dev, old_lvs, new_lvs in iv_names.itervalues():
5049       info("detaching %s drbd from local storage" % dev.iv_name)
5050       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5051       result.Raise()
5052       if not result.data:
5053         raise errors.OpExecError("Can't detach drbd from local storage on node"
5054                                  " %s for device %s" % (tgt_node, dev.iv_name))
5055       #dev.children = []
5056       #cfg.Update(instance)
5057
5058       # ok, we created the new LVs, so now we know we have the needed
5059       # storage; as such, we proceed on the target node to rename
5060       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5061       # using the assumption that logical_id == physical_id (which in
5062       # turn is the unique_id on that node)
5063
5064       # FIXME(iustin): use a better name for the replaced LVs
5065       temp_suffix = int(time.time())
5066       ren_fn = lambda d, suff: (d.physical_id[0],
5067                                 d.physical_id[1] + "_replaced-%s" % suff)
5068       # build the rename list based on what LVs exist on the node
5069       rlist = []
5070       for to_ren in old_lvs:
5071         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5072         if not result.RemoteFailMsg() and result.payload:
5073           # device exists
5074           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5075
5076       info("renaming the old LVs on the target node")
5077       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5078       result.Raise()
5079       if not result.data:
5080         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5081       # now we rename the new LVs to the old LVs
5082       info("renaming the new LVs on the target node")
5083       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5084       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5085       result.Raise()
5086       if not result.data:
5087         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5088
5089       for old, new in zip(old_lvs, new_lvs):
5090         new.logical_id = old.logical_id
5091         cfg.SetDiskID(new, tgt_node)
5092
5093       for disk in old_lvs:
5094         disk.logical_id = ren_fn(disk, temp_suffix)
5095         cfg.SetDiskID(disk, tgt_node)
5096
5097       # now that the new lvs have the old name, we can add them to the device
5098       info("adding new mirror component on %s" % tgt_node)
5099       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5100       if result.failed or not result.data:
5101         for new_lv in new_lvs:
5102           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5103           if msg:
5104             warning("Can't rollback device %s: %s", dev, msg,
5105                     hint="cleanup manually the unused logical volumes")
5106         raise errors.OpExecError("Can't add local storage to drbd")
5107
5108       dev.children = new_lvs
5109       cfg.Update(instance)
5110
5111     # Step: wait for sync
5112
5113     # this can fail as the old devices are degraded and _WaitForSync
5114     # does a combined result over all disks, so we don't check its
5115     # return value
5116     self.proc.LogStep(5, steps_total, "sync devices")
5117     _WaitForSync(self, instance, unlock=True)
5118
5119     # so check manually all the devices
5120     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5121       cfg.SetDiskID(dev, instance.primary_node)
5122       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5123       msg = result.RemoteFailMsg()
5124       if not msg and not result.payload:
5125         msg = "disk not found"
5126       if msg:
5127         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5128                                  (name, msg))
5129       if result.payload[5]:
5130         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5131
5132     # Step: remove old storage
5133     self.proc.LogStep(6, steps_total, "removing old storage")
5134     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5135       info("remove logical volumes for %s" % name)
5136       for lv in old_lvs:
5137         cfg.SetDiskID(lv, tgt_node)
5138         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5139         if msg:
5140           warning("Can't remove old LV: %s" % msg,
5141                   hint="manually remove unused LVs")
5142           continue
5143
5144   def _ExecD8Secondary(self, feedback_fn):
5145     """Replace the secondary node for drbd8.
5146
5147     The algorithm for replace is quite complicated:
5148       - for all disks of the instance:
5149         - create new LVs on the new node with same names
5150         - shutdown the drbd device on the old secondary
5151         - disconnect the drbd network on the primary
5152         - create the drbd device on the new secondary
5153         - network attach the drbd on the primary, using an artifice:
5154           the drbd code for Attach() will connect to the network if it
5155           finds a device which is connected to the good local disks but
5156           not network enabled
5157       - wait for sync across all devices
5158       - remove all disks from the old secondary
5159
5160     Failures are not very well handled.
5161
5162     """
5163     steps_total = 6
5164     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5165     instance = self.instance
5166     iv_names = {}
5167     # start of work
5168     cfg = self.cfg
5169     old_node = self.tgt_node
5170     new_node = self.new_node
5171     pri_node = instance.primary_node
5172     nodes_ip = {
5173       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5174       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5175       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5176       }
5177
5178     # Step: check device activation
5179     self.proc.LogStep(1, steps_total, "check device existence")
5180     info("checking volume groups")
5181     my_vg = cfg.GetVGName()
5182     results = self.rpc.call_vg_list([pri_node, new_node])
5183     for node in pri_node, new_node:
5184       res = results[node]
5185       if res.failed or not res.data or my_vg not in res.data:
5186         raise errors.OpExecError("Volume group '%s' not found on %s" %
5187                                  (my_vg, node))
5188     for idx, dev in enumerate(instance.disks):
5189       if idx not in self.op.disks:
5190         continue
5191       info("checking disk/%d on %s" % (idx, pri_node))
5192       cfg.SetDiskID(dev, pri_node)
5193       result = self.rpc.call_blockdev_find(pri_node, dev)
5194       msg = result.RemoteFailMsg()
5195       if not msg and not result.payload:
5196         msg = "disk not found"
5197       if msg:
5198         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5199                                  (idx, pri_node, msg))
5200
5201     # Step: check other node consistency
5202     self.proc.LogStep(2, steps_total, "check peer consistency")
5203     for idx, dev in enumerate(instance.disks):
5204       if idx not in self.op.disks:
5205         continue
5206       info("checking disk/%d consistency on %s" % (idx, pri_node))
5207       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5208         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5209                                  " unsafe to replace the secondary" %
5210                                  pri_node)
5211
5212     # Step: create new storage
5213     self.proc.LogStep(3, steps_total, "allocate new storage")
5214     for idx, dev in enumerate(instance.disks):
5215       info("adding new local storage on %s for disk/%d" %
5216            (new_node, idx))
5217       # we pass force_create=True to force LVM creation
5218       for new_lv in dev.children:
5219         _CreateBlockDev(self, new_node, instance, new_lv, True,
5220                         _GetInstanceInfoText(instance), False)
5221
5222     # Step 4: dbrd minors and drbd setups changes
5223     # after this, we must manually remove the drbd minors on both the
5224     # error and the success paths
5225     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5226                                    instance.name)
5227     logging.debug("Allocated minors %s" % (minors,))
5228     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5229     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5230       size = dev.size
5231       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5232       # create new devices on new_node; note that we create two IDs:
5233       # one without port, so the drbd will be activated without
5234       # networking information on the new node at this stage, and one
5235       # with network, for the latter activation in step 4
5236       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5237       if pri_node == o_node1:
5238         p_minor = o_minor1
5239       else:
5240         p_minor = o_minor2
5241
5242       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5243       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5244
5245       iv_names[idx] = (dev, dev.children, new_net_id)
5246       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5247                     new_net_id)
5248       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5249                               logical_id=new_alone_id,
5250                               children=dev.children)
5251       try:
5252         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5253                               _GetInstanceInfoText(instance), False)
5254       except errors.BlockDeviceError:
5255         self.cfg.ReleaseDRBDMinors(instance.name)
5256         raise
5257
5258     for idx, dev in enumerate(instance.disks):
5259       # we have new devices, shutdown the drbd on the old secondary
5260       info("shutting down drbd for disk/%d on old node" % idx)
5261       cfg.SetDiskID(dev, old_node)
5262       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5263       if msg:
5264         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5265                 (idx, msg),
5266                 hint="Please cleanup this device manually as soon as possible")
5267
5268     info("detaching primary drbds from the network (=> standalone)")
5269     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5270                                                instance.disks)[pri_node]
5271
5272     msg = result.RemoteFailMsg()
5273     if msg:
5274       # detaches didn't succeed (unlikely)
5275       self.cfg.ReleaseDRBDMinors(instance.name)
5276       raise errors.OpExecError("Can't detach the disks from the network on"
5277                                " old node: %s" % (msg,))
5278
5279     # if we managed to detach at least one, we update all the disks of
5280     # the instance to point to the new secondary
5281     info("updating instance configuration")
5282     for dev, _, new_logical_id in iv_names.itervalues():
5283       dev.logical_id = new_logical_id
5284       cfg.SetDiskID(dev, pri_node)
5285     cfg.Update(instance)
5286
5287     # and now perform the drbd attach
5288     info("attaching primary drbds to new secondary (standalone => connected)")
5289     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5290                                            instance.disks, instance.name,
5291                                            False)
5292     for to_node, to_result in result.items():
5293       msg = to_result.RemoteFailMsg()
5294       if msg:
5295         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5296                 hint="please do a gnt-instance info to see the"
5297                 " status of disks")
5298
5299     # this can fail as the old devices are degraded and _WaitForSync
5300     # does a combined result over all disks, so we don't check its
5301     # return value
5302     self.proc.LogStep(5, steps_total, "sync devices")
5303     _WaitForSync(self, instance, unlock=True)
5304
5305     # so check manually all the devices
5306     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5307       cfg.SetDiskID(dev, pri_node)
5308       result = self.rpc.call_blockdev_find(pri_node, dev)
5309       msg = result.RemoteFailMsg()
5310       if not msg and not result.payload:
5311         msg = "disk not found"
5312       if msg:
5313         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5314                                  (idx, msg))
5315       if result.payload[5]:
5316         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5317
5318     self.proc.LogStep(6, steps_total, "removing old storage")
5319     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5320       info("remove logical volumes for disk/%d" % idx)
5321       for lv in old_lvs:
5322         cfg.SetDiskID(lv, old_node)
5323         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5324         if msg:
5325           warning("Can't remove LV on old secondary: %s", msg,
5326                   hint="Cleanup stale volumes by hand")
5327
5328   def Exec(self, feedback_fn):
5329     """Execute disk replacement.
5330
5331     This dispatches the disk replacement to the appropriate handler.
5332
5333     """
5334     instance = self.instance
5335
5336     # Activate the instance disks if we're replacing them on a down instance
5337     if not instance.admin_up:
5338       _StartInstanceDisks(self, instance, True)
5339
5340     if self.op.mode == constants.REPLACE_DISK_CHG:
5341       fn = self._ExecD8Secondary
5342     else:
5343       fn = self._ExecD8DiskOnly
5344
5345     ret = fn(feedback_fn)
5346
5347     # Deactivate the instance disks if we're replacing them on a down instance
5348     if not instance.admin_up:
5349       _SafeShutdownInstanceDisks(self, instance)
5350
5351     return ret
5352
5353
5354 class LUGrowDisk(LogicalUnit):
5355   """Grow a disk of an instance.
5356
5357   """
5358   HPATH = "disk-grow"
5359   HTYPE = constants.HTYPE_INSTANCE
5360   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5361   REQ_BGL = False
5362
5363   def ExpandNames(self):
5364     self._ExpandAndLockInstance()
5365     self.needed_locks[locking.LEVEL_NODE] = []
5366     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5367
5368   def DeclareLocks(self, level):
5369     if level == locking.LEVEL_NODE:
5370       self._LockInstancesNodes()
5371
5372   def BuildHooksEnv(self):
5373     """Build hooks env.
5374
5375     This runs on the master, the primary and all the secondaries.
5376
5377     """
5378     env = {
5379       "DISK": self.op.disk,
5380       "AMOUNT": self.op.amount,
5381       }
5382     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5383     nl = [
5384       self.cfg.GetMasterNode(),
5385       self.instance.primary_node,
5386       ]
5387     return env, nl, nl
5388
5389   def CheckPrereq(self):
5390     """Check prerequisites.
5391
5392     This checks that the instance is in the cluster.
5393
5394     """
5395     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5396     assert instance is not None, \
5397       "Cannot retrieve locked instance %s" % self.op.instance_name
5398     nodenames = list(instance.all_nodes)
5399     for node in nodenames:
5400       _CheckNodeOnline(self, node)
5401
5402
5403     self.instance = instance
5404
5405     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5406       raise errors.OpPrereqError("Instance's disk layout does not support"
5407                                  " growing.")
5408
5409     self.disk = instance.FindDisk(self.op.disk)
5410
5411     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5412                                        instance.hypervisor)
5413     for node in nodenames:
5414       info = nodeinfo[node]
5415       if info.failed or not info.data:
5416         raise errors.OpPrereqError("Cannot get current information"
5417                                    " from node '%s'" % node)
5418       vg_free = info.data.get('vg_free', None)
5419       if not isinstance(vg_free, int):
5420         raise errors.OpPrereqError("Can't compute free disk space on"
5421                                    " node %s" % node)
5422       if self.op.amount > vg_free:
5423         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5424                                    " %d MiB available, %d MiB required" %
5425                                    (node, vg_free, self.op.amount))
5426
5427   def Exec(self, feedback_fn):
5428     """Execute disk grow.
5429
5430     """
5431     instance = self.instance
5432     disk = self.disk
5433     for node in instance.all_nodes:
5434       self.cfg.SetDiskID(disk, node)
5435       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5436       msg = result.RemoteFailMsg()
5437       if msg:
5438         raise errors.OpExecError("Grow request failed to node %s: %s" %
5439                                  (node, msg))
5440     disk.RecordGrow(self.op.amount)
5441     self.cfg.Update(instance)
5442     if self.op.wait_for_sync:
5443       disk_abort = not _WaitForSync(self, instance)
5444       if disk_abort:
5445         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5446                              " status.\nPlease check the instance.")
5447
5448
5449 class LUQueryInstanceData(NoHooksLU):
5450   """Query runtime instance data.
5451
5452   """
5453   _OP_REQP = ["instances", "static"]
5454   REQ_BGL = False
5455
5456   def ExpandNames(self):
5457     self.needed_locks = {}
5458     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5459
5460     if not isinstance(self.op.instances, list):
5461       raise errors.OpPrereqError("Invalid argument type 'instances'")
5462
5463     if self.op.instances:
5464       self.wanted_names = []
5465       for name in self.op.instances:
5466         full_name = self.cfg.ExpandInstanceName(name)
5467         if full_name is None:
5468           raise errors.OpPrereqError("Instance '%s' not known" % name)
5469         self.wanted_names.append(full_name)
5470       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5471     else:
5472       self.wanted_names = None
5473       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5474
5475     self.needed_locks[locking.LEVEL_NODE] = []
5476     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5477
5478   def DeclareLocks(self, level):
5479     if level == locking.LEVEL_NODE:
5480       self._LockInstancesNodes()
5481
5482   def CheckPrereq(self):
5483     """Check prerequisites.
5484
5485     This only checks the optional instance list against the existing names.
5486
5487     """
5488     if self.wanted_names is None:
5489       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5490
5491     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5492                              in self.wanted_names]
5493     return
5494
5495   def _ComputeDiskStatus(self, instance, snode, dev):
5496     """Compute block device status.
5497
5498     """
5499     static = self.op.static
5500     if not static:
5501       self.cfg.SetDiskID(dev, instance.primary_node)
5502       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5503       msg = dev_pstatus.RemoteFailMsg()
5504       if msg:
5505         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5506                                  (instance.name, msg))
5507       dev_pstatus = dev_pstatus.payload
5508     else:
5509       dev_pstatus = None
5510
5511     if dev.dev_type in constants.LDS_DRBD:
5512       # we change the snode then (otherwise we use the one passed in)
5513       if dev.logical_id[0] == instance.primary_node:
5514         snode = dev.logical_id[1]
5515       else:
5516         snode = dev.logical_id[0]
5517
5518     if snode and not static:
5519       self.cfg.SetDiskID(dev, snode)
5520       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5521       msg = dev_sstatus.RemoteFailMsg()
5522       if msg:
5523         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5524                                  (instance.name, msg))
5525       dev_sstatus = dev_sstatus.payload
5526     else:
5527       dev_sstatus = None
5528
5529     if dev.children:
5530       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5531                       for child in dev.children]
5532     else:
5533       dev_children = []
5534
5535     data = {
5536       "iv_name": dev.iv_name,
5537       "dev_type": dev.dev_type,
5538       "logical_id": dev.logical_id,
5539       "physical_id": dev.physical_id,
5540       "pstatus": dev_pstatus,
5541       "sstatus": dev_sstatus,
5542       "children": dev_children,
5543       "mode": dev.mode,
5544       }
5545
5546     return data
5547
5548   def Exec(self, feedback_fn):
5549     """Gather and return data"""
5550     result = {}
5551
5552     cluster = self.cfg.GetClusterInfo()
5553
5554     for instance in self.wanted_instances:
5555       if not self.op.static:
5556         remote_info = self.rpc.call_instance_info(instance.primary_node,
5557                                                   instance.name,
5558                                                   instance.hypervisor)
5559         remote_info.Raise()
5560         remote_info = remote_info.data
5561         if remote_info and "state" in remote_info:
5562           remote_state = "up"
5563         else:
5564           remote_state = "down"
5565       else:
5566         remote_state = None
5567       if instance.admin_up:
5568         config_state = "up"
5569       else:
5570         config_state = "down"
5571
5572       disks = [self._ComputeDiskStatus(instance, None, device)
5573                for device in instance.disks]
5574
5575       idict = {
5576         "name": instance.name,
5577         "config_state": config_state,
5578         "run_state": remote_state,
5579         "pnode": instance.primary_node,
5580         "snodes": instance.secondary_nodes,
5581         "os": instance.os,
5582         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5583         "disks": disks,
5584         "hypervisor": instance.hypervisor,
5585         "network_port": instance.network_port,
5586         "hv_instance": instance.hvparams,
5587         "hv_actual": cluster.FillHV(instance),
5588         "be_instance": instance.beparams,
5589         "be_actual": cluster.FillBE(instance),
5590         }
5591
5592       result[instance.name] = idict
5593
5594     return result
5595
5596
5597 class LUSetInstanceParams(LogicalUnit):
5598   """Modifies an instances's parameters.
5599
5600   """
5601   HPATH = "instance-modify"
5602   HTYPE = constants.HTYPE_INSTANCE
5603   _OP_REQP = ["instance_name"]
5604   REQ_BGL = False
5605
5606   def CheckArguments(self):
5607     if not hasattr(self.op, 'nics'):
5608       self.op.nics = []
5609     if not hasattr(self.op, 'disks'):
5610       self.op.disks = []
5611     if not hasattr(self.op, 'beparams'):
5612       self.op.beparams = {}
5613     if not hasattr(self.op, 'hvparams'):
5614       self.op.hvparams = {}
5615     self.op.force = getattr(self.op, "force", False)
5616     if not (self.op.nics or self.op.disks or
5617             self.op.hvparams or self.op.beparams):
5618       raise errors.OpPrereqError("No changes submitted")
5619
5620     # Disk validation
5621     disk_addremove = 0
5622     for disk_op, disk_dict in self.op.disks:
5623       if disk_op == constants.DDM_REMOVE:
5624         disk_addremove += 1
5625         continue
5626       elif disk_op == constants.DDM_ADD:
5627         disk_addremove += 1
5628       else:
5629         if not isinstance(disk_op, int):
5630           raise errors.OpPrereqError("Invalid disk index")
5631       if disk_op == constants.DDM_ADD:
5632         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5633         if mode not in constants.DISK_ACCESS_SET:
5634           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5635         size = disk_dict.get('size', None)
5636         if size is None:
5637           raise errors.OpPrereqError("Required disk parameter size missing")
5638         try:
5639           size = int(size)
5640         except ValueError, err:
5641           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5642                                      str(err))
5643         disk_dict['size'] = size
5644       else:
5645         # modification of disk
5646         if 'size' in disk_dict:
5647           raise errors.OpPrereqError("Disk size change not possible, use"
5648                                      " grow-disk")
5649
5650     if disk_addremove > 1:
5651       raise errors.OpPrereqError("Only one disk add or remove operation"
5652                                  " supported at a time")
5653
5654     # NIC validation
5655     nic_addremove = 0
5656     for nic_op, nic_dict in self.op.nics:
5657       if nic_op == constants.DDM_REMOVE:
5658         nic_addremove += 1
5659         continue
5660       elif nic_op == constants.DDM_ADD:
5661         nic_addremove += 1
5662       else:
5663         if not isinstance(nic_op, int):
5664           raise errors.OpPrereqError("Invalid nic index")
5665
5666       # nic_dict should be a dict
5667       nic_ip = nic_dict.get('ip', None)
5668       if nic_ip is not None:
5669         if nic_ip.lower() == "none":
5670           nic_dict['ip'] = None
5671         else:
5672           if not utils.IsValidIP(nic_ip):
5673             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5674       # we can only check None bridges and assign the default one
5675       nic_bridge = nic_dict.get('bridge', None)
5676       if nic_bridge is None:
5677         nic_dict['bridge'] = self.cfg.GetDefBridge()
5678       # but we can validate MACs
5679       nic_mac = nic_dict.get('mac', None)
5680       if nic_mac is not None:
5681         if self.cfg.IsMacInUse(nic_mac):
5682           raise errors.OpPrereqError("MAC address %s already in use"
5683                                      " in cluster" % nic_mac)
5684         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5685           if not utils.IsValidMac(nic_mac):
5686             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5687     if nic_addremove > 1:
5688       raise errors.OpPrereqError("Only one NIC add or remove operation"
5689                                  " supported at a time")
5690
5691   def ExpandNames(self):
5692     self._ExpandAndLockInstance()
5693     self.needed_locks[locking.LEVEL_NODE] = []
5694     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5695
5696   def DeclareLocks(self, level):
5697     if level == locking.LEVEL_NODE:
5698       self._LockInstancesNodes()
5699
5700   def BuildHooksEnv(self):
5701     """Build hooks env.
5702
5703     This runs on the master, primary and secondaries.
5704
5705     """
5706     args = dict()
5707     if constants.BE_MEMORY in self.be_new:
5708       args['memory'] = self.be_new[constants.BE_MEMORY]
5709     if constants.BE_VCPUS in self.be_new:
5710       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5711     # FIXME: readd disk/nic changes
5712     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5713     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5714     return env, nl, nl
5715
5716   def CheckPrereq(self):
5717     """Check prerequisites.
5718
5719     This only checks the instance list against the existing names.
5720
5721     """
5722     force = self.force = self.op.force
5723
5724     # checking the new params on the primary/secondary nodes
5725
5726     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5727     assert self.instance is not None, \
5728       "Cannot retrieve locked instance %s" % self.op.instance_name
5729     pnode = instance.primary_node
5730     nodelist = list(instance.all_nodes)
5731
5732     # hvparams processing
5733     if self.op.hvparams:
5734       i_hvdict = copy.deepcopy(instance.hvparams)
5735       for key, val in self.op.hvparams.iteritems():
5736         if val == constants.VALUE_DEFAULT:
5737           try:
5738             del i_hvdict[key]
5739           except KeyError:
5740             pass
5741         else:
5742           i_hvdict[key] = val
5743       cluster = self.cfg.GetClusterInfo()
5744       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5745       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5746                                 i_hvdict)
5747       # local check
5748       hypervisor.GetHypervisor(
5749         instance.hypervisor).CheckParameterSyntax(hv_new)
5750       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5751       self.hv_new = hv_new # the new actual values
5752       self.hv_inst = i_hvdict # the new dict (without defaults)
5753     else:
5754       self.hv_new = self.hv_inst = {}
5755
5756     # beparams processing
5757     if self.op.beparams:
5758       i_bedict = copy.deepcopy(instance.beparams)
5759       for key, val in self.op.beparams.iteritems():
5760         if val == constants.VALUE_DEFAULT:
5761           try:
5762             del i_bedict[key]
5763           except KeyError:
5764             pass
5765         else:
5766           i_bedict[key] = val
5767       cluster = self.cfg.GetClusterInfo()
5768       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5769       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5770                                 i_bedict)
5771       self.be_new = be_new # the new actual values
5772       self.be_inst = i_bedict # the new dict (without defaults)
5773     else:
5774       self.be_new = self.be_inst = {}
5775
5776     self.warn = []
5777
5778     if constants.BE_MEMORY in self.op.beparams and not self.force:
5779       mem_check_list = [pnode]
5780       if be_new[constants.BE_AUTO_BALANCE]:
5781         # either we changed auto_balance to yes or it was from before
5782         mem_check_list.extend(instance.secondary_nodes)
5783       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5784                                                   instance.hypervisor)
5785       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5786                                          instance.hypervisor)
5787       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5788         # Assume the primary node is unreachable and go ahead
5789         self.warn.append("Can't get info from primary node %s" % pnode)
5790       else:
5791         if not instance_info.failed and instance_info.data:
5792           current_mem = instance_info.data['memory']
5793         else:
5794           # Assume instance not running
5795           # (there is a slight race condition here, but it's not very probable,
5796           # and we have no other way to check)
5797           current_mem = 0
5798         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5799                     nodeinfo[pnode].data['memory_free'])
5800         if miss_mem > 0:
5801           raise errors.OpPrereqError("This change will prevent the instance"
5802                                      " from starting, due to %d MB of memory"
5803                                      " missing on its primary node" % miss_mem)
5804
5805       if be_new[constants.BE_AUTO_BALANCE]:
5806         for node, nres in nodeinfo.iteritems():
5807           if node not in instance.secondary_nodes:
5808             continue
5809           if nres.failed or not isinstance(nres.data, dict):
5810             self.warn.append("Can't get info from secondary node %s" % node)
5811           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5812             self.warn.append("Not enough memory to failover instance to"
5813                              " secondary node %s" % node)
5814
5815     # NIC processing
5816     for nic_op, nic_dict in self.op.nics:
5817       if nic_op == constants.DDM_REMOVE:
5818         if not instance.nics:
5819           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5820         continue
5821       if nic_op != constants.DDM_ADD:
5822         # an existing nic
5823         if nic_op < 0 or nic_op >= len(instance.nics):
5824           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5825                                      " are 0 to %d" %
5826                                      (nic_op, len(instance.nics)))
5827       nic_bridge = nic_dict.get('bridge', None)
5828       if nic_bridge is not None:
5829         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5830           msg = ("Bridge '%s' doesn't exist on one of"
5831                  " the instance nodes" % nic_bridge)
5832           if self.force:
5833             self.warn.append(msg)
5834           else:
5835             raise errors.OpPrereqError(msg)
5836
5837     # DISK processing
5838     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5839       raise errors.OpPrereqError("Disk operations not supported for"
5840                                  " diskless instances")
5841     for disk_op, disk_dict in self.op.disks:
5842       if disk_op == constants.DDM_REMOVE:
5843         if len(instance.disks) == 1:
5844           raise errors.OpPrereqError("Cannot remove the last disk of"
5845                                      " an instance")
5846         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5847         ins_l = ins_l[pnode]
5848         if ins_l.failed or not isinstance(ins_l.data, list):
5849           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5850         if instance.name in ins_l.data:
5851           raise errors.OpPrereqError("Instance is running, can't remove"
5852                                      " disks.")
5853
5854       if (disk_op == constants.DDM_ADD and
5855           len(instance.nics) >= constants.MAX_DISKS):
5856         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5857                                    " add more" % constants.MAX_DISKS)
5858       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5859         # an existing disk
5860         if disk_op < 0 or disk_op >= len(instance.disks):
5861           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5862                                      " are 0 to %d" %
5863                                      (disk_op, len(instance.disks)))
5864
5865     return
5866
5867   def Exec(self, feedback_fn):
5868     """Modifies an instance.
5869
5870     All parameters take effect only at the next restart of the instance.
5871
5872     """
5873     # Process here the warnings from CheckPrereq, as we don't have a
5874     # feedback_fn there.
5875     for warn in self.warn:
5876       feedback_fn("WARNING: %s" % warn)
5877
5878     result = []
5879     instance = self.instance
5880     # disk changes
5881     for disk_op, disk_dict in self.op.disks:
5882       if disk_op == constants.DDM_REMOVE:
5883         # remove the last disk
5884         device = instance.disks.pop()
5885         device_idx = len(instance.disks)
5886         for node, disk in device.ComputeNodeTree(instance.primary_node):
5887           self.cfg.SetDiskID(disk, node)
5888           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5889           if msg:
5890             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5891                             " continuing anyway", device_idx, node, msg)
5892         result.append(("disk/%d" % device_idx, "remove"))
5893       elif disk_op == constants.DDM_ADD:
5894         # add a new disk
5895         if instance.disk_template == constants.DT_FILE:
5896           file_driver, file_path = instance.disks[0].logical_id
5897           file_path = os.path.dirname(file_path)
5898         else:
5899           file_driver = file_path = None
5900         disk_idx_base = len(instance.disks)
5901         new_disk = _GenerateDiskTemplate(self,
5902                                          instance.disk_template,
5903                                          instance.name, instance.primary_node,
5904                                          instance.secondary_nodes,
5905                                          [disk_dict],
5906                                          file_path,
5907                                          file_driver,
5908                                          disk_idx_base)[0]
5909         instance.disks.append(new_disk)
5910         info = _GetInstanceInfoText(instance)
5911
5912         logging.info("Creating volume %s for instance %s",
5913                      new_disk.iv_name, instance.name)
5914         # Note: this needs to be kept in sync with _CreateDisks
5915         #HARDCODE
5916         for node in instance.all_nodes:
5917           f_create = node == instance.primary_node
5918           try:
5919             _CreateBlockDev(self, node, instance, new_disk,
5920                             f_create, info, f_create)
5921           except errors.OpExecError, err:
5922             self.LogWarning("Failed to create volume %s (%s) on"
5923                             " node %s: %s",
5924                             new_disk.iv_name, new_disk, node, err)
5925         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5926                        (new_disk.size, new_disk.mode)))
5927       else:
5928         # change a given disk
5929         instance.disks[disk_op].mode = disk_dict['mode']
5930         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5931     # NIC changes
5932     for nic_op, nic_dict in self.op.nics:
5933       if nic_op == constants.DDM_REMOVE:
5934         # remove the last nic
5935         del instance.nics[-1]
5936         result.append(("nic.%d" % len(instance.nics), "remove"))
5937       elif nic_op == constants.DDM_ADD:
5938         # add a new nic
5939         if 'mac' not in nic_dict:
5940           mac = constants.VALUE_GENERATE
5941         else:
5942           mac = nic_dict['mac']
5943         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5944           mac = self.cfg.GenerateMAC()
5945         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5946                               bridge=nic_dict.get('bridge', None))
5947         instance.nics.append(new_nic)
5948         result.append(("nic.%d" % (len(instance.nics) - 1),
5949                        "add:mac=%s,ip=%s,bridge=%s" %
5950                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5951       else:
5952         # change a given nic
5953         for key in 'mac', 'ip', 'bridge':
5954           if key in nic_dict:
5955             setattr(instance.nics[nic_op], key, nic_dict[key])
5956             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5957
5958     # hvparams changes
5959     if self.op.hvparams:
5960       instance.hvparams = self.hv_inst
5961       for key, val in self.op.hvparams.iteritems():
5962         result.append(("hv/%s" % key, val))
5963
5964     # beparams changes
5965     if self.op.beparams:
5966       instance.beparams = self.be_inst
5967       for key, val in self.op.beparams.iteritems():
5968         result.append(("be/%s" % key, val))
5969
5970     self.cfg.Update(instance)
5971
5972     return result
5973
5974
5975 class LUQueryExports(NoHooksLU):
5976   """Query the exports list
5977
5978   """
5979   _OP_REQP = ['nodes']
5980   REQ_BGL = False
5981
5982   def ExpandNames(self):
5983     self.needed_locks = {}
5984     self.share_locks[locking.LEVEL_NODE] = 1
5985     if not self.op.nodes:
5986       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5987     else:
5988       self.needed_locks[locking.LEVEL_NODE] = \
5989         _GetWantedNodes(self, self.op.nodes)
5990
5991   def CheckPrereq(self):
5992     """Check prerequisites.
5993
5994     """
5995     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5996
5997   def Exec(self, feedback_fn):
5998     """Compute the list of all the exported system images.
5999
6000     @rtype: dict
6001     @return: a dictionary with the structure node->(export-list)
6002         where export-list is a list of the instances exported on
6003         that node.
6004
6005     """
6006     rpcresult = self.rpc.call_export_list(self.nodes)
6007     result = {}
6008     for node in rpcresult:
6009       if rpcresult[node].failed:
6010         result[node] = False
6011       else:
6012         result[node] = rpcresult[node].data
6013
6014     return result
6015
6016
6017 class LUExportInstance(LogicalUnit):
6018   """Export an instance to an image in the cluster.
6019
6020   """
6021   HPATH = "instance-export"
6022   HTYPE = constants.HTYPE_INSTANCE
6023   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6024   REQ_BGL = False
6025
6026   def ExpandNames(self):
6027     self._ExpandAndLockInstance()
6028     # FIXME: lock only instance primary and destination node
6029     #
6030     # Sad but true, for now we have do lock all nodes, as we don't know where
6031     # the previous export might be, and and in this LU we search for it and
6032     # remove it from its current node. In the future we could fix this by:
6033     #  - making a tasklet to search (share-lock all), then create the new one,
6034     #    then one to remove, after
6035     #  - removing the removal operation altoghether
6036     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6037
6038   def DeclareLocks(self, level):
6039     """Last minute lock declaration."""
6040     # All nodes are locked anyway, so nothing to do here.
6041
6042   def BuildHooksEnv(self):
6043     """Build hooks env.
6044
6045     This will run on the master, primary node and target node.
6046
6047     """
6048     env = {
6049       "EXPORT_NODE": self.op.target_node,
6050       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6051       }
6052     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6053     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6054           self.op.target_node]
6055     return env, nl, nl
6056
6057   def CheckPrereq(self):
6058     """Check prerequisites.
6059
6060     This checks that the instance and node names are valid.
6061
6062     """
6063     instance_name = self.op.instance_name
6064     self.instance = self.cfg.GetInstanceInfo(instance_name)
6065     assert self.instance is not None, \
6066           "Cannot retrieve locked instance %s" % self.op.instance_name
6067     _CheckNodeOnline(self, self.instance.primary_node)
6068
6069     self.dst_node = self.cfg.GetNodeInfo(
6070       self.cfg.ExpandNodeName(self.op.target_node))
6071
6072     if self.dst_node is None:
6073       # This is wrong node name, not a non-locked node
6074       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6075     _CheckNodeOnline(self, self.dst_node.name)
6076     _CheckNodeNotDrained(self, self.dst_node.name)
6077
6078     # instance disk type verification
6079     for disk in self.instance.disks:
6080       if disk.dev_type == constants.LD_FILE:
6081         raise errors.OpPrereqError("Export not supported for instances with"
6082                                    " file-based disks")
6083
6084   def Exec(self, feedback_fn):
6085     """Export an instance to an image in the cluster.
6086
6087     """
6088     instance = self.instance
6089     dst_node = self.dst_node
6090     src_node = instance.primary_node
6091     if self.op.shutdown:
6092       # shutdown the instance, but not the disks
6093       result = self.rpc.call_instance_shutdown(src_node, instance)
6094       msg = result.RemoteFailMsg()
6095       if msg:
6096         raise errors.OpExecError("Could not shutdown instance %s on"
6097                                  " node %s: %s" %
6098                                  (instance.name, src_node, msg))
6099
6100     vgname = self.cfg.GetVGName()
6101
6102     snap_disks = []
6103
6104     # set the disks ID correctly since call_instance_start needs the
6105     # correct drbd minor to create the symlinks
6106     for disk in instance.disks:
6107       self.cfg.SetDiskID(disk, src_node)
6108
6109     try:
6110       for disk in instance.disks:
6111         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6112         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6113         if new_dev_name.failed or not new_dev_name.data:
6114           self.LogWarning("Could not snapshot block device %s on node %s",
6115                           disk.logical_id[1], src_node)
6116           snap_disks.append(False)
6117         else:
6118           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6119                                  logical_id=(vgname, new_dev_name.data),
6120                                  physical_id=(vgname, new_dev_name.data),
6121                                  iv_name=disk.iv_name)
6122           snap_disks.append(new_dev)
6123
6124     finally:
6125       if self.op.shutdown and instance.admin_up:
6126         result = self.rpc.call_instance_start(src_node, instance, None)
6127         msg = result.RemoteFailMsg()
6128         if msg:
6129           _ShutdownInstanceDisks(self, instance)
6130           raise errors.OpExecError("Could not start instance: %s" % msg)
6131
6132     # TODO: check for size
6133
6134     cluster_name = self.cfg.GetClusterName()
6135     for idx, dev in enumerate(snap_disks):
6136       if dev:
6137         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6138                                                instance, cluster_name, idx)
6139         if result.failed or not result.data:
6140           self.LogWarning("Could not export block device %s from node %s to"
6141                           " node %s", dev.logical_id[1], src_node,
6142                           dst_node.name)
6143         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6144         if msg:
6145           self.LogWarning("Could not remove snapshot block device %s from node"
6146                           " %s: %s", dev.logical_id[1], src_node, msg)
6147
6148     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6149     if result.failed or not result.data:
6150       self.LogWarning("Could not finalize export for instance %s on node %s",
6151                       instance.name, dst_node.name)
6152
6153     nodelist = self.cfg.GetNodeList()
6154     nodelist.remove(dst_node.name)
6155
6156     # on one-node clusters nodelist will be empty after the removal
6157     # if we proceed the backup would be removed because OpQueryExports
6158     # substitutes an empty list with the full cluster node list.
6159     if nodelist:
6160       exportlist = self.rpc.call_export_list(nodelist)
6161       for node in exportlist:
6162         if exportlist[node].failed:
6163           continue
6164         if instance.name in exportlist[node].data:
6165           if not self.rpc.call_export_remove(node, instance.name):
6166             self.LogWarning("Could not remove older export for instance %s"
6167                             " on node %s", instance.name, node)
6168
6169
6170 class LURemoveExport(NoHooksLU):
6171   """Remove exports related to the named instance.
6172
6173   """
6174   _OP_REQP = ["instance_name"]
6175   REQ_BGL = False
6176
6177   def ExpandNames(self):
6178     self.needed_locks = {}
6179     # We need all nodes to be locked in order for RemoveExport to work, but we
6180     # don't need to lock the instance itself, as nothing will happen to it (and
6181     # we can remove exports also for a removed instance)
6182     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6183
6184   def CheckPrereq(self):
6185     """Check prerequisites.
6186     """
6187     pass
6188
6189   def Exec(self, feedback_fn):
6190     """Remove any export.
6191
6192     """
6193     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6194     # If the instance was not found we'll try with the name that was passed in.
6195     # This will only work if it was an FQDN, though.
6196     fqdn_warn = False
6197     if not instance_name:
6198       fqdn_warn = True
6199       instance_name = self.op.instance_name
6200
6201     exportlist = self.rpc.call_export_list(self.acquired_locks[
6202       locking.LEVEL_NODE])
6203     found = False
6204     for node in exportlist:
6205       if exportlist[node].failed:
6206         self.LogWarning("Failed to query node %s, continuing" % node)
6207         continue
6208       if instance_name in exportlist[node].data:
6209         found = True
6210         result = self.rpc.call_export_remove(node, instance_name)
6211         if result.failed or not result.data:
6212           logging.error("Could not remove export for instance %s"
6213                         " on node %s", instance_name, node)
6214
6215     if fqdn_warn and not found:
6216       feedback_fn("Export not found. If trying to remove an export belonging"
6217                   " to a deleted instance please use its Fully Qualified"
6218                   " Domain Name.")
6219
6220
6221 class TagsLU(NoHooksLU):
6222   """Generic tags LU.
6223
6224   This is an abstract class which is the parent of all the other tags LUs.
6225
6226   """
6227
6228   def ExpandNames(self):
6229     self.needed_locks = {}
6230     if self.op.kind == constants.TAG_NODE:
6231       name = self.cfg.ExpandNodeName(self.op.name)
6232       if name is None:
6233         raise errors.OpPrereqError("Invalid node name (%s)" %
6234                                    (self.op.name,))
6235       self.op.name = name
6236       self.needed_locks[locking.LEVEL_NODE] = name
6237     elif self.op.kind == constants.TAG_INSTANCE:
6238       name = self.cfg.ExpandInstanceName(self.op.name)
6239       if name is None:
6240         raise errors.OpPrereqError("Invalid instance name (%s)" %
6241                                    (self.op.name,))
6242       self.op.name = name
6243       self.needed_locks[locking.LEVEL_INSTANCE] = name
6244
6245   def CheckPrereq(self):
6246     """Check prerequisites.
6247
6248     """
6249     if self.op.kind == constants.TAG_CLUSTER:
6250       self.target = self.cfg.GetClusterInfo()
6251     elif self.op.kind == constants.TAG_NODE:
6252       self.target = self.cfg.GetNodeInfo(self.op.name)
6253     elif self.op.kind == constants.TAG_INSTANCE:
6254       self.target = self.cfg.GetInstanceInfo(self.op.name)
6255     else:
6256       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6257                                  str(self.op.kind))
6258
6259
6260 class LUGetTags(TagsLU):
6261   """Returns the tags of a given object.
6262
6263   """
6264   _OP_REQP = ["kind", "name"]
6265   REQ_BGL = False
6266
6267   def Exec(self, feedback_fn):
6268     """Returns the tag list.
6269
6270     """
6271     return list(self.target.GetTags())
6272
6273
6274 class LUSearchTags(NoHooksLU):
6275   """Searches the tags for a given pattern.
6276
6277   """
6278   _OP_REQP = ["pattern"]
6279   REQ_BGL = False
6280
6281   def ExpandNames(self):
6282     self.needed_locks = {}
6283
6284   def CheckPrereq(self):
6285     """Check prerequisites.
6286
6287     This checks the pattern passed for validity by compiling it.
6288
6289     """
6290     try:
6291       self.re = re.compile(self.op.pattern)
6292     except re.error, err:
6293       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6294                                  (self.op.pattern, err))
6295
6296   def Exec(self, feedback_fn):
6297     """Returns the tag list.
6298
6299     """
6300     cfg = self.cfg
6301     tgts = [("/cluster", cfg.GetClusterInfo())]
6302     ilist = cfg.GetAllInstancesInfo().values()
6303     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6304     nlist = cfg.GetAllNodesInfo().values()
6305     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6306     results = []
6307     for path, target in tgts:
6308       for tag in target.GetTags():
6309         if self.re.search(tag):
6310           results.append((path, tag))
6311     return results
6312
6313
6314 class LUAddTags(TagsLU):
6315   """Sets a tag on a given object.
6316
6317   """
6318   _OP_REQP = ["kind", "name", "tags"]
6319   REQ_BGL = False
6320
6321   def CheckPrereq(self):
6322     """Check prerequisites.
6323
6324     This checks the type and length of the tag name and value.
6325
6326     """
6327     TagsLU.CheckPrereq(self)
6328     for tag in self.op.tags:
6329       objects.TaggableObject.ValidateTag(tag)
6330
6331   def Exec(self, feedback_fn):
6332     """Sets the tag.
6333
6334     """
6335     try:
6336       for tag in self.op.tags:
6337         self.target.AddTag(tag)
6338     except errors.TagError, err:
6339       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6340     try:
6341       self.cfg.Update(self.target)
6342     except errors.ConfigurationError:
6343       raise errors.OpRetryError("There has been a modification to the"
6344                                 " config file and the operation has been"
6345                                 " aborted. Please retry.")
6346
6347
6348 class LUDelTags(TagsLU):
6349   """Delete a list of tags from a given object.
6350
6351   """
6352   _OP_REQP = ["kind", "name", "tags"]
6353   REQ_BGL = False
6354
6355   def CheckPrereq(self):
6356     """Check prerequisites.
6357
6358     This checks that we have the given tag.
6359
6360     """
6361     TagsLU.CheckPrereq(self)
6362     for tag in self.op.tags:
6363       objects.TaggableObject.ValidateTag(tag)
6364     del_tags = frozenset(self.op.tags)
6365     cur_tags = self.target.GetTags()
6366     if not del_tags <= cur_tags:
6367       diff_tags = del_tags - cur_tags
6368       diff_names = ["'%s'" % tag for tag in diff_tags]
6369       diff_names.sort()
6370       raise errors.OpPrereqError("Tag(s) %s not found" %
6371                                  (",".join(diff_names)))
6372
6373   def Exec(self, feedback_fn):
6374     """Remove the tag from the object.
6375
6376     """
6377     for tag in self.op.tags:
6378       self.target.RemoveTag(tag)
6379     try:
6380       self.cfg.Update(self.target)
6381     except errors.ConfigurationError:
6382       raise errors.OpRetryError("There has been a modification to the"
6383                                 " config file and the operation has been"
6384                                 " aborted. Please retry.")
6385
6386
6387 class LUTestDelay(NoHooksLU):
6388   """Sleep for a specified amount of time.
6389
6390   This LU sleeps on the master and/or nodes for a specified amount of
6391   time.
6392
6393   """
6394   _OP_REQP = ["duration", "on_master", "on_nodes"]
6395   REQ_BGL = False
6396
6397   def ExpandNames(self):
6398     """Expand names and set required locks.
6399
6400     This expands the node list, if any.
6401
6402     """
6403     self.needed_locks = {}
6404     if self.op.on_nodes:
6405       # _GetWantedNodes can be used here, but is not always appropriate to use
6406       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6407       # more information.
6408       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6409       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6410
6411   def CheckPrereq(self):
6412     """Check prerequisites.
6413
6414     """
6415
6416   def Exec(self, feedback_fn):
6417     """Do the actual sleep.
6418
6419     """
6420     if self.op.on_master:
6421       if not utils.TestDelay(self.op.duration):
6422         raise errors.OpExecError("Error during master delay test")
6423     if self.op.on_nodes:
6424       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6425       if not result:
6426         raise errors.OpExecError("Complete failure from rpc call")
6427       for node, node_result in result.items():
6428         node_result.Raise()
6429         if not node_result.data:
6430           raise errors.OpExecError("Failure during rpc call to node %s,"
6431                                    " result: %s" % (node, node_result.data))
6432
6433
6434 class IAllocator(object):
6435   """IAllocator framework.
6436
6437   An IAllocator instance has three sets of attributes:
6438     - cfg that is needed to query the cluster
6439     - input data (all members of the _KEYS class attribute are required)
6440     - four buffer attributes (in|out_data|text), that represent the
6441       input (to the external script) in text and data structure format,
6442       and the output from it, again in two formats
6443     - the result variables from the script (success, info, nodes) for
6444       easy usage
6445
6446   """
6447   _ALLO_KEYS = [
6448     "mem_size", "disks", "disk_template",
6449     "os", "tags", "nics", "vcpus", "hypervisor",
6450     ]
6451   _RELO_KEYS = [
6452     "relocate_from",
6453     ]
6454
6455   def __init__(self, lu, mode, name, **kwargs):
6456     self.lu = lu
6457     # init buffer variables
6458     self.in_text = self.out_text = self.in_data = self.out_data = None
6459     # init all input fields so that pylint is happy
6460     self.mode = mode
6461     self.name = name
6462     self.mem_size = self.disks = self.disk_template = None
6463     self.os = self.tags = self.nics = self.vcpus = None
6464     self.hypervisor = None
6465     self.relocate_from = None
6466     # computed fields
6467     self.required_nodes = None
6468     # init result fields
6469     self.success = self.info = self.nodes = None
6470     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6471       keyset = self._ALLO_KEYS
6472     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6473       keyset = self._RELO_KEYS
6474     else:
6475       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6476                                    " IAllocator" % self.mode)
6477     for key in kwargs:
6478       if key not in keyset:
6479         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6480                                      " IAllocator" % key)
6481       setattr(self, key, kwargs[key])
6482     for key in keyset:
6483       if key not in kwargs:
6484         raise errors.ProgrammerError("Missing input parameter '%s' to"
6485                                      " IAllocator" % key)
6486     self._BuildInputData()
6487
6488   def _ComputeClusterData(self):
6489     """Compute the generic allocator input data.
6490
6491     This is the data that is independent of the actual operation.
6492
6493     """
6494     cfg = self.lu.cfg
6495     cluster_info = cfg.GetClusterInfo()
6496     # cluster data
6497     data = {
6498       "version": 1,
6499       "cluster_name": cfg.GetClusterName(),
6500       "cluster_tags": list(cluster_info.GetTags()),
6501       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6502       # we don't have job IDs
6503       }
6504     iinfo = cfg.GetAllInstancesInfo().values()
6505     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6506
6507     # node data
6508     node_results = {}
6509     node_list = cfg.GetNodeList()
6510
6511     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6512       hypervisor_name = self.hypervisor
6513     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6514       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6515
6516     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6517                                            hypervisor_name)
6518     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6519                        cluster_info.enabled_hypervisors)
6520     for nname, nresult in node_data.items():
6521       # first fill in static (config-based) values
6522       ninfo = cfg.GetNodeInfo(nname)
6523       pnr = {
6524         "tags": list(ninfo.GetTags()),
6525         "primary_ip": ninfo.primary_ip,
6526         "secondary_ip": ninfo.secondary_ip,
6527         "offline": ninfo.offline,
6528         "drained": ninfo.drained,
6529         "master_candidate": ninfo.master_candidate,
6530         }
6531
6532       if not ninfo.offline:
6533         nresult.Raise()
6534         if not isinstance(nresult.data, dict):
6535           raise errors.OpExecError("Can't get data for node %s" % nname)
6536         remote_info = nresult.data
6537         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6538                      'vg_size', 'vg_free', 'cpu_total']:
6539           if attr not in remote_info:
6540             raise errors.OpExecError("Node '%s' didn't return attribute"
6541                                      " '%s'" % (nname, attr))
6542           try:
6543             remote_info[attr] = int(remote_info[attr])
6544           except ValueError, err:
6545             raise errors.OpExecError("Node '%s' returned invalid value"
6546                                      " for '%s': %s" % (nname, attr, err))
6547         # compute memory used by primary instances
6548         i_p_mem = i_p_up_mem = 0
6549         for iinfo, beinfo in i_list:
6550           if iinfo.primary_node == nname:
6551             i_p_mem += beinfo[constants.BE_MEMORY]
6552             if iinfo.name not in node_iinfo[nname].data:
6553               i_used_mem = 0
6554             else:
6555               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6556             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6557             remote_info['memory_free'] -= max(0, i_mem_diff)
6558
6559             if iinfo.admin_up:
6560               i_p_up_mem += beinfo[constants.BE_MEMORY]
6561
6562         # compute memory used by instances
6563         pnr_dyn = {
6564           "total_memory": remote_info['memory_total'],
6565           "reserved_memory": remote_info['memory_dom0'],
6566           "free_memory": remote_info['memory_free'],
6567           "total_disk": remote_info['vg_size'],
6568           "free_disk": remote_info['vg_free'],
6569           "total_cpus": remote_info['cpu_total'],
6570           "i_pri_memory": i_p_mem,
6571           "i_pri_up_memory": i_p_up_mem,
6572           }
6573         pnr.update(pnr_dyn)
6574
6575       node_results[nname] = pnr
6576     data["nodes"] = node_results
6577
6578     # instance data
6579     instance_data = {}
6580     for iinfo, beinfo in i_list:
6581       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6582                   for n in iinfo.nics]
6583       pir = {
6584         "tags": list(iinfo.GetTags()),
6585         "admin_up": iinfo.admin_up,
6586         "vcpus": beinfo[constants.BE_VCPUS],
6587         "memory": beinfo[constants.BE_MEMORY],
6588         "os": iinfo.os,
6589         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6590         "nics": nic_data,
6591         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6592         "disk_template": iinfo.disk_template,
6593         "hypervisor": iinfo.hypervisor,
6594         }
6595       instance_data[iinfo.name] = pir
6596
6597     data["instances"] = instance_data
6598
6599     self.in_data = data
6600
6601   def _AddNewInstance(self):
6602     """Add new instance data to allocator structure.
6603
6604     This in combination with _AllocatorGetClusterData will create the
6605     correct structure needed as input for the allocator.
6606
6607     The checks for the completeness of the opcode must have already been
6608     done.
6609
6610     """
6611     data = self.in_data
6612
6613     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6614
6615     if self.disk_template in constants.DTS_NET_MIRROR:
6616       self.required_nodes = 2
6617     else:
6618       self.required_nodes = 1
6619     request = {
6620       "type": "allocate",
6621       "name": self.name,
6622       "disk_template": self.disk_template,
6623       "tags": self.tags,
6624       "os": self.os,
6625       "vcpus": self.vcpus,
6626       "memory": self.mem_size,
6627       "disks": self.disks,
6628       "disk_space_total": disk_space,
6629       "nics": self.nics,
6630       "required_nodes": self.required_nodes,
6631       }
6632     data["request"] = request
6633
6634   def _AddRelocateInstance(self):
6635     """Add relocate instance data to allocator structure.
6636
6637     This in combination with _IAllocatorGetClusterData will create the
6638     correct structure needed as input for the allocator.
6639
6640     The checks for the completeness of the opcode must have already been
6641     done.
6642
6643     """
6644     instance = self.lu.cfg.GetInstanceInfo(self.name)
6645     if instance is None:
6646       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6647                                    " IAllocator" % self.name)
6648
6649     if instance.disk_template not in constants.DTS_NET_MIRROR:
6650       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6651
6652     if len(instance.secondary_nodes) != 1:
6653       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6654
6655     self.required_nodes = 1
6656     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6657     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6658
6659     request = {
6660       "type": "relocate",
6661       "name": self.name,
6662       "disk_space_total": disk_space,
6663       "required_nodes": self.required_nodes,
6664       "relocate_from": self.relocate_from,
6665       }
6666     self.in_data["request"] = request
6667
6668   def _BuildInputData(self):
6669     """Build input data structures.
6670
6671     """
6672     self._ComputeClusterData()
6673
6674     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6675       self._AddNewInstance()
6676     else:
6677       self._AddRelocateInstance()
6678
6679     self.in_text = serializer.Dump(self.in_data)
6680
6681   def Run(self, name, validate=True, call_fn=None):
6682     """Run an instance allocator and return the results.
6683
6684     """
6685     if call_fn is None:
6686       call_fn = self.lu.rpc.call_iallocator_runner
6687     data = self.in_text
6688
6689     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6690     result.Raise()
6691
6692     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6693       raise errors.OpExecError("Invalid result from master iallocator runner")
6694
6695     rcode, stdout, stderr, fail = result.data
6696
6697     if rcode == constants.IARUN_NOTFOUND:
6698       raise errors.OpExecError("Can't find allocator '%s'" % name)
6699     elif rcode == constants.IARUN_FAILURE:
6700       raise errors.OpExecError("Instance allocator call failed: %s,"
6701                                " output: %s" % (fail, stdout+stderr))
6702     self.out_text = stdout
6703     if validate:
6704       self._ValidateResult()
6705
6706   def _ValidateResult(self):
6707     """Process the allocator results.
6708
6709     This will process and if successful save the result in
6710     self.out_data and the other parameters.
6711
6712     """
6713     try:
6714       rdict = serializer.Load(self.out_text)
6715     except Exception, err:
6716       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6717
6718     if not isinstance(rdict, dict):
6719       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6720
6721     for key in "success", "info", "nodes":
6722       if key not in rdict:
6723         raise errors.OpExecError("Can't parse iallocator results:"
6724                                  " missing key '%s'" % key)
6725       setattr(self, key, rdict[key])
6726
6727     if not isinstance(rdict["nodes"], list):
6728       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6729                                " is not a list")
6730     self.out_data = rdict
6731
6732
6733 class LUTestAllocator(NoHooksLU):
6734   """Run allocator tests.
6735
6736   This LU runs the allocator tests
6737
6738   """
6739   _OP_REQP = ["direction", "mode", "name"]
6740
6741   def CheckPrereq(self):
6742     """Check prerequisites.
6743
6744     This checks the opcode parameters depending on the director and mode test.
6745
6746     """
6747     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6748       for attr in ["name", "mem_size", "disks", "disk_template",
6749                    "os", "tags", "nics", "vcpus"]:
6750         if not hasattr(self.op, attr):
6751           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6752                                      attr)
6753       iname = self.cfg.ExpandInstanceName(self.op.name)
6754       if iname is not None:
6755         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6756                                    iname)
6757       if not isinstance(self.op.nics, list):
6758         raise errors.OpPrereqError("Invalid parameter 'nics'")
6759       for row in self.op.nics:
6760         if (not isinstance(row, dict) or
6761             "mac" not in row or
6762             "ip" not in row or
6763             "bridge" not in row):
6764           raise errors.OpPrereqError("Invalid contents of the"
6765                                      " 'nics' parameter")
6766       if not isinstance(self.op.disks, list):
6767         raise errors.OpPrereqError("Invalid parameter 'disks'")
6768       for row in self.op.disks:
6769         if (not isinstance(row, dict) or
6770             "size" not in row or
6771             not isinstance(row["size"], int) or
6772             "mode" not in row or
6773             row["mode"] not in ['r', 'w']):
6774           raise errors.OpPrereqError("Invalid contents of the"
6775                                      " 'disks' parameter")
6776       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6777         self.op.hypervisor = self.cfg.GetHypervisorType()
6778     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6779       if not hasattr(self.op, "name"):
6780         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6781       fname = self.cfg.ExpandInstanceName(self.op.name)
6782       if fname is None:
6783         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6784                                    self.op.name)
6785       self.op.name = fname
6786       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6787     else:
6788       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6789                                  self.op.mode)
6790
6791     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6792       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6793         raise errors.OpPrereqError("Missing allocator name")
6794     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6795       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6796                                  self.op.direction)
6797
6798   def Exec(self, feedback_fn):
6799     """Run the allocator test.
6800
6801     """
6802     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6803       ial = IAllocator(self,
6804                        mode=self.op.mode,
6805                        name=self.op.name,
6806                        mem_size=self.op.mem_size,
6807                        disks=self.op.disks,
6808                        disk_template=self.op.disk_template,
6809                        os=self.op.os,
6810                        tags=self.op.tags,
6811                        nics=self.op.nics,
6812                        vcpus=self.op.vcpus,
6813                        hypervisor=self.op.hypervisor,
6814                        )
6815     else:
6816       ial = IAllocator(self,
6817                        mode=self.op.mode,
6818                        name=self.op.name,
6819                        relocate_from=list(self.relocate_from),
6820                        )
6821
6822     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6823       result = ial.in_text
6824     else:
6825       ial.Run(self.op.allocator, validate=False)
6826       result = ial.out_text
6827     return result