b709455be0fddbdb5a7ba3c7728e4ca8ace8cb4a
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @rtype: dict
480   @return: the hook environment for this instance
481
482   """
483   if status:
484     str_status = "up"
485   else:
486     str_status = "down"
487   env = {
488     "OP_TARGET": name,
489     "INSTANCE_NAME": name,
490     "INSTANCE_PRIMARY": primary_node,
491     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492     "INSTANCE_OS_TYPE": os_type,
493     "INSTANCE_STATUS": str_status,
494     "INSTANCE_MEMORY": memory,
495     "INSTANCE_VCPUS": vcpus,
496   }
497
498   if nics:
499     nic_count = len(nics)
500     for idx, (ip, bridge, mac) in enumerate(nics):
501       if ip is None:
502         ip = ""
503       env["INSTANCE_NIC%d_IP" % idx] = ip
504       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506   else:
507     nic_count = 0
508
509   env["INSTANCE_NIC_COUNT"] = nic_count
510
511   return env
512
513
514 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515   """Builds instance related env variables for hooks from an object.
516
517   @type lu: L{LogicalUnit}
518   @param lu: the logical unit on whose behalf we execute
519   @type instance: L{objects.Instance}
520   @param instance: the instance for which we should build the
521       environment
522   @type override: dict
523   @param override: dictionary with key/values that will override
524       our values
525   @rtype: dict
526   @return: the hook environment dictionary
527
528   """
529   bep = lu.cfg.GetClusterInfo().FillBE(instance)
530   args = {
531     'name': instance.name,
532     'primary_node': instance.primary_node,
533     'secondary_nodes': instance.secondary_nodes,
534     'os_type': instance.os,
535     'status': instance.admin_up,
536     'memory': bep[constants.BE_MEMORY],
537     'vcpus': bep[constants.BE_VCPUS],
538     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539   }
540   if override:
541     args.update(override)
542   return _BuildInstanceHookEnv(**args)
543
544
545 def _AdjustCandidatePool(lu):
546   """Adjust the candidate pool after node operations.
547
548   """
549   mod_list = lu.cfg.MaintainCandidatePool()
550   if mod_list:
551     lu.LogInfo("Promoted nodes to master candidate role: %s",
552                ", ".join(node.name for node in mod_list))
553     for name in mod_list:
554       lu.context.ReaddNode(name)
555   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556   if mc_now > mc_max:
557     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558                (mc_now, mc_max))
559
560
561 def _CheckInstanceBridgesExist(lu, instance):
562   """Check that the brigdes needed by an instance exist.
563
564   """
565   # check bridges existance
566   brlist = [nic.bridge for nic in instance.nics]
567   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568   result.Raise()
569   if not result.data:
570     raise errors.OpPrereqError("One or more target bridges %s does not"
571                                " exist on destination node '%s'" %
572                                (brlist, instance.primary_node))
573
574
575 class LUDestroyCluster(NoHooksLU):
576   """Logical unit for destroying the cluster.
577
578   """
579   _OP_REQP = []
580
581   def CheckPrereq(self):
582     """Check prerequisites.
583
584     This checks whether the cluster is empty.
585
586     Any errors are signalled by raising errors.OpPrereqError.
587
588     """
589     master = self.cfg.GetMasterNode()
590
591     nodelist = self.cfg.GetNodeList()
592     if len(nodelist) != 1 or nodelist[0] != master:
593       raise errors.OpPrereqError("There are still %d node(s) in"
594                                  " this cluster." % (len(nodelist) - 1))
595     instancelist = self.cfg.GetInstanceList()
596     if instancelist:
597       raise errors.OpPrereqError("There are still %d instance(s) in"
598                                  " this cluster." % len(instancelist))
599
600   def Exec(self, feedback_fn):
601     """Destroys the cluster.
602
603     """
604     master = self.cfg.GetMasterNode()
605     result = self.rpc.call_node_stop_master(master, False)
606     result.Raise()
607     if not result.data:
608       raise errors.OpExecError("Could not disable the master role")
609     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610     utils.CreateBackup(priv_key)
611     utils.CreateBackup(pub_key)
612     return master
613
614
615 class LUVerifyCluster(LogicalUnit):
616   """Verifies the cluster status.
617
618   """
619   HPATH = "cluster-verify"
620   HTYPE = constants.HTYPE_CLUSTER
621   _OP_REQP = ["skip_checks"]
622   REQ_BGL = False
623
624   def ExpandNames(self):
625     self.needed_locks = {
626       locking.LEVEL_NODE: locking.ALL_SET,
627       locking.LEVEL_INSTANCE: locking.ALL_SET,
628     }
629     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630
631   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632                   node_result, feedback_fn, master_files,
633                   drbd_map):
634     """Run multiple tests against a node.
635
636     Test list:
637
638       - compares ganeti version
639       - checks vg existance and size > 20G
640       - checks config file checksum
641       - checks ssh to other nodes
642
643     @type nodeinfo: L{objects.Node}
644     @param nodeinfo: the node to check
645     @param file_list: required list of files
646     @param local_cksum: dictionary of local files and their checksums
647     @param node_result: the results from the node
648     @param feedback_fn: function used to accumulate results
649     @param master_files: list of files that only masters should have
650     @param drbd_map: the useddrbd minors for this node, in
651         form of minor: (instance, must_exist) which correspond to instances
652         and their running status
653
654     """
655     node = nodeinfo.name
656
657     # main result, node_result should be a non-empty dict
658     if not node_result or not isinstance(node_result, dict):
659       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660       return True
661
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     remote_version = node_result.get('version', None)
665     if not (remote_version and isinstance(remote_version, (list, tuple)) and
666             len(remote_version) == 2):
667       feedback_fn("  - ERROR: connection to %s failed" % (node))
668       return True
669
670     if local_version != remote_version[0]:
671       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672                   " node %s %s" % (local_version, node, remote_version[0]))
673       return True
674
675     # node seems compatible, we can actually try to look into its results
676
677     bad = False
678
679     # full package version
680     if constants.RELEASE_VERSION != remote_version[1]:
681       feedback_fn("  - WARNING: software version mismatch: master %s,"
682                   " node %s %s" %
683                   (constants.RELEASE_VERSION, node, remote_version[1]))
684
685     # checks vg existence and size > 20G
686
687     vglist = node_result.get(constants.NV_VGLIST, None)
688     if not vglist:
689       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690                       (node,))
691       bad = True
692     else:
693       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694                                             constants.MIN_VG_SIZE)
695       if vgstatus:
696         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697         bad = True
698
699     # checks config file checksum
700
701     remote_cksum = node_result.get(constants.NV_FILELIST, None)
702     if not isinstance(remote_cksum, dict):
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned file checksum data")
705     else:
706       for file_name in file_list:
707         node_is_mc = nodeinfo.master_candidate
708         must_have_file = file_name not in master_files
709         if file_name not in remote_cksum:
710           if node_is_mc or must_have_file:
711             bad = True
712             feedback_fn("  - ERROR: file '%s' missing" % file_name)
713         elif remote_cksum[file_name] != local_cksum[file_name]:
714           if node_is_mc or must_have_file:
715             bad = True
716             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717           else:
718             # not candidate and this is not a must-have file
719             bad = True
720             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721                         " '%s'" % file_name)
722         else:
723           # all good, except non-master/non-must have combination
724           if not node_is_mc and not must_have_file:
725             feedback_fn("  - ERROR: file '%s' should not exist on non master"
726                         " candidates" % file_name)
727
728     # checks ssh to any
729
730     if constants.NV_NODELIST not in node_result:
731       bad = True
732       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733     else:
734       if node_result[constants.NV_NODELIST]:
735         bad = True
736         for node in node_result[constants.NV_NODELIST]:
737           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738                           (node, node_result[constants.NV_NODELIST][node]))
739
740     if constants.NV_NODENETTEST not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743     else:
744       if node_result[constants.NV_NODENETTEST]:
745         bad = True
746         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747         for node in nlist:
748           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749                           (node, node_result[constants.NV_NODENETTEST][node]))
750
751     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752     if isinstance(hyp_result, dict):
753       for hv_name, hv_result in hyp_result.iteritems():
754         if hv_result is not None:
755           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756                       (hv_name, hv_result))
757
758     # check used drbd list
759     used_minors = node_result.get(constants.NV_DRBDLIST, [])
760     for minor, (iname, must_exist) in drbd_map.items():
761       if minor not in used_minors and must_exist:
762         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
763                     (minor, iname))
764         bad = True
765     for minor in used_minors:
766       if minor not in drbd_map:
767         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
768         bad = True
769
770     return bad
771
772   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
773                       node_instance, feedback_fn, n_offline):
774     """Verify an instance.
775
776     This function checks to see if the required block devices are
777     available on the instance's node.
778
779     """
780     bad = False
781
782     node_current = instanceconfig.primary_node
783
784     node_vol_should = {}
785     instanceconfig.MapLVsByNode(node_vol_should)
786
787     for node in node_vol_should:
788       if node in n_offline:
789         # ignore missing volumes on offline nodes
790         continue
791       for volume in node_vol_should[node]:
792         if node not in node_vol_is or volume not in node_vol_is[node]:
793           feedback_fn("  - ERROR: volume %s missing on node %s" %
794                           (volume, node))
795           bad = True
796
797     if instanceconfig.admin_up:
798       if ((node_current not in node_instance or
799           not instance in node_instance[node_current]) and
800           node_current not in n_offline):
801         feedback_fn("  - ERROR: instance %s not running on node %s" %
802                         (instance, node_current))
803         bad = True
804
805     for node in node_instance:
806       if (not node == node_current):
807         if instance in node_instance[node]:
808           feedback_fn("  - ERROR: instance %s should not run on node %s" %
809                           (instance, node))
810           bad = True
811
812     return bad
813
814   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
815     """Verify if there are any unknown volumes in the cluster.
816
817     The .os, .swap and backup volumes are ignored. All other volumes are
818     reported as unknown.
819
820     """
821     bad = False
822
823     for node in node_vol_is:
824       for volume in node_vol_is[node]:
825         if node not in node_vol_should or volume not in node_vol_should[node]:
826           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
827                       (volume, node))
828           bad = True
829     return bad
830
831   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
832     """Verify the list of running instances.
833
834     This checks what instances are running but unknown to the cluster.
835
836     """
837     bad = False
838     for node in node_instance:
839       for runninginstance in node_instance[node]:
840         if runninginstance not in instancelist:
841           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
842                           (runninginstance, node))
843           bad = True
844     return bad
845
846   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
847     """Verify N+1 Memory Resilience.
848
849     Check that if one single node dies we can still start all the instances it
850     was primary for.
851
852     """
853     bad = False
854
855     for node, nodeinfo in node_info.iteritems():
856       # This code checks that every node which is now listed as secondary has
857       # enough memory to host all instances it is supposed to should a single
858       # other node in the cluster fail.
859       # FIXME: not ready for failover to an arbitrary node
860       # FIXME: does not support file-backed instances
861       # WARNING: we currently take into account down instances as well as up
862       # ones, considering that even if they're down someone might want to start
863       # them even in the event of a node failure.
864       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
865         needed_mem = 0
866         for instance in instances:
867           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
868           if bep[constants.BE_AUTO_BALANCE]:
869             needed_mem += bep[constants.BE_MEMORY]
870         if nodeinfo['mfree'] < needed_mem:
871           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
872                       " failovers should node %s fail" % (node, prinode))
873           bad = True
874     return bad
875
876   def CheckPrereq(self):
877     """Check prerequisites.
878
879     Transform the list of checks we're going to skip into a set and check that
880     all its members are valid.
881
882     """
883     self.skip_set = frozenset(self.op.skip_checks)
884     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
885       raise errors.OpPrereqError("Invalid checks to be skipped specified")
886
887   def BuildHooksEnv(self):
888     """Build hooks env.
889
890     Cluster-Verify hooks just rone in the post phase and their failure makes
891     the output be logged in the verify output and the verification to fail.
892
893     """
894     all_nodes = self.cfg.GetNodeList()
895     # TODO: populate the environment with useful information for verify hooks
896     env = {}
897     return env, [], all_nodes
898
899   def Exec(self, feedback_fn):
900     """Verify integrity of cluster, performing various test on nodes.
901
902     """
903     bad = False
904     feedback_fn("* Verifying global settings")
905     for msg in self.cfg.VerifyConfig():
906       feedback_fn("  - ERROR: %s" % msg)
907
908     vg_name = self.cfg.GetVGName()
909     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
910     nodelist = utils.NiceSort(self.cfg.GetNodeList())
911     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
912     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
913     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
914                         for iname in instancelist)
915     i_non_redundant = [] # Non redundant instances
916     i_non_a_balanced = [] # Non auto-balanced instances
917     n_offline = [] # List of offline nodes
918     n_drained = [] # List of nodes being drained
919     node_volume = {}
920     node_instance = {}
921     node_info = {}
922     instance_cfg = {}
923
924     # FIXME: verify OS list
925     # do local checksums
926     master_files = [constants.CLUSTER_CONF_FILE]
927
928     file_names = ssconf.SimpleStore().GetFileList()
929     file_names.append(constants.SSL_CERT_FILE)
930     file_names.append(constants.RAPI_CERT_FILE)
931     file_names.extend(master_files)
932
933     local_checksums = utils.FingerprintFiles(file_names)
934
935     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
936     node_verify_param = {
937       constants.NV_FILELIST: file_names,
938       constants.NV_NODELIST: [node.name for node in nodeinfo
939                               if not node.offline],
940       constants.NV_HYPERVISOR: hypervisors,
941       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
942                                   node.secondary_ip) for node in nodeinfo
943                                  if not node.offline],
944       constants.NV_LVLIST: vg_name,
945       constants.NV_INSTANCELIST: hypervisors,
946       constants.NV_VGLIST: None,
947       constants.NV_VERSION: None,
948       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
949       constants.NV_DRBDLIST: None,
950       }
951     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
952                                            self.cfg.GetClusterName())
953
954     cluster = self.cfg.GetClusterInfo()
955     master_node = self.cfg.GetMasterNode()
956     all_drbd_map = self.cfg.ComputeDRBDMap()
957
958     for node_i in nodeinfo:
959       node = node_i.name
960       nresult = all_nvinfo[node].data
961
962       if node_i.offline:
963         feedback_fn("* Skipping offline node %s" % (node,))
964         n_offline.append(node)
965         continue
966
967       if node == master_node:
968         ntype = "master"
969       elif node_i.master_candidate:
970         ntype = "master candidate"
971       elif node_i.drained:
972         ntype = "drained"
973         n_drained.append(node)
974       else:
975         ntype = "regular"
976       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
977
978       if all_nvinfo[node].failed or not isinstance(nresult, dict):
979         feedback_fn("  - ERROR: connection to %s failed" % (node,))
980         bad = True
981         continue
982
983       node_drbd = {}
984       for minor, instance in all_drbd_map[node].items():
985         instance = instanceinfo[instance]
986         node_drbd[minor] = (instance.name, instance.admin_up)
987       result = self._VerifyNode(node_i, file_names, local_checksums,
988                                 nresult, feedback_fn, master_files,
989                                 node_drbd)
990       bad = bad or result
991
992       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
993       if isinstance(lvdata, basestring):
994         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
995                     (node, utils.SafeEncode(lvdata)))
996         bad = True
997         node_volume[node] = {}
998       elif not isinstance(lvdata, dict):
999         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1000         bad = True
1001         continue
1002       else:
1003         node_volume[node] = lvdata
1004
1005       # node_instance
1006       idata = nresult.get(constants.NV_INSTANCELIST, None)
1007       if not isinstance(idata, list):
1008         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1009                     (node,))
1010         bad = True
1011         continue
1012
1013       node_instance[node] = idata
1014
1015       # node_info
1016       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1017       if not isinstance(nodeinfo, dict):
1018         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1019         bad = True
1020         continue
1021
1022       try:
1023         node_info[node] = {
1024           "mfree": int(nodeinfo['memory_free']),
1025           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1026           "pinst": [],
1027           "sinst": [],
1028           # dictionary holding all instances this node is secondary for,
1029           # grouped by their primary node. Each key is a cluster node, and each
1030           # value is a list of instances which have the key as primary and the
1031           # current node as secondary.  this is handy to calculate N+1 memory
1032           # availability if you can only failover from a primary to its
1033           # secondary.
1034           "sinst-by-pnode": {},
1035         }
1036       except ValueError:
1037         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1038         bad = True
1039         continue
1040
1041     node_vol_should = {}
1042
1043     for instance in instancelist:
1044       feedback_fn("* Verifying instance %s" % instance)
1045       inst_config = instanceinfo[instance]
1046       result =  self._VerifyInstance(instance, inst_config, node_volume,
1047                                      node_instance, feedback_fn, n_offline)
1048       bad = bad or result
1049       inst_nodes_offline = []
1050
1051       inst_config.MapLVsByNode(node_vol_should)
1052
1053       instance_cfg[instance] = inst_config
1054
1055       pnode = inst_config.primary_node
1056       if pnode in node_info:
1057         node_info[pnode]['pinst'].append(instance)
1058       elif pnode not in n_offline:
1059         feedback_fn("  - ERROR: instance %s, connection to primary node"
1060                     " %s failed" % (instance, pnode))
1061         bad = True
1062
1063       if pnode in n_offline:
1064         inst_nodes_offline.append(pnode)
1065
1066       # If the instance is non-redundant we cannot survive losing its primary
1067       # node, so we are not N+1 compliant. On the other hand we have no disk
1068       # templates with more than one secondary so that situation is not well
1069       # supported either.
1070       # FIXME: does not support file-backed instances
1071       if len(inst_config.secondary_nodes) == 0:
1072         i_non_redundant.append(instance)
1073       elif len(inst_config.secondary_nodes) > 1:
1074         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1075                     % instance)
1076
1077       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1078         i_non_a_balanced.append(instance)
1079
1080       for snode in inst_config.secondary_nodes:
1081         if snode in node_info:
1082           node_info[snode]['sinst'].append(instance)
1083           if pnode not in node_info[snode]['sinst-by-pnode']:
1084             node_info[snode]['sinst-by-pnode'][pnode] = []
1085           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1086         elif snode not in n_offline:
1087           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1088                       " %s failed" % (instance, snode))
1089           bad = True
1090         if snode in n_offline:
1091           inst_nodes_offline.append(snode)
1092
1093       if inst_nodes_offline:
1094         # warn that the instance lives on offline nodes, and set bad=True
1095         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1096                     ", ".join(inst_nodes_offline))
1097         bad = True
1098
1099     feedback_fn("* Verifying orphan volumes")
1100     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1101                                        feedback_fn)
1102     bad = bad or result
1103
1104     feedback_fn("* Verifying remaining instances")
1105     result = self._VerifyOrphanInstances(instancelist, node_instance,
1106                                          feedback_fn)
1107     bad = bad or result
1108
1109     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1110       feedback_fn("* Verifying N+1 Memory redundancy")
1111       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1112       bad = bad or result
1113
1114     feedback_fn("* Other Notes")
1115     if i_non_redundant:
1116       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1117                   % len(i_non_redundant))
1118
1119     if i_non_a_balanced:
1120       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1121                   % len(i_non_a_balanced))
1122
1123     if n_offline:
1124       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1125
1126     if n_drained:
1127       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1128
1129     return not bad
1130
1131   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1132     """Analize the post-hooks' result
1133
1134     This method analyses the hook result, handles it, and sends some
1135     nicely-formatted feedback back to the user.
1136
1137     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1138         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1139     @param hooks_results: the results of the multi-node hooks rpc call
1140     @param feedback_fn: function used send feedback back to the caller
1141     @param lu_result: previous Exec result
1142     @return: the new Exec result, based on the previous result
1143         and hook results
1144
1145     """
1146     # We only really run POST phase hooks, and are only interested in
1147     # their results
1148     if phase == constants.HOOKS_PHASE_POST:
1149       # Used to change hooks' output to proper indentation
1150       indent_re = re.compile('^', re.M)
1151       feedback_fn("* Hooks Results")
1152       if not hooks_results:
1153         feedback_fn("  - ERROR: general communication failure")
1154         lu_result = 1
1155       else:
1156         for node_name in hooks_results:
1157           show_node_header = True
1158           res = hooks_results[node_name]
1159           if res.failed or res.data is False or not isinstance(res.data, list):
1160             if res.offline:
1161               # no need to warn or set fail return value
1162               continue
1163             feedback_fn("    Communication failure in hooks execution")
1164             lu_result = 1
1165             continue
1166           for script, hkr, output in res.data:
1167             if hkr == constants.HKR_FAIL:
1168               # The node header is only shown once, if there are
1169               # failing hooks on that node
1170               if show_node_header:
1171                 feedback_fn("  Node %s:" % node_name)
1172                 show_node_header = False
1173               feedback_fn("    ERROR: Script %s failed, output:" % script)
1174               output = indent_re.sub('      ', output)
1175               feedback_fn("%s" % output)
1176               lu_result = 1
1177
1178       return lu_result
1179
1180
1181 class LUVerifyDisks(NoHooksLU):
1182   """Verifies the cluster disks status.
1183
1184   """
1185   _OP_REQP = []
1186   REQ_BGL = False
1187
1188   def ExpandNames(self):
1189     self.needed_locks = {
1190       locking.LEVEL_NODE: locking.ALL_SET,
1191       locking.LEVEL_INSTANCE: locking.ALL_SET,
1192     }
1193     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This has no prerequisites.
1199
1200     """
1201     pass
1202
1203   def Exec(self, feedback_fn):
1204     """Verify integrity of cluster disks.
1205
1206     """
1207     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1208
1209     vg_name = self.cfg.GetVGName()
1210     nodes = utils.NiceSort(self.cfg.GetNodeList())
1211     instances = [self.cfg.GetInstanceInfo(name)
1212                  for name in self.cfg.GetInstanceList()]
1213
1214     nv_dict = {}
1215     for inst in instances:
1216       inst_lvs = {}
1217       if (not inst.admin_up or
1218           inst.disk_template not in constants.DTS_NET_MIRROR):
1219         continue
1220       inst.MapLVsByNode(inst_lvs)
1221       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1222       for node, vol_list in inst_lvs.iteritems():
1223         for vol in vol_list:
1224           nv_dict[(node, vol)] = inst
1225
1226     if not nv_dict:
1227       return result
1228
1229     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1230
1231     to_act = set()
1232     for node in nodes:
1233       # node_volume
1234       lvs = node_lvs[node]
1235       if lvs.failed:
1236         if not lvs.offline:
1237           self.LogWarning("Connection to node %s failed: %s" %
1238                           (node, lvs.data))
1239         continue
1240       lvs = lvs.data
1241       if isinstance(lvs, basestring):
1242         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1243         res_nlvm[node] = lvs
1244       elif not isinstance(lvs, dict):
1245         logging.warning("Connection to node %s failed or invalid data"
1246                         " returned", node)
1247         res_nodes.append(node)
1248         continue
1249
1250       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1251         inst = nv_dict.pop((node, lv_name), None)
1252         if (not lv_online and inst is not None
1253             and inst.name not in res_instances):
1254           res_instances.append(inst.name)
1255
1256     # any leftover items in nv_dict are missing LVs, let's arrange the
1257     # data better
1258     for key, inst in nv_dict.iteritems():
1259       if inst.name not in res_missing:
1260         res_missing[inst.name] = []
1261       res_missing[inst.name].append(key)
1262
1263     return result
1264
1265
1266 class LURenameCluster(LogicalUnit):
1267   """Rename the cluster.
1268
1269   """
1270   HPATH = "cluster-rename"
1271   HTYPE = constants.HTYPE_CLUSTER
1272   _OP_REQP = ["name"]
1273
1274   def BuildHooksEnv(self):
1275     """Build hooks env.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.cfg.GetClusterName(),
1280       "NEW_NAME": self.op.name,
1281       }
1282     mn = self.cfg.GetMasterNode()
1283     return env, [mn], [mn]
1284
1285   def CheckPrereq(self):
1286     """Verify that the passed name is a valid one.
1287
1288     """
1289     hostname = utils.HostInfo(self.op.name)
1290
1291     new_name = hostname.name
1292     self.ip = new_ip = hostname.ip
1293     old_name = self.cfg.GetClusterName()
1294     old_ip = self.cfg.GetMasterIP()
1295     if new_name == old_name and new_ip == old_ip:
1296       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1297                                  " cluster has changed")
1298     if new_ip != old_ip:
1299       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1300         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1301                                    " reachable on the network. Aborting." %
1302                                    new_ip)
1303
1304     self.op.name = new_name
1305
1306   def Exec(self, feedback_fn):
1307     """Rename the cluster.
1308
1309     """
1310     clustername = self.op.name
1311     ip = self.ip
1312
1313     # shutdown the master IP
1314     master = self.cfg.GetMasterNode()
1315     result = self.rpc.call_node_stop_master(master, False)
1316     if result.failed or not result.data:
1317       raise errors.OpExecError("Could not disable the master role")
1318
1319     try:
1320       cluster = self.cfg.GetClusterInfo()
1321       cluster.cluster_name = clustername
1322       cluster.master_ip = ip
1323       self.cfg.Update(cluster)
1324
1325       # update the known hosts file
1326       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1327       node_list = self.cfg.GetNodeList()
1328       try:
1329         node_list.remove(master)
1330       except ValueError:
1331         pass
1332       result = self.rpc.call_upload_file(node_list,
1333                                          constants.SSH_KNOWN_HOSTS_FILE)
1334       for to_node, to_result in result.iteritems():
1335         if to_result.failed or not to_result.data:
1336           logging.error("Copy of file %s to node %s failed",
1337                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1338
1339     finally:
1340       result = self.rpc.call_node_start_master(master, False)
1341       if result.failed or not result.data:
1342         self.LogWarning("Could not re-enable the master role on"
1343                         " the master, please restart manually.")
1344
1345
1346 def _RecursiveCheckIfLVMBased(disk):
1347   """Check if the given disk or its children are lvm-based.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to check
1351   @rtype: booleean
1352   @return: boolean indicating whether a LD_LV dev_type was found or not
1353
1354   """
1355   if disk.children:
1356     for chdisk in disk.children:
1357       if _RecursiveCheckIfLVMBased(chdisk):
1358         return True
1359   return disk.dev_type == constants.LD_LV
1360
1361
1362 class LUSetClusterParams(LogicalUnit):
1363   """Change the parameters of the cluster.
1364
1365   """
1366   HPATH = "cluster-modify"
1367   HTYPE = constants.HTYPE_CLUSTER
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def CheckParameters(self):
1372     """Check parameters
1373
1374     """
1375     if not hasattr(self.op, "candidate_pool_size"):
1376       self.op.candidate_pool_size = None
1377     if self.op.candidate_pool_size is not None:
1378       try:
1379         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1380       except ValueError, err:
1381         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1382                                    str(err))
1383       if self.op.candidate_pool_size < 1:
1384         raise errors.OpPrereqError("At least one master candidate needed")
1385
1386   def ExpandNames(self):
1387     # FIXME: in the future maybe other cluster params won't require checking on
1388     # all nodes to be modified.
1389     self.needed_locks = {
1390       locking.LEVEL_NODE: locking.ALL_SET,
1391     }
1392     self.share_locks[locking.LEVEL_NODE] = 1
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     """
1398     env = {
1399       "OP_TARGET": self.cfg.GetClusterName(),
1400       "NEW_VG_NAME": self.op.vg_name,
1401       }
1402     mn = self.cfg.GetMasterNode()
1403     return env, [mn], [mn]
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     This checks whether the given params don't conflict and
1409     if the given volume group is valid.
1410
1411     """
1412     if self.op.vg_name is not None and not self.op.vg_name:
1413       instances = self.cfg.GetAllInstancesInfo().values()
1414       for inst in instances:
1415         for disk in inst.disks:
1416           if _RecursiveCheckIfLVMBased(disk):
1417             raise errors.OpPrereqError("Cannot disable lvm storage while"
1418                                        " lvm-based instances exist")
1419
1420     node_list = self.acquired_locks[locking.LEVEL_NODE]
1421
1422     # if vg_name not None, checks given volume group on all nodes
1423     if self.op.vg_name:
1424       vglist = self.rpc.call_vg_list(node_list)
1425       for node in node_list:
1426         if vglist[node].failed:
1427           # ignoring down node
1428           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1429           continue
1430         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1431                                               self.op.vg_name,
1432                                               constants.MIN_VG_SIZE)
1433         if vgstatus:
1434           raise errors.OpPrereqError("Error on node '%s': %s" %
1435                                      (node, vgstatus))
1436
1437     self.cluster = cluster = self.cfg.GetClusterInfo()
1438     # validate beparams changes
1439     if self.op.beparams:
1440       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1441       self.new_beparams = cluster.FillDict(
1442         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1443
1444     # hypervisor list/parameters
1445     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1446     if self.op.hvparams:
1447       if not isinstance(self.op.hvparams, dict):
1448         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1449       for hv_name, hv_dict in self.op.hvparams.items():
1450         if hv_name not in self.new_hvparams:
1451           self.new_hvparams[hv_name] = hv_dict
1452         else:
1453           self.new_hvparams[hv_name].update(hv_dict)
1454
1455     if self.op.enabled_hypervisors is not None:
1456       self.hv_list = self.op.enabled_hypervisors
1457     else:
1458       self.hv_list = cluster.enabled_hypervisors
1459
1460     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1461       # either the enabled list has changed, or the parameters have, validate
1462       for hv_name, hv_params in self.new_hvparams.items():
1463         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1464             (self.op.enabled_hypervisors and
1465              hv_name in self.op.enabled_hypervisors)):
1466           # either this is a new hypervisor, or its parameters have changed
1467           hv_class = hypervisor.GetHypervisor(hv_name)
1468           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1469           hv_class.CheckParameterSyntax(hv_params)
1470           _CheckHVParams(self, node_list, hv_name, hv_params)
1471
1472   def Exec(self, feedback_fn):
1473     """Change the parameters of the cluster.
1474
1475     """
1476     if self.op.vg_name is not None:
1477       if self.op.vg_name != self.cfg.GetVGName():
1478         self.cfg.SetVGName(self.op.vg_name)
1479       else:
1480         feedback_fn("Cluster LVM configuration already in desired"
1481                     " state, not changing")
1482     if self.op.hvparams:
1483       self.cluster.hvparams = self.new_hvparams
1484     if self.op.enabled_hypervisors is not None:
1485       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1486     if self.op.beparams:
1487       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1488     if self.op.candidate_pool_size is not None:
1489       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1490
1491     self.cfg.Update(self.cluster)
1492
1493     # we want to update nodes after the cluster so that if any errors
1494     # happen, we have recorded and saved the cluster info
1495     if self.op.candidate_pool_size is not None:
1496       _AdjustCandidatePool(self)
1497
1498
1499 class LURedistributeConfig(NoHooksLU):
1500   """Force the redistribution of cluster configuration.
1501
1502   This is a very simple LU.
1503
1504   """
1505   _OP_REQP = []
1506   REQ_BGL = False
1507
1508   def ExpandNames(self):
1509     self.needed_locks = {
1510       locking.LEVEL_NODE: locking.ALL_SET,
1511     }
1512     self.share_locks[locking.LEVEL_NODE] = 1
1513
1514   def CheckPrereq(self):
1515     """Check prerequisites.
1516
1517     """
1518
1519   def Exec(self, feedback_fn):
1520     """Redistribute the configuration.
1521
1522     """
1523     self.cfg.Update(self.cfg.GetClusterInfo())
1524
1525
1526 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1527   """Sleep and poll for an instance's disk to sync.
1528
1529   """
1530   if not instance.disks:
1531     return True
1532
1533   if not oneshot:
1534     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1535
1536   node = instance.primary_node
1537
1538   for dev in instance.disks:
1539     lu.cfg.SetDiskID(dev, node)
1540
1541   retries = 0
1542   while True:
1543     max_time = 0
1544     done = True
1545     cumul_degraded = False
1546     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1547     if rstats.failed or not rstats.data:
1548       lu.LogWarning("Can't get any data from node %s", node)
1549       retries += 1
1550       if retries >= 10:
1551         raise errors.RemoteError("Can't contact node %s for mirror data,"
1552                                  " aborting." % node)
1553       time.sleep(6)
1554       continue
1555     rstats = rstats.data
1556     retries = 0
1557     for i, mstat in enumerate(rstats):
1558       if mstat is None:
1559         lu.LogWarning("Can't compute data for node %s/%s",
1560                            node, instance.disks[i].iv_name)
1561         continue
1562       # we ignore the ldisk parameter
1563       perc_done, est_time, is_degraded, _ = mstat
1564       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1565       if perc_done is not None:
1566         done = False
1567         if est_time is not None:
1568           rem_time = "%d estimated seconds remaining" % est_time
1569           max_time = est_time
1570         else:
1571           rem_time = "no time estimate"
1572         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1573                         (instance.disks[i].iv_name, perc_done, rem_time))
1574     if done or oneshot:
1575       break
1576
1577     time.sleep(min(60, max_time))
1578
1579   if done:
1580     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1581   return not cumul_degraded
1582
1583
1584 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1585   """Check that mirrors are not degraded.
1586
1587   The ldisk parameter, if True, will change the test from the
1588   is_degraded attribute (which represents overall non-ok status for
1589   the device(s)) to the ldisk (representing the local storage status).
1590
1591   """
1592   lu.cfg.SetDiskID(dev, node)
1593   if ldisk:
1594     idx = 6
1595   else:
1596     idx = 5
1597
1598   result = True
1599   if on_primary or dev.AssembleOnSecondary():
1600     rstats = lu.rpc.call_blockdev_find(node, dev)
1601     msg = rstats.RemoteFailMsg()
1602     if msg:
1603       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1604       result = False
1605     elif not rstats.payload:
1606       lu.LogWarning("Can't find disk on node %s", node)
1607       result = False
1608     else:
1609       result = result and (not rstats.payload[idx])
1610   if dev.children:
1611     for child in dev.children:
1612       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1613
1614   return result
1615
1616
1617 class LUDiagnoseOS(NoHooksLU):
1618   """Logical unit for OS diagnose/query.
1619
1620   """
1621   _OP_REQP = ["output_fields", "names"]
1622   REQ_BGL = False
1623   _FIELDS_STATIC = utils.FieldSet()
1624   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1625
1626   def ExpandNames(self):
1627     if self.op.names:
1628       raise errors.OpPrereqError("Selective OS query not supported")
1629
1630     _CheckOutputFields(static=self._FIELDS_STATIC,
1631                        dynamic=self._FIELDS_DYNAMIC,
1632                        selected=self.op.output_fields)
1633
1634     # Lock all nodes, in shared mode
1635     self.needed_locks = {}
1636     self.share_locks[locking.LEVEL_NODE] = 1
1637     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1638
1639   def CheckPrereq(self):
1640     """Check prerequisites.
1641
1642     """
1643
1644   @staticmethod
1645   def _DiagnoseByOS(node_list, rlist):
1646     """Remaps a per-node return list into an a per-os per-node dictionary
1647
1648     @param node_list: a list with the names of all nodes
1649     @param rlist: a map with node names as keys and OS objects as values
1650
1651     @rtype: dict
1652     @returns: a dictionary with osnames as keys and as value another map, with
1653         nodes as keys and list of OS objects as values, eg::
1654
1655           {"debian-etch": {"node1": [<object>,...],
1656                            "node2": [<object>,]}
1657           }
1658
1659     """
1660     all_os = {}
1661     for node_name, nr in rlist.iteritems():
1662       if nr.failed or not nr.data:
1663         continue
1664       for os_obj in nr.data:
1665         if os_obj.name not in all_os:
1666           # build a list of nodes for this os containing empty lists
1667           # for each node in node_list
1668           all_os[os_obj.name] = {}
1669           for nname in node_list:
1670             all_os[os_obj.name][nname] = []
1671         all_os[os_obj.name][node_name].append(os_obj)
1672     return all_os
1673
1674   def Exec(self, feedback_fn):
1675     """Compute the list of OSes.
1676
1677     """
1678     node_list = self.acquired_locks[locking.LEVEL_NODE]
1679     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1680                    if node in node_list]
1681     node_data = self.rpc.call_os_diagnose(valid_nodes)
1682     if node_data == False:
1683       raise errors.OpExecError("Can't gather the list of OSes")
1684     pol = self._DiagnoseByOS(valid_nodes, node_data)
1685     output = []
1686     for os_name, os_data in pol.iteritems():
1687       row = []
1688       for field in self.op.output_fields:
1689         if field == "name":
1690           val = os_name
1691         elif field == "valid":
1692           val = utils.all([osl and osl[0] for osl in os_data.values()])
1693         elif field == "node_status":
1694           val = {}
1695           for node_name, nos_list in os_data.iteritems():
1696             val[node_name] = [(v.status, v.path) for v in nos_list]
1697         else:
1698           raise errors.ParameterError(field)
1699         row.append(val)
1700       output.append(row)
1701
1702     return output
1703
1704
1705 class LURemoveNode(LogicalUnit):
1706   """Logical unit for removing a node.
1707
1708   """
1709   HPATH = "node-remove"
1710   HTYPE = constants.HTYPE_NODE
1711   _OP_REQP = ["node_name"]
1712
1713   def BuildHooksEnv(self):
1714     """Build hooks env.
1715
1716     This doesn't run on the target node in the pre phase as a failed
1717     node would then be impossible to remove.
1718
1719     """
1720     env = {
1721       "OP_TARGET": self.op.node_name,
1722       "NODE_NAME": self.op.node_name,
1723       }
1724     all_nodes = self.cfg.GetNodeList()
1725     all_nodes.remove(self.op.node_name)
1726     return env, all_nodes, all_nodes
1727
1728   def CheckPrereq(self):
1729     """Check prerequisites.
1730
1731     This checks:
1732      - the node exists in the configuration
1733      - it does not have primary or secondary instances
1734      - it's not the master
1735
1736     Any errors are signalled by raising errors.OpPrereqError.
1737
1738     """
1739     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1740     if node is None:
1741       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1742
1743     instance_list = self.cfg.GetInstanceList()
1744
1745     masternode = self.cfg.GetMasterNode()
1746     if node.name == masternode:
1747       raise errors.OpPrereqError("Node is the master node,"
1748                                  " you need to failover first.")
1749
1750     for instance_name in instance_list:
1751       instance = self.cfg.GetInstanceInfo(instance_name)
1752       if node.name in instance.all_nodes:
1753         raise errors.OpPrereqError("Instance %s is still running on the node,"
1754                                    " please remove first." % instance_name)
1755     self.op.node_name = node.name
1756     self.node = node
1757
1758   def Exec(self, feedback_fn):
1759     """Removes the node from the cluster.
1760
1761     """
1762     node = self.node
1763     logging.info("Stopping the node daemon and removing configs from node %s",
1764                  node.name)
1765
1766     self.context.RemoveNode(node.name)
1767
1768     self.rpc.call_node_leave_cluster(node.name)
1769
1770     # Promote nodes to master candidate as needed
1771     _AdjustCandidatePool(self)
1772
1773
1774 class LUQueryNodes(NoHooksLU):
1775   """Logical unit for querying nodes.
1776
1777   """
1778   _OP_REQP = ["output_fields", "names", "use_locking"]
1779   REQ_BGL = False
1780   _FIELDS_DYNAMIC = utils.FieldSet(
1781     "dtotal", "dfree",
1782     "mtotal", "mnode", "mfree",
1783     "bootid",
1784     "ctotal", "cnodes", "csockets",
1785     )
1786
1787   _FIELDS_STATIC = utils.FieldSet(
1788     "name", "pinst_cnt", "sinst_cnt",
1789     "pinst_list", "sinst_list",
1790     "pip", "sip", "tags",
1791     "serial_no",
1792     "master_candidate",
1793     "master",
1794     "offline",
1795     "drained",
1796     )
1797
1798   def ExpandNames(self):
1799     _CheckOutputFields(static=self._FIELDS_STATIC,
1800                        dynamic=self._FIELDS_DYNAMIC,
1801                        selected=self.op.output_fields)
1802
1803     self.needed_locks = {}
1804     self.share_locks[locking.LEVEL_NODE] = 1
1805
1806     if self.op.names:
1807       self.wanted = _GetWantedNodes(self, self.op.names)
1808     else:
1809       self.wanted = locking.ALL_SET
1810
1811     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1812     self.do_locking = self.do_node_query and self.op.use_locking
1813     if self.do_locking:
1814       # if we don't request only static fields, we need to lock the nodes
1815       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1816
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     """
1822     # The validation of the node list is done in the _GetWantedNodes,
1823     # if non empty, and if empty, there's no validation to do
1824     pass
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     all_info = self.cfg.GetAllNodesInfo()
1831     if self.do_locking:
1832       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1833     elif self.wanted != locking.ALL_SET:
1834       nodenames = self.wanted
1835       missing = set(nodenames).difference(all_info.keys())
1836       if missing:
1837         raise errors.OpExecError(
1838           "Some nodes were removed before retrieving their data: %s" % missing)
1839     else:
1840       nodenames = all_info.keys()
1841
1842     nodenames = utils.NiceSort(nodenames)
1843     nodelist = [all_info[name] for name in nodenames]
1844
1845     # begin data gathering
1846
1847     if self.do_node_query:
1848       live_data = {}
1849       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1850                                           self.cfg.GetHypervisorType())
1851       for name in nodenames:
1852         nodeinfo = node_data[name]
1853         if not nodeinfo.failed and nodeinfo.data:
1854           nodeinfo = nodeinfo.data
1855           fn = utils.TryConvert
1856           live_data[name] = {
1857             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1858             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1859             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1860             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1861             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1862             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1863             "bootid": nodeinfo.get('bootid', None),
1864             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1865             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1866             }
1867         else:
1868           live_data[name] = {}
1869     else:
1870       live_data = dict.fromkeys(nodenames, {})
1871
1872     node_to_primary = dict([(name, set()) for name in nodenames])
1873     node_to_secondary = dict([(name, set()) for name in nodenames])
1874
1875     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1876                              "sinst_cnt", "sinst_list"))
1877     if inst_fields & frozenset(self.op.output_fields):
1878       instancelist = self.cfg.GetInstanceList()
1879
1880       for instance_name in instancelist:
1881         inst = self.cfg.GetInstanceInfo(instance_name)
1882         if inst.primary_node in node_to_primary:
1883           node_to_primary[inst.primary_node].add(inst.name)
1884         for secnode in inst.secondary_nodes:
1885           if secnode in node_to_secondary:
1886             node_to_secondary[secnode].add(inst.name)
1887
1888     master_node = self.cfg.GetMasterNode()
1889
1890     # end data gathering
1891
1892     output = []
1893     for node in nodelist:
1894       node_output = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = node.name
1898         elif field == "pinst_list":
1899           val = list(node_to_primary[node.name])
1900         elif field == "sinst_list":
1901           val = list(node_to_secondary[node.name])
1902         elif field == "pinst_cnt":
1903           val = len(node_to_primary[node.name])
1904         elif field == "sinst_cnt":
1905           val = len(node_to_secondary[node.name])
1906         elif field == "pip":
1907           val = node.primary_ip
1908         elif field == "sip":
1909           val = node.secondary_ip
1910         elif field == "tags":
1911           val = list(node.GetTags())
1912         elif field == "serial_no":
1913           val = node.serial_no
1914         elif field == "master_candidate":
1915           val = node.master_candidate
1916         elif field == "master":
1917           val = node.name == master_node
1918         elif field == "offline":
1919           val = node.offline
1920         elif field == "drained":
1921           val = node.drained
1922         elif self._FIELDS_DYNAMIC.Matches(field):
1923           val = live_data[node.name].get(field, None)
1924         else:
1925           raise errors.ParameterError(field)
1926         node_output.append(val)
1927       output.append(node_output)
1928
1929     return output
1930
1931
1932 class LUQueryNodeVolumes(NoHooksLU):
1933   """Logical unit for getting volumes on node(s).
1934
1935   """
1936   _OP_REQP = ["nodes", "output_fields"]
1937   REQ_BGL = False
1938   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1939   _FIELDS_STATIC = utils.FieldSet("node")
1940
1941   def ExpandNames(self):
1942     _CheckOutputFields(static=self._FIELDS_STATIC,
1943                        dynamic=self._FIELDS_DYNAMIC,
1944                        selected=self.op.output_fields)
1945
1946     self.needed_locks = {}
1947     self.share_locks[locking.LEVEL_NODE] = 1
1948     if not self.op.nodes:
1949       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1950     else:
1951       self.needed_locks[locking.LEVEL_NODE] = \
1952         _GetWantedNodes(self, self.op.nodes)
1953
1954   def CheckPrereq(self):
1955     """Check prerequisites.
1956
1957     This checks that the fields required are valid output fields.
1958
1959     """
1960     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1961
1962   def Exec(self, feedback_fn):
1963     """Computes the list of nodes and their attributes.
1964
1965     """
1966     nodenames = self.nodes
1967     volumes = self.rpc.call_node_volumes(nodenames)
1968
1969     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1970              in self.cfg.GetInstanceList()]
1971
1972     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1973
1974     output = []
1975     for node in nodenames:
1976       if node not in volumes or volumes[node].failed or not volumes[node].data:
1977         continue
1978
1979       node_vols = volumes[node].data[:]
1980       node_vols.sort(key=lambda vol: vol['dev'])
1981
1982       for vol in node_vols:
1983         node_output = []
1984         for field in self.op.output_fields:
1985           if field == "node":
1986             val = node
1987           elif field == "phys":
1988             val = vol['dev']
1989           elif field == "vg":
1990             val = vol['vg']
1991           elif field == "name":
1992             val = vol['name']
1993           elif field == "size":
1994             val = int(float(vol['size']))
1995           elif field == "instance":
1996             for inst in ilist:
1997               if node not in lv_by_node[inst]:
1998                 continue
1999               if vol['name'] in lv_by_node[inst][node]:
2000                 val = inst.name
2001                 break
2002             else:
2003               val = '-'
2004           else:
2005             raise errors.ParameterError(field)
2006           node_output.append(str(val))
2007
2008         output.append(node_output)
2009
2010     return output
2011
2012
2013 class LUAddNode(LogicalUnit):
2014   """Logical unit for adding node to the cluster.
2015
2016   """
2017   HPATH = "node-add"
2018   HTYPE = constants.HTYPE_NODE
2019   _OP_REQP = ["node_name"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This will run on all nodes before, and on all nodes + the new node after.
2025
2026     """
2027     env = {
2028       "OP_TARGET": self.op.node_name,
2029       "NODE_NAME": self.op.node_name,
2030       "NODE_PIP": self.op.primary_ip,
2031       "NODE_SIP": self.op.secondary_ip,
2032       }
2033     nodes_0 = self.cfg.GetNodeList()
2034     nodes_1 = nodes_0 + [self.op.node_name, ]
2035     return env, nodes_0, nodes_1
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks:
2041      - the new node is not already in the config
2042      - it is resolvable
2043      - its parameters (single/dual homed) matches the cluster
2044
2045     Any errors are signalled by raising errors.OpPrereqError.
2046
2047     """
2048     node_name = self.op.node_name
2049     cfg = self.cfg
2050
2051     dns_data = utils.HostInfo(node_name)
2052
2053     node = dns_data.name
2054     primary_ip = self.op.primary_ip = dns_data.ip
2055     secondary_ip = getattr(self.op, "secondary_ip", None)
2056     if secondary_ip is None:
2057       secondary_ip = primary_ip
2058     if not utils.IsValidIP(secondary_ip):
2059       raise errors.OpPrereqError("Invalid secondary IP given")
2060     self.op.secondary_ip = secondary_ip
2061
2062     node_list = cfg.GetNodeList()
2063     if not self.op.readd and node in node_list:
2064       raise errors.OpPrereqError("Node %s is already in the configuration" %
2065                                  node)
2066     elif self.op.readd and node not in node_list:
2067       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2068
2069     for existing_node_name in node_list:
2070       existing_node = cfg.GetNodeInfo(existing_node_name)
2071
2072       if self.op.readd and node == existing_node_name:
2073         if (existing_node.primary_ip != primary_ip or
2074             existing_node.secondary_ip != secondary_ip):
2075           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2076                                      " address configuration as before")
2077         continue
2078
2079       if (existing_node.primary_ip == primary_ip or
2080           existing_node.secondary_ip == primary_ip or
2081           existing_node.primary_ip == secondary_ip or
2082           existing_node.secondary_ip == secondary_ip):
2083         raise errors.OpPrereqError("New node ip address(es) conflict with"
2084                                    " existing node %s" % existing_node.name)
2085
2086     # check that the type of the node (single versus dual homed) is the
2087     # same as for the master
2088     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2089     master_singlehomed = myself.secondary_ip == myself.primary_ip
2090     newbie_singlehomed = secondary_ip == primary_ip
2091     if master_singlehomed != newbie_singlehomed:
2092       if master_singlehomed:
2093         raise errors.OpPrereqError("The master has no private ip but the"
2094                                    " new node has one")
2095       else:
2096         raise errors.OpPrereqError("The master has a private ip but the"
2097                                    " new node doesn't have one")
2098
2099     # checks reachablity
2100     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2101       raise errors.OpPrereqError("Node not reachable by ping")
2102
2103     if not newbie_singlehomed:
2104       # check reachability from my secondary ip to newbie's secondary ip
2105       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2106                            source=myself.secondary_ip):
2107         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2108                                    " based ping to noded port")
2109
2110     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2111     mc_now, _ = self.cfg.GetMasterCandidateStats()
2112     master_candidate = mc_now < cp_size
2113
2114     self.new_node = objects.Node(name=node,
2115                                  primary_ip=primary_ip,
2116                                  secondary_ip=secondary_ip,
2117                                  master_candidate=master_candidate,
2118                                  offline=False, drained=False)
2119
2120   def Exec(self, feedback_fn):
2121     """Adds the new node to the cluster.
2122
2123     """
2124     new_node = self.new_node
2125     node = new_node.name
2126
2127     # check connectivity
2128     result = self.rpc.call_version([node])[node]
2129     result.Raise()
2130     if result.data:
2131       if constants.PROTOCOL_VERSION == result.data:
2132         logging.info("Communication to node %s fine, sw version %s match",
2133                      node, result.data)
2134       else:
2135         raise errors.OpExecError("Version mismatch master version %s,"
2136                                  " node version %s" %
2137                                  (constants.PROTOCOL_VERSION, result.data))
2138     else:
2139       raise errors.OpExecError("Cannot get version from the new node")
2140
2141     # setup ssh on node
2142     logging.info("Copy ssh key to node %s", node)
2143     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2144     keyarray = []
2145     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2146                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2147                 priv_key, pub_key]
2148
2149     for i in keyfiles:
2150       f = open(i, 'r')
2151       try:
2152         keyarray.append(f.read())
2153       finally:
2154         f.close()
2155
2156     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2157                                     keyarray[2],
2158                                     keyarray[3], keyarray[4], keyarray[5])
2159
2160     msg = result.RemoteFailMsg()
2161     if msg:
2162       raise errors.OpExecError("Cannot transfer ssh keys to the"
2163                                " new node: %s" % msg)
2164
2165     # Add node to our /etc/hosts, and add key to known_hosts
2166     utils.AddHostToEtcHosts(new_node.name)
2167
2168     if new_node.secondary_ip != new_node.primary_ip:
2169       result = self.rpc.call_node_has_ip_address(new_node.name,
2170                                                  new_node.secondary_ip)
2171       if result.failed or not result.data:
2172         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2173                                  " you gave (%s). Please fix and re-run this"
2174                                  " command." % new_node.secondary_ip)
2175
2176     node_verify_list = [self.cfg.GetMasterNode()]
2177     node_verify_param = {
2178       'nodelist': [node],
2179       # TODO: do a node-net-test as well?
2180     }
2181
2182     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2183                                        self.cfg.GetClusterName())
2184     for verifier in node_verify_list:
2185       if result[verifier].failed or not result[verifier].data:
2186         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2187                                  " for remote verification" % verifier)
2188       if result[verifier].data['nodelist']:
2189         for failed in result[verifier].data['nodelist']:
2190           feedback_fn("ssh/hostname verification failed %s -> %s" %
2191                       (verifier, result[verifier].data['nodelist'][failed]))
2192         raise errors.OpExecError("ssh/hostname verification failed.")
2193
2194     # Distribute updated /etc/hosts and known_hosts to all nodes,
2195     # including the node just added
2196     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2197     dist_nodes = self.cfg.GetNodeList()
2198     if not self.op.readd:
2199       dist_nodes.append(node)
2200     if myself.name in dist_nodes:
2201       dist_nodes.remove(myself.name)
2202
2203     logging.debug("Copying hosts and known_hosts to all nodes")
2204     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2205       result = self.rpc.call_upload_file(dist_nodes, fname)
2206       for to_node, to_result in result.iteritems():
2207         if to_result.failed or not to_result.data:
2208           logging.error("Copy of file %s to node %s failed", fname, to_node)
2209
2210     to_copy = []
2211     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2212     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2213       to_copy.append(constants.VNC_PASSWORD_FILE)
2214
2215     for fname in to_copy:
2216       result = self.rpc.call_upload_file([node], fname)
2217       if result[node].failed or not result[node]:
2218         logging.error("Could not copy file %s to node %s", fname, node)
2219
2220     if self.op.readd:
2221       self.context.ReaddNode(new_node)
2222     else:
2223       self.context.AddNode(new_node)
2224
2225
2226 class LUSetNodeParams(LogicalUnit):
2227   """Modifies the parameters of a node.
2228
2229   """
2230   HPATH = "node-modify"
2231   HTYPE = constants.HTYPE_NODE
2232   _OP_REQP = ["node_name"]
2233   REQ_BGL = False
2234
2235   def CheckArguments(self):
2236     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2237     if node_name is None:
2238       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2239     self.op.node_name = node_name
2240     _CheckBooleanOpField(self.op, 'master_candidate')
2241     _CheckBooleanOpField(self.op, 'offline')
2242     _CheckBooleanOpField(self.op, 'drained')
2243     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2244     if all_mods.count(None) == 3:
2245       raise errors.OpPrereqError("Please pass at least one modification")
2246     if all_mods.count(True) > 1:
2247       raise errors.OpPrereqError("Can't set the node into more than one"
2248                                  " state at the same time")
2249
2250   def ExpandNames(self):
2251     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2252
2253   def BuildHooksEnv(self):
2254     """Build hooks env.
2255
2256     This runs on the master node.
2257
2258     """
2259     env = {
2260       "OP_TARGET": self.op.node_name,
2261       "MASTER_CANDIDATE": str(self.op.master_candidate),
2262       "OFFLINE": str(self.op.offline),
2263       "DRAINED": str(self.op.drained),
2264       }
2265     nl = [self.cfg.GetMasterNode(),
2266           self.op.node_name]
2267     return env, nl, nl
2268
2269   def CheckPrereq(self):
2270     """Check prerequisites.
2271
2272     This only checks the instance list against the existing names.
2273
2274     """
2275     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2276
2277     if ((self.op.master_candidate == False or self.op.offline == True or
2278          self.op.drained == True) and node.master_candidate):
2279       # we will demote the node from master_candidate
2280       if self.op.node_name == self.cfg.GetMasterNode():
2281         raise errors.OpPrereqError("The master node has to be a"
2282                                    " master candidate, online and not drained")
2283       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2284       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2285       if num_candidates <= cp_size:
2286         msg = ("Not enough master candidates (desired"
2287                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2288         if self.op.force:
2289           self.LogWarning(msg)
2290         else:
2291           raise errors.OpPrereqError(msg)
2292
2293     if (self.op.master_candidate == True and
2294         ((node.offline and not self.op.offline == False) or
2295          (node.drained and not self.op.drained == False))):
2296       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2297                                  " to master_candidate")
2298
2299     return
2300
2301   def Exec(self, feedback_fn):
2302     """Modifies a node.
2303
2304     """
2305     node = self.node
2306
2307     result = []
2308     changed_mc = False
2309
2310     if self.op.offline is not None:
2311       node.offline = self.op.offline
2312       result.append(("offline", str(self.op.offline)))
2313       if self.op.offline == True:
2314         if node.master_candidate:
2315           node.master_candidate = False
2316           changed_mc = True
2317           result.append(("master_candidate", "auto-demotion due to offline"))
2318         if node.drained:
2319           node.drained = False
2320           result.append(("drained", "clear drained status due to offline"))
2321
2322     if self.op.master_candidate is not None:
2323       node.master_candidate = self.op.master_candidate
2324       changed_mc = True
2325       result.append(("master_candidate", str(self.op.master_candidate)))
2326       if self.op.master_candidate == False:
2327         rrc = self.rpc.call_node_demote_from_mc(node.name)
2328         msg = rrc.RemoteFailMsg()
2329         if msg:
2330           self.LogWarning("Node failed to demote itself: %s" % msg)
2331
2332     if self.op.drained is not None:
2333       node.drained = self.op.drained
2334       result.append(("drained", str(self.op.drained)))
2335       if self.op.drained == True:
2336         if node.master_candidate:
2337           node.master_candidate = False
2338           changed_mc = True
2339           result.append(("master_candidate", "auto-demotion due to drain"))
2340         if node.offline:
2341           node.offline = False
2342           result.append(("offline", "clear offline status due to drain"))
2343
2344     # this will trigger configuration file update, if needed
2345     self.cfg.Update(node)
2346     # this will trigger job queue propagation or cleanup
2347     if changed_mc:
2348       self.context.ReaddNode(node)
2349
2350     return result
2351
2352
2353 class LUQueryClusterInfo(NoHooksLU):
2354   """Query cluster configuration.
2355
2356   """
2357   _OP_REQP = []
2358   REQ_BGL = False
2359
2360   def ExpandNames(self):
2361     self.needed_locks = {}
2362
2363   def CheckPrereq(self):
2364     """No prerequsites needed for this LU.
2365
2366     """
2367     pass
2368
2369   def Exec(self, feedback_fn):
2370     """Return cluster config.
2371
2372     """
2373     cluster = self.cfg.GetClusterInfo()
2374     result = {
2375       "software_version": constants.RELEASE_VERSION,
2376       "protocol_version": constants.PROTOCOL_VERSION,
2377       "config_version": constants.CONFIG_VERSION,
2378       "os_api_version": constants.OS_API_VERSION,
2379       "export_version": constants.EXPORT_VERSION,
2380       "architecture": (platform.architecture()[0], platform.machine()),
2381       "name": cluster.cluster_name,
2382       "master": cluster.master_node,
2383       "default_hypervisor": cluster.default_hypervisor,
2384       "enabled_hypervisors": cluster.enabled_hypervisors,
2385       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2386                         for hypervisor in cluster.enabled_hypervisors]),
2387       "beparams": cluster.beparams,
2388       "candidate_pool_size": cluster.candidate_pool_size,
2389       }
2390
2391     return result
2392
2393
2394 class LUQueryConfigValues(NoHooksLU):
2395   """Return configuration values.
2396
2397   """
2398   _OP_REQP = []
2399   REQ_BGL = False
2400   _FIELDS_DYNAMIC = utils.FieldSet()
2401   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2402
2403   def ExpandNames(self):
2404     self.needed_locks = {}
2405
2406     _CheckOutputFields(static=self._FIELDS_STATIC,
2407                        dynamic=self._FIELDS_DYNAMIC,
2408                        selected=self.op.output_fields)
2409
2410   def CheckPrereq(self):
2411     """No prerequisites.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Dump a representation of the cluster config to the standard output.
2418
2419     """
2420     values = []
2421     for field in self.op.output_fields:
2422       if field == "cluster_name":
2423         entry = self.cfg.GetClusterName()
2424       elif field == "master_node":
2425         entry = self.cfg.GetMasterNode()
2426       elif field == "drain_flag":
2427         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2428       else:
2429         raise errors.ParameterError(field)
2430       values.append(entry)
2431     return values
2432
2433
2434 class LUActivateInstanceDisks(NoHooksLU):
2435   """Bring up an instance's disks.
2436
2437   """
2438   _OP_REQP = ["instance_name"]
2439   REQ_BGL = False
2440
2441   def ExpandNames(self):
2442     self._ExpandAndLockInstance()
2443     self.needed_locks[locking.LEVEL_NODE] = []
2444     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2445
2446   def DeclareLocks(self, level):
2447     if level == locking.LEVEL_NODE:
2448       self._LockInstancesNodes()
2449
2450   def CheckPrereq(self):
2451     """Check prerequisites.
2452
2453     This checks that the instance is in the cluster.
2454
2455     """
2456     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2457     assert self.instance is not None, \
2458       "Cannot retrieve locked instance %s" % self.op.instance_name
2459     _CheckNodeOnline(self, self.instance.primary_node)
2460
2461   def Exec(self, feedback_fn):
2462     """Activate the disks.
2463
2464     """
2465     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2466     if not disks_ok:
2467       raise errors.OpExecError("Cannot activate block devices")
2468
2469     return disks_info
2470
2471
2472 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2473   """Prepare the block devices for an instance.
2474
2475   This sets up the block devices on all nodes.
2476
2477   @type lu: L{LogicalUnit}
2478   @param lu: the logical unit on whose behalf we execute
2479   @type instance: L{objects.Instance}
2480   @param instance: the instance for whose disks we assemble
2481   @type ignore_secondaries: boolean
2482   @param ignore_secondaries: if true, errors on secondary nodes
2483       won't result in an error return from the function
2484   @return: False if the operation failed, otherwise a list of
2485       (host, instance_visible_name, node_visible_name)
2486       with the mapping from node devices to instance devices
2487
2488   """
2489   device_info = []
2490   disks_ok = True
2491   iname = instance.name
2492   # With the two passes mechanism we try to reduce the window of
2493   # opportunity for the race condition of switching DRBD to primary
2494   # before handshaking occured, but we do not eliminate it
2495
2496   # The proper fix would be to wait (with some limits) until the
2497   # connection has been made and drbd transitions from WFConnection
2498   # into any other network-connected state (Connected, SyncTarget,
2499   # SyncSource, etc.)
2500
2501   # 1st pass, assemble on all nodes in secondary mode
2502   for inst_disk in instance.disks:
2503     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2504       lu.cfg.SetDiskID(node_disk, node)
2505       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2506       msg = result.RemoteFailMsg()
2507       if msg:
2508         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2509                            " (is_primary=False, pass=1): %s",
2510                            inst_disk.iv_name, node, msg)
2511         if not ignore_secondaries:
2512           disks_ok = False
2513
2514   # FIXME: race condition on drbd migration to primary
2515
2516   # 2nd pass, do only the primary node
2517   for inst_disk in instance.disks:
2518     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2519       if node != instance.primary_node:
2520         continue
2521       lu.cfg.SetDiskID(node_disk, node)
2522       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2523       msg = result.RemoteFailMsg()
2524       if msg:
2525         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2526                            " (is_primary=True, pass=2): %s",
2527                            inst_disk.iv_name, node, msg)
2528         disks_ok = False
2529     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2530
2531   # leave the disks configured for the primary node
2532   # this is a workaround that would be fixed better by
2533   # improving the logical/physical id handling
2534   for disk in instance.disks:
2535     lu.cfg.SetDiskID(disk, instance.primary_node)
2536
2537   return disks_ok, device_info
2538
2539
2540 def _StartInstanceDisks(lu, instance, force):
2541   """Start the disks of an instance.
2542
2543   """
2544   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2545                                            ignore_secondaries=force)
2546   if not disks_ok:
2547     _ShutdownInstanceDisks(lu, instance)
2548     if force is not None and not force:
2549       lu.proc.LogWarning("", hint="If the message above refers to a"
2550                          " secondary node,"
2551                          " you can retry the operation using '--force'.")
2552     raise errors.OpExecError("Disk consistency error")
2553
2554
2555 class LUDeactivateInstanceDisks(NoHooksLU):
2556   """Shutdown an instance's disks.
2557
2558   """
2559   _OP_REQP = ["instance_name"]
2560   REQ_BGL = False
2561
2562   def ExpandNames(self):
2563     self._ExpandAndLockInstance()
2564     self.needed_locks[locking.LEVEL_NODE] = []
2565     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2566
2567   def DeclareLocks(self, level):
2568     if level == locking.LEVEL_NODE:
2569       self._LockInstancesNodes()
2570
2571   def CheckPrereq(self):
2572     """Check prerequisites.
2573
2574     This checks that the instance is in the cluster.
2575
2576     """
2577     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2578     assert self.instance is not None, \
2579       "Cannot retrieve locked instance %s" % self.op.instance_name
2580
2581   def Exec(self, feedback_fn):
2582     """Deactivate the disks
2583
2584     """
2585     instance = self.instance
2586     _SafeShutdownInstanceDisks(self, instance)
2587
2588
2589 def _SafeShutdownInstanceDisks(lu, instance):
2590   """Shutdown block devices of an instance.
2591
2592   This function checks if an instance is running, before calling
2593   _ShutdownInstanceDisks.
2594
2595   """
2596   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2597                                       [instance.hypervisor])
2598   ins_l = ins_l[instance.primary_node]
2599   if ins_l.failed or not isinstance(ins_l.data, list):
2600     raise errors.OpExecError("Can't contact node '%s'" %
2601                              instance.primary_node)
2602
2603   if instance.name in ins_l.data:
2604     raise errors.OpExecError("Instance is running, can't shutdown"
2605                              " block devices.")
2606
2607   _ShutdownInstanceDisks(lu, instance)
2608
2609
2610 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2611   """Shutdown block devices of an instance.
2612
2613   This does the shutdown on all nodes of the instance.
2614
2615   If the ignore_primary is false, errors on the primary node are
2616   ignored.
2617
2618   """
2619   all_result = True
2620   for disk in instance.disks:
2621     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2622       lu.cfg.SetDiskID(top_disk, node)
2623       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2624       msg = result.RemoteFailMsg()
2625       if msg:
2626         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2627                       disk.iv_name, node, msg)
2628         if not ignore_primary or node != instance.primary_node:
2629           all_result = False
2630   return all_result
2631
2632
2633 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2634   """Checks if a node has enough free memory.
2635
2636   This function check if a given node has the needed amount of free
2637   memory. In case the node has less memory or we cannot get the
2638   information from the node, this function raise an OpPrereqError
2639   exception.
2640
2641   @type lu: C{LogicalUnit}
2642   @param lu: a logical unit from which we get configuration data
2643   @type node: C{str}
2644   @param node: the node to check
2645   @type reason: C{str}
2646   @param reason: string to use in the error message
2647   @type requested: C{int}
2648   @param requested: the amount of memory in MiB to check for
2649   @type hypervisor_name: C{str}
2650   @param hypervisor_name: the hypervisor to ask for memory stats
2651   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2652       we cannot check the node
2653
2654   """
2655   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2656   nodeinfo[node].Raise()
2657   free_mem = nodeinfo[node].data.get('memory_free')
2658   if not isinstance(free_mem, int):
2659     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2660                              " was '%s'" % (node, free_mem))
2661   if requested > free_mem:
2662     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2663                              " needed %s MiB, available %s MiB" %
2664                              (node, reason, requested, free_mem))
2665
2666
2667 class LUStartupInstance(LogicalUnit):
2668   """Starts an instance.
2669
2670   """
2671   HPATH = "instance-start"
2672   HTYPE = constants.HTYPE_INSTANCE
2673   _OP_REQP = ["instance_name", "force"]
2674   REQ_BGL = False
2675
2676   def ExpandNames(self):
2677     self._ExpandAndLockInstance()
2678
2679   def BuildHooksEnv(self):
2680     """Build hooks env.
2681
2682     This runs on master, primary and secondary nodes of the instance.
2683
2684     """
2685     env = {
2686       "FORCE": self.op.force,
2687       }
2688     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2689     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2690     return env, nl, nl
2691
2692   def CheckPrereq(self):
2693     """Check prerequisites.
2694
2695     This checks that the instance is in the cluster.
2696
2697     """
2698     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699     assert self.instance is not None, \
2700       "Cannot retrieve locked instance %s" % self.op.instance_name
2701
2702     _CheckNodeOnline(self, instance.primary_node)
2703
2704     bep = self.cfg.GetClusterInfo().FillBE(instance)
2705     # check bridges existance
2706     _CheckInstanceBridgesExist(self, instance)
2707
2708     _CheckNodeFreeMemory(self, instance.primary_node,
2709                          "starting instance %s" % instance.name,
2710                          bep[constants.BE_MEMORY], instance.hypervisor)
2711
2712   def Exec(self, feedback_fn):
2713     """Start the instance.
2714
2715     """
2716     instance = self.instance
2717     force = self.op.force
2718     extra_args = getattr(self.op, "extra_args", "")
2719
2720     self.cfg.MarkInstanceUp(instance.name)
2721
2722     node_current = instance.primary_node
2723
2724     _StartInstanceDisks(self, instance, force)
2725
2726     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2727     msg = result.RemoteFailMsg()
2728     if msg:
2729       _ShutdownInstanceDisks(self, instance)
2730       raise errors.OpExecError("Could not start instance: %s" % msg)
2731
2732
2733 class LURebootInstance(LogicalUnit):
2734   """Reboot an instance.
2735
2736   """
2737   HPATH = "instance-reboot"
2738   HTYPE = constants.HTYPE_INSTANCE
2739   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2740   REQ_BGL = False
2741
2742   def ExpandNames(self):
2743     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2744                                    constants.INSTANCE_REBOOT_HARD,
2745                                    constants.INSTANCE_REBOOT_FULL]:
2746       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2747                                   (constants.INSTANCE_REBOOT_SOFT,
2748                                    constants.INSTANCE_REBOOT_HARD,
2749                                    constants.INSTANCE_REBOOT_FULL))
2750     self._ExpandAndLockInstance()
2751
2752   def BuildHooksEnv(self):
2753     """Build hooks env.
2754
2755     This runs on master, primary and secondary nodes of the instance.
2756
2757     """
2758     env = {
2759       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2760       }
2761     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2762     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2763     return env, nl, nl
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775     _CheckNodeOnline(self, instance.primary_node)
2776
2777     # check bridges existance
2778     _CheckInstanceBridgesExist(self, instance)
2779
2780   def Exec(self, feedback_fn):
2781     """Reboot the instance.
2782
2783     """
2784     instance = self.instance
2785     ignore_secondaries = self.op.ignore_secondaries
2786     reboot_type = self.op.reboot_type
2787     extra_args = getattr(self.op, "extra_args", "")
2788
2789     node_current = instance.primary_node
2790
2791     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2792                        constants.INSTANCE_REBOOT_HARD]:
2793       result = self.rpc.call_instance_reboot(node_current, instance,
2794                                              reboot_type, extra_args)
2795       if result.failed or not result.data:
2796         raise errors.OpExecError("Could not reboot instance")
2797     else:
2798       if not self.rpc.call_instance_shutdown(node_current, instance):
2799         raise errors.OpExecError("could not shutdown instance for full reboot")
2800       _ShutdownInstanceDisks(self, instance)
2801       _StartInstanceDisks(self, instance, ignore_secondaries)
2802       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2803       msg = result.RemoteFailMsg()
2804       if msg:
2805         _ShutdownInstanceDisks(self, instance)
2806         raise errors.OpExecError("Could not start instance for"
2807                                  " full reboot: %s" % msg)
2808
2809     self.cfg.MarkInstanceUp(instance.name)
2810
2811
2812 class LUShutdownInstance(LogicalUnit):
2813   """Shutdown an instance.
2814
2815   """
2816   HPATH = "instance-stop"
2817   HTYPE = constants.HTYPE_INSTANCE
2818   _OP_REQP = ["instance_name"]
2819   REQ_BGL = False
2820
2821   def ExpandNames(self):
2822     self._ExpandAndLockInstance()
2823
2824   def BuildHooksEnv(self):
2825     """Build hooks env.
2826
2827     This runs on master, primary and secondary nodes of the instance.
2828
2829     """
2830     env = _BuildInstanceHookEnvByObject(self, self.instance)
2831     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2832     return env, nl, nl
2833
2834   def CheckPrereq(self):
2835     """Check prerequisites.
2836
2837     This checks that the instance is in the cluster.
2838
2839     """
2840     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2841     assert self.instance is not None, \
2842       "Cannot retrieve locked instance %s" % self.op.instance_name
2843     _CheckNodeOnline(self, self.instance.primary_node)
2844
2845   def Exec(self, feedback_fn):
2846     """Shutdown the instance.
2847
2848     """
2849     instance = self.instance
2850     node_current = instance.primary_node
2851     self.cfg.MarkInstanceDown(instance.name)
2852     result = self.rpc.call_instance_shutdown(node_current, instance)
2853     if result.failed or not result.data:
2854       self.proc.LogWarning("Could not shutdown instance")
2855
2856     _ShutdownInstanceDisks(self, instance)
2857
2858
2859 class LUReinstallInstance(LogicalUnit):
2860   """Reinstall an instance.
2861
2862   """
2863   HPATH = "instance-reinstall"
2864   HTYPE = constants.HTYPE_INSTANCE
2865   _OP_REQP = ["instance_name"]
2866   REQ_BGL = False
2867
2868   def ExpandNames(self):
2869     self._ExpandAndLockInstance()
2870
2871   def BuildHooksEnv(self):
2872     """Build hooks env.
2873
2874     This runs on master, primary and secondary nodes of the instance.
2875
2876     """
2877     env = _BuildInstanceHookEnvByObject(self, self.instance)
2878     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2879     return env, nl, nl
2880
2881   def CheckPrereq(self):
2882     """Check prerequisites.
2883
2884     This checks that the instance is in the cluster and is not running.
2885
2886     """
2887     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2888     assert instance is not None, \
2889       "Cannot retrieve locked instance %s" % self.op.instance_name
2890     _CheckNodeOnline(self, instance.primary_node)
2891
2892     if instance.disk_template == constants.DT_DISKLESS:
2893       raise errors.OpPrereqError("Instance '%s' has no disks" %
2894                                  self.op.instance_name)
2895     if instance.admin_up:
2896       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2897                                  self.op.instance_name)
2898     remote_info = self.rpc.call_instance_info(instance.primary_node,
2899                                               instance.name,
2900                                               instance.hypervisor)
2901     if remote_info.failed or remote_info.data:
2902       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2903                                  (self.op.instance_name,
2904                                   instance.primary_node))
2905
2906     self.op.os_type = getattr(self.op, "os_type", None)
2907     if self.op.os_type is not None:
2908       # OS verification
2909       pnode = self.cfg.GetNodeInfo(
2910         self.cfg.ExpandNodeName(instance.primary_node))
2911       if pnode is None:
2912         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2913                                    self.op.pnode)
2914       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2915       result.Raise()
2916       if not isinstance(result.data, objects.OS):
2917         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2918                                    " primary node"  % self.op.os_type)
2919
2920     self.instance = instance
2921
2922   def Exec(self, feedback_fn):
2923     """Reinstall the instance.
2924
2925     """
2926     inst = self.instance
2927
2928     if self.op.os_type is not None:
2929       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2930       inst.os = self.op.os_type
2931       self.cfg.Update(inst)
2932
2933     _StartInstanceDisks(self, inst, None)
2934     try:
2935       feedback_fn("Running the instance OS create scripts...")
2936       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2937       msg = result.RemoteFailMsg()
2938       if msg:
2939         raise errors.OpExecError("Could not install OS for instance %s"
2940                                  " on node %s: %s" %
2941                                  (inst.name, inst.primary_node, msg))
2942     finally:
2943       _ShutdownInstanceDisks(self, inst)
2944
2945
2946 class LURenameInstance(LogicalUnit):
2947   """Rename an instance.
2948
2949   """
2950   HPATH = "instance-rename"
2951   HTYPE = constants.HTYPE_INSTANCE
2952   _OP_REQP = ["instance_name", "new_name"]
2953
2954   def BuildHooksEnv(self):
2955     """Build hooks env.
2956
2957     This runs on master, primary and secondary nodes of the instance.
2958
2959     """
2960     env = _BuildInstanceHookEnvByObject(self, self.instance)
2961     env["INSTANCE_NEW_NAME"] = self.op.new_name
2962     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2963     return env, nl, nl
2964
2965   def CheckPrereq(self):
2966     """Check prerequisites.
2967
2968     This checks that the instance is in the cluster and is not running.
2969
2970     """
2971     instance = self.cfg.GetInstanceInfo(
2972       self.cfg.ExpandInstanceName(self.op.instance_name))
2973     if instance is None:
2974       raise errors.OpPrereqError("Instance '%s' not known" %
2975                                  self.op.instance_name)
2976     _CheckNodeOnline(self, instance.primary_node)
2977
2978     if instance.admin_up:
2979       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2980                                  self.op.instance_name)
2981     remote_info = self.rpc.call_instance_info(instance.primary_node,
2982                                               instance.name,
2983                                               instance.hypervisor)
2984     remote_info.Raise()
2985     if remote_info.data:
2986       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2987                                  (self.op.instance_name,
2988                                   instance.primary_node))
2989     self.instance = instance
2990
2991     # new name verification
2992     name_info = utils.HostInfo(self.op.new_name)
2993
2994     self.op.new_name = new_name = name_info.name
2995     instance_list = self.cfg.GetInstanceList()
2996     if new_name in instance_list:
2997       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2998                                  new_name)
2999
3000     if not getattr(self.op, "ignore_ip", False):
3001       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3002         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3003                                    (name_info.ip, new_name))
3004
3005
3006   def Exec(self, feedback_fn):
3007     """Reinstall the instance.
3008
3009     """
3010     inst = self.instance
3011     old_name = inst.name
3012
3013     if inst.disk_template == constants.DT_FILE:
3014       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3015
3016     self.cfg.RenameInstance(inst.name, self.op.new_name)
3017     # Change the instance lock. This is definitely safe while we hold the BGL
3018     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3019     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3020
3021     # re-read the instance from the configuration after rename
3022     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3023
3024     if inst.disk_template == constants.DT_FILE:
3025       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3026       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3027                                                      old_file_storage_dir,
3028                                                      new_file_storage_dir)
3029       result.Raise()
3030       if not result.data:
3031         raise errors.OpExecError("Could not connect to node '%s' to rename"
3032                                  " directory '%s' to '%s' (but the instance"
3033                                  " has been renamed in Ganeti)" % (
3034                                  inst.primary_node, old_file_storage_dir,
3035                                  new_file_storage_dir))
3036
3037       if not result.data[0]:
3038         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3039                                  " (but the instance has been renamed in"
3040                                  " Ganeti)" % (old_file_storage_dir,
3041                                                new_file_storage_dir))
3042
3043     _StartInstanceDisks(self, inst, None)
3044     try:
3045       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3046                                                  old_name)
3047       msg = result.RemoteFailMsg()
3048       if msg:
3049         msg = ("Could not run OS rename script for instance %s on node %s"
3050                " (but the instance has been renamed in Ganeti): %s" %
3051                (inst.name, inst.primary_node, msg))
3052         self.proc.LogWarning(msg)
3053     finally:
3054       _ShutdownInstanceDisks(self, inst)
3055
3056
3057 class LURemoveInstance(LogicalUnit):
3058   """Remove an instance.
3059
3060   """
3061   HPATH = "instance-remove"
3062   HTYPE = constants.HTYPE_INSTANCE
3063   _OP_REQP = ["instance_name", "ignore_failures"]
3064   REQ_BGL = False
3065
3066   def ExpandNames(self):
3067     self._ExpandAndLockInstance()
3068     self.needed_locks[locking.LEVEL_NODE] = []
3069     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3070
3071   def DeclareLocks(self, level):
3072     if level == locking.LEVEL_NODE:
3073       self._LockInstancesNodes()
3074
3075   def BuildHooksEnv(self):
3076     """Build hooks env.
3077
3078     This runs on master, primary and secondary nodes of the instance.
3079
3080     """
3081     env = _BuildInstanceHookEnvByObject(self, self.instance)
3082     nl = [self.cfg.GetMasterNode()]
3083     return env, nl, nl
3084
3085   def CheckPrereq(self):
3086     """Check prerequisites.
3087
3088     This checks that the instance is in the cluster.
3089
3090     """
3091     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3092     assert self.instance is not None, \
3093       "Cannot retrieve locked instance %s" % self.op.instance_name
3094
3095   def Exec(self, feedback_fn):
3096     """Remove the instance.
3097
3098     """
3099     instance = self.instance
3100     logging.info("Shutting down instance %s on node %s",
3101                  instance.name, instance.primary_node)
3102
3103     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3104     if result.failed or not result.data:
3105       if self.op.ignore_failures:
3106         feedback_fn("Warning: can't shutdown instance")
3107       else:
3108         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3109                                  (instance.name, instance.primary_node))
3110
3111     logging.info("Removing block devices for instance %s", instance.name)
3112
3113     if not _RemoveDisks(self, instance):
3114       if self.op.ignore_failures:
3115         feedback_fn("Warning: can't remove instance's disks")
3116       else:
3117         raise errors.OpExecError("Can't remove instance's disks")
3118
3119     logging.info("Removing instance %s out of cluster config", instance.name)
3120
3121     self.cfg.RemoveInstance(instance.name)
3122     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3123
3124
3125 class LUQueryInstances(NoHooksLU):
3126   """Logical unit for querying instances.
3127
3128   """
3129   _OP_REQP = ["output_fields", "names", "use_locking"]
3130   REQ_BGL = False
3131   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3132                                     "admin_state", "admin_ram",
3133                                     "disk_template", "ip", "mac", "bridge",
3134                                     "sda_size", "sdb_size", "vcpus", "tags",
3135                                     "network_port", "beparams",
3136                                     "(disk).(size)/([0-9]+)",
3137                                     "(disk).(sizes)", "disk_usage",
3138                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3139                                     "(nic).(macs|ips|bridges)",
3140                                     "(disk|nic).(count)",
3141                                     "serial_no", "hypervisor", "hvparams",] +
3142                                   ["hv/%s" % name
3143                                    for name in constants.HVS_PARAMETERS] +
3144                                   ["be/%s" % name
3145                                    for name in constants.BES_PARAMETERS])
3146   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3147
3148
3149   def ExpandNames(self):
3150     _CheckOutputFields(static=self._FIELDS_STATIC,
3151                        dynamic=self._FIELDS_DYNAMIC,
3152                        selected=self.op.output_fields)
3153
3154     self.needed_locks = {}
3155     self.share_locks[locking.LEVEL_INSTANCE] = 1
3156     self.share_locks[locking.LEVEL_NODE] = 1
3157
3158     if self.op.names:
3159       self.wanted = _GetWantedInstances(self, self.op.names)
3160     else:
3161       self.wanted = locking.ALL_SET
3162
3163     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3164     self.do_locking = self.do_node_query and self.op.use_locking
3165     if self.do_locking:
3166       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3167       self.needed_locks[locking.LEVEL_NODE] = []
3168       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3169
3170   def DeclareLocks(self, level):
3171     if level == locking.LEVEL_NODE and self.do_locking:
3172       self._LockInstancesNodes()
3173
3174   def CheckPrereq(self):
3175     """Check prerequisites.
3176
3177     """
3178     pass
3179
3180   def Exec(self, feedback_fn):
3181     """Computes the list of nodes and their attributes.
3182
3183     """
3184     all_info = self.cfg.GetAllInstancesInfo()
3185     if self.wanted == locking.ALL_SET:
3186       # caller didn't specify instance names, so ordering is not important
3187       if self.do_locking:
3188         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3189       else:
3190         instance_names = all_info.keys()
3191       instance_names = utils.NiceSort(instance_names)
3192     else:
3193       # caller did specify names, so we must keep the ordering
3194       if self.do_locking:
3195         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3196       else:
3197         tgt_set = all_info.keys()
3198       missing = set(self.wanted).difference(tgt_set)
3199       if missing:
3200         raise errors.OpExecError("Some instances were removed before"
3201                                  " retrieving their data: %s" % missing)
3202       instance_names = self.wanted
3203
3204     instance_list = [all_info[iname] for iname in instance_names]
3205
3206     # begin data gathering
3207
3208     nodes = frozenset([inst.primary_node for inst in instance_list])
3209     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3210
3211     bad_nodes = []
3212     off_nodes = []
3213     if self.do_node_query:
3214       live_data = {}
3215       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3216       for name in nodes:
3217         result = node_data[name]
3218         if result.offline:
3219           # offline nodes will be in both lists
3220           off_nodes.append(name)
3221         if result.failed:
3222           bad_nodes.append(name)
3223         else:
3224           if result.data:
3225             live_data.update(result.data)
3226             # else no instance is alive
3227     else:
3228       live_data = dict([(name, {}) for name in instance_names])
3229
3230     # end data gathering
3231
3232     HVPREFIX = "hv/"
3233     BEPREFIX = "be/"
3234     output = []
3235     for instance in instance_list:
3236       iout = []
3237       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3238       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3239       for field in self.op.output_fields:
3240         st_match = self._FIELDS_STATIC.Matches(field)
3241         if field == "name":
3242           val = instance.name
3243         elif field == "os":
3244           val = instance.os
3245         elif field == "pnode":
3246           val = instance.primary_node
3247         elif field == "snodes":
3248           val = list(instance.secondary_nodes)
3249         elif field == "admin_state":
3250           val = instance.admin_up
3251         elif field == "oper_state":
3252           if instance.primary_node in bad_nodes:
3253             val = None
3254           else:
3255             val = bool(live_data.get(instance.name))
3256         elif field == "status":
3257           if instance.primary_node in off_nodes:
3258             val = "ERROR_nodeoffline"
3259           elif instance.primary_node in bad_nodes:
3260             val = "ERROR_nodedown"
3261           else:
3262             running = bool(live_data.get(instance.name))
3263             if running:
3264               if instance.admin_up:
3265                 val = "running"
3266               else:
3267                 val = "ERROR_up"
3268             else:
3269               if instance.admin_up:
3270                 val = "ERROR_down"
3271               else:
3272                 val = "ADMIN_down"
3273         elif field == "oper_ram":
3274           if instance.primary_node in bad_nodes:
3275             val = None
3276           elif instance.name in live_data:
3277             val = live_data[instance.name].get("memory", "?")
3278           else:
3279             val = "-"
3280         elif field == "disk_template":
3281           val = instance.disk_template
3282         elif field == "ip":
3283           val = instance.nics[0].ip
3284         elif field == "bridge":
3285           val = instance.nics[0].bridge
3286         elif field == "mac":
3287           val = instance.nics[0].mac
3288         elif field == "sda_size" or field == "sdb_size":
3289           idx = ord(field[2]) - ord('a')
3290           try:
3291             val = instance.FindDisk(idx).size
3292           except errors.OpPrereqError:
3293             val = None
3294         elif field == "disk_usage": # total disk usage per node
3295           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3296           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3297         elif field == "tags":
3298           val = list(instance.GetTags())
3299         elif field == "serial_no":
3300           val = instance.serial_no
3301         elif field == "network_port":
3302           val = instance.network_port
3303         elif field == "hypervisor":
3304           val = instance.hypervisor
3305         elif field == "hvparams":
3306           val = i_hv
3307         elif (field.startswith(HVPREFIX) and
3308               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3309           val = i_hv.get(field[len(HVPREFIX):], None)
3310         elif field == "beparams":
3311           val = i_be
3312         elif (field.startswith(BEPREFIX) and
3313               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3314           val = i_be.get(field[len(BEPREFIX):], None)
3315         elif st_match and st_match.groups():
3316           # matches a variable list
3317           st_groups = st_match.groups()
3318           if st_groups and st_groups[0] == "disk":
3319             if st_groups[1] == "count":
3320               val = len(instance.disks)
3321             elif st_groups[1] == "sizes":
3322               val = [disk.size for disk in instance.disks]
3323             elif st_groups[1] == "size":
3324               try:
3325                 val = instance.FindDisk(st_groups[2]).size
3326               except errors.OpPrereqError:
3327                 val = None
3328             else:
3329               assert False, "Unhandled disk parameter"
3330           elif st_groups[0] == "nic":
3331             if st_groups[1] == "count":
3332               val = len(instance.nics)
3333             elif st_groups[1] == "macs":
3334               val = [nic.mac for nic in instance.nics]
3335             elif st_groups[1] == "ips":
3336               val = [nic.ip for nic in instance.nics]
3337             elif st_groups[1] == "bridges":
3338               val = [nic.bridge for nic in instance.nics]
3339             else:
3340               # index-based item
3341               nic_idx = int(st_groups[2])
3342               if nic_idx >= len(instance.nics):
3343                 val = None
3344               else:
3345                 if st_groups[1] == "mac":
3346                   val = instance.nics[nic_idx].mac
3347                 elif st_groups[1] == "ip":
3348                   val = instance.nics[nic_idx].ip
3349                 elif st_groups[1] == "bridge":
3350                   val = instance.nics[nic_idx].bridge
3351                 else:
3352                   assert False, "Unhandled NIC parameter"
3353           else:
3354             assert False, "Unhandled variable parameter"
3355         else:
3356           raise errors.ParameterError(field)
3357         iout.append(val)
3358       output.append(iout)
3359
3360     return output
3361
3362
3363 class LUFailoverInstance(LogicalUnit):
3364   """Failover an instance.
3365
3366   """
3367   HPATH = "instance-failover"
3368   HTYPE = constants.HTYPE_INSTANCE
3369   _OP_REQP = ["instance_name", "ignore_consistency"]
3370   REQ_BGL = False
3371
3372   def ExpandNames(self):
3373     self._ExpandAndLockInstance()
3374     self.needed_locks[locking.LEVEL_NODE] = []
3375     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3376
3377   def DeclareLocks(self, level):
3378     if level == locking.LEVEL_NODE:
3379       self._LockInstancesNodes()
3380
3381   def BuildHooksEnv(self):
3382     """Build hooks env.
3383
3384     This runs on master, primary and secondary nodes of the instance.
3385
3386     """
3387     env = {
3388       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3389       }
3390     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3391     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3392     return env, nl, nl
3393
3394   def CheckPrereq(self):
3395     """Check prerequisites.
3396
3397     This checks that the instance is in the cluster.
3398
3399     """
3400     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3401     assert self.instance is not None, \
3402       "Cannot retrieve locked instance %s" % self.op.instance_name
3403
3404     bep = self.cfg.GetClusterInfo().FillBE(instance)
3405     if instance.disk_template not in constants.DTS_NET_MIRROR:
3406       raise errors.OpPrereqError("Instance's disk layout is not"
3407                                  " network mirrored, cannot failover.")
3408
3409     secondary_nodes = instance.secondary_nodes
3410     if not secondary_nodes:
3411       raise errors.ProgrammerError("no secondary node but using "
3412                                    "a mirrored disk template")
3413
3414     target_node = secondary_nodes[0]
3415     _CheckNodeOnline(self, target_node)
3416     _CheckNodeNotDrained(self, target_node)
3417     # check memory requirements on the secondary node
3418     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3419                          instance.name, bep[constants.BE_MEMORY],
3420                          instance.hypervisor)
3421
3422     # check bridge existance
3423     brlist = [nic.bridge for nic in instance.nics]
3424     result = self.rpc.call_bridges_exist(target_node, brlist)
3425     result.Raise()
3426     if not result.data:
3427       raise errors.OpPrereqError("One or more target bridges %s does not"
3428                                  " exist on destination node '%s'" %
3429                                  (brlist, target_node))
3430
3431   def Exec(self, feedback_fn):
3432     """Failover an instance.
3433
3434     The failover is done by shutting it down on its present node and
3435     starting it on the secondary.
3436
3437     """
3438     instance = self.instance
3439
3440     source_node = instance.primary_node
3441     target_node = instance.secondary_nodes[0]
3442
3443     feedback_fn("* checking disk consistency between source and target")
3444     for dev in instance.disks:
3445       # for drbd, these are drbd over lvm
3446       if not _CheckDiskConsistency(self, dev, target_node, False):
3447         if instance.admin_up and not self.op.ignore_consistency:
3448           raise errors.OpExecError("Disk %s is degraded on target node,"
3449                                    " aborting failover." % dev.iv_name)
3450
3451     feedback_fn("* shutting down instance on source node")
3452     logging.info("Shutting down instance %s on node %s",
3453                  instance.name, source_node)
3454
3455     result = self.rpc.call_instance_shutdown(source_node, instance)
3456     if result.failed or not result.data:
3457       if self.op.ignore_consistency:
3458         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3459                              " Proceeding"
3460                              " anyway. Please make sure node %s is down",
3461                              instance.name, source_node, source_node)
3462       else:
3463         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3464                                  (instance.name, source_node))
3465
3466     feedback_fn("* deactivating the instance's disks on source node")
3467     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3468       raise errors.OpExecError("Can't shut down the instance's disks.")
3469
3470     instance.primary_node = target_node
3471     # distribute new instance config to the other nodes
3472     self.cfg.Update(instance)
3473
3474     # Only start the instance if it's marked as up
3475     if instance.admin_up:
3476       feedback_fn("* activating the instance's disks on target node")
3477       logging.info("Starting instance %s on node %s",
3478                    instance.name, target_node)
3479
3480       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3481                                                ignore_secondaries=True)
3482       if not disks_ok:
3483         _ShutdownInstanceDisks(self, instance)
3484         raise errors.OpExecError("Can't activate the instance's disks")
3485
3486       feedback_fn("* starting the instance on the target node")
3487       result = self.rpc.call_instance_start(target_node, instance, None)
3488       msg = result.RemoteFailMsg()
3489       if msg:
3490         _ShutdownInstanceDisks(self, instance)
3491         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3492                                  (instance.name, target_node, msg))
3493
3494
3495 class LUMigrateInstance(LogicalUnit):
3496   """Migrate an instance.
3497
3498   This is migration without shutting down, compared to the failover,
3499   which is done with shutdown.
3500
3501   """
3502   HPATH = "instance-migrate"
3503   HTYPE = constants.HTYPE_INSTANCE
3504   _OP_REQP = ["instance_name", "live", "cleanup"]
3505
3506   REQ_BGL = False
3507
3508   def ExpandNames(self):
3509     self._ExpandAndLockInstance()
3510     self.needed_locks[locking.LEVEL_NODE] = []
3511     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3512
3513   def DeclareLocks(self, level):
3514     if level == locking.LEVEL_NODE:
3515       self._LockInstancesNodes()
3516
3517   def BuildHooksEnv(self):
3518     """Build hooks env.
3519
3520     This runs on master, primary and secondary nodes of the instance.
3521
3522     """
3523     env = _BuildInstanceHookEnvByObject(self, self.instance)
3524     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3525     return env, nl, nl
3526
3527   def CheckPrereq(self):
3528     """Check prerequisites.
3529
3530     This checks that the instance is in the cluster.
3531
3532     """
3533     instance = self.cfg.GetInstanceInfo(
3534       self.cfg.ExpandInstanceName(self.op.instance_name))
3535     if instance is None:
3536       raise errors.OpPrereqError("Instance '%s' not known" %
3537                                  self.op.instance_name)
3538
3539     if instance.disk_template != constants.DT_DRBD8:
3540       raise errors.OpPrereqError("Instance's disk layout is not"
3541                                  " drbd8, cannot migrate.")
3542
3543     secondary_nodes = instance.secondary_nodes
3544     if not secondary_nodes:
3545       raise errors.ConfigurationError("No secondary node but using"
3546                                       " drbd8 disk template")
3547
3548     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3549
3550     target_node = secondary_nodes[0]
3551     # check memory requirements on the secondary node
3552     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3553                          instance.name, i_be[constants.BE_MEMORY],
3554                          instance.hypervisor)
3555
3556     # check bridge existance
3557     brlist = [nic.bridge for nic in instance.nics]
3558     result = self.rpc.call_bridges_exist(target_node, brlist)
3559     if result.failed or not result.data:
3560       raise errors.OpPrereqError("One or more target bridges %s does not"
3561                                  " exist on destination node '%s'" %
3562                                  (brlist, target_node))
3563
3564     if not self.op.cleanup:
3565       _CheckNodeNotDrained(self, target_node)
3566       result = self.rpc.call_instance_migratable(instance.primary_node,
3567                                                  instance)
3568       msg = result.RemoteFailMsg()
3569       if msg:
3570         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3571                                    msg)
3572
3573     self.instance = instance
3574
3575   def _WaitUntilSync(self):
3576     """Poll with custom rpc for disk sync.
3577
3578     This uses our own step-based rpc call.
3579
3580     """
3581     self.feedback_fn("* wait until resync is done")
3582     all_done = False
3583     while not all_done:
3584       all_done = True
3585       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3586                                             self.nodes_ip,
3587                                             self.instance.disks)
3588       min_percent = 100
3589       for node, nres in result.items():
3590         msg = nres.RemoteFailMsg()
3591         if msg:
3592           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3593                                    (node, msg))
3594         node_done, node_percent = nres.payload
3595         all_done = all_done and node_done
3596         if node_percent is not None:
3597           min_percent = min(min_percent, node_percent)
3598       if not all_done:
3599         if min_percent < 100:
3600           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3601         time.sleep(2)
3602
3603   def _EnsureSecondary(self, node):
3604     """Demote a node to secondary.
3605
3606     """
3607     self.feedback_fn("* switching node %s to secondary mode" % node)
3608
3609     for dev in self.instance.disks:
3610       self.cfg.SetDiskID(dev, node)
3611
3612     result = self.rpc.call_blockdev_close(node, self.instance.name,
3613                                           self.instance.disks)
3614     msg = result.RemoteFailMsg()
3615     if msg:
3616       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3617                                " error %s" % (node, msg))
3618
3619   def _GoStandalone(self):
3620     """Disconnect from the network.
3621
3622     """
3623     self.feedback_fn("* changing into standalone mode")
3624     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3625                                                self.instance.disks)
3626     for node, nres in result.items():
3627       msg = nres.RemoteFailMsg()
3628       if msg:
3629         raise errors.OpExecError("Cannot disconnect disks node %s,"
3630                                  " error %s" % (node, msg))
3631
3632   def _GoReconnect(self, multimaster):
3633     """Reconnect to the network.
3634
3635     """
3636     if multimaster:
3637       msg = "dual-master"
3638     else:
3639       msg = "single-master"
3640     self.feedback_fn("* changing disks into %s mode" % msg)
3641     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3642                                            self.instance.disks,
3643                                            self.instance.name, multimaster)
3644     for node, nres in result.items():
3645       msg = nres.RemoteFailMsg()
3646       if msg:
3647         raise errors.OpExecError("Cannot change disks config on node %s,"
3648                                  " error: %s" % (node, msg))
3649
3650   def _ExecCleanup(self):
3651     """Try to cleanup after a failed migration.
3652
3653     The cleanup is done by:
3654       - check that the instance is running only on one node
3655         (and update the config if needed)
3656       - change disks on its secondary node to secondary
3657       - wait until disks are fully synchronized
3658       - disconnect from the network
3659       - change disks into single-master mode
3660       - wait again until disks are fully synchronized
3661
3662     """
3663     instance = self.instance
3664     target_node = self.target_node
3665     source_node = self.source_node
3666
3667     # check running on only one node
3668     self.feedback_fn("* checking where the instance actually runs"
3669                      " (if this hangs, the hypervisor might be in"
3670                      " a bad state)")
3671     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3672     for node, result in ins_l.items():
3673       result.Raise()
3674       if not isinstance(result.data, list):
3675         raise errors.OpExecError("Can't contact node '%s'" % node)
3676
3677     runningon_source = instance.name in ins_l[source_node].data
3678     runningon_target = instance.name in ins_l[target_node].data
3679
3680     if runningon_source and runningon_target:
3681       raise errors.OpExecError("Instance seems to be running on two nodes,"
3682                                " or the hypervisor is confused. You will have"
3683                                " to ensure manually that it runs only on one"
3684                                " and restart this operation.")
3685
3686     if not (runningon_source or runningon_target):
3687       raise errors.OpExecError("Instance does not seem to be running at all."
3688                                " In this case, it's safer to repair by"
3689                                " running 'gnt-instance stop' to ensure disk"
3690                                " shutdown, and then restarting it.")
3691
3692     if runningon_target:
3693       # the migration has actually succeeded, we need to update the config
3694       self.feedback_fn("* instance running on secondary node (%s),"
3695                        " updating config" % target_node)
3696       instance.primary_node = target_node
3697       self.cfg.Update(instance)
3698       demoted_node = source_node
3699     else:
3700       self.feedback_fn("* instance confirmed to be running on its"
3701                        " primary node (%s)" % source_node)
3702       demoted_node = target_node
3703
3704     self._EnsureSecondary(demoted_node)
3705     try:
3706       self._WaitUntilSync()
3707     except errors.OpExecError:
3708       # we ignore here errors, since if the device is standalone, it
3709       # won't be able to sync
3710       pass
3711     self._GoStandalone()
3712     self._GoReconnect(False)
3713     self._WaitUntilSync()
3714
3715     self.feedback_fn("* done")
3716
3717   def _RevertDiskStatus(self):
3718     """Try to revert the disk status after a failed migration.
3719
3720     """
3721     target_node = self.target_node
3722     try:
3723       self._EnsureSecondary(target_node)
3724       self._GoStandalone()
3725       self._GoReconnect(False)
3726       self._WaitUntilSync()
3727     except errors.OpExecError, err:
3728       self.LogWarning("Migration failed and I can't reconnect the"
3729                       " drives: error '%s'\n"
3730                       "Please look and recover the instance status" %
3731                       str(err))
3732
3733   def _AbortMigration(self):
3734     """Call the hypervisor code to abort a started migration.
3735
3736     """
3737     instance = self.instance
3738     target_node = self.target_node
3739     migration_info = self.migration_info
3740
3741     abort_result = self.rpc.call_finalize_migration(target_node,
3742                                                     instance,
3743                                                     migration_info,
3744                                                     False)
3745     abort_msg = abort_result.RemoteFailMsg()
3746     if abort_msg:
3747       logging.error("Aborting migration failed on target node %s: %s" %
3748                     (target_node, abort_msg))
3749       # Don't raise an exception here, as we stil have to try to revert the
3750       # disk status, even if this step failed.
3751
3752   def _ExecMigration(self):
3753     """Migrate an instance.
3754
3755     The migrate is done by:
3756       - change the disks into dual-master mode
3757       - wait until disks are fully synchronized again
3758       - migrate the instance
3759       - change disks on the new secondary node (the old primary) to secondary
3760       - wait until disks are fully synchronized
3761       - change disks into single-master mode
3762
3763     """
3764     instance = self.instance
3765     target_node = self.target_node
3766     source_node = self.source_node
3767
3768     self.feedback_fn("* checking disk consistency between source and target")
3769     for dev in instance.disks:
3770       if not _CheckDiskConsistency(self, dev, target_node, False):
3771         raise errors.OpExecError("Disk %s is degraded or not fully"
3772                                  " synchronized on target node,"
3773                                  " aborting migrate." % dev.iv_name)
3774
3775     # First get the migration information from the remote node
3776     result = self.rpc.call_migration_info(source_node, instance)
3777     msg = result.RemoteFailMsg()
3778     if msg:
3779       log_err = ("Failed fetching source migration information from %s: %s" %
3780                  (source_node, msg))
3781       logging.error(log_err)
3782       raise errors.OpExecError(log_err)
3783
3784     self.migration_info = migration_info = result.payload
3785
3786     # Then switch the disks to master/master mode
3787     self._EnsureSecondary(target_node)
3788     self._GoStandalone()
3789     self._GoReconnect(True)
3790     self._WaitUntilSync()
3791
3792     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3793     result = self.rpc.call_accept_instance(target_node,
3794                                            instance,
3795                                            migration_info,
3796                                            self.nodes_ip[target_node])
3797
3798     msg = result.RemoteFailMsg()
3799     if msg:
3800       logging.error("Instance pre-migration failed, trying to revert"
3801                     " disk status: %s", msg)
3802       self._AbortMigration()
3803       self._RevertDiskStatus()
3804       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3805                                (instance.name, msg))
3806
3807     self.feedback_fn("* migrating instance to %s" % target_node)
3808     time.sleep(10)
3809     result = self.rpc.call_instance_migrate(source_node, instance,
3810                                             self.nodes_ip[target_node],
3811                                             self.op.live)
3812     msg = result.RemoteFailMsg()
3813     if msg:
3814       logging.error("Instance migration failed, trying to revert"
3815                     " disk status: %s", msg)
3816       self._AbortMigration()
3817       self._RevertDiskStatus()
3818       raise errors.OpExecError("Could not migrate instance %s: %s" %
3819                                (instance.name, msg))
3820     time.sleep(10)
3821
3822     instance.primary_node = target_node
3823     # distribute new instance config to the other nodes
3824     self.cfg.Update(instance)
3825
3826     result = self.rpc.call_finalize_migration(target_node,
3827                                               instance,
3828                                               migration_info,
3829                                               True)
3830     msg = result.RemoteFailMsg()
3831     if msg:
3832       logging.error("Instance migration succeeded, but finalization failed:"
3833                     " %s" % msg)
3834       raise errors.OpExecError("Could not finalize instance migration: %s" %
3835                                msg)
3836
3837     self._EnsureSecondary(source_node)
3838     self._WaitUntilSync()
3839     self._GoStandalone()
3840     self._GoReconnect(False)
3841     self._WaitUntilSync()
3842
3843     self.feedback_fn("* done")
3844
3845   def Exec(self, feedback_fn):
3846     """Perform the migration.
3847
3848     """
3849     self.feedback_fn = feedback_fn
3850
3851     self.source_node = self.instance.primary_node
3852     self.target_node = self.instance.secondary_nodes[0]
3853     self.all_nodes = [self.source_node, self.target_node]
3854     self.nodes_ip = {
3855       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3856       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3857       }
3858     if self.op.cleanup:
3859       return self._ExecCleanup()
3860     else:
3861       return self._ExecMigration()
3862
3863
3864 def _CreateBlockDev(lu, node, instance, device, force_create,
3865                     info, force_open):
3866   """Create a tree of block devices on a given node.
3867
3868   If this device type has to be created on secondaries, create it and
3869   all its children.
3870
3871   If not, just recurse to children keeping the same 'force' value.
3872
3873   @param lu: the lu on whose behalf we execute
3874   @param node: the node on which to create the device
3875   @type instance: L{objects.Instance}
3876   @param instance: the instance which owns the device
3877   @type device: L{objects.Disk}
3878   @param device: the device to create
3879   @type force_create: boolean
3880   @param force_create: whether to force creation of this device; this
3881       will be change to True whenever we find a device which has
3882       CreateOnSecondary() attribute
3883   @param info: the extra 'metadata' we should attach to the device
3884       (this will be represented as a LVM tag)
3885   @type force_open: boolean
3886   @param force_open: this parameter will be passes to the
3887       L{backend.BlockdevCreate} function where it specifies
3888       whether we run on primary or not, and it affects both
3889       the child assembly and the device own Open() execution
3890
3891   """
3892   if device.CreateOnSecondary():
3893     force_create = True
3894
3895   if device.children:
3896     for child in device.children:
3897       _CreateBlockDev(lu, node, instance, child, force_create,
3898                       info, force_open)
3899
3900   if not force_create:
3901     return
3902
3903   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3904
3905
3906 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3907   """Create a single block device on a given node.
3908
3909   This will not recurse over children of the device, so they must be
3910   created in advance.
3911
3912   @param lu: the lu on whose behalf we execute
3913   @param node: the node on which to create the device
3914   @type instance: L{objects.Instance}
3915   @param instance: the instance which owns the device
3916   @type device: L{objects.Disk}
3917   @param device: the device to create
3918   @param info: the extra 'metadata' we should attach to the device
3919       (this will be represented as a LVM tag)
3920   @type force_open: boolean
3921   @param force_open: this parameter will be passes to the
3922       L{backend.BlockdevCreate} function where it specifies
3923       whether we run on primary or not, and it affects both
3924       the child assembly and the device own Open() execution
3925
3926   """
3927   lu.cfg.SetDiskID(device, node)
3928   result = lu.rpc.call_blockdev_create(node, device, device.size,
3929                                        instance.name, force_open, info)
3930   msg = result.RemoteFailMsg()
3931   if msg:
3932     raise errors.OpExecError("Can't create block device %s on"
3933                              " node %s for instance %s: %s" %
3934                              (device, node, instance.name, msg))
3935   if device.physical_id is None:
3936     device.physical_id = result.payload
3937
3938
3939 def _GenerateUniqueNames(lu, exts):
3940   """Generate a suitable LV name.
3941
3942   This will generate a logical volume name for the given instance.
3943
3944   """
3945   results = []
3946   for val in exts:
3947     new_id = lu.cfg.GenerateUniqueID()
3948     results.append("%s%s" % (new_id, val))
3949   return results
3950
3951
3952 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3953                          p_minor, s_minor):
3954   """Generate a drbd8 device complete with its children.
3955
3956   """
3957   port = lu.cfg.AllocatePort()
3958   vgname = lu.cfg.GetVGName()
3959   shared_secret = lu.cfg.GenerateDRBDSecret()
3960   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3961                           logical_id=(vgname, names[0]))
3962   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3963                           logical_id=(vgname, names[1]))
3964   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3965                           logical_id=(primary, secondary, port,
3966                                       p_minor, s_minor,
3967                                       shared_secret),
3968                           children=[dev_data, dev_meta],
3969                           iv_name=iv_name)
3970   return drbd_dev
3971
3972
3973 def _GenerateDiskTemplate(lu, template_name,
3974                           instance_name, primary_node,
3975                           secondary_nodes, disk_info,
3976                           file_storage_dir, file_driver,
3977                           base_index):
3978   """Generate the entire disk layout for a given template type.
3979
3980   """
3981   #TODO: compute space requirements
3982
3983   vgname = lu.cfg.GetVGName()
3984   disk_count = len(disk_info)
3985   disks = []
3986   if template_name == constants.DT_DISKLESS:
3987     pass
3988   elif template_name == constants.DT_PLAIN:
3989     if len(secondary_nodes) != 0:
3990       raise errors.ProgrammerError("Wrong template configuration")
3991
3992     names = _GenerateUniqueNames(lu, [".disk%d" % i
3993                                       for i in range(disk_count)])
3994     for idx, disk in enumerate(disk_info):
3995       disk_index = idx + base_index
3996       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3997                               logical_id=(vgname, names[idx]),
3998                               iv_name="disk/%d" % disk_index,
3999                               mode=disk["mode"])
4000       disks.append(disk_dev)
4001   elif template_name == constants.DT_DRBD8:
4002     if len(secondary_nodes) != 1:
4003       raise errors.ProgrammerError("Wrong template configuration")
4004     remote_node = secondary_nodes[0]
4005     minors = lu.cfg.AllocateDRBDMinor(
4006       [primary_node, remote_node] * len(disk_info), instance_name)
4007
4008     names = []
4009     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4010                                                for i in range(disk_count)]):
4011       names.append(lv_prefix + "_data")
4012       names.append(lv_prefix + "_meta")
4013     for idx, disk in enumerate(disk_info):
4014       disk_index = idx + base_index
4015       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4016                                       disk["size"], names[idx*2:idx*2+2],
4017                                       "disk/%d" % disk_index,
4018                                       minors[idx*2], minors[idx*2+1])
4019       disk_dev.mode = disk["mode"]
4020       disks.append(disk_dev)
4021   elif template_name == constants.DT_FILE:
4022     if len(secondary_nodes) != 0:
4023       raise errors.ProgrammerError("Wrong template configuration")
4024
4025     for idx, disk in enumerate(disk_info):
4026       disk_index = idx + base_index
4027       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4028                               iv_name="disk/%d" % disk_index,
4029                               logical_id=(file_driver,
4030                                           "%s/disk%d" % (file_storage_dir,
4031                                                          disk_index)),
4032                               mode=disk["mode"])
4033       disks.append(disk_dev)
4034   else:
4035     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4036   return disks
4037
4038
4039 def _GetInstanceInfoText(instance):
4040   """Compute that text that should be added to the disk's metadata.
4041
4042   """
4043   return "originstname+%s" % instance.name
4044
4045
4046 def _CreateDisks(lu, instance):
4047   """Create all disks for an instance.
4048
4049   This abstracts away some work from AddInstance.
4050
4051   @type lu: L{LogicalUnit}
4052   @param lu: the logical unit on whose behalf we execute
4053   @type instance: L{objects.Instance}
4054   @param instance: the instance whose disks we should create
4055   @rtype: boolean
4056   @return: the success of the creation
4057
4058   """
4059   info = _GetInstanceInfoText(instance)
4060   pnode = instance.primary_node
4061
4062   if instance.disk_template == constants.DT_FILE:
4063     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4064     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4065
4066     if result.failed or not result.data:
4067       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4068
4069     if not result.data[0]:
4070       raise errors.OpExecError("Failed to create directory '%s'" %
4071                                file_storage_dir)
4072
4073   # Note: this needs to be kept in sync with adding of disks in
4074   # LUSetInstanceParams
4075   for device in instance.disks:
4076     logging.info("Creating volume %s for instance %s",
4077                  device.iv_name, instance.name)
4078     #HARDCODE
4079     for node in instance.all_nodes:
4080       f_create = node == pnode
4081       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4082
4083
4084 def _RemoveDisks(lu, instance):
4085   """Remove all disks for an instance.
4086
4087   This abstracts away some work from `AddInstance()` and
4088   `RemoveInstance()`. Note that in case some of the devices couldn't
4089   be removed, the removal will continue with the other ones (compare
4090   with `_CreateDisks()`).
4091
4092   @type lu: L{LogicalUnit}
4093   @param lu: the logical unit on whose behalf we execute
4094   @type instance: L{objects.Instance}
4095   @param instance: the instance whose disks we should remove
4096   @rtype: boolean
4097   @return: the success of the removal
4098
4099   """
4100   logging.info("Removing block devices for instance %s", instance.name)
4101
4102   all_result = True
4103   for device in instance.disks:
4104     for node, disk in device.ComputeNodeTree(instance.primary_node):
4105       lu.cfg.SetDiskID(disk, node)
4106       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4107       if msg:
4108         lu.LogWarning("Could not remove block device %s on node %s,"
4109                       " continuing anyway: %s", device.iv_name, node, msg)
4110         all_result = False
4111
4112   if instance.disk_template == constants.DT_FILE:
4113     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4114     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4115                                                  file_storage_dir)
4116     if result.failed or not result.data:
4117       logging.error("Could not remove directory '%s'", file_storage_dir)
4118       all_result = False
4119
4120   return all_result
4121
4122
4123 def _ComputeDiskSize(disk_template, disks):
4124   """Compute disk size requirements in the volume group
4125
4126   """
4127   # Required free disk space as a function of disk and swap space
4128   req_size_dict = {
4129     constants.DT_DISKLESS: None,
4130     constants.DT_PLAIN: sum(d["size"] for d in disks),
4131     # 128 MB are added for drbd metadata for each disk
4132     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4133     constants.DT_FILE: None,
4134   }
4135
4136   if disk_template not in req_size_dict:
4137     raise errors.ProgrammerError("Disk template '%s' size requirement"
4138                                  " is unknown" %  disk_template)
4139
4140   return req_size_dict[disk_template]
4141
4142
4143 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4144   """Hypervisor parameter validation.
4145
4146   This function abstract the hypervisor parameter validation to be
4147   used in both instance create and instance modify.
4148
4149   @type lu: L{LogicalUnit}
4150   @param lu: the logical unit for which we check
4151   @type nodenames: list
4152   @param nodenames: the list of nodes on which we should check
4153   @type hvname: string
4154   @param hvname: the name of the hypervisor we should use
4155   @type hvparams: dict
4156   @param hvparams: the parameters which we need to check
4157   @raise errors.OpPrereqError: if the parameters are not valid
4158
4159   """
4160   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4161                                                   hvname,
4162                                                   hvparams)
4163   for node in nodenames:
4164     info = hvinfo[node]
4165     if info.offline:
4166       continue
4167     msg = info.RemoteFailMsg()
4168     if msg:
4169       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4170                                  " %s" % msg)
4171
4172
4173 class LUCreateInstance(LogicalUnit):
4174   """Create an instance.
4175
4176   """
4177   HPATH = "instance-add"
4178   HTYPE = constants.HTYPE_INSTANCE
4179   _OP_REQP = ["instance_name", "disks", "disk_template",
4180               "mode", "start",
4181               "wait_for_sync", "ip_check", "nics",
4182               "hvparams", "beparams"]
4183   REQ_BGL = False
4184
4185   def _ExpandNode(self, node):
4186     """Expands and checks one node name.
4187
4188     """
4189     node_full = self.cfg.ExpandNodeName(node)
4190     if node_full is None:
4191       raise errors.OpPrereqError("Unknown node %s" % node)
4192     return node_full
4193
4194   def ExpandNames(self):
4195     """ExpandNames for CreateInstance.
4196
4197     Figure out the right locks for instance creation.
4198
4199     """
4200     self.needed_locks = {}
4201
4202     # set optional parameters to none if they don't exist
4203     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4204       if not hasattr(self.op, attr):
4205         setattr(self.op, attr, None)
4206
4207     # cheap checks, mostly valid constants given
4208
4209     # verify creation mode
4210     if self.op.mode not in (constants.INSTANCE_CREATE,
4211                             constants.INSTANCE_IMPORT):
4212       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4213                                  self.op.mode)
4214
4215     # disk template and mirror node verification
4216     if self.op.disk_template not in constants.DISK_TEMPLATES:
4217       raise errors.OpPrereqError("Invalid disk template name")
4218
4219     if self.op.hypervisor is None:
4220       self.op.hypervisor = self.cfg.GetHypervisorType()
4221
4222     cluster = self.cfg.GetClusterInfo()
4223     enabled_hvs = cluster.enabled_hypervisors
4224     if self.op.hypervisor not in enabled_hvs:
4225       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4226                                  " cluster (%s)" % (self.op.hypervisor,
4227                                   ",".join(enabled_hvs)))
4228
4229     # check hypervisor parameter syntax (locally)
4230     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4231     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4232                                   self.op.hvparams)
4233     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4234     hv_type.CheckParameterSyntax(filled_hvp)
4235
4236     # fill and remember the beparams dict
4237     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4238     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4239                                     self.op.beparams)
4240
4241     #### instance parameters check
4242
4243     # instance name verification
4244     hostname1 = utils.HostInfo(self.op.instance_name)
4245     self.op.instance_name = instance_name = hostname1.name
4246
4247     # this is just a preventive check, but someone might still add this
4248     # instance in the meantime, and creation will fail at lock-add time
4249     if instance_name in self.cfg.GetInstanceList():
4250       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4251                                  instance_name)
4252
4253     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4254
4255     # NIC buildup
4256     self.nics = []
4257     for nic in self.op.nics:
4258       # ip validity checks
4259       ip = nic.get("ip", None)
4260       if ip is None or ip.lower() == "none":
4261         nic_ip = None
4262       elif ip.lower() == constants.VALUE_AUTO:
4263         nic_ip = hostname1.ip
4264       else:
4265         if not utils.IsValidIP(ip):
4266           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4267                                      " like a valid IP" % ip)
4268         nic_ip = ip
4269
4270       # MAC address verification
4271       mac = nic.get("mac", constants.VALUE_AUTO)
4272       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4273         if not utils.IsValidMac(mac.lower()):
4274           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4275                                      mac)
4276       # bridge verification
4277       bridge = nic.get("bridge", None)
4278       if bridge is None:
4279         bridge = self.cfg.GetDefBridge()
4280       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4281
4282     # disk checks/pre-build
4283     self.disks = []
4284     for disk in self.op.disks:
4285       mode = disk.get("mode", constants.DISK_RDWR)
4286       if mode not in constants.DISK_ACCESS_SET:
4287         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4288                                    mode)
4289       size = disk.get("size", None)
4290       if size is None:
4291         raise errors.OpPrereqError("Missing disk size")
4292       try:
4293         size = int(size)
4294       except ValueError:
4295         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4296       self.disks.append({"size": size, "mode": mode})
4297
4298     # used in CheckPrereq for ip ping check
4299     self.check_ip = hostname1.ip
4300
4301     # file storage checks
4302     if (self.op.file_driver and
4303         not self.op.file_driver in constants.FILE_DRIVER):
4304       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4305                                  self.op.file_driver)
4306
4307     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4308       raise errors.OpPrereqError("File storage directory path not absolute")
4309
4310     ### Node/iallocator related checks
4311     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4312       raise errors.OpPrereqError("One and only one of iallocator and primary"
4313                                  " node must be given")
4314
4315     if self.op.iallocator:
4316       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4317     else:
4318       self.op.pnode = self._ExpandNode(self.op.pnode)
4319       nodelist = [self.op.pnode]
4320       if self.op.snode is not None:
4321         self.op.snode = self._ExpandNode(self.op.snode)
4322         nodelist.append(self.op.snode)
4323       self.needed_locks[locking.LEVEL_NODE] = nodelist
4324
4325     # in case of import lock the source node too
4326     if self.op.mode == constants.INSTANCE_IMPORT:
4327       src_node = getattr(self.op, "src_node", None)
4328       src_path = getattr(self.op, "src_path", None)
4329
4330       if src_path is None:
4331         self.op.src_path = src_path = self.op.instance_name
4332
4333       if src_node is None:
4334         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4335         self.op.src_node = None
4336         if os.path.isabs(src_path):
4337           raise errors.OpPrereqError("Importing an instance from an absolute"
4338                                      " path requires a source node option.")
4339       else:
4340         self.op.src_node = src_node = self._ExpandNode(src_node)
4341         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4342           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4343         if not os.path.isabs(src_path):
4344           self.op.src_path = src_path = \
4345             os.path.join(constants.EXPORT_DIR, src_path)
4346
4347     else: # INSTANCE_CREATE
4348       if getattr(self.op, "os_type", None) is None:
4349         raise errors.OpPrereqError("No guest OS specified")
4350
4351   def _RunAllocator(self):
4352     """Run the allocator based on input opcode.
4353
4354     """
4355     nics = [n.ToDict() for n in self.nics]
4356     ial = IAllocator(self,
4357                      mode=constants.IALLOCATOR_MODE_ALLOC,
4358                      name=self.op.instance_name,
4359                      disk_template=self.op.disk_template,
4360                      tags=[],
4361                      os=self.op.os_type,
4362                      vcpus=self.be_full[constants.BE_VCPUS],
4363                      mem_size=self.be_full[constants.BE_MEMORY],
4364                      disks=self.disks,
4365                      nics=nics,
4366                      hypervisor=self.op.hypervisor,
4367                      )
4368
4369     ial.Run(self.op.iallocator)
4370
4371     if not ial.success:
4372       raise errors.OpPrereqError("Can't compute nodes using"
4373                                  " iallocator '%s': %s" % (self.op.iallocator,
4374                                                            ial.info))
4375     if len(ial.nodes) != ial.required_nodes:
4376       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4377                                  " of nodes (%s), required %s" %
4378                                  (self.op.iallocator, len(ial.nodes),
4379                                   ial.required_nodes))
4380     self.op.pnode = ial.nodes[0]
4381     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4382                  self.op.instance_name, self.op.iallocator,
4383                  ", ".join(ial.nodes))
4384     if ial.required_nodes == 2:
4385       self.op.snode = ial.nodes[1]
4386
4387   def BuildHooksEnv(self):
4388     """Build hooks env.
4389
4390     This runs on master, primary and secondary nodes of the instance.
4391
4392     """
4393     env = {
4394       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4395       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4396       "INSTANCE_ADD_MODE": self.op.mode,
4397       }
4398     if self.op.mode == constants.INSTANCE_IMPORT:
4399       env["INSTANCE_SRC_NODE"] = self.op.src_node
4400       env["INSTANCE_SRC_PATH"] = self.op.src_path
4401       env["INSTANCE_SRC_IMAGES"] = self.src_images
4402
4403     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4404       primary_node=self.op.pnode,
4405       secondary_nodes=self.secondaries,
4406       status=self.op.start,
4407       os_type=self.op.os_type,
4408       memory=self.be_full[constants.BE_MEMORY],
4409       vcpus=self.be_full[constants.BE_VCPUS],
4410       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4411     ))
4412
4413     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4414           self.secondaries)
4415     return env, nl, nl
4416
4417
4418   def CheckPrereq(self):
4419     """Check prerequisites.
4420
4421     """
4422     if (not self.cfg.GetVGName() and
4423         self.op.disk_template not in constants.DTS_NOT_LVM):
4424       raise errors.OpPrereqError("Cluster does not support lvm-based"
4425                                  " instances")
4426
4427
4428     if self.op.mode == constants.INSTANCE_IMPORT:
4429       src_node = self.op.src_node
4430       src_path = self.op.src_path
4431
4432       if src_node is None:
4433         exp_list = self.rpc.call_export_list(
4434           self.acquired_locks[locking.LEVEL_NODE])
4435         found = False
4436         for node in exp_list:
4437           if not exp_list[node].failed and src_path in exp_list[node].data:
4438             found = True
4439             self.op.src_node = src_node = node
4440             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4441                                                        src_path)
4442             break
4443         if not found:
4444           raise errors.OpPrereqError("No export found for relative path %s" %
4445                                       src_path)
4446
4447       _CheckNodeOnline(self, src_node)
4448       result = self.rpc.call_export_info(src_node, src_path)
4449       result.Raise()
4450       if not result.data:
4451         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4452
4453       export_info = result.data
4454       if not export_info.has_section(constants.INISECT_EXP):
4455         raise errors.ProgrammerError("Corrupted export config")
4456
4457       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4458       if (int(ei_version) != constants.EXPORT_VERSION):
4459         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4460                                    (ei_version, constants.EXPORT_VERSION))
4461
4462       # Check that the new instance doesn't have less disks than the export
4463       instance_disks = len(self.disks)
4464       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4465       if instance_disks < export_disks:
4466         raise errors.OpPrereqError("Not enough disks to import."
4467                                    " (instance: %d, export: %d)" %
4468                                    (instance_disks, export_disks))
4469
4470       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4471       disk_images = []
4472       for idx in range(export_disks):
4473         option = 'disk%d_dump' % idx
4474         if export_info.has_option(constants.INISECT_INS, option):
4475           # FIXME: are the old os-es, disk sizes, etc. useful?
4476           export_name = export_info.get(constants.INISECT_INS, option)
4477           image = os.path.join(src_path, export_name)
4478           disk_images.append(image)
4479         else:
4480           disk_images.append(False)
4481
4482       self.src_images = disk_images
4483
4484       old_name = export_info.get(constants.INISECT_INS, 'name')
4485       # FIXME: int() here could throw a ValueError on broken exports
4486       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4487       if self.op.instance_name == old_name:
4488         for idx, nic in enumerate(self.nics):
4489           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4490             nic_mac_ini = 'nic%d_mac' % idx
4491             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4492
4493     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4494     if self.op.start and not self.op.ip_check:
4495       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4496                                  " adding an instance in start mode")
4497
4498     if self.op.ip_check:
4499       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4500         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4501                                    (self.check_ip, self.op.instance_name))
4502
4503     #### allocator run
4504
4505     if self.op.iallocator is not None:
4506       self._RunAllocator()
4507
4508     #### node related checks
4509
4510     # check primary node
4511     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4512     assert self.pnode is not None, \
4513       "Cannot retrieve locked node %s" % self.op.pnode
4514     if pnode.offline:
4515       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4516                                  pnode.name)
4517     if pnode.drained:
4518       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4519                                  pnode.name)
4520
4521     self.secondaries = []
4522
4523     # mirror node verification
4524     if self.op.disk_template in constants.DTS_NET_MIRROR:
4525       if self.op.snode is None:
4526         raise errors.OpPrereqError("The networked disk templates need"
4527                                    " a mirror node")
4528       if self.op.snode == pnode.name:
4529         raise errors.OpPrereqError("The secondary node cannot be"
4530                                    " the primary node.")
4531       _CheckNodeOnline(self, self.op.snode)
4532       _CheckNodeNotDrained(self, self.op.snode)
4533       self.secondaries.append(self.op.snode)
4534
4535     nodenames = [pnode.name] + self.secondaries
4536
4537     req_size = _ComputeDiskSize(self.op.disk_template,
4538                                 self.disks)
4539
4540     # Check lv size requirements
4541     if req_size is not None:
4542       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4543                                          self.op.hypervisor)
4544       for node in nodenames:
4545         info = nodeinfo[node]
4546         info.Raise()
4547         info = info.data
4548         if not info:
4549           raise errors.OpPrereqError("Cannot get current information"
4550                                      " from node '%s'" % node)
4551         vg_free = info.get('vg_free', None)
4552         if not isinstance(vg_free, int):
4553           raise errors.OpPrereqError("Can't compute free disk space on"
4554                                      " node %s" % node)
4555         if req_size > info['vg_free']:
4556           raise errors.OpPrereqError("Not enough disk space on target node %s."
4557                                      " %d MB available, %d MB required" %
4558                                      (node, info['vg_free'], req_size))
4559
4560     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4561
4562     # os verification
4563     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4564     result.Raise()
4565     if not isinstance(result.data, objects.OS):
4566       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4567                                  " primary node"  % self.op.os_type)
4568
4569     # bridge check on primary node
4570     bridges = [n.bridge for n in self.nics]
4571     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4572     result.Raise()
4573     if not result.data:
4574       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4575                                  " exist on destination node '%s'" %
4576                                  (",".join(bridges), pnode.name))
4577
4578     # memory check on primary node
4579     if self.op.start:
4580       _CheckNodeFreeMemory(self, self.pnode.name,
4581                            "creating instance %s" % self.op.instance_name,
4582                            self.be_full[constants.BE_MEMORY],
4583                            self.op.hypervisor)
4584
4585   def Exec(self, feedback_fn):
4586     """Create and add the instance to the cluster.
4587
4588     """
4589     instance = self.op.instance_name
4590     pnode_name = self.pnode.name
4591
4592     for nic in self.nics:
4593       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4594         nic.mac = self.cfg.GenerateMAC()
4595
4596     ht_kind = self.op.hypervisor
4597     if ht_kind in constants.HTS_REQ_PORT:
4598       network_port = self.cfg.AllocatePort()
4599     else:
4600       network_port = None
4601
4602     ##if self.op.vnc_bind_address is None:
4603     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4604
4605     # this is needed because os.path.join does not accept None arguments
4606     if self.op.file_storage_dir is None:
4607       string_file_storage_dir = ""
4608     else:
4609       string_file_storage_dir = self.op.file_storage_dir
4610
4611     # build the full file storage dir path
4612     file_storage_dir = os.path.normpath(os.path.join(
4613                                         self.cfg.GetFileStorageDir(),
4614                                         string_file_storage_dir, instance))
4615
4616
4617     disks = _GenerateDiskTemplate(self,
4618                                   self.op.disk_template,
4619                                   instance, pnode_name,
4620                                   self.secondaries,
4621                                   self.disks,
4622                                   file_storage_dir,
4623                                   self.op.file_driver,
4624                                   0)
4625
4626     iobj = objects.Instance(name=instance, os=self.op.os_type,
4627                             primary_node=pnode_name,
4628                             nics=self.nics, disks=disks,
4629                             disk_template=self.op.disk_template,
4630                             admin_up=False,
4631                             network_port=network_port,
4632                             beparams=self.op.beparams,
4633                             hvparams=self.op.hvparams,
4634                             hypervisor=self.op.hypervisor,
4635                             )
4636
4637     feedback_fn("* creating instance disks...")
4638     try:
4639       _CreateDisks(self, iobj)
4640     except errors.OpExecError:
4641       self.LogWarning("Device creation failed, reverting...")
4642       try:
4643         _RemoveDisks(self, iobj)
4644       finally:
4645         self.cfg.ReleaseDRBDMinors(instance)
4646         raise
4647
4648     feedback_fn("adding instance %s to cluster config" % instance)
4649
4650     self.cfg.AddInstance(iobj)
4651     # Declare that we don't want to remove the instance lock anymore, as we've
4652     # added the instance to the config
4653     del self.remove_locks[locking.LEVEL_INSTANCE]
4654     # Unlock all the nodes
4655     if self.op.mode == constants.INSTANCE_IMPORT:
4656       nodes_keep = [self.op.src_node]
4657       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4658                        if node != self.op.src_node]
4659       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4660       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4661     else:
4662       self.context.glm.release(locking.LEVEL_NODE)
4663       del self.acquired_locks[locking.LEVEL_NODE]
4664
4665     if self.op.wait_for_sync:
4666       disk_abort = not _WaitForSync(self, iobj)
4667     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4668       # make sure the disks are not degraded (still sync-ing is ok)
4669       time.sleep(15)
4670       feedback_fn("* checking mirrors status")
4671       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4672     else:
4673       disk_abort = False
4674
4675     if disk_abort:
4676       _RemoveDisks(self, iobj)
4677       self.cfg.RemoveInstance(iobj.name)
4678       # Make sure the instance lock gets removed
4679       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4680       raise errors.OpExecError("There are some degraded disks for"
4681                                " this instance")
4682
4683     feedback_fn("creating os for instance %s on node %s" %
4684                 (instance, pnode_name))
4685
4686     if iobj.disk_template != constants.DT_DISKLESS:
4687       if self.op.mode == constants.INSTANCE_CREATE:
4688         feedback_fn("* running the instance OS create scripts...")
4689         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4690         msg = result.RemoteFailMsg()
4691         if msg:
4692           raise errors.OpExecError("Could not add os for instance %s"
4693                                    " on node %s: %s" %
4694                                    (instance, pnode_name, msg))
4695
4696       elif self.op.mode == constants.INSTANCE_IMPORT:
4697         feedback_fn("* running the instance OS import scripts...")
4698         src_node = self.op.src_node
4699         src_images = self.src_images
4700         cluster_name = self.cfg.GetClusterName()
4701         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4702                                                          src_node, src_images,
4703                                                          cluster_name)
4704         import_result.Raise()
4705         for idx, result in enumerate(import_result.data):
4706           if not result:
4707             self.LogWarning("Could not import the image %s for instance"
4708                             " %s, disk %d, on node %s" %
4709                             (src_images[idx], instance, idx, pnode_name))
4710       else:
4711         # also checked in the prereq part
4712         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4713                                      % self.op.mode)
4714
4715     if self.op.start:
4716       iobj.admin_up = True
4717       self.cfg.Update(iobj)
4718       logging.info("Starting instance %s on node %s", instance, pnode_name)
4719       feedback_fn("* starting instance...")
4720       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4721       msg = result.RemoteFailMsg()
4722       if msg:
4723         raise errors.OpExecError("Could not start instance: %s" % msg)
4724
4725
4726 class LUConnectConsole(NoHooksLU):
4727   """Connect to an instance's console.
4728
4729   This is somewhat special in that it returns the command line that
4730   you need to run on the master node in order to connect to the
4731   console.
4732
4733   """
4734   _OP_REQP = ["instance_name"]
4735   REQ_BGL = False
4736
4737   def ExpandNames(self):
4738     self._ExpandAndLockInstance()
4739
4740   def CheckPrereq(self):
4741     """Check prerequisites.
4742
4743     This checks that the instance is in the cluster.
4744
4745     """
4746     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4747     assert self.instance is not None, \
4748       "Cannot retrieve locked instance %s" % self.op.instance_name
4749     _CheckNodeOnline(self, self.instance.primary_node)
4750
4751   def Exec(self, feedback_fn):
4752     """Connect to the console of an instance
4753
4754     """
4755     instance = self.instance
4756     node = instance.primary_node
4757
4758     node_insts = self.rpc.call_instance_list([node],
4759                                              [instance.hypervisor])[node]
4760     node_insts.Raise()
4761
4762     if instance.name not in node_insts.data:
4763       raise errors.OpExecError("Instance %s is not running." % instance.name)
4764
4765     logging.debug("Connecting to console of %s on %s", instance.name, node)
4766
4767     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4768     cluster = self.cfg.GetClusterInfo()
4769     # beparams and hvparams are passed separately, to avoid editing the
4770     # instance and then saving the defaults in the instance itself.
4771     hvparams = cluster.FillHV(instance)
4772     beparams = cluster.FillBE(instance)
4773     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4774
4775     # build ssh cmdline
4776     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4777
4778
4779 class LUReplaceDisks(LogicalUnit):
4780   """Replace the disks of an instance.
4781
4782   """
4783   HPATH = "mirrors-replace"
4784   HTYPE = constants.HTYPE_INSTANCE
4785   _OP_REQP = ["instance_name", "mode", "disks"]
4786   REQ_BGL = False
4787
4788   def CheckArguments(self):
4789     if not hasattr(self.op, "remote_node"):
4790       self.op.remote_node = None
4791     if not hasattr(self.op, "iallocator"):
4792       self.op.iallocator = None
4793
4794     # check for valid parameter combination
4795     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4796     if self.op.mode == constants.REPLACE_DISK_CHG:
4797       if cnt == 2:
4798         raise errors.OpPrereqError("When changing the secondary either an"
4799                                    " iallocator script must be used or the"
4800                                    " new node given")
4801       elif cnt == 0:
4802         raise errors.OpPrereqError("Give either the iallocator or the new"
4803                                    " secondary, not both")
4804     else: # not replacing the secondary
4805       if cnt != 2:
4806         raise errors.OpPrereqError("The iallocator and new node options can"
4807                                    " be used only when changing the"
4808                                    " secondary node")
4809
4810   def ExpandNames(self):
4811     self._ExpandAndLockInstance()
4812
4813     if self.op.iallocator is not None:
4814       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4815     elif self.op.remote_node is not None:
4816       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4817       if remote_node is None:
4818         raise errors.OpPrereqError("Node '%s' not known" %
4819                                    self.op.remote_node)
4820       self.op.remote_node = remote_node
4821       # Warning: do not remove the locking of the new secondary here
4822       # unless DRBD8.AddChildren is changed to work in parallel;
4823       # currently it doesn't since parallel invocations of
4824       # FindUnusedMinor will conflict
4825       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4826       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4827     else:
4828       self.needed_locks[locking.LEVEL_NODE] = []
4829       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4830
4831   def DeclareLocks(self, level):
4832     # If we're not already locking all nodes in the set we have to declare the
4833     # instance's primary/secondary nodes.
4834     if (level == locking.LEVEL_NODE and
4835         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4836       self._LockInstancesNodes()
4837
4838   def _RunAllocator(self):
4839     """Compute a new secondary node using an IAllocator.
4840
4841     """
4842     ial = IAllocator(self,
4843                      mode=constants.IALLOCATOR_MODE_RELOC,
4844                      name=self.op.instance_name,
4845                      relocate_from=[self.sec_node])
4846
4847     ial.Run(self.op.iallocator)
4848
4849     if not ial.success:
4850       raise errors.OpPrereqError("Can't compute nodes using"
4851                                  " iallocator '%s': %s" % (self.op.iallocator,
4852                                                            ial.info))
4853     if len(ial.nodes) != ial.required_nodes:
4854       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4855                                  " of nodes (%s), required %s" %
4856                                  (len(ial.nodes), ial.required_nodes))
4857     self.op.remote_node = ial.nodes[0]
4858     self.LogInfo("Selected new secondary for the instance: %s",
4859                  self.op.remote_node)
4860
4861   def BuildHooksEnv(self):
4862     """Build hooks env.
4863
4864     This runs on the master, the primary and all the secondaries.
4865
4866     """
4867     env = {
4868       "MODE": self.op.mode,
4869       "NEW_SECONDARY": self.op.remote_node,
4870       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4871       }
4872     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4873     nl = [
4874       self.cfg.GetMasterNode(),
4875       self.instance.primary_node,
4876       ]
4877     if self.op.remote_node is not None:
4878       nl.append(self.op.remote_node)
4879     return env, nl, nl
4880
4881   def CheckPrereq(self):
4882     """Check prerequisites.
4883
4884     This checks that the instance is in the cluster.
4885
4886     """
4887     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4888     assert instance is not None, \
4889       "Cannot retrieve locked instance %s" % self.op.instance_name
4890     self.instance = instance
4891
4892     if instance.disk_template != constants.DT_DRBD8:
4893       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4894                                  " instances")
4895
4896     if len(instance.secondary_nodes) != 1:
4897       raise errors.OpPrereqError("The instance has a strange layout,"
4898                                  " expected one secondary but found %d" %
4899                                  len(instance.secondary_nodes))
4900
4901     self.sec_node = instance.secondary_nodes[0]
4902
4903     if self.op.iallocator is not None:
4904       self._RunAllocator()
4905
4906     remote_node = self.op.remote_node
4907     if remote_node is not None:
4908       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4909       assert self.remote_node_info is not None, \
4910         "Cannot retrieve locked node %s" % remote_node
4911     else:
4912       self.remote_node_info = None
4913     if remote_node == instance.primary_node:
4914       raise errors.OpPrereqError("The specified node is the primary node of"
4915                                  " the instance.")
4916     elif remote_node == self.sec_node:
4917       raise errors.OpPrereqError("The specified node is already the"
4918                                  " secondary node of the instance.")
4919
4920     if self.op.mode == constants.REPLACE_DISK_PRI:
4921       n1 = self.tgt_node = instance.primary_node
4922       n2 = self.oth_node = self.sec_node
4923     elif self.op.mode == constants.REPLACE_DISK_SEC:
4924       n1 = self.tgt_node = self.sec_node
4925       n2 = self.oth_node = instance.primary_node
4926     elif self.op.mode == constants.REPLACE_DISK_CHG:
4927       n1 = self.new_node = remote_node
4928       n2 = self.oth_node = instance.primary_node
4929       self.tgt_node = self.sec_node
4930       _CheckNodeNotDrained(self, remote_node)
4931     else:
4932       raise errors.ProgrammerError("Unhandled disk replace mode")
4933
4934     _CheckNodeOnline(self, n1)
4935     _CheckNodeOnline(self, n2)
4936
4937     if not self.op.disks:
4938       self.op.disks = range(len(instance.disks))
4939
4940     for disk_idx in self.op.disks:
4941       instance.FindDisk(disk_idx)
4942
4943   def _ExecD8DiskOnly(self, feedback_fn):
4944     """Replace a disk on the primary or secondary for dbrd8.
4945
4946     The algorithm for replace is quite complicated:
4947
4948       1. for each disk to be replaced:
4949
4950         1. create new LVs on the target node with unique names
4951         1. detach old LVs from the drbd device
4952         1. rename old LVs to name_replaced.<time_t>
4953         1. rename new LVs to old LVs
4954         1. attach the new LVs (with the old names now) to the drbd device
4955
4956       1. wait for sync across all devices
4957
4958       1. for each modified disk:
4959
4960         1. remove old LVs (which have the name name_replaces.<time_t>)
4961
4962     Failures are not very well handled.
4963
4964     """
4965     steps_total = 6
4966     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4967     instance = self.instance
4968     iv_names = {}
4969     vgname = self.cfg.GetVGName()
4970     # start of work
4971     cfg = self.cfg
4972     tgt_node = self.tgt_node
4973     oth_node = self.oth_node
4974
4975     # Step: check device activation
4976     self.proc.LogStep(1, steps_total, "check device existence")
4977     info("checking volume groups")
4978     my_vg = cfg.GetVGName()
4979     results = self.rpc.call_vg_list([oth_node, tgt_node])
4980     if not results:
4981       raise errors.OpExecError("Can't list volume groups on the nodes")
4982     for node in oth_node, tgt_node:
4983       res = results[node]
4984       if res.failed or not res.data or my_vg not in res.data:
4985         raise errors.OpExecError("Volume group '%s' not found on %s" %
4986                                  (my_vg, node))
4987     for idx, dev in enumerate(instance.disks):
4988       if idx not in self.op.disks:
4989         continue
4990       for node in tgt_node, oth_node:
4991         info("checking disk/%d on %s" % (idx, node))
4992         cfg.SetDiskID(dev, node)
4993         result = self.rpc.call_blockdev_find(node, dev)
4994         msg = result.RemoteFailMsg()
4995         if not msg and not result.payload:
4996           msg = "disk not found"
4997         if msg:
4998           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
4999                                    (idx, node, msg))
5000
5001     # Step: check other node consistency
5002     self.proc.LogStep(2, steps_total, "check peer consistency")
5003     for idx, dev in enumerate(instance.disks):
5004       if idx not in self.op.disks:
5005         continue
5006       info("checking disk/%d consistency on %s" % (idx, oth_node))
5007       if not _CheckDiskConsistency(self, dev, oth_node,
5008                                    oth_node==instance.primary_node):
5009         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5010                                  " to replace disks on this node (%s)" %
5011                                  (oth_node, tgt_node))
5012
5013     # Step: create new storage
5014     self.proc.LogStep(3, steps_total, "allocate new storage")
5015     for idx, dev in enumerate(instance.disks):
5016       if idx not in self.op.disks:
5017         continue
5018       size = dev.size
5019       cfg.SetDiskID(dev, tgt_node)
5020       lv_names = [".disk%d_%s" % (idx, suf)
5021                   for suf in ["data", "meta"]]
5022       names = _GenerateUniqueNames(self, lv_names)
5023       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5024                              logical_id=(vgname, names[0]))
5025       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5026                              logical_id=(vgname, names[1]))
5027       new_lvs = [lv_data, lv_meta]
5028       old_lvs = dev.children
5029       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5030       info("creating new local storage on %s for %s" %
5031            (tgt_node, dev.iv_name))
5032       # we pass force_create=True to force the LVM creation
5033       for new_lv in new_lvs:
5034         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5035                         _GetInstanceInfoText(instance), False)
5036
5037     # Step: for each lv, detach+rename*2+attach
5038     self.proc.LogStep(4, steps_total, "change drbd configuration")
5039     for dev, old_lvs, new_lvs in iv_names.itervalues():
5040       info("detaching %s drbd from local storage" % dev.iv_name)
5041       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5042       result.Raise()
5043       if not result.data:
5044         raise errors.OpExecError("Can't detach drbd from local storage on node"
5045                                  " %s for device %s" % (tgt_node, dev.iv_name))
5046       #dev.children = []
5047       #cfg.Update(instance)
5048
5049       # ok, we created the new LVs, so now we know we have the needed
5050       # storage; as such, we proceed on the target node to rename
5051       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5052       # using the assumption that logical_id == physical_id (which in
5053       # turn is the unique_id on that node)
5054
5055       # FIXME(iustin): use a better name for the replaced LVs
5056       temp_suffix = int(time.time())
5057       ren_fn = lambda d, suff: (d.physical_id[0],
5058                                 d.physical_id[1] + "_replaced-%s" % suff)
5059       # build the rename list based on what LVs exist on the node
5060       rlist = []
5061       for to_ren in old_lvs:
5062         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5063         if not result.RemoteFailMsg() and result.payload:
5064           # device exists
5065           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5066
5067       info("renaming the old LVs on the target node")
5068       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5069       result.Raise()
5070       if not result.data:
5071         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5072       # now we rename the new LVs to the old LVs
5073       info("renaming the new LVs on the target node")
5074       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5075       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5076       result.Raise()
5077       if not result.data:
5078         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5079
5080       for old, new in zip(old_lvs, new_lvs):
5081         new.logical_id = old.logical_id
5082         cfg.SetDiskID(new, tgt_node)
5083
5084       for disk in old_lvs:
5085         disk.logical_id = ren_fn(disk, temp_suffix)
5086         cfg.SetDiskID(disk, tgt_node)
5087
5088       # now that the new lvs have the old name, we can add them to the device
5089       info("adding new mirror component on %s" % tgt_node)
5090       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5091       if result.failed or not result.data:
5092         for new_lv in new_lvs:
5093           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5094           if msg:
5095             warning("Can't rollback device %s: %s", dev, msg,
5096                     hint="cleanup manually the unused logical volumes")
5097         raise errors.OpExecError("Can't add local storage to drbd")
5098
5099       dev.children = new_lvs
5100       cfg.Update(instance)
5101
5102     # Step: wait for sync
5103
5104     # this can fail as the old devices are degraded and _WaitForSync
5105     # does a combined result over all disks, so we don't check its
5106     # return value
5107     self.proc.LogStep(5, steps_total, "sync devices")
5108     _WaitForSync(self, instance, unlock=True)
5109
5110     # so check manually all the devices
5111     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5112       cfg.SetDiskID(dev, instance.primary_node)
5113       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5114       msg = result.RemoteFailMsg()
5115       if not msg and not result.payload:
5116         msg = "disk not found"
5117       if msg:
5118         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5119                                  (name, msg))
5120       if result.payload[5]:
5121         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5122
5123     # Step: remove old storage
5124     self.proc.LogStep(6, steps_total, "removing old storage")
5125     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5126       info("remove logical volumes for %s" % name)
5127       for lv in old_lvs:
5128         cfg.SetDiskID(lv, tgt_node)
5129         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5130         if msg:
5131           warning("Can't remove old LV: %s" % msg,
5132                   hint="manually remove unused LVs")
5133           continue
5134
5135   def _ExecD8Secondary(self, feedback_fn):
5136     """Replace the secondary node for drbd8.
5137
5138     The algorithm for replace is quite complicated:
5139       - for all disks of the instance:
5140         - create new LVs on the new node with same names
5141         - shutdown the drbd device on the old secondary
5142         - disconnect the drbd network on the primary
5143         - create the drbd device on the new secondary
5144         - network attach the drbd on the primary, using an artifice:
5145           the drbd code for Attach() will connect to the network if it
5146           finds a device which is connected to the good local disks but
5147           not network enabled
5148       - wait for sync across all devices
5149       - remove all disks from the old secondary
5150
5151     Failures are not very well handled.
5152
5153     """
5154     steps_total = 6
5155     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5156     instance = self.instance
5157     iv_names = {}
5158     # start of work
5159     cfg = self.cfg
5160     old_node = self.tgt_node
5161     new_node = self.new_node
5162     pri_node = instance.primary_node
5163     nodes_ip = {
5164       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5165       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5166       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5167       }
5168
5169     # Step: check device activation
5170     self.proc.LogStep(1, steps_total, "check device existence")
5171     info("checking volume groups")
5172     my_vg = cfg.GetVGName()
5173     results = self.rpc.call_vg_list([pri_node, new_node])
5174     for node in pri_node, new_node:
5175       res = results[node]
5176       if res.failed or not res.data or my_vg not in res.data:
5177         raise errors.OpExecError("Volume group '%s' not found on %s" %
5178                                  (my_vg, node))
5179     for idx, dev in enumerate(instance.disks):
5180       if idx not in self.op.disks:
5181         continue
5182       info("checking disk/%d on %s" % (idx, pri_node))
5183       cfg.SetDiskID(dev, pri_node)
5184       result = self.rpc.call_blockdev_find(pri_node, dev)
5185       msg = result.RemoteFailMsg()
5186       if not msg and not result.payload:
5187         msg = "disk not found"
5188       if msg:
5189         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5190                                  (idx, pri_node, msg))
5191
5192     # Step: check other node consistency
5193     self.proc.LogStep(2, steps_total, "check peer consistency")
5194     for idx, dev in enumerate(instance.disks):
5195       if idx not in self.op.disks:
5196         continue
5197       info("checking disk/%d consistency on %s" % (idx, pri_node))
5198       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5199         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5200                                  " unsafe to replace the secondary" %
5201                                  pri_node)
5202
5203     # Step: create new storage
5204     self.proc.LogStep(3, steps_total, "allocate new storage")
5205     for idx, dev in enumerate(instance.disks):
5206       info("adding new local storage on %s for disk/%d" %
5207            (new_node, idx))
5208       # we pass force_create=True to force LVM creation
5209       for new_lv in dev.children:
5210         _CreateBlockDev(self, new_node, instance, new_lv, True,
5211                         _GetInstanceInfoText(instance), False)
5212
5213     # Step 4: dbrd minors and drbd setups changes
5214     # after this, we must manually remove the drbd minors on both the
5215     # error and the success paths
5216     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5217                                    instance.name)
5218     logging.debug("Allocated minors %s" % (minors,))
5219     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5220     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5221       size = dev.size
5222       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5223       # create new devices on new_node; note that we create two IDs:
5224       # one without port, so the drbd will be activated without
5225       # networking information on the new node at this stage, and one
5226       # with network, for the latter activation in step 4
5227       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5228       if pri_node == o_node1:
5229         p_minor = o_minor1
5230       else:
5231         p_minor = o_minor2
5232
5233       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5234       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5235
5236       iv_names[idx] = (dev, dev.children, new_net_id)
5237       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5238                     new_net_id)
5239       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5240                               logical_id=new_alone_id,
5241                               children=dev.children)
5242       try:
5243         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5244                               _GetInstanceInfoText(instance), False)
5245       except errors.BlockDeviceError:
5246         self.cfg.ReleaseDRBDMinors(instance.name)
5247         raise
5248
5249     for idx, dev in enumerate(instance.disks):
5250       # we have new devices, shutdown the drbd on the old secondary
5251       info("shutting down drbd for disk/%d on old node" % idx)
5252       cfg.SetDiskID(dev, old_node)
5253       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5254       if msg:
5255         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5256                 (idx, msg),
5257                 hint="Please cleanup this device manually as soon as possible")
5258
5259     info("detaching primary drbds from the network (=> standalone)")
5260     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5261                                                instance.disks)[pri_node]
5262
5263     msg = result.RemoteFailMsg()
5264     if msg:
5265       # detaches didn't succeed (unlikely)
5266       self.cfg.ReleaseDRBDMinors(instance.name)
5267       raise errors.OpExecError("Can't detach the disks from the network on"
5268                                " old node: %s" % (msg,))
5269
5270     # if we managed to detach at least one, we update all the disks of
5271     # the instance to point to the new secondary
5272     info("updating instance configuration")
5273     for dev, _, new_logical_id in iv_names.itervalues():
5274       dev.logical_id = new_logical_id
5275       cfg.SetDiskID(dev, pri_node)
5276     cfg.Update(instance)
5277
5278     # and now perform the drbd attach
5279     info("attaching primary drbds to new secondary (standalone => connected)")
5280     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5281                                            instance.disks, instance.name,
5282                                            False)
5283     for to_node, to_result in result.items():
5284       msg = to_result.RemoteFailMsg()
5285       if msg:
5286         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5287                 hint="please do a gnt-instance info to see the"
5288                 " status of disks")
5289
5290     # this can fail as the old devices are degraded and _WaitForSync
5291     # does a combined result over all disks, so we don't check its
5292     # return value
5293     self.proc.LogStep(5, steps_total, "sync devices")
5294     _WaitForSync(self, instance, unlock=True)
5295
5296     # so check manually all the devices
5297     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5298       cfg.SetDiskID(dev, pri_node)
5299       result = self.rpc.call_blockdev_find(pri_node, dev)
5300       msg = result.RemoteFailMsg()
5301       if not msg and not result.payload:
5302         msg = "disk not found"
5303       if msg:
5304         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5305                                  (idx, msg))
5306       if result.payload[5]:
5307         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5308
5309     self.proc.LogStep(6, steps_total, "removing old storage")
5310     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5311       info("remove logical volumes for disk/%d" % idx)
5312       for lv in old_lvs:
5313         cfg.SetDiskID(lv, old_node)
5314         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5315         if msg:
5316           warning("Can't remove LV on old secondary: %s", msg,
5317                   hint="Cleanup stale volumes by hand")
5318
5319   def Exec(self, feedback_fn):
5320     """Execute disk replacement.
5321
5322     This dispatches the disk replacement to the appropriate handler.
5323
5324     """
5325     instance = self.instance
5326
5327     # Activate the instance disks if we're replacing them on a down instance
5328     if not instance.admin_up:
5329       _StartInstanceDisks(self, instance, True)
5330
5331     if self.op.mode == constants.REPLACE_DISK_CHG:
5332       fn = self._ExecD8Secondary
5333     else:
5334       fn = self._ExecD8DiskOnly
5335
5336     ret = fn(feedback_fn)
5337
5338     # Deactivate the instance disks if we're replacing them on a down instance
5339     if not instance.admin_up:
5340       _SafeShutdownInstanceDisks(self, instance)
5341
5342     return ret
5343
5344
5345 class LUGrowDisk(LogicalUnit):
5346   """Grow a disk of an instance.
5347
5348   """
5349   HPATH = "disk-grow"
5350   HTYPE = constants.HTYPE_INSTANCE
5351   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5352   REQ_BGL = False
5353
5354   def ExpandNames(self):
5355     self._ExpandAndLockInstance()
5356     self.needed_locks[locking.LEVEL_NODE] = []
5357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5358
5359   def DeclareLocks(self, level):
5360     if level == locking.LEVEL_NODE:
5361       self._LockInstancesNodes()
5362
5363   def BuildHooksEnv(self):
5364     """Build hooks env.
5365
5366     This runs on the master, the primary and all the secondaries.
5367
5368     """
5369     env = {
5370       "DISK": self.op.disk,
5371       "AMOUNT": self.op.amount,
5372       }
5373     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5374     nl = [
5375       self.cfg.GetMasterNode(),
5376       self.instance.primary_node,
5377       ]
5378     return env, nl, nl
5379
5380   def CheckPrereq(self):
5381     """Check prerequisites.
5382
5383     This checks that the instance is in the cluster.
5384
5385     """
5386     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5387     assert instance is not None, \
5388       "Cannot retrieve locked instance %s" % self.op.instance_name
5389     nodenames = list(instance.all_nodes)
5390     for node in nodenames:
5391       _CheckNodeOnline(self, node)
5392
5393
5394     self.instance = instance
5395
5396     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5397       raise errors.OpPrereqError("Instance's disk layout does not support"
5398                                  " growing.")
5399
5400     self.disk = instance.FindDisk(self.op.disk)
5401
5402     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5403                                        instance.hypervisor)
5404     for node in nodenames:
5405       info = nodeinfo[node]
5406       if info.failed or not info.data:
5407         raise errors.OpPrereqError("Cannot get current information"
5408                                    " from node '%s'" % node)
5409       vg_free = info.data.get('vg_free', None)
5410       if not isinstance(vg_free, int):
5411         raise errors.OpPrereqError("Can't compute free disk space on"
5412                                    " node %s" % node)
5413       if self.op.amount > vg_free:
5414         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5415                                    " %d MiB available, %d MiB required" %
5416                                    (node, vg_free, self.op.amount))
5417
5418   def Exec(self, feedback_fn):
5419     """Execute disk grow.
5420
5421     """
5422     instance = self.instance
5423     disk = self.disk
5424     for node in instance.all_nodes:
5425       self.cfg.SetDiskID(disk, node)
5426       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5427       msg = result.RemoteFailMsg()
5428       if msg:
5429         raise errors.OpExecError("Grow request failed to node %s: %s" %
5430                                  (node, msg))
5431     disk.RecordGrow(self.op.amount)
5432     self.cfg.Update(instance)
5433     if self.op.wait_for_sync:
5434       disk_abort = not _WaitForSync(self, instance)
5435       if disk_abort:
5436         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5437                              " status.\nPlease check the instance.")
5438
5439
5440 class LUQueryInstanceData(NoHooksLU):
5441   """Query runtime instance data.
5442
5443   """
5444   _OP_REQP = ["instances", "static"]
5445   REQ_BGL = False
5446
5447   def ExpandNames(self):
5448     self.needed_locks = {}
5449     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5450
5451     if not isinstance(self.op.instances, list):
5452       raise errors.OpPrereqError("Invalid argument type 'instances'")
5453
5454     if self.op.instances:
5455       self.wanted_names = []
5456       for name in self.op.instances:
5457         full_name = self.cfg.ExpandInstanceName(name)
5458         if full_name is None:
5459           raise errors.OpPrereqError("Instance '%s' not known" % name)
5460         self.wanted_names.append(full_name)
5461       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5462     else:
5463       self.wanted_names = None
5464       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5465
5466     self.needed_locks[locking.LEVEL_NODE] = []
5467     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5468
5469   def DeclareLocks(self, level):
5470     if level == locking.LEVEL_NODE:
5471       self._LockInstancesNodes()
5472
5473   def CheckPrereq(self):
5474     """Check prerequisites.
5475
5476     This only checks the optional instance list against the existing names.
5477
5478     """
5479     if self.wanted_names is None:
5480       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5481
5482     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5483                              in self.wanted_names]
5484     return
5485
5486   def _ComputeDiskStatus(self, instance, snode, dev):
5487     """Compute block device status.
5488
5489     """
5490     static = self.op.static
5491     if not static:
5492       self.cfg.SetDiskID(dev, instance.primary_node)
5493       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5494       msg = dev_pstatus.RemoteFailMsg()
5495       if msg:
5496         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5497                                  (instance.name, msg))
5498       dev_pstatus = dev_pstatus.payload
5499     else:
5500       dev_pstatus = None
5501
5502     if dev.dev_type in constants.LDS_DRBD:
5503       # we change the snode then (otherwise we use the one passed in)
5504       if dev.logical_id[0] == instance.primary_node:
5505         snode = dev.logical_id[1]
5506       else:
5507         snode = dev.logical_id[0]
5508
5509     if snode and not static:
5510       self.cfg.SetDiskID(dev, snode)
5511       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5512       msg = dev_sstatus.RemoteFailMsg()
5513       if msg:
5514         raise errors.OpExecError("Can't compute disk status for %s: %s" %
5515                                  (instance.name, msg))
5516       dev_sstatus = dev_sstatus.payload
5517     else:
5518       dev_sstatus = None
5519
5520     if dev.children:
5521       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5522                       for child in dev.children]
5523     else:
5524       dev_children = []
5525
5526     data = {
5527       "iv_name": dev.iv_name,
5528       "dev_type": dev.dev_type,
5529       "logical_id": dev.logical_id,
5530       "physical_id": dev.physical_id,
5531       "pstatus": dev_pstatus,
5532       "sstatus": dev_sstatus,
5533       "children": dev_children,
5534       "mode": dev.mode,
5535       }
5536
5537     return data
5538
5539   def Exec(self, feedback_fn):
5540     """Gather and return data"""
5541     result = {}
5542
5543     cluster = self.cfg.GetClusterInfo()
5544
5545     for instance in self.wanted_instances:
5546       if not self.op.static:
5547         remote_info = self.rpc.call_instance_info(instance.primary_node,
5548                                                   instance.name,
5549                                                   instance.hypervisor)
5550         remote_info.Raise()
5551         remote_info = remote_info.data
5552         if remote_info and "state" in remote_info:
5553           remote_state = "up"
5554         else:
5555           remote_state = "down"
5556       else:
5557         remote_state = None
5558       if instance.admin_up:
5559         config_state = "up"
5560       else:
5561         config_state = "down"
5562
5563       disks = [self._ComputeDiskStatus(instance, None, device)
5564                for device in instance.disks]
5565
5566       idict = {
5567         "name": instance.name,
5568         "config_state": config_state,
5569         "run_state": remote_state,
5570         "pnode": instance.primary_node,
5571         "snodes": instance.secondary_nodes,
5572         "os": instance.os,
5573         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5574         "disks": disks,
5575         "hypervisor": instance.hypervisor,
5576         "network_port": instance.network_port,
5577         "hv_instance": instance.hvparams,
5578         "hv_actual": cluster.FillHV(instance),
5579         "be_instance": instance.beparams,
5580         "be_actual": cluster.FillBE(instance),
5581         }
5582
5583       result[instance.name] = idict
5584
5585     return result
5586
5587
5588 class LUSetInstanceParams(LogicalUnit):
5589   """Modifies an instances's parameters.
5590
5591   """
5592   HPATH = "instance-modify"
5593   HTYPE = constants.HTYPE_INSTANCE
5594   _OP_REQP = ["instance_name"]
5595   REQ_BGL = False
5596
5597   def CheckArguments(self):
5598     if not hasattr(self.op, 'nics'):
5599       self.op.nics = []
5600     if not hasattr(self.op, 'disks'):
5601       self.op.disks = []
5602     if not hasattr(self.op, 'beparams'):
5603       self.op.beparams = {}
5604     if not hasattr(self.op, 'hvparams'):
5605       self.op.hvparams = {}
5606     self.op.force = getattr(self.op, "force", False)
5607     if not (self.op.nics or self.op.disks or
5608             self.op.hvparams or self.op.beparams):
5609       raise errors.OpPrereqError("No changes submitted")
5610
5611     # Disk validation
5612     disk_addremove = 0
5613     for disk_op, disk_dict in self.op.disks:
5614       if disk_op == constants.DDM_REMOVE:
5615         disk_addremove += 1
5616         continue
5617       elif disk_op == constants.DDM_ADD:
5618         disk_addremove += 1
5619       else:
5620         if not isinstance(disk_op, int):
5621           raise errors.OpPrereqError("Invalid disk index")
5622       if disk_op == constants.DDM_ADD:
5623         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5624         if mode not in constants.DISK_ACCESS_SET:
5625           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5626         size = disk_dict.get('size', None)
5627         if size is None:
5628           raise errors.OpPrereqError("Required disk parameter size missing")
5629         try:
5630           size = int(size)
5631         except ValueError, err:
5632           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5633                                      str(err))
5634         disk_dict['size'] = size
5635       else:
5636         # modification of disk
5637         if 'size' in disk_dict:
5638           raise errors.OpPrereqError("Disk size change not possible, use"
5639                                      " grow-disk")
5640
5641     if disk_addremove > 1:
5642       raise errors.OpPrereqError("Only one disk add or remove operation"
5643                                  " supported at a time")
5644
5645     # NIC validation
5646     nic_addremove = 0
5647     for nic_op, nic_dict in self.op.nics:
5648       if nic_op == constants.DDM_REMOVE:
5649         nic_addremove += 1
5650         continue
5651       elif nic_op == constants.DDM_ADD:
5652         nic_addremove += 1
5653       else:
5654         if not isinstance(nic_op, int):
5655           raise errors.OpPrereqError("Invalid nic index")
5656
5657       # nic_dict should be a dict
5658       nic_ip = nic_dict.get('ip', None)
5659       if nic_ip is not None:
5660         if nic_ip.lower() == "none":
5661           nic_dict['ip'] = None
5662         else:
5663           if not utils.IsValidIP(nic_ip):
5664             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5665       # we can only check None bridges and assign the default one
5666       nic_bridge = nic_dict.get('bridge', None)
5667       if nic_bridge is None:
5668         nic_dict['bridge'] = self.cfg.GetDefBridge()
5669       # but we can validate MACs
5670       nic_mac = nic_dict.get('mac', None)
5671       if nic_mac is not None:
5672         if self.cfg.IsMacInUse(nic_mac):
5673           raise errors.OpPrereqError("MAC address %s already in use"
5674                                      " in cluster" % nic_mac)
5675         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5676           if not utils.IsValidMac(nic_mac):
5677             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5678     if nic_addremove > 1:
5679       raise errors.OpPrereqError("Only one NIC add or remove operation"
5680                                  " supported at a time")
5681
5682   def ExpandNames(self):
5683     self._ExpandAndLockInstance()
5684     self.needed_locks[locking.LEVEL_NODE] = []
5685     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5686
5687   def DeclareLocks(self, level):
5688     if level == locking.LEVEL_NODE:
5689       self._LockInstancesNodes()
5690
5691   def BuildHooksEnv(self):
5692     """Build hooks env.
5693
5694     This runs on the master, primary and secondaries.
5695
5696     """
5697     args = dict()
5698     if constants.BE_MEMORY in self.be_new:
5699       args['memory'] = self.be_new[constants.BE_MEMORY]
5700     if constants.BE_VCPUS in self.be_new:
5701       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5702     # FIXME: readd disk/nic changes
5703     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5704     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5705     return env, nl, nl
5706
5707   def CheckPrereq(self):
5708     """Check prerequisites.
5709
5710     This only checks the instance list against the existing names.
5711
5712     """
5713     force = self.force = self.op.force
5714
5715     # checking the new params on the primary/secondary nodes
5716
5717     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5718     assert self.instance is not None, \
5719       "Cannot retrieve locked instance %s" % self.op.instance_name
5720     pnode = instance.primary_node
5721     nodelist = list(instance.all_nodes)
5722
5723     # hvparams processing
5724     if self.op.hvparams:
5725       i_hvdict = copy.deepcopy(instance.hvparams)
5726       for key, val in self.op.hvparams.iteritems():
5727         if val == constants.VALUE_DEFAULT:
5728           try:
5729             del i_hvdict[key]
5730           except KeyError:
5731             pass
5732         else:
5733           i_hvdict[key] = val
5734       cluster = self.cfg.GetClusterInfo()
5735       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5736       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5737                                 i_hvdict)
5738       # local check
5739       hypervisor.GetHypervisor(
5740         instance.hypervisor).CheckParameterSyntax(hv_new)
5741       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5742       self.hv_new = hv_new # the new actual values
5743       self.hv_inst = i_hvdict # the new dict (without defaults)
5744     else:
5745       self.hv_new = self.hv_inst = {}
5746
5747     # beparams processing
5748     if self.op.beparams:
5749       i_bedict = copy.deepcopy(instance.beparams)
5750       for key, val in self.op.beparams.iteritems():
5751         if val == constants.VALUE_DEFAULT:
5752           try:
5753             del i_bedict[key]
5754           except KeyError:
5755             pass
5756         else:
5757           i_bedict[key] = val
5758       cluster = self.cfg.GetClusterInfo()
5759       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5760       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5761                                 i_bedict)
5762       self.be_new = be_new # the new actual values
5763       self.be_inst = i_bedict # the new dict (without defaults)
5764     else:
5765       self.be_new = self.be_inst = {}
5766
5767     self.warn = []
5768
5769     if constants.BE_MEMORY in self.op.beparams and not self.force:
5770       mem_check_list = [pnode]
5771       if be_new[constants.BE_AUTO_BALANCE]:
5772         # either we changed auto_balance to yes or it was from before
5773         mem_check_list.extend(instance.secondary_nodes)
5774       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5775                                                   instance.hypervisor)
5776       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5777                                          instance.hypervisor)
5778       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5779         # Assume the primary node is unreachable and go ahead
5780         self.warn.append("Can't get info from primary node %s" % pnode)
5781       else:
5782         if not instance_info.failed and instance_info.data:
5783           current_mem = instance_info.data['memory']
5784         else:
5785           # Assume instance not running
5786           # (there is a slight race condition here, but it's not very probable,
5787           # and we have no other way to check)
5788           current_mem = 0
5789         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5790                     nodeinfo[pnode].data['memory_free'])
5791         if miss_mem > 0:
5792           raise errors.OpPrereqError("This change will prevent the instance"
5793                                      " from starting, due to %d MB of memory"
5794                                      " missing on its primary node" % miss_mem)
5795
5796       if be_new[constants.BE_AUTO_BALANCE]:
5797         for node, nres in nodeinfo.iteritems():
5798           if node not in instance.secondary_nodes:
5799             continue
5800           if nres.failed or not isinstance(nres.data, dict):
5801             self.warn.append("Can't get info from secondary node %s" % node)
5802           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5803             self.warn.append("Not enough memory to failover instance to"
5804                              " secondary node %s" % node)
5805
5806     # NIC processing
5807     for nic_op, nic_dict in self.op.nics:
5808       if nic_op == constants.DDM_REMOVE:
5809         if not instance.nics:
5810           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5811         continue
5812       if nic_op != constants.DDM_ADD:
5813         # an existing nic
5814         if nic_op < 0 or nic_op >= len(instance.nics):
5815           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5816                                      " are 0 to %d" %
5817                                      (nic_op, len(instance.nics)))
5818       nic_bridge = nic_dict.get('bridge', None)
5819       if nic_bridge is not None:
5820         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5821           msg = ("Bridge '%s' doesn't exist on one of"
5822                  " the instance nodes" % nic_bridge)
5823           if self.force:
5824             self.warn.append(msg)
5825           else:
5826             raise errors.OpPrereqError(msg)
5827
5828     # DISK processing
5829     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5830       raise errors.OpPrereqError("Disk operations not supported for"
5831                                  " diskless instances")
5832     for disk_op, disk_dict in self.op.disks:
5833       if disk_op == constants.DDM_REMOVE:
5834         if len(instance.disks) == 1:
5835           raise errors.OpPrereqError("Cannot remove the last disk of"
5836                                      " an instance")
5837         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5838         ins_l = ins_l[pnode]
5839         if ins_l.failed or not isinstance(ins_l.data, list):
5840           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5841         if instance.name in ins_l.data:
5842           raise errors.OpPrereqError("Instance is running, can't remove"
5843                                      " disks.")
5844
5845       if (disk_op == constants.DDM_ADD and
5846           len(instance.nics) >= constants.MAX_DISKS):
5847         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5848                                    " add more" % constants.MAX_DISKS)
5849       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5850         # an existing disk
5851         if disk_op < 0 or disk_op >= len(instance.disks):
5852           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5853                                      " are 0 to %d" %
5854                                      (disk_op, len(instance.disks)))
5855
5856     return
5857
5858   def Exec(self, feedback_fn):
5859     """Modifies an instance.
5860
5861     All parameters take effect only at the next restart of the instance.
5862
5863     """
5864     # Process here the warnings from CheckPrereq, as we don't have a
5865     # feedback_fn there.
5866     for warn in self.warn:
5867       feedback_fn("WARNING: %s" % warn)
5868
5869     result = []
5870     instance = self.instance
5871     # disk changes
5872     for disk_op, disk_dict in self.op.disks:
5873       if disk_op == constants.DDM_REMOVE:
5874         # remove the last disk
5875         device = instance.disks.pop()
5876         device_idx = len(instance.disks)
5877         for node, disk in device.ComputeNodeTree(instance.primary_node):
5878           self.cfg.SetDiskID(disk, node)
5879           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5880           if msg:
5881             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5882                             " continuing anyway", device_idx, node, msg)
5883         result.append(("disk/%d" % device_idx, "remove"))
5884       elif disk_op == constants.DDM_ADD:
5885         # add a new disk
5886         if instance.disk_template == constants.DT_FILE:
5887           file_driver, file_path = instance.disks[0].logical_id
5888           file_path = os.path.dirname(file_path)
5889         else:
5890           file_driver = file_path = None
5891         disk_idx_base = len(instance.disks)
5892         new_disk = _GenerateDiskTemplate(self,
5893                                          instance.disk_template,
5894                                          instance.name, instance.primary_node,
5895                                          instance.secondary_nodes,
5896                                          [disk_dict],
5897                                          file_path,
5898                                          file_driver,
5899                                          disk_idx_base)[0]
5900         instance.disks.append(new_disk)
5901         info = _GetInstanceInfoText(instance)
5902
5903         logging.info("Creating volume %s for instance %s",
5904                      new_disk.iv_name, instance.name)
5905         # Note: this needs to be kept in sync with _CreateDisks
5906         #HARDCODE
5907         for node in instance.all_nodes:
5908           f_create = node == instance.primary_node
5909           try:
5910             _CreateBlockDev(self, node, instance, new_disk,
5911                             f_create, info, f_create)
5912           except errors.OpExecError, err:
5913             self.LogWarning("Failed to create volume %s (%s) on"
5914                             " node %s: %s",
5915                             new_disk.iv_name, new_disk, node, err)
5916         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5917                        (new_disk.size, new_disk.mode)))
5918       else:
5919         # change a given disk
5920         instance.disks[disk_op].mode = disk_dict['mode']
5921         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5922     # NIC changes
5923     for nic_op, nic_dict in self.op.nics:
5924       if nic_op == constants.DDM_REMOVE:
5925         # remove the last nic
5926         del instance.nics[-1]
5927         result.append(("nic.%d" % len(instance.nics), "remove"))
5928       elif nic_op == constants.DDM_ADD:
5929         # add a new nic
5930         if 'mac' not in nic_dict:
5931           mac = constants.VALUE_GENERATE
5932         else:
5933           mac = nic_dict['mac']
5934         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5935           mac = self.cfg.GenerateMAC()
5936         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5937                               bridge=nic_dict.get('bridge', None))
5938         instance.nics.append(new_nic)
5939         result.append(("nic.%d" % (len(instance.nics) - 1),
5940                        "add:mac=%s,ip=%s,bridge=%s" %
5941                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5942       else:
5943         # change a given nic
5944         for key in 'mac', 'ip', 'bridge':
5945           if key in nic_dict:
5946             setattr(instance.nics[nic_op], key, nic_dict[key])
5947             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5948
5949     # hvparams changes
5950     if self.op.hvparams:
5951       instance.hvparams = self.hv_inst
5952       for key, val in self.op.hvparams.iteritems():
5953         result.append(("hv/%s" % key, val))
5954
5955     # beparams changes
5956     if self.op.beparams:
5957       instance.beparams = self.be_inst
5958       for key, val in self.op.beparams.iteritems():
5959         result.append(("be/%s" % key, val))
5960
5961     self.cfg.Update(instance)
5962
5963     return result
5964
5965
5966 class LUQueryExports(NoHooksLU):
5967   """Query the exports list
5968
5969   """
5970   _OP_REQP = ['nodes']
5971   REQ_BGL = False
5972
5973   def ExpandNames(self):
5974     self.needed_locks = {}
5975     self.share_locks[locking.LEVEL_NODE] = 1
5976     if not self.op.nodes:
5977       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5978     else:
5979       self.needed_locks[locking.LEVEL_NODE] = \
5980         _GetWantedNodes(self, self.op.nodes)
5981
5982   def CheckPrereq(self):
5983     """Check prerequisites.
5984
5985     """
5986     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5987
5988   def Exec(self, feedback_fn):
5989     """Compute the list of all the exported system images.
5990
5991     @rtype: dict
5992     @return: a dictionary with the structure node->(export-list)
5993         where export-list is a list of the instances exported on
5994         that node.
5995
5996     """
5997     rpcresult = self.rpc.call_export_list(self.nodes)
5998     result = {}
5999     for node in rpcresult:
6000       if rpcresult[node].failed:
6001         result[node] = False
6002       else:
6003         result[node] = rpcresult[node].data
6004
6005     return result
6006
6007
6008 class LUExportInstance(LogicalUnit):
6009   """Export an instance to an image in the cluster.
6010
6011   """
6012   HPATH = "instance-export"
6013   HTYPE = constants.HTYPE_INSTANCE
6014   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6015   REQ_BGL = False
6016
6017   def ExpandNames(self):
6018     self._ExpandAndLockInstance()
6019     # FIXME: lock only instance primary and destination node
6020     #
6021     # Sad but true, for now we have do lock all nodes, as we don't know where
6022     # the previous export might be, and and in this LU we search for it and
6023     # remove it from its current node. In the future we could fix this by:
6024     #  - making a tasklet to search (share-lock all), then create the new one,
6025     #    then one to remove, after
6026     #  - removing the removal operation altoghether
6027     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6028
6029   def DeclareLocks(self, level):
6030     """Last minute lock declaration."""
6031     # All nodes are locked anyway, so nothing to do here.
6032
6033   def BuildHooksEnv(self):
6034     """Build hooks env.
6035
6036     This will run on the master, primary node and target node.
6037
6038     """
6039     env = {
6040       "EXPORT_NODE": self.op.target_node,
6041       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6042       }
6043     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6044     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6045           self.op.target_node]
6046     return env, nl, nl
6047
6048   def CheckPrereq(self):
6049     """Check prerequisites.
6050
6051     This checks that the instance and node names are valid.
6052
6053     """
6054     instance_name = self.op.instance_name
6055     self.instance = self.cfg.GetInstanceInfo(instance_name)
6056     assert self.instance is not None, \
6057           "Cannot retrieve locked instance %s" % self.op.instance_name
6058     _CheckNodeOnline(self, self.instance.primary_node)
6059
6060     self.dst_node = self.cfg.GetNodeInfo(
6061       self.cfg.ExpandNodeName(self.op.target_node))
6062
6063     if self.dst_node is None:
6064       # This is wrong node name, not a non-locked node
6065       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6066     _CheckNodeOnline(self, self.dst_node.name)
6067     _CheckNodeNotDrained(self, self.dst_node.name)
6068
6069     # instance disk type verification
6070     for disk in self.instance.disks:
6071       if disk.dev_type == constants.LD_FILE:
6072         raise errors.OpPrereqError("Export not supported for instances with"
6073                                    " file-based disks")
6074
6075   def Exec(self, feedback_fn):
6076     """Export an instance to an image in the cluster.
6077
6078     """
6079     instance = self.instance
6080     dst_node = self.dst_node
6081     src_node = instance.primary_node
6082     if self.op.shutdown:
6083       # shutdown the instance, but not the disks
6084       result = self.rpc.call_instance_shutdown(src_node, instance)
6085       result.Raise()
6086       if not result.data:
6087         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6088                                  (instance.name, src_node))
6089
6090     vgname = self.cfg.GetVGName()
6091
6092     snap_disks = []
6093
6094     # set the disks ID correctly since call_instance_start needs the
6095     # correct drbd minor to create the symlinks
6096     for disk in instance.disks:
6097       self.cfg.SetDiskID(disk, src_node)
6098
6099     try:
6100       for disk in instance.disks:
6101         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6102         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6103         if new_dev_name.failed or not new_dev_name.data:
6104           self.LogWarning("Could not snapshot block device %s on node %s",
6105                           disk.logical_id[1], src_node)
6106           snap_disks.append(False)
6107         else:
6108           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6109                                  logical_id=(vgname, new_dev_name.data),
6110                                  physical_id=(vgname, new_dev_name.data),
6111                                  iv_name=disk.iv_name)
6112           snap_disks.append(new_dev)
6113
6114     finally:
6115       if self.op.shutdown and instance.admin_up:
6116         result = self.rpc.call_instance_start(src_node, instance, None)
6117         msg = result.RemoteFailMsg()
6118         if msg:
6119           _ShutdownInstanceDisks(self, instance)
6120           raise errors.OpExecError("Could not start instance: %s" % msg)
6121
6122     # TODO: check for size
6123
6124     cluster_name = self.cfg.GetClusterName()
6125     for idx, dev in enumerate(snap_disks):
6126       if dev:
6127         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6128                                                instance, cluster_name, idx)
6129         if result.failed or not result.data:
6130           self.LogWarning("Could not export block device %s from node %s to"
6131                           " node %s", dev.logical_id[1], src_node,
6132                           dst_node.name)
6133         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6134         if msg:
6135           self.LogWarning("Could not remove snapshot block device %s from node"
6136                           " %s: %s", dev.logical_id[1], src_node, msg)
6137
6138     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6139     if result.failed or not result.data:
6140       self.LogWarning("Could not finalize export for instance %s on node %s",
6141                       instance.name, dst_node.name)
6142
6143     nodelist = self.cfg.GetNodeList()
6144     nodelist.remove(dst_node.name)
6145
6146     # on one-node clusters nodelist will be empty after the removal
6147     # if we proceed the backup would be removed because OpQueryExports
6148     # substitutes an empty list with the full cluster node list.
6149     if nodelist:
6150       exportlist = self.rpc.call_export_list(nodelist)
6151       for node in exportlist:
6152         if exportlist[node].failed:
6153           continue
6154         if instance.name in exportlist[node].data:
6155           if not self.rpc.call_export_remove(node, instance.name):
6156             self.LogWarning("Could not remove older export for instance %s"
6157                             " on node %s", instance.name, node)
6158
6159
6160 class LURemoveExport(NoHooksLU):
6161   """Remove exports related to the named instance.
6162
6163   """
6164   _OP_REQP = ["instance_name"]
6165   REQ_BGL = False
6166
6167   def ExpandNames(self):
6168     self.needed_locks = {}
6169     # We need all nodes to be locked in order for RemoveExport to work, but we
6170     # don't need to lock the instance itself, as nothing will happen to it (and
6171     # we can remove exports also for a removed instance)
6172     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6173
6174   def CheckPrereq(self):
6175     """Check prerequisites.
6176     """
6177     pass
6178
6179   def Exec(self, feedback_fn):
6180     """Remove any export.
6181
6182     """
6183     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6184     # If the instance was not found we'll try with the name that was passed in.
6185     # This will only work if it was an FQDN, though.
6186     fqdn_warn = False
6187     if not instance_name:
6188       fqdn_warn = True
6189       instance_name = self.op.instance_name
6190
6191     exportlist = self.rpc.call_export_list(self.acquired_locks[
6192       locking.LEVEL_NODE])
6193     found = False
6194     for node in exportlist:
6195       if exportlist[node].failed:
6196         self.LogWarning("Failed to query node %s, continuing" % node)
6197         continue
6198       if instance_name in exportlist[node].data:
6199         found = True
6200         result = self.rpc.call_export_remove(node, instance_name)
6201         if result.failed or not result.data:
6202           logging.error("Could not remove export for instance %s"
6203                         " on node %s", instance_name, node)
6204
6205     if fqdn_warn and not found:
6206       feedback_fn("Export not found. If trying to remove an export belonging"
6207                   " to a deleted instance please use its Fully Qualified"
6208                   " Domain Name.")
6209
6210
6211 class TagsLU(NoHooksLU):
6212   """Generic tags LU.
6213
6214   This is an abstract class which is the parent of all the other tags LUs.
6215
6216   """
6217
6218   def ExpandNames(self):
6219     self.needed_locks = {}
6220     if self.op.kind == constants.TAG_NODE:
6221       name = self.cfg.ExpandNodeName(self.op.name)
6222       if name is None:
6223         raise errors.OpPrereqError("Invalid node name (%s)" %
6224                                    (self.op.name,))
6225       self.op.name = name
6226       self.needed_locks[locking.LEVEL_NODE] = name
6227     elif self.op.kind == constants.TAG_INSTANCE:
6228       name = self.cfg.ExpandInstanceName(self.op.name)
6229       if name is None:
6230         raise errors.OpPrereqError("Invalid instance name (%s)" %
6231                                    (self.op.name,))
6232       self.op.name = name
6233       self.needed_locks[locking.LEVEL_INSTANCE] = name
6234
6235   def CheckPrereq(self):
6236     """Check prerequisites.
6237
6238     """
6239     if self.op.kind == constants.TAG_CLUSTER:
6240       self.target = self.cfg.GetClusterInfo()
6241     elif self.op.kind == constants.TAG_NODE:
6242       self.target = self.cfg.GetNodeInfo(self.op.name)
6243     elif self.op.kind == constants.TAG_INSTANCE:
6244       self.target = self.cfg.GetInstanceInfo(self.op.name)
6245     else:
6246       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6247                                  str(self.op.kind))
6248
6249
6250 class LUGetTags(TagsLU):
6251   """Returns the tags of a given object.
6252
6253   """
6254   _OP_REQP = ["kind", "name"]
6255   REQ_BGL = False
6256
6257   def Exec(self, feedback_fn):
6258     """Returns the tag list.
6259
6260     """
6261     return list(self.target.GetTags())
6262
6263
6264 class LUSearchTags(NoHooksLU):
6265   """Searches the tags for a given pattern.
6266
6267   """
6268   _OP_REQP = ["pattern"]
6269   REQ_BGL = False
6270
6271   def ExpandNames(self):
6272     self.needed_locks = {}
6273
6274   def CheckPrereq(self):
6275     """Check prerequisites.
6276
6277     This checks the pattern passed for validity by compiling it.
6278
6279     """
6280     try:
6281       self.re = re.compile(self.op.pattern)
6282     except re.error, err:
6283       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6284                                  (self.op.pattern, err))
6285
6286   def Exec(self, feedback_fn):
6287     """Returns the tag list.
6288
6289     """
6290     cfg = self.cfg
6291     tgts = [("/cluster", cfg.GetClusterInfo())]
6292     ilist = cfg.GetAllInstancesInfo().values()
6293     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6294     nlist = cfg.GetAllNodesInfo().values()
6295     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6296     results = []
6297     for path, target in tgts:
6298       for tag in target.GetTags():
6299         if self.re.search(tag):
6300           results.append((path, tag))
6301     return results
6302
6303
6304 class LUAddTags(TagsLU):
6305   """Sets a tag on a given object.
6306
6307   """
6308   _OP_REQP = ["kind", "name", "tags"]
6309   REQ_BGL = False
6310
6311   def CheckPrereq(self):
6312     """Check prerequisites.
6313
6314     This checks the type and length of the tag name and value.
6315
6316     """
6317     TagsLU.CheckPrereq(self)
6318     for tag in self.op.tags:
6319       objects.TaggableObject.ValidateTag(tag)
6320
6321   def Exec(self, feedback_fn):
6322     """Sets the tag.
6323
6324     """
6325     try:
6326       for tag in self.op.tags:
6327         self.target.AddTag(tag)
6328     except errors.TagError, err:
6329       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6330     try:
6331       self.cfg.Update(self.target)
6332     except errors.ConfigurationError:
6333       raise errors.OpRetryError("There has been a modification to the"
6334                                 " config file and the operation has been"
6335                                 " aborted. Please retry.")
6336
6337
6338 class LUDelTags(TagsLU):
6339   """Delete a list of tags from a given object.
6340
6341   """
6342   _OP_REQP = ["kind", "name", "tags"]
6343   REQ_BGL = False
6344
6345   def CheckPrereq(self):
6346     """Check prerequisites.
6347
6348     This checks that we have the given tag.
6349
6350     """
6351     TagsLU.CheckPrereq(self)
6352     for tag in self.op.tags:
6353       objects.TaggableObject.ValidateTag(tag)
6354     del_tags = frozenset(self.op.tags)
6355     cur_tags = self.target.GetTags()
6356     if not del_tags <= cur_tags:
6357       diff_tags = del_tags - cur_tags
6358       diff_names = ["'%s'" % tag for tag in diff_tags]
6359       diff_names.sort()
6360       raise errors.OpPrereqError("Tag(s) %s not found" %
6361                                  (",".join(diff_names)))
6362
6363   def Exec(self, feedback_fn):
6364     """Remove the tag from the object.
6365
6366     """
6367     for tag in self.op.tags:
6368       self.target.RemoveTag(tag)
6369     try:
6370       self.cfg.Update(self.target)
6371     except errors.ConfigurationError:
6372       raise errors.OpRetryError("There has been a modification to the"
6373                                 " config file and the operation has been"
6374                                 " aborted. Please retry.")
6375
6376
6377 class LUTestDelay(NoHooksLU):
6378   """Sleep for a specified amount of time.
6379
6380   This LU sleeps on the master and/or nodes for a specified amount of
6381   time.
6382
6383   """
6384   _OP_REQP = ["duration", "on_master", "on_nodes"]
6385   REQ_BGL = False
6386
6387   def ExpandNames(self):
6388     """Expand names and set required locks.
6389
6390     This expands the node list, if any.
6391
6392     """
6393     self.needed_locks = {}
6394     if self.op.on_nodes:
6395       # _GetWantedNodes can be used here, but is not always appropriate to use
6396       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6397       # more information.
6398       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6399       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6400
6401   def CheckPrereq(self):
6402     """Check prerequisites.
6403
6404     """
6405
6406   def Exec(self, feedback_fn):
6407     """Do the actual sleep.
6408
6409     """
6410     if self.op.on_master:
6411       if not utils.TestDelay(self.op.duration):
6412         raise errors.OpExecError("Error during master delay test")
6413     if self.op.on_nodes:
6414       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6415       if not result:
6416         raise errors.OpExecError("Complete failure from rpc call")
6417       for node, node_result in result.items():
6418         node_result.Raise()
6419         if not node_result.data:
6420           raise errors.OpExecError("Failure during rpc call to node %s,"
6421                                    " result: %s" % (node, node_result.data))
6422
6423
6424 class IAllocator(object):
6425   """IAllocator framework.
6426
6427   An IAllocator instance has three sets of attributes:
6428     - cfg that is needed to query the cluster
6429     - input data (all members of the _KEYS class attribute are required)
6430     - four buffer attributes (in|out_data|text), that represent the
6431       input (to the external script) in text and data structure format,
6432       and the output from it, again in two formats
6433     - the result variables from the script (success, info, nodes) for
6434       easy usage
6435
6436   """
6437   _ALLO_KEYS = [
6438     "mem_size", "disks", "disk_template",
6439     "os", "tags", "nics", "vcpus", "hypervisor",
6440     ]
6441   _RELO_KEYS = [
6442     "relocate_from",
6443     ]
6444
6445   def __init__(self, lu, mode, name, **kwargs):
6446     self.lu = lu
6447     # init buffer variables
6448     self.in_text = self.out_text = self.in_data = self.out_data = None
6449     # init all input fields so that pylint is happy
6450     self.mode = mode
6451     self.name = name
6452     self.mem_size = self.disks = self.disk_template = None
6453     self.os = self.tags = self.nics = self.vcpus = None
6454     self.hypervisor = None
6455     self.relocate_from = None
6456     # computed fields
6457     self.required_nodes = None
6458     # init result fields
6459     self.success = self.info = self.nodes = None
6460     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6461       keyset = self._ALLO_KEYS
6462     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6463       keyset = self._RELO_KEYS
6464     else:
6465       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6466                                    " IAllocator" % self.mode)
6467     for key in kwargs:
6468       if key not in keyset:
6469         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6470                                      " IAllocator" % key)
6471       setattr(self, key, kwargs[key])
6472     for key in keyset:
6473       if key not in kwargs:
6474         raise errors.ProgrammerError("Missing input parameter '%s' to"
6475                                      " IAllocator" % key)
6476     self._BuildInputData()
6477
6478   def _ComputeClusterData(self):
6479     """Compute the generic allocator input data.
6480
6481     This is the data that is independent of the actual operation.
6482
6483     """
6484     cfg = self.lu.cfg
6485     cluster_info = cfg.GetClusterInfo()
6486     # cluster data
6487     data = {
6488       "version": 1,
6489       "cluster_name": cfg.GetClusterName(),
6490       "cluster_tags": list(cluster_info.GetTags()),
6491       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6492       # we don't have job IDs
6493       }
6494     iinfo = cfg.GetAllInstancesInfo().values()
6495     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6496
6497     # node data
6498     node_results = {}
6499     node_list = cfg.GetNodeList()
6500
6501     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6502       hypervisor_name = self.hypervisor
6503     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6504       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6505
6506     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6507                                            hypervisor_name)
6508     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6509                        cluster_info.enabled_hypervisors)
6510     for nname, nresult in node_data.items():
6511       # first fill in static (config-based) values
6512       ninfo = cfg.GetNodeInfo(nname)
6513       pnr = {
6514         "tags": list(ninfo.GetTags()),
6515         "primary_ip": ninfo.primary_ip,
6516         "secondary_ip": ninfo.secondary_ip,
6517         "offline": ninfo.offline,
6518         "drained": ninfo.drained,
6519         "master_candidate": ninfo.master_candidate,
6520         }
6521
6522       if not ninfo.offline:
6523         nresult.Raise()
6524         if not isinstance(nresult.data, dict):
6525           raise errors.OpExecError("Can't get data for node %s" % nname)
6526         remote_info = nresult.data
6527         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6528                      'vg_size', 'vg_free', 'cpu_total']:
6529           if attr not in remote_info:
6530             raise errors.OpExecError("Node '%s' didn't return attribute"
6531                                      " '%s'" % (nname, attr))
6532           try:
6533             remote_info[attr] = int(remote_info[attr])
6534           except ValueError, err:
6535             raise errors.OpExecError("Node '%s' returned invalid value"
6536                                      " for '%s': %s" % (nname, attr, err))
6537         # compute memory used by primary instances
6538         i_p_mem = i_p_up_mem = 0
6539         for iinfo, beinfo in i_list:
6540           if iinfo.primary_node == nname:
6541             i_p_mem += beinfo[constants.BE_MEMORY]
6542             if iinfo.name not in node_iinfo[nname].data:
6543               i_used_mem = 0
6544             else:
6545               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6546             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6547             remote_info['memory_free'] -= max(0, i_mem_diff)
6548
6549             if iinfo.admin_up:
6550               i_p_up_mem += beinfo[constants.BE_MEMORY]
6551
6552         # compute memory used by instances
6553         pnr_dyn = {
6554           "total_memory": remote_info['memory_total'],
6555           "reserved_memory": remote_info['memory_dom0'],
6556           "free_memory": remote_info['memory_free'],
6557           "total_disk": remote_info['vg_size'],
6558           "free_disk": remote_info['vg_free'],
6559           "total_cpus": remote_info['cpu_total'],
6560           "i_pri_memory": i_p_mem,
6561           "i_pri_up_memory": i_p_up_mem,
6562           }
6563         pnr.update(pnr_dyn)
6564
6565       node_results[nname] = pnr
6566     data["nodes"] = node_results
6567
6568     # instance data
6569     instance_data = {}
6570     for iinfo, beinfo in i_list:
6571       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6572                   for n in iinfo.nics]
6573       pir = {
6574         "tags": list(iinfo.GetTags()),
6575         "admin_up": iinfo.admin_up,
6576         "vcpus": beinfo[constants.BE_VCPUS],
6577         "memory": beinfo[constants.BE_MEMORY],
6578         "os": iinfo.os,
6579         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6580         "nics": nic_data,
6581         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6582         "disk_template": iinfo.disk_template,
6583         "hypervisor": iinfo.hypervisor,
6584         }
6585       instance_data[iinfo.name] = pir
6586
6587     data["instances"] = instance_data
6588
6589     self.in_data = data
6590
6591   def _AddNewInstance(self):
6592     """Add new instance data to allocator structure.
6593
6594     This in combination with _AllocatorGetClusterData will create the
6595     correct structure needed as input for the allocator.
6596
6597     The checks for the completeness of the opcode must have already been
6598     done.
6599
6600     """
6601     data = self.in_data
6602
6603     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6604
6605     if self.disk_template in constants.DTS_NET_MIRROR:
6606       self.required_nodes = 2
6607     else:
6608       self.required_nodes = 1
6609     request = {
6610       "type": "allocate",
6611       "name": self.name,
6612       "disk_template": self.disk_template,
6613       "tags": self.tags,
6614       "os": self.os,
6615       "vcpus": self.vcpus,
6616       "memory": self.mem_size,
6617       "disks": self.disks,
6618       "disk_space_total": disk_space,
6619       "nics": self.nics,
6620       "required_nodes": self.required_nodes,
6621       }
6622     data["request"] = request
6623
6624   def _AddRelocateInstance(self):
6625     """Add relocate instance data to allocator structure.
6626
6627     This in combination with _IAllocatorGetClusterData will create the
6628     correct structure needed as input for the allocator.
6629
6630     The checks for the completeness of the opcode must have already been
6631     done.
6632
6633     """
6634     instance = self.lu.cfg.GetInstanceInfo(self.name)
6635     if instance is None:
6636       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6637                                    " IAllocator" % self.name)
6638
6639     if instance.disk_template not in constants.DTS_NET_MIRROR:
6640       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6641
6642     if len(instance.secondary_nodes) != 1:
6643       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6644
6645     self.required_nodes = 1
6646     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6647     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6648
6649     request = {
6650       "type": "relocate",
6651       "name": self.name,
6652       "disk_space_total": disk_space,
6653       "required_nodes": self.required_nodes,
6654       "relocate_from": self.relocate_from,
6655       }
6656     self.in_data["request"] = request
6657
6658   def _BuildInputData(self):
6659     """Build input data structures.
6660
6661     """
6662     self._ComputeClusterData()
6663
6664     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6665       self._AddNewInstance()
6666     else:
6667       self._AddRelocateInstance()
6668
6669     self.in_text = serializer.Dump(self.in_data)
6670
6671   def Run(self, name, validate=True, call_fn=None):
6672     """Run an instance allocator and return the results.
6673
6674     """
6675     if call_fn is None:
6676       call_fn = self.lu.rpc.call_iallocator_runner
6677     data = self.in_text
6678
6679     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6680     result.Raise()
6681
6682     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6683       raise errors.OpExecError("Invalid result from master iallocator runner")
6684
6685     rcode, stdout, stderr, fail = result.data
6686
6687     if rcode == constants.IARUN_NOTFOUND:
6688       raise errors.OpExecError("Can't find allocator '%s'" % name)
6689     elif rcode == constants.IARUN_FAILURE:
6690       raise errors.OpExecError("Instance allocator call failed: %s,"
6691                                " output: %s" % (fail, stdout+stderr))
6692     self.out_text = stdout
6693     if validate:
6694       self._ValidateResult()
6695
6696   def _ValidateResult(self):
6697     """Process the allocator results.
6698
6699     This will process and if successful save the result in
6700     self.out_data and the other parameters.
6701
6702     """
6703     try:
6704       rdict = serializer.Load(self.out_text)
6705     except Exception, err:
6706       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6707
6708     if not isinstance(rdict, dict):
6709       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6710
6711     for key in "success", "info", "nodes":
6712       if key not in rdict:
6713         raise errors.OpExecError("Can't parse iallocator results:"
6714                                  " missing key '%s'" % key)
6715       setattr(self, key, rdict[key])
6716
6717     if not isinstance(rdict["nodes"], list):
6718       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6719                                " is not a list")
6720     self.out_data = rdict
6721
6722
6723 class LUTestAllocator(NoHooksLU):
6724   """Run allocator tests.
6725
6726   This LU runs the allocator tests
6727
6728   """
6729   _OP_REQP = ["direction", "mode", "name"]
6730
6731   def CheckPrereq(self):
6732     """Check prerequisites.
6733
6734     This checks the opcode parameters depending on the director and mode test.
6735
6736     """
6737     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6738       for attr in ["name", "mem_size", "disks", "disk_template",
6739                    "os", "tags", "nics", "vcpus"]:
6740         if not hasattr(self.op, attr):
6741           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6742                                      attr)
6743       iname = self.cfg.ExpandInstanceName(self.op.name)
6744       if iname is not None:
6745         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6746                                    iname)
6747       if not isinstance(self.op.nics, list):
6748         raise errors.OpPrereqError("Invalid parameter 'nics'")
6749       for row in self.op.nics:
6750         if (not isinstance(row, dict) or
6751             "mac" not in row or
6752             "ip" not in row or
6753             "bridge" not in row):
6754           raise errors.OpPrereqError("Invalid contents of the"
6755                                      " 'nics' parameter")
6756       if not isinstance(self.op.disks, list):
6757         raise errors.OpPrereqError("Invalid parameter 'disks'")
6758       for row in self.op.disks:
6759         if (not isinstance(row, dict) or
6760             "size" not in row or
6761             not isinstance(row["size"], int) or
6762             "mode" not in row or
6763             row["mode"] not in ['r', 'w']):
6764           raise errors.OpPrereqError("Invalid contents of the"
6765                                      " 'disks' parameter")
6766       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6767         self.op.hypervisor = self.cfg.GetHypervisorType()
6768     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6769       if not hasattr(self.op, "name"):
6770         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6771       fname = self.cfg.ExpandInstanceName(self.op.name)
6772       if fname is None:
6773         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6774                                    self.op.name)
6775       self.op.name = fname
6776       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6777     else:
6778       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6779                                  self.op.mode)
6780
6781     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6782       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6783         raise errors.OpPrereqError("Missing allocator name")
6784     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6785       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6786                                  self.op.direction)
6787
6788   def Exec(self, feedback_fn):
6789     """Run the allocator test.
6790
6791     """
6792     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6793       ial = IAllocator(self,
6794                        mode=self.op.mode,
6795                        name=self.op.name,
6796                        mem_size=self.op.mem_size,
6797                        disks=self.op.disks,
6798                        disk_template=self.op.disk_template,
6799                        os=self.op.os,
6800                        tags=self.op.tags,
6801                        nics=self.op.nics,
6802                        vcpus=self.op.vcpus,
6803                        hypervisor=self.op.hypervisor,
6804                        )
6805     else:
6806       ial = IAllocator(self,
6807                        mode=self.op.mode,
6808                        name=self.op.name,
6809                        relocate_from=list(self.relocate_from),
6810                        )
6811
6812     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6813       result = ial.in_text
6814     else:
6815       ial.Run(self.op.allocator, validate=False)
6816       result = ial.out_text
6817     return result