Show nic parameters in cluster info
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384          msg = to_result.RemoteFailMsg()
1385          if msg:
1386            msg = ("Copy of file %s to node %s failed: %s" %
1387                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388            self.proc.LogWarning(msg)
1389
1390     finally:
1391       result = self.rpc.call_node_start_master(master, False)
1392       if result.failed or not result.data:
1393         self.LogWarning("Could not re-enable the master role on"
1394                         " the master, please restart manually.")
1395
1396
1397 def _RecursiveCheckIfLVMBased(disk):
1398   """Check if the given disk or its children are lvm-based.
1399
1400   @type disk: L{objects.Disk}
1401   @param disk: the disk to check
1402   @rtype: booleean
1403   @return: boolean indicating whether a LD_LV dev_type was found or not
1404
1405   """
1406   if disk.children:
1407     for chdisk in disk.children:
1408       if _RecursiveCheckIfLVMBased(chdisk):
1409         return True
1410   return disk.dev_type == constants.LD_LV
1411
1412
1413 class LUSetClusterParams(LogicalUnit):
1414   """Change the parameters of the cluster.
1415
1416   """
1417   HPATH = "cluster-modify"
1418   HTYPE = constants.HTYPE_CLUSTER
1419   _OP_REQP = []
1420   REQ_BGL = False
1421
1422   def CheckArguments(self):
1423     """Check parameters
1424
1425     """
1426     if not hasattr(self.op, "candidate_pool_size"):
1427       self.op.candidate_pool_size = None
1428     if self.op.candidate_pool_size is not None:
1429       try:
1430         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431       except (ValueError, TypeError), err:
1432         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433                                    str(err))
1434       if self.op.candidate_pool_size < 1:
1435         raise errors.OpPrereqError("At least one master candidate needed")
1436
1437   def ExpandNames(self):
1438     # FIXME: in the future maybe other cluster params won't require checking on
1439     # all nodes to be modified.
1440     self.needed_locks = {
1441       locking.LEVEL_NODE: locking.ALL_SET,
1442     }
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445   def BuildHooksEnv(self):
1446     """Build hooks env.
1447
1448     """
1449     env = {
1450       "OP_TARGET": self.cfg.GetClusterName(),
1451       "NEW_VG_NAME": self.op.vg_name,
1452       }
1453     mn = self.cfg.GetMasterNode()
1454     return env, [mn], [mn]
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     This checks whether the given params don't conflict and
1460     if the given volume group is valid.
1461
1462     """
1463     if self.op.vg_name is not None and not self.op.vg_name:
1464       instances = self.cfg.GetAllInstancesInfo().values()
1465       for inst in instances:
1466         for disk in inst.disks:
1467           if _RecursiveCheckIfLVMBased(disk):
1468             raise errors.OpPrereqError("Cannot disable lvm storage while"
1469                                        " lvm-based instances exist")
1470
1471     node_list = self.acquired_locks[locking.LEVEL_NODE]
1472
1473     # if vg_name not None, checks given volume group on all nodes
1474     if self.op.vg_name:
1475       vglist = self.rpc.call_vg_list(node_list)
1476       for node in node_list:
1477         if vglist[node].failed:
1478           # ignoring down node
1479           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480           continue
1481         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482                                               self.op.vg_name,
1483                                               constants.MIN_VG_SIZE)
1484         if vgstatus:
1485           raise errors.OpPrereqError("Error on node '%s': %s" %
1486                                      (node, vgstatus))
1487
1488     self.cluster = cluster = self.cfg.GetClusterInfo()
1489     # validate params changes
1490     if self.op.beparams:
1491       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492       self.new_beparams = objects.FillDict(
1493         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1494
1495     if self.op.nicparams:
1496       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1497       self.new_nicparams = objects.FillDict(
1498         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1499       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1500
1501     # hypervisor list/parameters
1502     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1503     if self.op.hvparams:
1504       if not isinstance(self.op.hvparams, dict):
1505         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1506       for hv_name, hv_dict in self.op.hvparams.items():
1507         if hv_name not in self.new_hvparams:
1508           self.new_hvparams[hv_name] = hv_dict
1509         else:
1510           self.new_hvparams[hv_name].update(hv_dict)
1511
1512     if self.op.enabled_hypervisors is not None:
1513       self.hv_list = self.op.enabled_hypervisors
1514     else:
1515       self.hv_list = cluster.enabled_hypervisors
1516
1517     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1518       # either the enabled list has changed, or the parameters have, validate
1519       for hv_name, hv_params in self.new_hvparams.items():
1520         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1521             (self.op.enabled_hypervisors and
1522              hv_name in self.op.enabled_hypervisors)):
1523           # either this is a new hypervisor, or its parameters have changed
1524           hv_class = hypervisor.GetHypervisor(hv_name)
1525           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1526           hv_class.CheckParameterSyntax(hv_params)
1527           _CheckHVParams(self, node_list, hv_name, hv_params)
1528
1529   def Exec(self, feedback_fn):
1530     """Change the parameters of the cluster.
1531
1532     """
1533     if self.op.vg_name is not None:
1534       new_volume = self.op.vg_name
1535       if not new_volume:
1536         new_volume = None
1537       if new_volume != self.cfg.GetVGName():
1538         self.cfg.SetVGName(new_volume)
1539       else:
1540         feedback_fn("Cluster LVM configuration already in desired"
1541                     " state, not changing")
1542     if self.op.hvparams:
1543       self.cluster.hvparams = self.new_hvparams
1544     if self.op.enabled_hypervisors is not None:
1545       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1546     if self.op.beparams:
1547       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1548     if self.op.nicparams:
1549       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1550
1551     if self.op.candidate_pool_size is not None:
1552       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1553
1554     self.cfg.Update(self.cluster)
1555
1556     # we want to update nodes after the cluster so that if any errors
1557     # happen, we have recorded and saved the cluster info
1558     if self.op.candidate_pool_size is not None:
1559       _AdjustCandidatePool(self)
1560
1561
1562 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1563   """Distribute additional files which are part of the cluster configuration.
1564
1565   ConfigWriter takes care of distributing the config and ssconf files, but
1566   there are more files which should be distributed to all nodes. This function
1567   makes sure those are copied.
1568
1569   @param lu: calling logical unit
1570   @param additional_nodes: list of nodes not in the config to distribute to
1571
1572   """
1573   # 1. Gather target nodes
1574   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1575   dist_nodes = lu.cfg.GetNodeList()
1576   if additional_nodes is not None:
1577     dist_nodes.extend(additional_nodes)
1578   if myself.name in dist_nodes:
1579     dist_nodes.remove(myself.name)
1580   # 2. Gather files to distribute
1581   dist_files = set([constants.ETC_HOSTS,
1582                     constants.SSH_KNOWN_HOSTS_FILE,
1583                     constants.RAPI_CERT_FILE,
1584                     constants.RAPI_USERS_FILE,
1585                    ])
1586
1587   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1588   for hv_name in enabled_hypervisors:
1589     hv_class = hypervisor.GetHypervisor(hv_name)
1590     dist_files.update(hv_class.GetAncillaryFiles())
1591
1592   # 3. Perform the files upload
1593   for fname in dist_files:
1594     if os.path.exists(fname):
1595       result = lu.rpc.call_upload_file(dist_nodes, fname)
1596       for to_node, to_result in result.items():
1597          msg = to_result.RemoteFailMsg()
1598          if msg:
1599            msg = ("Copy of file %s to node %s failed: %s" %
1600                    (fname, to_node, msg))
1601            lu.proc.LogWarning(msg)
1602
1603
1604 class LURedistributeConfig(NoHooksLU):
1605   """Force the redistribution of cluster configuration.
1606
1607   This is a very simple LU.
1608
1609   """
1610   _OP_REQP = []
1611   REQ_BGL = False
1612
1613   def ExpandNames(self):
1614     self.needed_locks = {
1615       locking.LEVEL_NODE: locking.ALL_SET,
1616     }
1617     self.share_locks[locking.LEVEL_NODE] = 1
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     """
1623
1624   def Exec(self, feedback_fn):
1625     """Redistribute the configuration.
1626
1627     """
1628     self.cfg.Update(self.cfg.GetClusterInfo())
1629     _RedistributeAncillaryFiles(self)
1630
1631
1632 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1633   """Sleep and poll for an instance's disk to sync.
1634
1635   """
1636   if not instance.disks:
1637     return True
1638
1639   if not oneshot:
1640     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1641
1642   node = instance.primary_node
1643
1644   for dev in instance.disks:
1645     lu.cfg.SetDiskID(dev, node)
1646
1647   retries = 0
1648   while True:
1649     max_time = 0
1650     done = True
1651     cumul_degraded = False
1652     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1653     if rstats.failed or not rstats.data:
1654       lu.LogWarning("Can't get any data from node %s", node)
1655       retries += 1
1656       if retries >= 10:
1657         raise errors.RemoteError("Can't contact node %s for mirror data,"
1658                                  " aborting." % node)
1659       time.sleep(6)
1660       continue
1661     rstats = rstats.data
1662     retries = 0
1663     for i, mstat in enumerate(rstats):
1664       if mstat is None:
1665         lu.LogWarning("Can't compute data for node %s/%s",
1666                            node, instance.disks[i].iv_name)
1667         continue
1668       # we ignore the ldisk parameter
1669       perc_done, est_time, is_degraded, _ = mstat
1670       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1671       if perc_done is not None:
1672         done = False
1673         if est_time is not None:
1674           rem_time = "%d estimated seconds remaining" % est_time
1675           max_time = est_time
1676         else:
1677           rem_time = "no time estimate"
1678         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1679                         (instance.disks[i].iv_name, perc_done, rem_time))
1680     if done or oneshot:
1681       break
1682
1683     time.sleep(min(60, max_time))
1684
1685   if done:
1686     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1687   return not cumul_degraded
1688
1689
1690 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1691   """Check that mirrors are not degraded.
1692
1693   The ldisk parameter, if True, will change the test from the
1694   is_degraded attribute (which represents overall non-ok status for
1695   the device(s)) to the ldisk (representing the local storage status).
1696
1697   """
1698   lu.cfg.SetDiskID(dev, node)
1699   if ldisk:
1700     idx = 6
1701   else:
1702     idx = 5
1703
1704   result = True
1705   if on_primary or dev.AssembleOnSecondary():
1706     rstats = lu.rpc.call_blockdev_find(node, dev)
1707     msg = rstats.RemoteFailMsg()
1708     if msg:
1709       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1710       result = False
1711     elif not rstats.payload:
1712       lu.LogWarning("Can't find disk on node %s", node)
1713       result = False
1714     else:
1715       result = result and (not rstats.payload[idx])
1716   if dev.children:
1717     for child in dev.children:
1718       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1719
1720   return result
1721
1722
1723 class LUDiagnoseOS(NoHooksLU):
1724   """Logical unit for OS diagnose/query.
1725
1726   """
1727   _OP_REQP = ["output_fields", "names"]
1728   REQ_BGL = False
1729   _FIELDS_STATIC = utils.FieldSet()
1730   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1731
1732   def ExpandNames(self):
1733     if self.op.names:
1734       raise errors.OpPrereqError("Selective OS query not supported")
1735
1736     _CheckOutputFields(static=self._FIELDS_STATIC,
1737                        dynamic=self._FIELDS_DYNAMIC,
1738                        selected=self.op.output_fields)
1739
1740     # Lock all nodes, in shared mode
1741     # Temporary removal of locks, should be reverted later
1742     # TODO: reintroduce locks when they are lighter-weight
1743     self.needed_locks = {}
1744     #self.share_locks[locking.LEVEL_NODE] = 1
1745     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1746
1747   def CheckPrereq(self):
1748     """Check prerequisites.
1749
1750     """
1751
1752   @staticmethod
1753   def _DiagnoseByOS(node_list, rlist):
1754     """Remaps a per-node return list into an a per-os per-node dictionary
1755
1756     @param node_list: a list with the names of all nodes
1757     @param rlist: a map with node names as keys and OS objects as values
1758
1759     @rtype: dict
1760     @return: a dictionary with osnames as keys and as value another map, with
1761         nodes as keys and list of OS objects as values, eg::
1762
1763           {"debian-etch": {"node1": [<object>,...],
1764                            "node2": [<object>,]}
1765           }
1766
1767     """
1768     all_os = {}
1769     # we build here the list of nodes that didn't fail the RPC (at RPC
1770     # level), so that nodes with a non-responding node daemon don't
1771     # make all OSes invalid
1772     good_nodes = [node_name for node_name in rlist
1773                   if not rlist[node_name].failed]
1774     for node_name, nr in rlist.iteritems():
1775       if nr.failed or not nr.data:
1776         continue
1777       for os_obj in nr.data:
1778         if os_obj.name not in all_os:
1779           # build a list of nodes for this os containing empty lists
1780           # for each node in node_list
1781           all_os[os_obj.name] = {}
1782           for nname in good_nodes:
1783             all_os[os_obj.name][nname] = []
1784         all_os[os_obj.name][node_name].append(os_obj)
1785     return all_os
1786
1787   def Exec(self, feedback_fn):
1788     """Compute the list of OSes.
1789
1790     """
1791     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1792     node_data = self.rpc.call_os_diagnose(valid_nodes)
1793     if node_data == False:
1794       raise errors.OpExecError("Can't gather the list of OSes")
1795     pol = self._DiagnoseByOS(valid_nodes, node_data)
1796     output = []
1797     for os_name, os_data in pol.iteritems():
1798       row = []
1799       for field in self.op.output_fields:
1800         if field == "name":
1801           val = os_name
1802         elif field == "valid":
1803           val = utils.all([osl and osl[0] for osl in os_data.values()])
1804         elif field == "node_status":
1805           val = {}
1806           for node_name, nos_list in os_data.iteritems():
1807             val[node_name] = [(v.status, v.path) for v in nos_list]
1808         else:
1809           raise errors.ParameterError(field)
1810         row.append(val)
1811       output.append(row)
1812
1813     return output
1814
1815
1816 class LURemoveNode(LogicalUnit):
1817   """Logical unit for removing a node.
1818
1819   """
1820   HPATH = "node-remove"
1821   HTYPE = constants.HTYPE_NODE
1822   _OP_REQP = ["node_name"]
1823
1824   def BuildHooksEnv(self):
1825     """Build hooks env.
1826
1827     This doesn't run on the target node in the pre phase as a failed
1828     node would then be impossible to remove.
1829
1830     """
1831     env = {
1832       "OP_TARGET": self.op.node_name,
1833       "NODE_NAME": self.op.node_name,
1834       }
1835     all_nodes = self.cfg.GetNodeList()
1836     all_nodes.remove(self.op.node_name)
1837     return env, all_nodes, all_nodes
1838
1839   def CheckPrereq(self):
1840     """Check prerequisites.
1841
1842     This checks:
1843      - the node exists in the configuration
1844      - it does not have primary or secondary instances
1845      - it's not the master
1846
1847     Any errors are signalled by raising errors.OpPrereqError.
1848
1849     """
1850     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1851     if node is None:
1852       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1853
1854     instance_list = self.cfg.GetInstanceList()
1855
1856     masternode = self.cfg.GetMasterNode()
1857     if node.name == masternode:
1858       raise errors.OpPrereqError("Node is the master node,"
1859                                  " you need to failover first.")
1860
1861     for instance_name in instance_list:
1862       instance = self.cfg.GetInstanceInfo(instance_name)
1863       if node.name in instance.all_nodes:
1864         raise errors.OpPrereqError("Instance %s is still running on the node,"
1865                                    " please remove first." % instance_name)
1866     self.op.node_name = node.name
1867     self.node = node
1868
1869   def Exec(self, feedback_fn):
1870     """Removes the node from the cluster.
1871
1872     """
1873     node = self.node
1874     logging.info("Stopping the node daemon and removing configs from node %s",
1875                  node.name)
1876
1877     self.context.RemoveNode(node.name)
1878
1879     self.rpc.call_node_leave_cluster(node.name)
1880
1881     # Promote nodes to master candidate as needed
1882     _AdjustCandidatePool(self)
1883
1884
1885 class LUQueryNodes(NoHooksLU):
1886   """Logical unit for querying nodes.
1887
1888   """
1889   _OP_REQP = ["output_fields", "names", "use_locking"]
1890   REQ_BGL = False
1891   _FIELDS_DYNAMIC = utils.FieldSet(
1892     "dtotal", "dfree",
1893     "mtotal", "mnode", "mfree",
1894     "bootid",
1895     "ctotal", "cnodes", "csockets",
1896     )
1897
1898   _FIELDS_STATIC = utils.FieldSet(
1899     "name", "pinst_cnt", "sinst_cnt",
1900     "pinst_list", "sinst_list",
1901     "pip", "sip", "tags",
1902     "serial_no",
1903     "master_candidate",
1904     "master",
1905     "offline",
1906     "drained",
1907     )
1908
1909   def ExpandNames(self):
1910     _CheckOutputFields(static=self._FIELDS_STATIC,
1911                        dynamic=self._FIELDS_DYNAMIC,
1912                        selected=self.op.output_fields)
1913
1914     self.needed_locks = {}
1915     self.share_locks[locking.LEVEL_NODE] = 1
1916
1917     if self.op.names:
1918       self.wanted = _GetWantedNodes(self, self.op.names)
1919     else:
1920       self.wanted = locking.ALL_SET
1921
1922     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1923     self.do_locking = self.do_node_query and self.op.use_locking
1924     if self.do_locking:
1925       # if we don't request only static fields, we need to lock the nodes
1926       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1927
1928
1929   def CheckPrereq(self):
1930     """Check prerequisites.
1931
1932     """
1933     # The validation of the node list is done in the _GetWantedNodes,
1934     # if non empty, and if empty, there's no validation to do
1935     pass
1936
1937   def Exec(self, feedback_fn):
1938     """Computes the list of nodes and their attributes.
1939
1940     """
1941     all_info = self.cfg.GetAllNodesInfo()
1942     if self.do_locking:
1943       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1944     elif self.wanted != locking.ALL_SET:
1945       nodenames = self.wanted
1946       missing = set(nodenames).difference(all_info.keys())
1947       if missing:
1948         raise errors.OpExecError(
1949           "Some nodes were removed before retrieving their data: %s" % missing)
1950     else:
1951       nodenames = all_info.keys()
1952
1953     nodenames = utils.NiceSort(nodenames)
1954     nodelist = [all_info[name] for name in nodenames]
1955
1956     # begin data gathering
1957
1958     if self.do_node_query:
1959       live_data = {}
1960       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1961                                           self.cfg.GetHypervisorType())
1962       for name in nodenames:
1963         nodeinfo = node_data[name]
1964         if not nodeinfo.failed and nodeinfo.data:
1965           nodeinfo = nodeinfo.data
1966           fn = utils.TryConvert
1967           live_data[name] = {
1968             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1969             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1970             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1971             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1972             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1973             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1974             "bootid": nodeinfo.get('bootid', None),
1975             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1976             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1977             }
1978         else:
1979           live_data[name] = {}
1980     else:
1981       live_data = dict.fromkeys(nodenames, {})
1982
1983     node_to_primary = dict([(name, set()) for name in nodenames])
1984     node_to_secondary = dict([(name, set()) for name in nodenames])
1985
1986     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1987                              "sinst_cnt", "sinst_list"))
1988     if inst_fields & frozenset(self.op.output_fields):
1989       instancelist = self.cfg.GetInstanceList()
1990
1991       for instance_name in instancelist:
1992         inst = self.cfg.GetInstanceInfo(instance_name)
1993         if inst.primary_node in node_to_primary:
1994           node_to_primary[inst.primary_node].add(inst.name)
1995         for secnode in inst.secondary_nodes:
1996           if secnode in node_to_secondary:
1997             node_to_secondary[secnode].add(inst.name)
1998
1999     master_node = self.cfg.GetMasterNode()
2000
2001     # end data gathering
2002
2003     output = []
2004     for node in nodelist:
2005       node_output = []
2006       for field in self.op.output_fields:
2007         if field == "name":
2008           val = node.name
2009         elif field == "pinst_list":
2010           val = list(node_to_primary[node.name])
2011         elif field == "sinst_list":
2012           val = list(node_to_secondary[node.name])
2013         elif field == "pinst_cnt":
2014           val = len(node_to_primary[node.name])
2015         elif field == "sinst_cnt":
2016           val = len(node_to_secondary[node.name])
2017         elif field == "pip":
2018           val = node.primary_ip
2019         elif field == "sip":
2020           val = node.secondary_ip
2021         elif field == "tags":
2022           val = list(node.GetTags())
2023         elif field == "serial_no":
2024           val = node.serial_no
2025         elif field == "master_candidate":
2026           val = node.master_candidate
2027         elif field == "master":
2028           val = node.name == master_node
2029         elif field == "offline":
2030           val = node.offline
2031         elif field == "drained":
2032           val = node.drained
2033         elif self._FIELDS_DYNAMIC.Matches(field):
2034           val = live_data[node.name].get(field, None)
2035         else:
2036           raise errors.ParameterError(field)
2037         node_output.append(val)
2038       output.append(node_output)
2039
2040     return output
2041
2042
2043 class LUQueryNodeVolumes(NoHooksLU):
2044   """Logical unit for getting volumes on node(s).
2045
2046   """
2047   _OP_REQP = ["nodes", "output_fields"]
2048   REQ_BGL = False
2049   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2050   _FIELDS_STATIC = utils.FieldSet("node")
2051
2052   def ExpandNames(self):
2053     _CheckOutputFields(static=self._FIELDS_STATIC,
2054                        dynamic=self._FIELDS_DYNAMIC,
2055                        selected=self.op.output_fields)
2056
2057     self.needed_locks = {}
2058     self.share_locks[locking.LEVEL_NODE] = 1
2059     if not self.op.nodes:
2060       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2061     else:
2062       self.needed_locks[locking.LEVEL_NODE] = \
2063         _GetWantedNodes(self, self.op.nodes)
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     This checks that the fields required are valid output fields.
2069
2070     """
2071     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2072
2073   def Exec(self, feedback_fn):
2074     """Computes the list of nodes and their attributes.
2075
2076     """
2077     nodenames = self.nodes
2078     volumes = self.rpc.call_node_volumes(nodenames)
2079
2080     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2081              in self.cfg.GetInstanceList()]
2082
2083     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2084
2085     output = []
2086     for node in nodenames:
2087       if node not in volumes or volumes[node].failed or not volumes[node].data:
2088         continue
2089
2090       node_vols = volumes[node].data[:]
2091       node_vols.sort(key=lambda vol: vol['dev'])
2092
2093       for vol in node_vols:
2094         node_output = []
2095         for field in self.op.output_fields:
2096           if field == "node":
2097             val = node
2098           elif field == "phys":
2099             val = vol['dev']
2100           elif field == "vg":
2101             val = vol['vg']
2102           elif field == "name":
2103             val = vol['name']
2104           elif field == "size":
2105             val = int(float(vol['size']))
2106           elif field == "instance":
2107             for inst in ilist:
2108               if node not in lv_by_node[inst]:
2109                 continue
2110               if vol['name'] in lv_by_node[inst][node]:
2111                 val = inst.name
2112                 break
2113             else:
2114               val = '-'
2115           else:
2116             raise errors.ParameterError(field)
2117           node_output.append(str(val))
2118
2119         output.append(node_output)
2120
2121     return output
2122
2123
2124 class LUAddNode(LogicalUnit):
2125   """Logical unit for adding node to the cluster.
2126
2127   """
2128   HPATH = "node-add"
2129   HTYPE = constants.HTYPE_NODE
2130   _OP_REQP = ["node_name"]
2131
2132   def BuildHooksEnv(self):
2133     """Build hooks env.
2134
2135     This will run on all nodes before, and on all nodes + the new node after.
2136
2137     """
2138     env = {
2139       "OP_TARGET": self.op.node_name,
2140       "NODE_NAME": self.op.node_name,
2141       "NODE_PIP": self.op.primary_ip,
2142       "NODE_SIP": self.op.secondary_ip,
2143       }
2144     nodes_0 = self.cfg.GetNodeList()
2145     nodes_1 = nodes_0 + [self.op.node_name, ]
2146     return env, nodes_0, nodes_1
2147
2148   def CheckPrereq(self):
2149     """Check prerequisites.
2150
2151     This checks:
2152      - the new node is not already in the config
2153      - it is resolvable
2154      - its parameters (single/dual homed) matches the cluster
2155
2156     Any errors are signalled by raising errors.OpPrereqError.
2157
2158     """
2159     node_name = self.op.node_name
2160     cfg = self.cfg
2161
2162     dns_data = utils.HostInfo(node_name)
2163
2164     node = dns_data.name
2165     primary_ip = self.op.primary_ip = dns_data.ip
2166     secondary_ip = getattr(self.op, "secondary_ip", None)
2167     if secondary_ip is None:
2168       secondary_ip = primary_ip
2169     if not utils.IsValidIP(secondary_ip):
2170       raise errors.OpPrereqError("Invalid secondary IP given")
2171     self.op.secondary_ip = secondary_ip
2172
2173     node_list = cfg.GetNodeList()
2174     if not self.op.readd and node in node_list:
2175       raise errors.OpPrereqError("Node %s is already in the configuration" %
2176                                  node)
2177     elif self.op.readd and node not in node_list:
2178       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2179
2180     for existing_node_name in node_list:
2181       existing_node = cfg.GetNodeInfo(existing_node_name)
2182
2183       if self.op.readd and node == existing_node_name:
2184         if (existing_node.primary_ip != primary_ip or
2185             existing_node.secondary_ip != secondary_ip):
2186           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2187                                      " address configuration as before")
2188         continue
2189
2190       if (existing_node.primary_ip == primary_ip or
2191           existing_node.secondary_ip == primary_ip or
2192           existing_node.primary_ip == secondary_ip or
2193           existing_node.secondary_ip == secondary_ip):
2194         raise errors.OpPrereqError("New node ip address(es) conflict with"
2195                                    " existing node %s" % existing_node.name)
2196
2197     # check that the type of the node (single versus dual homed) is the
2198     # same as for the master
2199     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2200     master_singlehomed = myself.secondary_ip == myself.primary_ip
2201     newbie_singlehomed = secondary_ip == primary_ip
2202     if master_singlehomed != newbie_singlehomed:
2203       if master_singlehomed:
2204         raise errors.OpPrereqError("The master has no private ip but the"
2205                                    " new node has one")
2206       else:
2207         raise errors.OpPrereqError("The master has a private ip but the"
2208                                    " new node doesn't have one")
2209
2210     # checks reachablity
2211     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2212       raise errors.OpPrereqError("Node not reachable by ping")
2213
2214     if not newbie_singlehomed:
2215       # check reachability from my secondary ip to newbie's secondary ip
2216       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2217                            source=myself.secondary_ip):
2218         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2219                                    " based ping to noded port")
2220
2221     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2222     mc_now, _ = self.cfg.GetMasterCandidateStats()
2223     master_candidate = mc_now < cp_size
2224
2225     self.new_node = objects.Node(name=node,
2226                                  primary_ip=primary_ip,
2227                                  secondary_ip=secondary_ip,
2228                                  master_candidate=master_candidate,
2229                                  offline=False, drained=False)
2230
2231   def Exec(self, feedback_fn):
2232     """Adds the new node to the cluster.
2233
2234     """
2235     new_node = self.new_node
2236     node = new_node.name
2237
2238     # check connectivity
2239     result = self.rpc.call_version([node])[node]
2240     result.Raise()
2241     if result.data:
2242       if constants.PROTOCOL_VERSION == result.data:
2243         logging.info("Communication to node %s fine, sw version %s match",
2244                      node, result.data)
2245       else:
2246         raise errors.OpExecError("Version mismatch master version %s,"
2247                                  " node version %s" %
2248                                  (constants.PROTOCOL_VERSION, result.data))
2249     else:
2250       raise errors.OpExecError("Cannot get version from the new node")
2251
2252     # setup ssh on node
2253     logging.info("Copy ssh key to node %s", node)
2254     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2255     keyarray = []
2256     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2257                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2258                 priv_key, pub_key]
2259
2260     for i in keyfiles:
2261       f = open(i, 'r')
2262       try:
2263         keyarray.append(f.read())
2264       finally:
2265         f.close()
2266
2267     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2268                                     keyarray[2],
2269                                     keyarray[3], keyarray[4], keyarray[5])
2270
2271     msg = result.RemoteFailMsg()
2272     if msg:
2273       raise errors.OpExecError("Cannot transfer ssh keys to the"
2274                                " new node: %s" % msg)
2275
2276     # Add node to our /etc/hosts, and add key to known_hosts
2277     if self.cfg.GetClusterInfo().modify_etc_hosts:
2278       utils.AddHostToEtcHosts(new_node.name)
2279
2280     if new_node.secondary_ip != new_node.primary_ip:
2281       result = self.rpc.call_node_has_ip_address(new_node.name,
2282                                                  new_node.secondary_ip)
2283       if result.failed or not result.data:
2284         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2285                                  " you gave (%s). Please fix and re-run this"
2286                                  " command." % new_node.secondary_ip)
2287
2288     node_verify_list = [self.cfg.GetMasterNode()]
2289     node_verify_param = {
2290       'nodelist': [node],
2291       # TODO: do a node-net-test as well?
2292     }
2293
2294     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2295                                        self.cfg.GetClusterName())
2296     for verifier in node_verify_list:
2297       if result[verifier].failed or not result[verifier].data:
2298         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2299                                  " for remote verification" % verifier)
2300       if result[verifier].data['nodelist']:
2301         for failed in result[verifier].data['nodelist']:
2302           feedback_fn("ssh/hostname verification failed %s -> %s" %
2303                       (verifier, result[verifier].data['nodelist'][failed]))
2304         raise errors.OpExecError("ssh/hostname verification failed.")
2305
2306     if self.op.readd:
2307       _RedistributeAncillaryFiles(self)
2308       self.context.ReaddNode(new_node)
2309     else:
2310       _RedistributeAncillaryFiles(self, additional_nodes=node)
2311       self.context.AddNode(new_node)
2312
2313
2314 class LUSetNodeParams(LogicalUnit):
2315   """Modifies the parameters of a node.
2316
2317   """
2318   HPATH = "node-modify"
2319   HTYPE = constants.HTYPE_NODE
2320   _OP_REQP = ["node_name"]
2321   REQ_BGL = False
2322
2323   def CheckArguments(self):
2324     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2325     if node_name is None:
2326       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2327     self.op.node_name = node_name
2328     _CheckBooleanOpField(self.op, 'master_candidate')
2329     _CheckBooleanOpField(self.op, 'offline')
2330     _CheckBooleanOpField(self.op, 'drained')
2331     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2332     if all_mods.count(None) == 3:
2333       raise errors.OpPrereqError("Please pass at least one modification")
2334     if all_mods.count(True) > 1:
2335       raise errors.OpPrereqError("Can't set the node into more than one"
2336                                  " state at the same time")
2337
2338   def ExpandNames(self):
2339     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2340
2341   def BuildHooksEnv(self):
2342     """Build hooks env.
2343
2344     This runs on the master node.
2345
2346     """
2347     env = {
2348       "OP_TARGET": self.op.node_name,
2349       "MASTER_CANDIDATE": str(self.op.master_candidate),
2350       "OFFLINE": str(self.op.offline),
2351       "DRAINED": str(self.op.drained),
2352       }
2353     nl = [self.cfg.GetMasterNode(),
2354           self.op.node_name]
2355     return env, nl, nl
2356
2357   def CheckPrereq(self):
2358     """Check prerequisites.
2359
2360     This only checks the instance list against the existing names.
2361
2362     """
2363     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2364
2365     if ((self.op.master_candidate == False or self.op.offline == True or
2366          self.op.drained == True) and node.master_candidate):
2367       # we will demote the node from master_candidate
2368       if self.op.node_name == self.cfg.GetMasterNode():
2369         raise errors.OpPrereqError("The master node has to be a"
2370                                    " master candidate, online and not drained")
2371       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2372       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2373       if num_candidates <= cp_size:
2374         msg = ("Not enough master candidates (desired"
2375                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2376         if self.op.force:
2377           self.LogWarning(msg)
2378         else:
2379           raise errors.OpPrereqError(msg)
2380
2381     if (self.op.master_candidate == True and
2382         ((node.offline and not self.op.offline == False) or
2383          (node.drained and not self.op.drained == False))):
2384       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2385                                  " to master_candidate" % node.name)
2386
2387     return
2388
2389   def Exec(self, feedback_fn):
2390     """Modifies a node.
2391
2392     """
2393     node = self.node
2394
2395     result = []
2396     changed_mc = False
2397
2398     if self.op.offline is not None:
2399       node.offline = self.op.offline
2400       result.append(("offline", str(self.op.offline)))
2401       if self.op.offline == True:
2402         if node.master_candidate:
2403           node.master_candidate = False
2404           changed_mc = True
2405           result.append(("master_candidate", "auto-demotion due to offline"))
2406         if node.drained:
2407           node.drained = False
2408           result.append(("drained", "clear drained status due to offline"))
2409
2410     if self.op.master_candidate is not None:
2411       node.master_candidate = self.op.master_candidate
2412       changed_mc = True
2413       result.append(("master_candidate", str(self.op.master_candidate)))
2414       if self.op.master_candidate == False:
2415         rrc = self.rpc.call_node_demote_from_mc(node.name)
2416         msg = rrc.RemoteFailMsg()
2417         if msg:
2418           self.LogWarning("Node failed to demote itself: %s" % msg)
2419
2420     if self.op.drained is not None:
2421       node.drained = self.op.drained
2422       result.append(("drained", str(self.op.drained)))
2423       if self.op.drained == True:
2424         if node.master_candidate:
2425           node.master_candidate = False
2426           changed_mc = True
2427           result.append(("master_candidate", "auto-demotion due to drain"))
2428         if node.offline:
2429           node.offline = False
2430           result.append(("offline", "clear offline status due to drain"))
2431
2432     # this will trigger configuration file update, if needed
2433     self.cfg.Update(node)
2434     # this will trigger job queue propagation or cleanup
2435     if changed_mc:
2436       self.context.ReaddNode(node)
2437
2438     return result
2439
2440
2441 class LUPowercycleNode(NoHooksLU):
2442   """Powercycles a node.
2443
2444   """
2445   _OP_REQP = ["node_name", "force"]
2446   REQ_BGL = False
2447
2448   def CheckArguments(self):
2449     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2450     if node_name is None:
2451       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2452     self.op.node_name = node_name
2453     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2454       raise errors.OpPrereqError("The node is the master and the force"
2455                                  " parameter was not set")
2456
2457   def ExpandNames(self):
2458     """Locking for PowercycleNode.
2459
2460     This is a last-resource option and shouldn't block on other
2461     jobs. Therefore, we grab no locks.
2462
2463     """
2464     self.needed_locks = {}
2465
2466   def CheckPrereq(self):
2467     """Check prerequisites.
2468
2469     This LU has no prereqs.
2470
2471     """
2472     pass
2473
2474   def Exec(self, feedback_fn):
2475     """Reboots a node.
2476
2477     """
2478     result = self.rpc.call_node_powercycle(self.op.node_name,
2479                                            self.cfg.GetHypervisorType())
2480     msg = result.RemoteFailMsg()
2481     if msg:
2482       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2483     return result.payload
2484
2485
2486 class LUQueryClusterInfo(NoHooksLU):
2487   """Query cluster configuration.
2488
2489   """
2490   _OP_REQP = []
2491   REQ_BGL = False
2492
2493   def ExpandNames(self):
2494     self.needed_locks = {}
2495
2496   def CheckPrereq(self):
2497     """No prerequsites needed for this LU.
2498
2499     """
2500     pass
2501
2502   def Exec(self, feedback_fn):
2503     """Return cluster config.
2504
2505     """
2506     cluster = self.cfg.GetClusterInfo()
2507     result = {
2508       "software_version": constants.RELEASE_VERSION,
2509       "protocol_version": constants.PROTOCOL_VERSION,
2510       "config_version": constants.CONFIG_VERSION,
2511       "os_api_version": constants.OS_API_VERSION,
2512       "export_version": constants.EXPORT_VERSION,
2513       "architecture": (platform.architecture()[0], platform.machine()),
2514       "name": cluster.cluster_name,
2515       "master": cluster.master_node,
2516       "default_hypervisor": cluster.default_hypervisor,
2517       "enabled_hypervisors": cluster.enabled_hypervisors,
2518       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2519                         for hypervisor in cluster.enabled_hypervisors]),
2520       "beparams": cluster.beparams,
2521       "nicparams": cluster.nicparams,
2522       "candidate_pool_size": cluster.candidate_pool_size,
2523       "default_bridge": cluster.default_bridge,
2524       "master_netdev": cluster.master_netdev,
2525       "volume_group_name": cluster.volume_group_name,
2526       "file_storage_dir": cluster.file_storage_dir,
2527       }
2528
2529     return result
2530
2531
2532 class LUQueryConfigValues(NoHooksLU):
2533   """Return configuration values.
2534
2535   """
2536   _OP_REQP = []
2537   REQ_BGL = False
2538   _FIELDS_DYNAMIC = utils.FieldSet()
2539   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2540
2541   def ExpandNames(self):
2542     self.needed_locks = {}
2543
2544     _CheckOutputFields(static=self._FIELDS_STATIC,
2545                        dynamic=self._FIELDS_DYNAMIC,
2546                        selected=self.op.output_fields)
2547
2548   def CheckPrereq(self):
2549     """No prerequisites.
2550
2551     """
2552     pass
2553
2554   def Exec(self, feedback_fn):
2555     """Dump a representation of the cluster config to the standard output.
2556
2557     """
2558     values = []
2559     for field in self.op.output_fields:
2560       if field == "cluster_name":
2561         entry = self.cfg.GetClusterName()
2562       elif field == "master_node":
2563         entry = self.cfg.GetMasterNode()
2564       elif field == "drain_flag":
2565         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2566       else:
2567         raise errors.ParameterError(field)
2568       values.append(entry)
2569     return values
2570
2571
2572 class LUActivateInstanceDisks(NoHooksLU):
2573   """Bring up an instance's disks.
2574
2575   """
2576   _OP_REQP = ["instance_name"]
2577   REQ_BGL = False
2578
2579   def ExpandNames(self):
2580     self._ExpandAndLockInstance()
2581     self.needed_locks[locking.LEVEL_NODE] = []
2582     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2583
2584   def DeclareLocks(self, level):
2585     if level == locking.LEVEL_NODE:
2586       self._LockInstancesNodes()
2587
2588   def CheckPrereq(self):
2589     """Check prerequisites.
2590
2591     This checks that the instance is in the cluster.
2592
2593     """
2594     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595     assert self.instance is not None, \
2596       "Cannot retrieve locked instance %s" % self.op.instance_name
2597     _CheckNodeOnline(self, self.instance.primary_node)
2598
2599   def Exec(self, feedback_fn):
2600     """Activate the disks.
2601
2602     """
2603     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2604     if not disks_ok:
2605       raise errors.OpExecError("Cannot activate block devices")
2606
2607     return disks_info
2608
2609
2610 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611   """Prepare the block devices for an instance.
2612
2613   This sets up the block devices on all nodes.
2614
2615   @type lu: L{LogicalUnit}
2616   @param lu: the logical unit on whose behalf we execute
2617   @type instance: L{objects.Instance}
2618   @param instance: the instance for whose disks we assemble
2619   @type ignore_secondaries: boolean
2620   @param ignore_secondaries: if true, errors on secondary nodes
2621       won't result in an error return from the function
2622   @return: False if the operation failed, otherwise a list of
2623       (host, instance_visible_name, node_visible_name)
2624       with the mapping from node devices to instance devices
2625
2626   """
2627   device_info = []
2628   disks_ok = True
2629   iname = instance.name
2630   # With the two passes mechanism we try to reduce the window of
2631   # opportunity for the race condition of switching DRBD to primary
2632   # before handshaking occured, but we do not eliminate it
2633
2634   # The proper fix would be to wait (with some limits) until the
2635   # connection has been made and drbd transitions from WFConnection
2636   # into any other network-connected state (Connected, SyncTarget,
2637   # SyncSource, etc.)
2638
2639   # 1st pass, assemble on all nodes in secondary mode
2640   for inst_disk in instance.disks:
2641     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642       lu.cfg.SetDiskID(node_disk, node)
2643       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644       msg = result.RemoteFailMsg()
2645       if msg:
2646         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647                            " (is_primary=False, pass=1): %s",
2648                            inst_disk.iv_name, node, msg)
2649         if not ignore_secondaries:
2650           disks_ok = False
2651
2652   # FIXME: race condition on drbd migration to primary
2653
2654   # 2nd pass, do only the primary node
2655   for inst_disk in instance.disks:
2656     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657       if node != instance.primary_node:
2658         continue
2659       lu.cfg.SetDiskID(node_disk, node)
2660       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661       msg = result.RemoteFailMsg()
2662       if msg:
2663         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664                            " (is_primary=True, pass=2): %s",
2665                            inst_disk.iv_name, node, msg)
2666         disks_ok = False
2667     device_info.append((instance.primary_node, inst_disk.iv_name,
2668                         result.payload))
2669
2670   # leave the disks configured for the primary node
2671   # this is a workaround that would be fixed better by
2672   # improving the logical/physical id handling
2673   for disk in instance.disks:
2674     lu.cfg.SetDiskID(disk, instance.primary_node)
2675
2676   return disks_ok, device_info
2677
2678
2679 def _StartInstanceDisks(lu, instance, force):
2680   """Start the disks of an instance.
2681
2682   """
2683   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2684                                            ignore_secondaries=force)
2685   if not disks_ok:
2686     _ShutdownInstanceDisks(lu, instance)
2687     if force is not None and not force:
2688       lu.proc.LogWarning("", hint="If the message above refers to a"
2689                          " secondary node,"
2690                          " you can retry the operation using '--force'.")
2691     raise errors.OpExecError("Disk consistency error")
2692
2693
2694 class LUDeactivateInstanceDisks(NoHooksLU):
2695   """Shutdown an instance's disks.
2696
2697   """
2698   _OP_REQP = ["instance_name"]
2699   REQ_BGL = False
2700
2701   def ExpandNames(self):
2702     self._ExpandAndLockInstance()
2703     self.needed_locks[locking.LEVEL_NODE] = []
2704     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705
2706   def DeclareLocks(self, level):
2707     if level == locking.LEVEL_NODE:
2708       self._LockInstancesNodes()
2709
2710   def CheckPrereq(self):
2711     """Check prerequisites.
2712
2713     This checks that the instance is in the cluster.
2714
2715     """
2716     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717     assert self.instance is not None, \
2718       "Cannot retrieve locked instance %s" % self.op.instance_name
2719
2720   def Exec(self, feedback_fn):
2721     """Deactivate the disks
2722
2723     """
2724     instance = self.instance
2725     _SafeShutdownInstanceDisks(self, instance)
2726
2727
2728 def _SafeShutdownInstanceDisks(lu, instance):
2729   """Shutdown block devices of an instance.
2730
2731   This function checks if an instance is running, before calling
2732   _ShutdownInstanceDisks.
2733
2734   """
2735   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736                                       [instance.hypervisor])
2737   ins_l = ins_l[instance.primary_node]
2738   if ins_l.failed or not isinstance(ins_l.data, list):
2739     raise errors.OpExecError("Can't contact node '%s'" %
2740                              instance.primary_node)
2741
2742   if instance.name in ins_l.data:
2743     raise errors.OpExecError("Instance is running, can't shutdown"
2744                              " block devices.")
2745
2746   _ShutdownInstanceDisks(lu, instance)
2747
2748
2749 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750   """Shutdown block devices of an instance.
2751
2752   This does the shutdown on all nodes of the instance.
2753
2754   If the ignore_primary is false, errors on the primary node are
2755   ignored.
2756
2757   """
2758   all_result = True
2759   for disk in instance.disks:
2760     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761       lu.cfg.SetDiskID(top_disk, node)
2762       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763       msg = result.RemoteFailMsg()
2764       if msg:
2765         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766                       disk.iv_name, node, msg)
2767         if not ignore_primary or node != instance.primary_node:
2768           all_result = False
2769   return all_result
2770
2771
2772 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773   """Checks if a node has enough free memory.
2774
2775   This function check if a given node has the needed amount of free
2776   memory. In case the node has less memory or we cannot get the
2777   information from the node, this function raise an OpPrereqError
2778   exception.
2779
2780   @type lu: C{LogicalUnit}
2781   @param lu: a logical unit from which we get configuration data
2782   @type node: C{str}
2783   @param node: the node to check
2784   @type reason: C{str}
2785   @param reason: string to use in the error message
2786   @type requested: C{int}
2787   @param requested: the amount of memory in MiB to check for
2788   @type hypervisor_name: C{str}
2789   @param hypervisor_name: the hypervisor to ask for memory stats
2790   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791       we cannot check the node
2792
2793   """
2794   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795   nodeinfo[node].Raise()
2796   free_mem = nodeinfo[node].data.get('memory_free')
2797   if not isinstance(free_mem, int):
2798     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799                              " was '%s'" % (node, free_mem))
2800   if requested > free_mem:
2801     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802                              " needed %s MiB, available %s MiB" %
2803                              (node, reason, requested, free_mem))
2804
2805
2806 class LUStartupInstance(LogicalUnit):
2807   """Starts an instance.
2808
2809   """
2810   HPATH = "instance-start"
2811   HTYPE = constants.HTYPE_INSTANCE
2812   _OP_REQP = ["instance_name", "force"]
2813   REQ_BGL = False
2814
2815   def ExpandNames(self):
2816     self._ExpandAndLockInstance()
2817
2818   def BuildHooksEnv(self):
2819     """Build hooks env.
2820
2821     This runs on master, primary and secondary nodes of the instance.
2822
2823     """
2824     env = {
2825       "FORCE": self.op.force,
2826       }
2827     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829     return env, nl, nl
2830
2831   def CheckPrereq(self):
2832     """Check prerequisites.
2833
2834     This checks that the instance is in the cluster.
2835
2836     """
2837     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838     assert self.instance is not None, \
2839       "Cannot retrieve locked instance %s" % self.op.instance_name
2840
2841     # extra beparams
2842     self.beparams = getattr(self.op, "beparams", {})
2843     if self.beparams:
2844       if not isinstance(self.beparams, dict):
2845         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846                                    " dict" % (type(self.beparams), ))
2847       # fill the beparams dict
2848       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849       self.op.beparams = self.beparams
2850
2851     # extra hvparams
2852     self.hvparams = getattr(self.op, "hvparams", {})
2853     if self.hvparams:
2854       if not isinstance(self.hvparams, dict):
2855         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856                                    " dict" % (type(self.hvparams), ))
2857
2858       # check hypervisor parameter syntax (locally)
2859       cluster = self.cfg.GetClusterInfo()
2860       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2862                                     instance.hvparams)
2863       filled_hvp.update(self.hvparams)
2864       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865       hv_type.CheckParameterSyntax(filled_hvp)
2866       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867       self.op.hvparams = self.hvparams
2868
2869     _CheckNodeOnline(self, instance.primary_node)
2870
2871     bep = self.cfg.GetClusterInfo().FillBE(instance)
2872     # check bridges existance
2873     _CheckInstanceBridgesExist(self, instance)
2874
2875     remote_info = self.rpc.call_instance_info(instance.primary_node,
2876                                               instance.name,
2877                                               instance.hypervisor)
2878     remote_info.Raise()
2879     if not remote_info.data:
2880       _CheckNodeFreeMemory(self, instance.primary_node,
2881                            "starting instance %s" % instance.name,
2882                            bep[constants.BE_MEMORY], instance.hypervisor)
2883
2884   def Exec(self, feedback_fn):
2885     """Start the instance.
2886
2887     """
2888     instance = self.instance
2889     force = self.op.force
2890
2891     self.cfg.MarkInstanceUp(instance.name)
2892
2893     node_current = instance.primary_node
2894
2895     _StartInstanceDisks(self, instance, force)
2896
2897     result = self.rpc.call_instance_start(node_current, instance,
2898                                           self.hvparams, self.beparams)
2899     msg = result.RemoteFailMsg()
2900     if msg:
2901       _ShutdownInstanceDisks(self, instance)
2902       raise errors.OpExecError("Could not start instance: %s" % msg)
2903
2904
2905 class LURebootInstance(LogicalUnit):
2906   """Reboot an instance.
2907
2908   """
2909   HPATH = "instance-reboot"
2910   HTYPE = constants.HTYPE_INSTANCE
2911   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2912   REQ_BGL = False
2913
2914   def ExpandNames(self):
2915     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916                                    constants.INSTANCE_REBOOT_HARD,
2917                                    constants.INSTANCE_REBOOT_FULL]:
2918       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919                                   (constants.INSTANCE_REBOOT_SOFT,
2920                                    constants.INSTANCE_REBOOT_HARD,
2921                                    constants.INSTANCE_REBOOT_FULL))
2922     self._ExpandAndLockInstance()
2923
2924   def BuildHooksEnv(self):
2925     """Build hooks env.
2926
2927     This runs on master, primary and secondary nodes of the instance.
2928
2929     """
2930     env = {
2931       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932       "REBOOT_TYPE": self.op.reboot_type,
2933       }
2934     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2936     return env, nl, nl
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     This checks that the instance is in the cluster.
2942
2943     """
2944     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945     assert self.instance is not None, \
2946       "Cannot retrieve locked instance %s" % self.op.instance_name
2947
2948     _CheckNodeOnline(self, instance.primary_node)
2949
2950     # check bridges existance
2951     _CheckInstanceBridgesExist(self, instance)
2952
2953   def Exec(self, feedback_fn):
2954     """Reboot the instance.
2955
2956     """
2957     instance = self.instance
2958     ignore_secondaries = self.op.ignore_secondaries
2959     reboot_type = self.op.reboot_type
2960
2961     node_current = instance.primary_node
2962
2963     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964                        constants.INSTANCE_REBOOT_HARD]:
2965       for disk in instance.disks:
2966         self.cfg.SetDiskID(disk, node_current)
2967       result = self.rpc.call_instance_reboot(node_current, instance,
2968                                              reboot_type)
2969       msg = result.RemoteFailMsg()
2970       if msg:
2971         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2972     else:
2973       result = self.rpc.call_instance_shutdown(node_current, instance)
2974       msg = result.RemoteFailMsg()
2975       if msg:
2976         raise errors.OpExecError("Could not shutdown instance for"
2977                                  " full reboot: %s" % msg)
2978       _ShutdownInstanceDisks(self, instance)
2979       _StartInstanceDisks(self, instance, ignore_secondaries)
2980       result = self.rpc.call_instance_start(node_current, instance, None, None)
2981       msg = result.RemoteFailMsg()
2982       if msg:
2983         _ShutdownInstanceDisks(self, instance)
2984         raise errors.OpExecError("Could not start instance for"
2985                                  " full reboot: %s" % msg)
2986
2987     self.cfg.MarkInstanceUp(instance.name)
2988
2989
2990 class LUShutdownInstance(LogicalUnit):
2991   """Shutdown an instance.
2992
2993   """
2994   HPATH = "instance-stop"
2995   HTYPE = constants.HTYPE_INSTANCE
2996   _OP_REQP = ["instance_name"]
2997   REQ_BGL = False
2998
2999   def ExpandNames(self):
3000     self._ExpandAndLockInstance()
3001
3002   def BuildHooksEnv(self):
3003     """Build hooks env.
3004
3005     This runs on master, primary and secondary nodes of the instance.
3006
3007     """
3008     env = _BuildInstanceHookEnvByObject(self, self.instance)
3009     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3010     return env, nl, nl
3011
3012   def CheckPrereq(self):
3013     """Check prerequisites.
3014
3015     This checks that the instance is in the cluster.
3016
3017     """
3018     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019     assert self.instance is not None, \
3020       "Cannot retrieve locked instance %s" % self.op.instance_name
3021     _CheckNodeOnline(self, self.instance.primary_node)
3022
3023   def Exec(self, feedback_fn):
3024     """Shutdown the instance.
3025
3026     """
3027     instance = self.instance
3028     node_current = instance.primary_node
3029     self.cfg.MarkInstanceDown(instance.name)
3030     result = self.rpc.call_instance_shutdown(node_current, instance)
3031     msg = result.RemoteFailMsg()
3032     if msg:
3033       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3034
3035     _ShutdownInstanceDisks(self, instance)
3036
3037
3038 class LUReinstallInstance(LogicalUnit):
3039   """Reinstall an instance.
3040
3041   """
3042   HPATH = "instance-reinstall"
3043   HTYPE = constants.HTYPE_INSTANCE
3044   _OP_REQP = ["instance_name"]
3045   REQ_BGL = False
3046
3047   def ExpandNames(self):
3048     self._ExpandAndLockInstance()
3049
3050   def BuildHooksEnv(self):
3051     """Build hooks env.
3052
3053     This runs on master, primary and secondary nodes of the instance.
3054
3055     """
3056     env = _BuildInstanceHookEnvByObject(self, self.instance)
3057     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058     return env, nl, nl
3059
3060   def CheckPrereq(self):
3061     """Check prerequisites.
3062
3063     This checks that the instance is in the cluster and is not running.
3064
3065     """
3066     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067     assert instance is not None, \
3068       "Cannot retrieve locked instance %s" % self.op.instance_name
3069     _CheckNodeOnline(self, instance.primary_node)
3070
3071     if instance.disk_template == constants.DT_DISKLESS:
3072       raise errors.OpPrereqError("Instance '%s' has no disks" %
3073                                  self.op.instance_name)
3074     if instance.admin_up:
3075       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076                                  self.op.instance_name)
3077     remote_info = self.rpc.call_instance_info(instance.primary_node,
3078                                               instance.name,
3079                                               instance.hypervisor)
3080     remote_info.Raise()
3081     if remote_info.data:
3082       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083                                  (self.op.instance_name,
3084                                   instance.primary_node))
3085
3086     self.op.os_type = getattr(self.op, "os_type", None)
3087     if self.op.os_type is not None:
3088       # OS verification
3089       pnode = self.cfg.GetNodeInfo(
3090         self.cfg.ExpandNodeName(instance.primary_node))
3091       if pnode is None:
3092         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3093                                    self.op.pnode)
3094       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3095       result.Raise()
3096       if not isinstance(result.data, objects.OS):
3097         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098                                    " primary node"  % self.op.os_type)
3099
3100     self.instance = instance
3101
3102   def Exec(self, feedback_fn):
3103     """Reinstall the instance.
3104
3105     """
3106     inst = self.instance
3107
3108     if self.op.os_type is not None:
3109       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110       inst.os = self.op.os_type
3111       self.cfg.Update(inst)
3112
3113     _StartInstanceDisks(self, inst, None)
3114     try:
3115       feedback_fn("Running the instance OS create scripts...")
3116       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3117       msg = result.RemoteFailMsg()
3118       if msg:
3119         raise errors.OpExecError("Could not install OS for instance %s"
3120                                  " on node %s: %s" %
3121                                  (inst.name, inst.primary_node, msg))
3122     finally:
3123       _ShutdownInstanceDisks(self, inst)
3124
3125
3126 class LURenameInstance(LogicalUnit):
3127   """Rename an instance.
3128
3129   """
3130   HPATH = "instance-rename"
3131   HTYPE = constants.HTYPE_INSTANCE
3132   _OP_REQP = ["instance_name", "new_name"]
3133
3134   def BuildHooksEnv(self):
3135     """Build hooks env.
3136
3137     This runs on master, primary and secondary nodes of the instance.
3138
3139     """
3140     env = _BuildInstanceHookEnvByObject(self, self.instance)
3141     env["INSTANCE_NEW_NAME"] = self.op.new_name
3142     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3143     return env, nl, nl
3144
3145   def CheckPrereq(self):
3146     """Check prerequisites.
3147
3148     This checks that the instance is in the cluster and is not running.
3149
3150     """
3151     instance = self.cfg.GetInstanceInfo(
3152       self.cfg.ExpandInstanceName(self.op.instance_name))
3153     if instance is None:
3154       raise errors.OpPrereqError("Instance '%s' not known" %
3155                                  self.op.instance_name)
3156     _CheckNodeOnline(self, instance.primary_node)
3157
3158     if instance.admin_up:
3159       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160                                  self.op.instance_name)
3161     remote_info = self.rpc.call_instance_info(instance.primary_node,
3162                                               instance.name,
3163                                               instance.hypervisor)
3164     remote_info.Raise()
3165     if remote_info.data:
3166       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167                                  (self.op.instance_name,
3168                                   instance.primary_node))
3169     self.instance = instance
3170
3171     # new name verification
3172     name_info = utils.HostInfo(self.op.new_name)
3173
3174     self.op.new_name = new_name = name_info.name
3175     instance_list = self.cfg.GetInstanceList()
3176     if new_name in instance_list:
3177       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178                                  new_name)
3179
3180     if not getattr(self.op, "ignore_ip", False):
3181       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183                                    (name_info.ip, new_name))
3184
3185
3186   def Exec(self, feedback_fn):
3187     """Reinstall the instance.
3188
3189     """
3190     inst = self.instance
3191     old_name = inst.name
3192
3193     if inst.disk_template == constants.DT_FILE:
3194       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195
3196     self.cfg.RenameInstance(inst.name, self.op.new_name)
3197     # Change the instance lock. This is definitely safe while we hold the BGL
3198     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3200
3201     # re-read the instance from the configuration after rename
3202     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3203
3204     if inst.disk_template == constants.DT_FILE:
3205       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207                                                      old_file_storage_dir,
3208                                                      new_file_storage_dir)
3209       result.Raise()
3210       if not result.data:
3211         raise errors.OpExecError("Could not connect to node '%s' to rename"
3212                                  " directory '%s' to '%s' (but the instance"
3213                                  " has been renamed in Ganeti)" % (
3214                                  inst.primary_node, old_file_storage_dir,
3215                                  new_file_storage_dir))
3216
3217       if not result.data[0]:
3218         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219                                  " (but the instance has been renamed in"
3220                                  " Ganeti)" % (old_file_storage_dir,
3221                                                new_file_storage_dir))
3222
3223     _StartInstanceDisks(self, inst, None)
3224     try:
3225       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3226                                                  old_name)
3227       msg = result.RemoteFailMsg()
3228       if msg:
3229         msg = ("Could not run OS rename script for instance %s on node %s"
3230                " (but the instance has been renamed in Ganeti): %s" %
3231                (inst.name, inst.primary_node, msg))
3232         self.proc.LogWarning(msg)
3233     finally:
3234       _ShutdownInstanceDisks(self, inst)
3235
3236
3237 class LURemoveInstance(LogicalUnit):
3238   """Remove an instance.
3239
3240   """
3241   HPATH = "instance-remove"
3242   HTYPE = constants.HTYPE_INSTANCE
3243   _OP_REQP = ["instance_name", "ignore_failures"]
3244   REQ_BGL = False
3245
3246   def ExpandNames(self):
3247     self._ExpandAndLockInstance()
3248     self.needed_locks[locking.LEVEL_NODE] = []
3249     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3250
3251   def DeclareLocks(self, level):
3252     if level == locking.LEVEL_NODE:
3253       self._LockInstancesNodes()
3254
3255   def BuildHooksEnv(self):
3256     """Build hooks env.
3257
3258     This runs on master, primary and secondary nodes of the instance.
3259
3260     """
3261     env = _BuildInstanceHookEnvByObject(self, self.instance)
3262     nl = [self.cfg.GetMasterNode()]
3263     return env, nl, nl
3264
3265   def CheckPrereq(self):
3266     """Check prerequisites.
3267
3268     This checks that the instance is in the cluster.
3269
3270     """
3271     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272     assert self.instance is not None, \
3273       "Cannot retrieve locked instance %s" % self.op.instance_name
3274
3275   def Exec(self, feedback_fn):
3276     """Remove the instance.
3277
3278     """
3279     instance = self.instance
3280     logging.info("Shutting down instance %s on node %s",
3281                  instance.name, instance.primary_node)
3282
3283     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284     msg = result.RemoteFailMsg()
3285     if msg:
3286       if self.op.ignore_failures:
3287         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3288       else:
3289         raise errors.OpExecError("Could not shutdown instance %s on"
3290                                  " node %s: %s" %
3291                                  (instance.name, instance.primary_node, msg))
3292
3293     logging.info("Removing block devices for instance %s", instance.name)
3294
3295     if not _RemoveDisks(self, instance):
3296       if self.op.ignore_failures:
3297         feedback_fn("Warning: can't remove instance's disks")
3298       else:
3299         raise errors.OpExecError("Can't remove instance's disks")
3300
3301     logging.info("Removing instance %s out of cluster config", instance.name)
3302
3303     self.cfg.RemoveInstance(instance.name)
3304     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3305
3306
3307 class LUQueryInstances(NoHooksLU):
3308   """Logical unit for querying instances.
3309
3310   """
3311   _OP_REQP = ["output_fields", "names", "use_locking"]
3312   REQ_BGL = False
3313   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3314                                     "admin_state",
3315                                     "disk_template", "ip", "mac", "bridge",
3316                                     "sda_size", "sdb_size", "vcpus", "tags",
3317                                     "network_port", "beparams",
3318                                     r"(disk)\.(size)/([0-9]+)",
3319                                     r"(disk)\.(sizes)", "disk_usage",
3320                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321                                     r"(nic)\.(macs|ips|bridges)",
3322                                     r"(disk|nic)\.(count)",
3323                                     "serial_no", "hypervisor", "hvparams",] +
3324                                   ["hv/%s" % name
3325                                    for name in constants.HVS_PARAMETERS] +
3326                                   ["be/%s" % name
3327                                    for name in constants.BES_PARAMETERS])
3328   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3329
3330
3331   def ExpandNames(self):
3332     _CheckOutputFields(static=self._FIELDS_STATIC,
3333                        dynamic=self._FIELDS_DYNAMIC,
3334                        selected=self.op.output_fields)
3335
3336     self.needed_locks = {}
3337     self.share_locks[locking.LEVEL_INSTANCE] = 1
3338     self.share_locks[locking.LEVEL_NODE] = 1
3339
3340     if self.op.names:
3341       self.wanted = _GetWantedInstances(self, self.op.names)
3342     else:
3343       self.wanted = locking.ALL_SET
3344
3345     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346     self.do_locking = self.do_node_query and self.op.use_locking
3347     if self.do_locking:
3348       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349       self.needed_locks[locking.LEVEL_NODE] = []
3350       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351
3352   def DeclareLocks(self, level):
3353     if level == locking.LEVEL_NODE and self.do_locking:
3354       self._LockInstancesNodes()
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     """
3360     pass
3361
3362   def Exec(self, feedback_fn):
3363     """Computes the list of nodes and their attributes.
3364
3365     """
3366     all_info = self.cfg.GetAllInstancesInfo()
3367     if self.wanted == locking.ALL_SET:
3368       # caller didn't specify instance names, so ordering is not important
3369       if self.do_locking:
3370         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3371       else:
3372         instance_names = all_info.keys()
3373       instance_names = utils.NiceSort(instance_names)
3374     else:
3375       # caller did specify names, so we must keep the ordering
3376       if self.do_locking:
3377         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3378       else:
3379         tgt_set = all_info.keys()
3380       missing = set(self.wanted).difference(tgt_set)
3381       if missing:
3382         raise errors.OpExecError("Some instances were removed before"
3383                                  " retrieving their data: %s" % missing)
3384       instance_names = self.wanted
3385
3386     instance_list = [all_info[iname] for iname in instance_names]
3387
3388     # begin data gathering
3389
3390     nodes = frozenset([inst.primary_node for inst in instance_list])
3391     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392
3393     bad_nodes = []
3394     off_nodes = []
3395     if self.do_node_query:
3396       live_data = {}
3397       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3398       for name in nodes:
3399         result = node_data[name]
3400         if result.offline:
3401           # offline nodes will be in both lists
3402           off_nodes.append(name)
3403         if result.failed:
3404           bad_nodes.append(name)
3405         else:
3406           if result.data:
3407             live_data.update(result.data)
3408             # else no instance is alive
3409     else:
3410       live_data = dict([(name, {}) for name in instance_names])
3411
3412     # end data gathering
3413
3414     HVPREFIX = "hv/"
3415     BEPREFIX = "be/"
3416     output = []
3417     for instance in instance_list:
3418       iout = []
3419       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421       for field in self.op.output_fields:
3422         st_match = self._FIELDS_STATIC.Matches(field)
3423         if field == "name":
3424           val = instance.name
3425         elif field == "os":
3426           val = instance.os
3427         elif field == "pnode":
3428           val = instance.primary_node
3429         elif field == "snodes":
3430           val = list(instance.secondary_nodes)
3431         elif field == "admin_state":
3432           val = instance.admin_up
3433         elif field == "oper_state":
3434           if instance.primary_node in bad_nodes:
3435             val = None
3436           else:
3437             val = bool(live_data.get(instance.name))
3438         elif field == "status":
3439           if instance.primary_node in off_nodes:
3440             val = "ERROR_nodeoffline"
3441           elif instance.primary_node in bad_nodes:
3442             val = "ERROR_nodedown"
3443           else:
3444             running = bool(live_data.get(instance.name))
3445             if running:
3446               if instance.admin_up:
3447                 val = "running"
3448               else:
3449                 val = "ERROR_up"
3450             else:
3451               if instance.admin_up:
3452                 val = "ERROR_down"
3453               else:
3454                 val = "ADMIN_down"
3455         elif field == "oper_ram":
3456           if instance.primary_node in bad_nodes:
3457             val = None
3458           elif instance.name in live_data:
3459             val = live_data[instance.name].get("memory", "?")
3460           else:
3461             val = "-"
3462         elif field == "disk_template":
3463           val = instance.disk_template
3464         elif field == "ip":
3465           val = instance.nics[0].ip
3466         elif field == "bridge":
3467           val = instance.nics[0].bridge
3468         elif field == "mac":
3469           val = instance.nics[0].mac
3470         elif field == "sda_size" or field == "sdb_size":
3471           idx = ord(field[2]) - ord('a')
3472           try:
3473             val = instance.FindDisk(idx).size
3474           except errors.OpPrereqError:
3475             val = None
3476         elif field == "disk_usage": # total disk usage per node
3477           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3478           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3479         elif field == "tags":
3480           val = list(instance.GetTags())
3481         elif field == "serial_no":
3482           val = instance.serial_no
3483         elif field == "network_port":
3484           val = instance.network_port
3485         elif field == "hypervisor":
3486           val = instance.hypervisor
3487         elif field == "hvparams":
3488           val = i_hv
3489         elif (field.startswith(HVPREFIX) and
3490               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3491           val = i_hv.get(field[len(HVPREFIX):], None)
3492         elif field == "beparams":
3493           val = i_be
3494         elif (field.startswith(BEPREFIX) and
3495               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3496           val = i_be.get(field[len(BEPREFIX):], None)
3497         elif st_match and st_match.groups():
3498           # matches a variable list
3499           st_groups = st_match.groups()
3500           if st_groups and st_groups[0] == "disk":
3501             if st_groups[1] == "count":
3502               val = len(instance.disks)
3503             elif st_groups[1] == "sizes":
3504               val = [disk.size for disk in instance.disks]
3505             elif st_groups[1] == "size":
3506               try:
3507                 val = instance.FindDisk(st_groups[2]).size
3508               except errors.OpPrereqError:
3509                 val = None
3510             else:
3511               assert False, "Unhandled disk parameter"
3512           elif st_groups[0] == "nic":
3513             if st_groups[1] == "count":
3514               val = len(instance.nics)
3515             elif st_groups[1] == "macs":
3516               val = [nic.mac for nic in instance.nics]
3517             elif st_groups[1] == "ips":
3518               val = [nic.ip for nic in instance.nics]
3519             elif st_groups[1] == "bridges":
3520               val = [nic.bridge for nic in instance.nics]
3521             else:
3522               # index-based item
3523               nic_idx = int(st_groups[2])
3524               if nic_idx >= len(instance.nics):
3525                 val = None
3526               else:
3527                 if st_groups[1] == "mac":
3528                   val = instance.nics[nic_idx].mac
3529                 elif st_groups[1] == "ip":
3530                   val = instance.nics[nic_idx].ip
3531                 elif st_groups[1] == "bridge":
3532                   val = instance.nics[nic_idx].bridge
3533                 else:
3534                   assert False, "Unhandled NIC parameter"
3535           else:
3536             assert False, "Unhandled variable parameter"
3537         else:
3538           raise errors.ParameterError(field)
3539         iout.append(val)
3540       output.append(iout)
3541
3542     return output
3543
3544
3545 class LUFailoverInstance(LogicalUnit):
3546   """Failover an instance.
3547
3548   """
3549   HPATH = "instance-failover"
3550   HTYPE = constants.HTYPE_INSTANCE
3551   _OP_REQP = ["instance_name", "ignore_consistency"]
3552   REQ_BGL = False
3553
3554   def ExpandNames(self):
3555     self._ExpandAndLockInstance()
3556     self.needed_locks[locking.LEVEL_NODE] = []
3557     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3558
3559   def DeclareLocks(self, level):
3560     if level == locking.LEVEL_NODE:
3561       self._LockInstancesNodes()
3562
3563   def BuildHooksEnv(self):
3564     """Build hooks env.
3565
3566     This runs on master, primary and secondary nodes of the instance.
3567
3568     """
3569     env = {
3570       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3571       }
3572     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3573     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3574     return env, nl, nl
3575
3576   def CheckPrereq(self):
3577     """Check prerequisites.
3578
3579     This checks that the instance is in the cluster.
3580
3581     """
3582     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3583     assert self.instance is not None, \
3584       "Cannot retrieve locked instance %s" % self.op.instance_name
3585
3586     bep = self.cfg.GetClusterInfo().FillBE(instance)
3587     if instance.disk_template not in constants.DTS_NET_MIRROR:
3588       raise errors.OpPrereqError("Instance's disk layout is not"
3589                                  " network mirrored, cannot failover.")
3590
3591     secondary_nodes = instance.secondary_nodes
3592     if not secondary_nodes:
3593       raise errors.ProgrammerError("no secondary node but using "
3594                                    "a mirrored disk template")
3595
3596     target_node = secondary_nodes[0]
3597     _CheckNodeOnline(self, target_node)
3598     _CheckNodeNotDrained(self, target_node)
3599     # check memory requirements on the secondary node
3600     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3601                          instance.name, bep[constants.BE_MEMORY],
3602                          instance.hypervisor)
3603
3604     # check bridge existance
3605     brlist = [nic.bridge for nic in instance.nics]
3606     result = self.rpc.call_bridges_exist(target_node, brlist)
3607     result.Raise()
3608     if not result.data:
3609       raise errors.OpPrereqError("One or more target bridges %s does not"
3610                                  " exist on destination node '%s'" %
3611                                  (brlist, target_node))
3612
3613   def Exec(self, feedback_fn):
3614     """Failover an instance.
3615
3616     The failover is done by shutting it down on its present node and
3617     starting it on the secondary.
3618
3619     """
3620     instance = self.instance
3621
3622     source_node = instance.primary_node
3623     target_node = instance.secondary_nodes[0]
3624
3625     feedback_fn("* checking disk consistency between source and target")
3626     for dev in instance.disks:
3627       # for drbd, these are drbd over lvm
3628       if not _CheckDiskConsistency(self, dev, target_node, False):
3629         if instance.admin_up and not self.op.ignore_consistency:
3630           raise errors.OpExecError("Disk %s is degraded on target node,"
3631                                    " aborting failover." % dev.iv_name)
3632
3633     feedback_fn("* shutting down instance on source node")
3634     logging.info("Shutting down instance %s on node %s",
3635                  instance.name, source_node)
3636
3637     result = self.rpc.call_instance_shutdown(source_node, instance)
3638     msg = result.RemoteFailMsg()
3639     if msg:
3640       if self.op.ignore_consistency:
3641         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3642                              " Proceeding anyway. Please make sure node"
3643                              " %s is down. Error details: %s",
3644                              instance.name, source_node, source_node, msg)
3645       else:
3646         raise errors.OpExecError("Could not shutdown instance %s on"
3647                                  " node %s: %s" %
3648                                  (instance.name, source_node, msg))
3649
3650     feedback_fn("* deactivating the instance's disks on source node")
3651     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3652       raise errors.OpExecError("Can't shut down the instance's disks.")
3653
3654     instance.primary_node = target_node
3655     # distribute new instance config to the other nodes
3656     self.cfg.Update(instance)
3657
3658     # Only start the instance if it's marked as up
3659     if instance.admin_up:
3660       feedback_fn("* activating the instance's disks on target node")
3661       logging.info("Starting instance %s on node %s",
3662                    instance.name, target_node)
3663
3664       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3665                                                ignore_secondaries=True)
3666       if not disks_ok:
3667         _ShutdownInstanceDisks(self, instance)
3668         raise errors.OpExecError("Can't activate the instance's disks")
3669
3670       feedback_fn("* starting the instance on the target node")
3671       result = self.rpc.call_instance_start(target_node, instance, None, None)
3672       msg = result.RemoteFailMsg()
3673       if msg:
3674         _ShutdownInstanceDisks(self, instance)
3675         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3676                                  (instance.name, target_node, msg))
3677
3678
3679 class LUMigrateInstance(LogicalUnit):
3680   """Migrate an instance.
3681
3682   This is migration without shutting down, compared to the failover,
3683   which is done with shutdown.
3684
3685   """
3686   HPATH = "instance-migrate"
3687   HTYPE = constants.HTYPE_INSTANCE
3688   _OP_REQP = ["instance_name", "live", "cleanup"]
3689
3690   REQ_BGL = False
3691
3692   def ExpandNames(self):
3693     self._ExpandAndLockInstance()
3694     self.needed_locks[locking.LEVEL_NODE] = []
3695     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3696
3697   def DeclareLocks(self, level):
3698     if level == locking.LEVEL_NODE:
3699       self._LockInstancesNodes()
3700
3701   def BuildHooksEnv(self):
3702     """Build hooks env.
3703
3704     This runs on master, primary and secondary nodes of the instance.
3705
3706     """
3707     env = _BuildInstanceHookEnvByObject(self, self.instance)
3708     env["MIGRATE_LIVE"] = self.op.live
3709     env["MIGRATE_CLEANUP"] = self.op.cleanup
3710     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3711     return env, nl, nl
3712
3713   def CheckPrereq(self):
3714     """Check prerequisites.
3715
3716     This checks that the instance is in the cluster.
3717
3718     """
3719     instance = self.cfg.GetInstanceInfo(
3720       self.cfg.ExpandInstanceName(self.op.instance_name))
3721     if instance is None:
3722       raise errors.OpPrereqError("Instance '%s' not known" %
3723                                  self.op.instance_name)
3724
3725     if instance.disk_template != constants.DT_DRBD8:
3726       raise errors.OpPrereqError("Instance's disk layout is not"
3727                                  " drbd8, cannot migrate.")
3728
3729     secondary_nodes = instance.secondary_nodes
3730     if not secondary_nodes:
3731       raise errors.ConfigurationError("No secondary node but using"
3732                                       " drbd8 disk template")
3733
3734     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3735
3736     target_node = secondary_nodes[0]
3737     # check memory requirements on the secondary node
3738     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3739                          instance.name, i_be[constants.BE_MEMORY],
3740                          instance.hypervisor)
3741
3742     # check bridge existance
3743     brlist = [nic.bridge for nic in instance.nics]
3744     result = self.rpc.call_bridges_exist(target_node, brlist)
3745     if result.failed or not result.data:
3746       raise errors.OpPrereqError("One or more target bridges %s does not"
3747                                  " exist on destination node '%s'" %
3748                                  (brlist, target_node))
3749
3750     if not self.op.cleanup:
3751       _CheckNodeNotDrained(self, target_node)
3752       result = self.rpc.call_instance_migratable(instance.primary_node,
3753                                                  instance)
3754       msg = result.RemoteFailMsg()
3755       if msg:
3756         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3757                                    msg)
3758
3759     self.instance = instance
3760
3761   def _WaitUntilSync(self):
3762     """Poll with custom rpc for disk sync.
3763
3764     This uses our own step-based rpc call.
3765
3766     """
3767     self.feedback_fn("* wait until resync is done")
3768     all_done = False
3769     while not all_done:
3770       all_done = True
3771       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3772                                             self.nodes_ip,
3773                                             self.instance.disks)
3774       min_percent = 100
3775       for node, nres in result.items():
3776         msg = nres.RemoteFailMsg()
3777         if msg:
3778           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3779                                    (node, msg))
3780         node_done, node_percent = nres.payload
3781         all_done = all_done and node_done
3782         if node_percent is not None:
3783           min_percent = min(min_percent, node_percent)
3784       if not all_done:
3785         if min_percent < 100:
3786           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3787         time.sleep(2)
3788
3789   def _EnsureSecondary(self, node):
3790     """Demote a node to secondary.
3791
3792     """
3793     self.feedback_fn("* switching node %s to secondary mode" % node)
3794
3795     for dev in self.instance.disks:
3796       self.cfg.SetDiskID(dev, node)
3797
3798     result = self.rpc.call_blockdev_close(node, self.instance.name,
3799                                           self.instance.disks)
3800     msg = result.RemoteFailMsg()
3801     if msg:
3802       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3803                                " error %s" % (node, msg))
3804
3805   def _GoStandalone(self):
3806     """Disconnect from the network.
3807
3808     """
3809     self.feedback_fn("* changing into standalone mode")
3810     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3811                                                self.instance.disks)
3812     for node, nres in result.items():
3813       msg = nres.RemoteFailMsg()
3814       if msg:
3815         raise errors.OpExecError("Cannot disconnect disks node %s,"
3816                                  " error %s" % (node, msg))
3817
3818   def _GoReconnect(self, multimaster):
3819     """Reconnect to the network.
3820
3821     """
3822     if multimaster:
3823       msg = "dual-master"
3824     else:
3825       msg = "single-master"
3826     self.feedback_fn("* changing disks into %s mode" % msg)
3827     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3828                                            self.instance.disks,
3829                                            self.instance.name, multimaster)
3830     for node, nres in result.items():
3831       msg = nres.RemoteFailMsg()
3832       if msg:
3833         raise errors.OpExecError("Cannot change disks config on node %s,"
3834                                  " error: %s" % (node, msg))
3835
3836   def _ExecCleanup(self):
3837     """Try to cleanup after a failed migration.
3838
3839     The cleanup is done by:
3840       - check that the instance is running only on one node
3841         (and update the config if needed)
3842       - change disks on its secondary node to secondary
3843       - wait until disks are fully synchronized
3844       - disconnect from the network
3845       - change disks into single-master mode
3846       - wait again until disks are fully synchronized
3847
3848     """
3849     instance = self.instance
3850     target_node = self.target_node
3851     source_node = self.source_node
3852
3853     # check running on only one node
3854     self.feedback_fn("* checking where the instance actually runs"
3855                      " (if this hangs, the hypervisor might be in"
3856                      " a bad state)")
3857     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3858     for node, result in ins_l.items():
3859       result.Raise()
3860       if not isinstance(result.data, list):
3861         raise errors.OpExecError("Can't contact node '%s'" % node)
3862
3863     runningon_source = instance.name in ins_l[source_node].data
3864     runningon_target = instance.name in ins_l[target_node].data
3865
3866     if runningon_source and runningon_target:
3867       raise errors.OpExecError("Instance seems to be running on two nodes,"
3868                                " or the hypervisor is confused. You will have"
3869                                " to ensure manually that it runs only on one"
3870                                " and restart this operation.")
3871
3872     if not (runningon_source or runningon_target):
3873       raise errors.OpExecError("Instance does not seem to be running at all."
3874                                " In this case, it's safer to repair by"
3875                                " running 'gnt-instance stop' to ensure disk"
3876                                " shutdown, and then restarting it.")
3877
3878     if runningon_target:
3879       # the migration has actually succeeded, we need to update the config
3880       self.feedback_fn("* instance running on secondary node (%s),"
3881                        " updating config" % target_node)
3882       instance.primary_node = target_node
3883       self.cfg.Update(instance)
3884       demoted_node = source_node
3885     else:
3886       self.feedback_fn("* instance confirmed to be running on its"
3887                        " primary node (%s)" % source_node)
3888       demoted_node = target_node
3889
3890     self._EnsureSecondary(demoted_node)
3891     try:
3892       self._WaitUntilSync()
3893     except errors.OpExecError:
3894       # we ignore here errors, since if the device is standalone, it
3895       # won't be able to sync
3896       pass
3897     self._GoStandalone()
3898     self._GoReconnect(False)
3899     self._WaitUntilSync()
3900
3901     self.feedback_fn("* done")
3902
3903   def _RevertDiskStatus(self):
3904     """Try to revert the disk status after a failed migration.
3905
3906     """
3907     target_node = self.target_node
3908     try:
3909       self._EnsureSecondary(target_node)
3910       self._GoStandalone()
3911       self._GoReconnect(False)
3912       self._WaitUntilSync()
3913     except errors.OpExecError, err:
3914       self.LogWarning("Migration failed and I can't reconnect the"
3915                       " drives: error '%s'\n"
3916                       "Please look and recover the instance status" %
3917                       str(err))
3918
3919   def _AbortMigration(self):
3920     """Call the hypervisor code to abort a started migration.
3921
3922     """
3923     instance = self.instance
3924     target_node = self.target_node
3925     migration_info = self.migration_info
3926
3927     abort_result = self.rpc.call_finalize_migration(target_node,
3928                                                     instance,
3929                                                     migration_info,
3930                                                     False)
3931     abort_msg = abort_result.RemoteFailMsg()
3932     if abort_msg:
3933       logging.error("Aborting migration failed on target node %s: %s" %
3934                     (target_node, abort_msg))
3935       # Don't raise an exception here, as we stil have to try to revert the
3936       # disk status, even if this step failed.
3937
3938   def _ExecMigration(self):
3939     """Migrate an instance.
3940
3941     The migrate is done by:
3942       - change the disks into dual-master mode
3943       - wait until disks are fully synchronized again
3944       - migrate the instance
3945       - change disks on the new secondary node (the old primary) to secondary
3946       - wait until disks are fully synchronized
3947       - change disks into single-master mode
3948
3949     """
3950     instance = self.instance
3951     target_node = self.target_node
3952     source_node = self.source_node
3953
3954     self.feedback_fn("* checking disk consistency between source and target")
3955     for dev in instance.disks:
3956       if not _CheckDiskConsistency(self, dev, target_node, False):
3957         raise errors.OpExecError("Disk %s is degraded or not fully"
3958                                  " synchronized on target node,"
3959                                  " aborting migrate." % dev.iv_name)
3960
3961     # First get the migration information from the remote node
3962     result = self.rpc.call_migration_info(source_node, instance)
3963     msg = result.RemoteFailMsg()
3964     if msg:
3965       log_err = ("Failed fetching source migration information from %s: %s" %
3966                  (source_node, msg))
3967       logging.error(log_err)
3968       raise errors.OpExecError(log_err)
3969
3970     self.migration_info = migration_info = result.payload
3971
3972     # Then switch the disks to master/master mode
3973     self._EnsureSecondary(target_node)
3974     self._GoStandalone()
3975     self._GoReconnect(True)
3976     self._WaitUntilSync()
3977
3978     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3979     result = self.rpc.call_accept_instance(target_node,
3980                                            instance,
3981                                            migration_info,
3982                                            self.nodes_ip[target_node])
3983
3984     msg = result.RemoteFailMsg()
3985     if msg:
3986       logging.error("Instance pre-migration failed, trying to revert"
3987                     " disk status: %s", msg)
3988       self._AbortMigration()
3989       self._RevertDiskStatus()
3990       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3991                                (instance.name, msg))
3992
3993     self.feedback_fn("* migrating instance to %s" % target_node)
3994     time.sleep(10)
3995     result = self.rpc.call_instance_migrate(source_node, instance,
3996                                             self.nodes_ip[target_node],
3997                                             self.op.live)
3998     msg = result.RemoteFailMsg()
3999     if msg:
4000       logging.error("Instance migration failed, trying to revert"
4001                     " disk status: %s", msg)
4002       self._AbortMigration()
4003       self._RevertDiskStatus()
4004       raise errors.OpExecError("Could not migrate instance %s: %s" %
4005                                (instance.name, msg))
4006     time.sleep(10)
4007
4008     instance.primary_node = target_node
4009     # distribute new instance config to the other nodes
4010     self.cfg.Update(instance)
4011
4012     result = self.rpc.call_finalize_migration(target_node,
4013                                               instance,
4014                                               migration_info,
4015                                               True)
4016     msg = result.RemoteFailMsg()
4017     if msg:
4018       logging.error("Instance migration succeeded, but finalization failed:"
4019                     " %s" % msg)
4020       raise errors.OpExecError("Could not finalize instance migration: %s" %
4021                                msg)
4022
4023     self._EnsureSecondary(source_node)
4024     self._WaitUntilSync()
4025     self._GoStandalone()
4026     self._GoReconnect(False)
4027     self._WaitUntilSync()
4028
4029     self.feedback_fn("* done")
4030
4031   def Exec(self, feedback_fn):
4032     """Perform the migration.
4033
4034     """
4035     self.feedback_fn = feedback_fn
4036
4037     self.source_node = self.instance.primary_node
4038     self.target_node = self.instance.secondary_nodes[0]
4039     self.all_nodes = [self.source_node, self.target_node]
4040     self.nodes_ip = {
4041       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4042       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4043       }
4044     if self.op.cleanup:
4045       return self._ExecCleanup()
4046     else:
4047       return self._ExecMigration()
4048
4049
4050 def _CreateBlockDev(lu, node, instance, device, force_create,
4051                     info, force_open):
4052   """Create a tree of block devices on a given node.
4053
4054   If this device type has to be created on secondaries, create it and
4055   all its children.
4056
4057   If not, just recurse to children keeping the same 'force' value.
4058
4059   @param lu: the lu on whose behalf we execute
4060   @param node: the node on which to create the device
4061   @type instance: L{objects.Instance}
4062   @param instance: the instance which owns the device
4063   @type device: L{objects.Disk}
4064   @param device: the device to create
4065   @type force_create: boolean
4066   @param force_create: whether to force creation of this device; this
4067       will be change to True whenever we find a device which has
4068       CreateOnSecondary() attribute
4069   @param info: the extra 'metadata' we should attach to the device
4070       (this will be represented as a LVM tag)
4071   @type force_open: boolean
4072   @param force_open: this parameter will be passes to the
4073       L{backend.BlockdevCreate} function where it specifies
4074       whether we run on primary or not, and it affects both
4075       the child assembly and the device own Open() execution
4076
4077   """
4078   if device.CreateOnSecondary():
4079     force_create = True
4080
4081   if device.children:
4082     for child in device.children:
4083       _CreateBlockDev(lu, node, instance, child, force_create,
4084                       info, force_open)
4085
4086   if not force_create:
4087     return
4088
4089   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4090
4091
4092 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4093   """Create a single block device on a given node.
4094
4095   This will not recurse over children of the device, so they must be
4096   created in advance.
4097
4098   @param lu: the lu on whose behalf we execute
4099   @param node: the node on which to create the device
4100   @type instance: L{objects.Instance}
4101   @param instance: the instance which owns the device
4102   @type device: L{objects.Disk}
4103   @param device: the device to create
4104   @param info: the extra 'metadata' we should attach to the device
4105       (this will be represented as a LVM tag)
4106   @type force_open: boolean
4107   @param force_open: this parameter will be passes to the
4108       L{backend.BlockdevCreate} function where it specifies
4109       whether we run on primary or not, and it affects both
4110       the child assembly and the device own Open() execution
4111
4112   """
4113   lu.cfg.SetDiskID(device, node)
4114   result = lu.rpc.call_blockdev_create(node, device, device.size,
4115                                        instance.name, force_open, info)
4116   msg = result.RemoteFailMsg()
4117   if msg:
4118     raise errors.OpExecError("Can't create block device %s on"
4119                              " node %s for instance %s: %s" %
4120                              (device, node, instance.name, msg))
4121   if device.physical_id is None:
4122     device.physical_id = result.payload
4123
4124
4125 def _GenerateUniqueNames(lu, exts):
4126   """Generate a suitable LV name.
4127
4128   This will generate a logical volume name for the given instance.
4129
4130   """
4131   results = []
4132   for val in exts:
4133     new_id = lu.cfg.GenerateUniqueID()
4134     results.append("%s%s" % (new_id, val))
4135   return results
4136
4137
4138 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4139                          p_minor, s_minor):
4140   """Generate a drbd8 device complete with its children.
4141
4142   """
4143   port = lu.cfg.AllocatePort()
4144   vgname = lu.cfg.GetVGName()
4145   shared_secret = lu.cfg.GenerateDRBDSecret()
4146   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4147                           logical_id=(vgname, names[0]))
4148   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4149                           logical_id=(vgname, names[1]))
4150   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4151                           logical_id=(primary, secondary, port,
4152                                       p_minor, s_minor,
4153                                       shared_secret),
4154                           children=[dev_data, dev_meta],
4155                           iv_name=iv_name)
4156   return drbd_dev
4157
4158
4159 def _GenerateDiskTemplate(lu, template_name,
4160                           instance_name, primary_node,
4161                           secondary_nodes, disk_info,
4162                           file_storage_dir, file_driver,
4163                           base_index):
4164   """Generate the entire disk layout for a given template type.
4165
4166   """
4167   #TODO: compute space requirements
4168
4169   vgname = lu.cfg.GetVGName()
4170   disk_count = len(disk_info)
4171   disks = []
4172   if template_name == constants.DT_DISKLESS:
4173     pass
4174   elif template_name == constants.DT_PLAIN:
4175     if len(secondary_nodes) != 0:
4176       raise errors.ProgrammerError("Wrong template configuration")
4177
4178     names = _GenerateUniqueNames(lu, [".disk%d" % i
4179                                       for i in range(disk_count)])
4180     for idx, disk in enumerate(disk_info):
4181       disk_index = idx + base_index
4182       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4183                               logical_id=(vgname, names[idx]),
4184                               iv_name="disk/%d" % disk_index,
4185                               mode=disk["mode"])
4186       disks.append(disk_dev)
4187   elif template_name == constants.DT_DRBD8:
4188     if len(secondary_nodes) != 1:
4189       raise errors.ProgrammerError("Wrong template configuration")
4190     remote_node = secondary_nodes[0]
4191     minors = lu.cfg.AllocateDRBDMinor(
4192       [primary_node, remote_node] * len(disk_info), instance_name)
4193
4194     names = []
4195     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4196                                                for i in range(disk_count)]):
4197       names.append(lv_prefix + "_data")
4198       names.append(lv_prefix + "_meta")
4199     for idx, disk in enumerate(disk_info):
4200       disk_index = idx + base_index
4201       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4202                                       disk["size"], names[idx*2:idx*2+2],
4203                                       "disk/%d" % disk_index,
4204                                       minors[idx*2], minors[idx*2+1])
4205       disk_dev.mode = disk["mode"]
4206       disks.append(disk_dev)
4207   elif template_name == constants.DT_FILE:
4208     if len(secondary_nodes) != 0:
4209       raise errors.ProgrammerError("Wrong template configuration")
4210
4211     for idx, disk in enumerate(disk_info):
4212       disk_index = idx + base_index
4213       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4214                               iv_name="disk/%d" % disk_index,
4215                               logical_id=(file_driver,
4216                                           "%s/disk%d" % (file_storage_dir,
4217                                                          disk_index)),
4218                               mode=disk["mode"])
4219       disks.append(disk_dev)
4220   else:
4221     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4222   return disks
4223
4224
4225 def _GetInstanceInfoText(instance):
4226   """Compute that text that should be added to the disk's metadata.
4227
4228   """
4229   return "originstname+%s" % instance.name
4230
4231
4232 def _CreateDisks(lu, instance):
4233   """Create all disks for an instance.
4234
4235   This abstracts away some work from AddInstance.
4236
4237   @type lu: L{LogicalUnit}
4238   @param lu: the logical unit on whose behalf we execute
4239   @type instance: L{objects.Instance}
4240   @param instance: the instance whose disks we should create
4241   @rtype: boolean
4242   @return: the success of the creation
4243
4244   """
4245   info = _GetInstanceInfoText(instance)
4246   pnode = instance.primary_node
4247
4248   if instance.disk_template == constants.DT_FILE:
4249     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4250     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4251
4252     if result.failed or not result.data:
4253       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4254
4255     if not result.data[0]:
4256       raise errors.OpExecError("Failed to create directory '%s'" %
4257                                file_storage_dir)
4258
4259   # Note: this needs to be kept in sync with adding of disks in
4260   # LUSetInstanceParams
4261   for device in instance.disks:
4262     logging.info("Creating volume %s for instance %s",
4263                  device.iv_name, instance.name)
4264     #HARDCODE
4265     for node in instance.all_nodes:
4266       f_create = node == pnode
4267       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4268
4269
4270 def _RemoveDisks(lu, instance):
4271   """Remove all disks for an instance.
4272
4273   This abstracts away some work from `AddInstance()` and
4274   `RemoveInstance()`. Note that in case some of the devices couldn't
4275   be removed, the removal will continue with the other ones (compare
4276   with `_CreateDisks()`).
4277
4278   @type lu: L{LogicalUnit}
4279   @param lu: the logical unit on whose behalf we execute
4280   @type instance: L{objects.Instance}
4281   @param instance: the instance whose disks we should remove
4282   @rtype: boolean
4283   @return: the success of the removal
4284
4285   """
4286   logging.info("Removing block devices for instance %s", instance.name)
4287
4288   all_result = True
4289   for device in instance.disks:
4290     for node, disk in device.ComputeNodeTree(instance.primary_node):
4291       lu.cfg.SetDiskID(disk, node)
4292       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4293       if msg:
4294         lu.LogWarning("Could not remove block device %s on node %s,"
4295                       " continuing anyway: %s", device.iv_name, node, msg)
4296         all_result = False
4297
4298   if instance.disk_template == constants.DT_FILE:
4299     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4300     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4301                                                  file_storage_dir)
4302     if result.failed or not result.data:
4303       logging.error("Could not remove directory '%s'", file_storage_dir)
4304       all_result = False
4305
4306   return all_result
4307
4308
4309 def _ComputeDiskSize(disk_template, disks):
4310   """Compute disk size requirements in the volume group
4311
4312   """
4313   # Required free disk space as a function of disk and swap space
4314   req_size_dict = {
4315     constants.DT_DISKLESS: None,
4316     constants.DT_PLAIN: sum(d["size"] for d in disks),
4317     # 128 MB are added for drbd metadata for each disk
4318     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4319     constants.DT_FILE: None,
4320   }
4321
4322   if disk_template not in req_size_dict:
4323     raise errors.ProgrammerError("Disk template '%s' size requirement"
4324                                  " is unknown" %  disk_template)
4325
4326   return req_size_dict[disk_template]
4327
4328
4329 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4330   """Hypervisor parameter validation.
4331
4332   This function abstract the hypervisor parameter validation to be
4333   used in both instance create and instance modify.
4334
4335   @type lu: L{LogicalUnit}
4336   @param lu: the logical unit for which we check
4337   @type nodenames: list
4338   @param nodenames: the list of nodes on which we should check
4339   @type hvname: string
4340   @param hvname: the name of the hypervisor we should use
4341   @type hvparams: dict
4342   @param hvparams: the parameters which we need to check
4343   @raise errors.OpPrereqError: if the parameters are not valid
4344
4345   """
4346   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4347                                                   hvname,
4348                                                   hvparams)
4349   for node in nodenames:
4350     info = hvinfo[node]
4351     if info.offline:
4352       continue
4353     msg = info.RemoteFailMsg()
4354     if msg:
4355       raise errors.OpPrereqError("Hypervisor parameter validation"
4356                                  " failed on node %s: %s" % (node, msg))
4357
4358
4359 class LUCreateInstance(LogicalUnit):
4360   """Create an instance.
4361
4362   """
4363   HPATH = "instance-add"
4364   HTYPE = constants.HTYPE_INSTANCE
4365   _OP_REQP = ["instance_name", "disks", "disk_template",
4366               "mode", "start",
4367               "wait_for_sync", "ip_check", "nics",
4368               "hvparams", "beparams"]
4369   REQ_BGL = False
4370
4371   def _ExpandNode(self, node):
4372     """Expands and checks one node name.
4373
4374     """
4375     node_full = self.cfg.ExpandNodeName(node)
4376     if node_full is None:
4377       raise errors.OpPrereqError("Unknown node %s" % node)
4378     return node_full
4379
4380   def ExpandNames(self):
4381     """ExpandNames for CreateInstance.
4382
4383     Figure out the right locks for instance creation.
4384
4385     """
4386     self.needed_locks = {}
4387
4388     # set optional parameters to none if they don't exist
4389     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4390       if not hasattr(self.op, attr):
4391         setattr(self.op, attr, None)
4392
4393     # cheap checks, mostly valid constants given
4394
4395     # verify creation mode
4396     if self.op.mode not in (constants.INSTANCE_CREATE,
4397                             constants.INSTANCE_IMPORT):
4398       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4399                                  self.op.mode)
4400
4401     # disk template and mirror node verification
4402     if self.op.disk_template not in constants.DISK_TEMPLATES:
4403       raise errors.OpPrereqError("Invalid disk template name")
4404
4405     if self.op.hypervisor is None:
4406       self.op.hypervisor = self.cfg.GetHypervisorType()
4407
4408     cluster = self.cfg.GetClusterInfo()
4409     enabled_hvs = cluster.enabled_hypervisors
4410     if self.op.hypervisor not in enabled_hvs:
4411       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4412                                  " cluster (%s)" % (self.op.hypervisor,
4413                                   ",".join(enabled_hvs)))
4414
4415     # check hypervisor parameter syntax (locally)
4416     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4417     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4418                                   self.op.hvparams)
4419     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4420     hv_type.CheckParameterSyntax(filled_hvp)
4421
4422     # fill and remember the beparams dict
4423     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4424     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4425                                     self.op.beparams)
4426
4427     #### instance parameters check
4428
4429     # instance name verification
4430     hostname1 = utils.HostInfo(self.op.instance_name)
4431     self.op.instance_name = instance_name = hostname1.name
4432
4433     # this is just a preventive check, but someone might still add this
4434     # instance in the meantime, and creation will fail at lock-add time
4435     if instance_name in self.cfg.GetInstanceList():
4436       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4437                                  instance_name)
4438
4439     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4440
4441     # NIC buildup
4442     self.nics = []
4443     for nic in self.op.nics:
4444       # ip validity checks
4445       ip = nic.get("ip", None)
4446       if ip is None or ip.lower() == "none":
4447         nic_ip = None
4448       elif ip.lower() == constants.VALUE_AUTO:
4449         nic_ip = hostname1.ip
4450       else:
4451         if not utils.IsValidIP(ip):
4452           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4453                                      " like a valid IP" % ip)
4454         nic_ip = ip
4455
4456       # MAC address verification
4457       mac = nic.get("mac", constants.VALUE_AUTO)
4458       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4459         if not utils.IsValidMac(mac.lower()):
4460           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4461                                      mac)
4462       # bridge verification
4463       bridge = nic.get("bridge", None)
4464       if bridge is None:
4465         bridge = self.cfg.GetDefBridge()
4466       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4467
4468     # disk checks/pre-build
4469     self.disks = []
4470     for disk in self.op.disks:
4471       mode = disk.get("mode", constants.DISK_RDWR)
4472       if mode not in constants.DISK_ACCESS_SET:
4473         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4474                                    mode)
4475       size = disk.get("size", None)
4476       if size is None:
4477         raise errors.OpPrereqError("Missing disk size")
4478       try:
4479         size = int(size)
4480       except ValueError:
4481         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4482       self.disks.append({"size": size, "mode": mode})
4483
4484     # used in CheckPrereq for ip ping check
4485     self.check_ip = hostname1.ip
4486
4487     # file storage checks
4488     if (self.op.file_driver and
4489         not self.op.file_driver in constants.FILE_DRIVER):
4490       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4491                                  self.op.file_driver)
4492
4493     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4494       raise errors.OpPrereqError("File storage directory path not absolute")
4495
4496     ### Node/iallocator related checks
4497     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4498       raise errors.OpPrereqError("One and only one of iallocator and primary"
4499                                  " node must be given")
4500
4501     if self.op.iallocator:
4502       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4503     else:
4504       self.op.pnode = self._ExpandNode(self.op.pnode)
4505       nodelist = [self.op.pnode]
4506       if self.op.snode is not None:
4507         self.op.snode = self._ExpandNode(self.op.snode)
4508         nodelist.append(self.op.snode)
4509       self.needed_locks[locking.LEVEL_NODE] = nodelist
4510
4511     # in case of import lock the source node too
4512     if self.op.mode == constants.INSTANCE_IMPORT:
4513       src_node = getattr(self.op, "src_node", None)
4514       src_path = getattr(self.op, "src_path", None)
4515
4516       if src_path is None:
4517         self.op.src_path = src_path = self.op.instance_name
4518
4519       if src_node is None:
4520         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4521         self.op.src_node = None
4522         if os.path.isabs(src_path):
4523           raise errors.OpPrereqError("Importing an instance from an absolute"
4524                                      " path requires a source node option.")
4525       else:
4526         self.op.src_node = src_node = self._ExpandNode(src_node)
4527         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4528           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4529         if not os.path.isabs(src_path):
4530           self.op.src_path = src_path = \
4531             os.path.join(constants.EXPORT_DIR, src_path)
4532
4533     else: # INSTANCE_CREATE
4534       if getattr(self.op, "os_type", None) is None:
4535         raise errors.OpPrereqError("No guest OS specified")
4536
4537   def _RunAllocator(self):
4538     """Run the allocator based on input opcode.
4539
4540     """
4541     nics = [n.ToDict() for n in self.nics]
4542     ial = IAllocator(self,
4543                      mode=constants.IALLOCATOR_MODE_ALLOC,
4544                      name=self.op.instance_name,
4545                      disk_template=self.op.disk_template,
4546                      tags=[],
4547                      os=self.op.os_type,
4548                      vcpus=self.be_full[constants.BE_VCPUS],
4549                      mem_size=self.be_full[constants.BE_MEMORY],
4550                      disks=self.disks,
4551                      nics=nics,
4552                      hypervisor=self.op.hypervisor,
4553                      )
4554
4555     ial.Run(self.op.iallocator)
4556
4557     if not ial.success:
4558       raise errors.OpPrereqError("Can't compute nodes using"
4559                                  " iallocator '%s': %s" % (self.op.iallocator,
4560                                                            ial.info))
4561     if len(ial.nodes) != ial.required_nodes:
4562       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4563                                  " of nodes (%s), required %s" %
4564                                  (self.op.iallocator, len(ial.nodes),
4565                                   ial.required_nodes))
4566     self.op.pnode = ial.nodes[0]
4567     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4568                  self.op.instance_name, self.op.iallocator,
4569                  ", ".join(ial.nodes))
4570     if ial.required_nodes == 2:
4571       self.op.snode = ial.nodes[1]
4572
4573   def BuildHooksEnv(self):
4574     """Build hooks env.
4575
4576     This runs on master, primary and secondary nodes of the instance.
4577
4578     """
4579     env = {
4580       "ADD_MODE": self.op.mode,
4581       }
4582     if self.op.mode == constants.INSTANCE_IMPORT:
4583       env["SRC_NODE"] = self.op.src_node
4584       env["SRC_PATH"] = self.op.src_path
4585       env["SRC_IMAGES"] = self.src_images
4586
4587     env.update(_BuildInstanceHookEnv(
4588       name=self.op.instance_name,
4589       primary_node=self.op.pnode,
4590       secondary_nodes=self.secondaries,
4591       status=self.op.start,
4592       os_type=self.op.os_type,
4593       memory=self.be_full[constants.BE_MEMORY],
4594       vcpus=self.be_full[constants.BE_VCPUS],
4595       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4596       disk_template=self.op.disk_template,
4597       disks=[(d["size"], d["mode"]) for d in self.disks],
4598     ))
4599
4600     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4601           self.secondaries)
4602     return env, nl, nl
4603
4604
4605   def CheckPrereq(self):
4606     """Check prerequisites.
4607
4608     """
4609     if (not self.cfg.GetVGName() and
4610         self.op.disk_template not in constants.DTS_NOT_LVM):
4611       raise errors.OpPrereqError("Cluster does not support lvm-based"
4612                                  " instances")
4613
4614     if self.op.mode == constants.INSTANCE_IMPORT:
4615       src_node = self.op.src_node
4616       src_path = self.op.src_path
4617
4618       if src_node is None:
4619         exp_list = self.rpc.call_export_list(
4620           self.acquired_locks[locking.LEVEL_NODE])
4621         found = False
4622         for node in exp_list:
4623           if not exp_list[node].failed and src_path in exp_list[node].data:
4624             found = True
4625             self.op.src_node = src_node = node
4626             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4627                                                        src_path)
4628             break
4629         if not found:
4630           raise errors.OpPrereqError("No export found for relative path %s" %
4631                                       src_path)
4632
4633       _CheckNodeOnline(self, src_node)
4634       result = self.rpc.call_export_info(src_node, src_path)
4635       result.Raise()
4636       if not result.data:
4637         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4638
4639       export_info = result.data
4640       if not export_info.has_section(constants.INISECT_EXP):
4641         raise errors.ProgrammerError("Corrupted export config")
4642
4643       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4644       if (int(ei_version) != constants.EXPORT_VERSION):
4645         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4646                                    (ei_version, constants.EXPORT_VERSION))
4647
4648       # Check that the new instance doesn't have less disks than the export
4649       instance_disks = len(self.disks)
4650       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4651       if instance_disks < export_disks:
4652         raise errors.OpPrereqError("Not enough disks to import."
4653                                    " (instance: %d, export: %d)" %
4654                                    (instance_disks, export_disks))
4655
4656       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4657       disk_images = []
4658       for idx in range(export_disks):
4659         option = 'disk%d_dump' % idx
4660         if export_info.has_option(constants.INISECT_INS, option):
4661           # FIXME: are the old os-es, disk sizes, etc. useful?
4662           export_name = export_info.get(constants.INISECT_INS, option)
4663           image = os.path.join(src_path, export_name)
4664           disk_images.append(image)
4665         else:
4666           disk_images.append(False)
4667
4668       self.src_images = disk_images
4669
4670       old_name = export_info.get(constants.INISECT_INS, 'name')
4671       # FIXME: int() here could throw a ValueError on broken exports
4672       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4673       if self.op.instance_name == old_name:
4674         for idx, nic in enumerate(self.nics):
4675           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4676             nic_mac_ini = 'nic%d_mac' % idx
4677             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4678
4679     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4680     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4681     if self.op.start and not self.op.ip_check:
4682       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4683                                  " adding an instance in start mode")
4684
4685     if self.op.ip_check:
4686       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4687         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4688                                    (self.check_ip, self.op.instance_name))
4689
4690     #### mac address generation
4691     # By generating here the mac address both the allocator and the hooks get
4692     # the real final mac address rather than the 'auto' or 'generate' value.
4693     # There is a race condition between the generation and the instance object
4694     # creation, which means that we know the mac is valid now, but we're not
4695     # sure it will be when we actually add the instance. If things go bad
4696     # adding the instance will abort because of a duplicate mac, and the
4697     # creation job will fail.
4698     for nic in self.nics:
4699       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4700         nic.mac = self.cfg.GenerateMAC()
4701
4702     #### allocator run
4703
4704     if self.op.iallocator is not None:
4705       self._RunAllocator()
4706
4707     #### node related checks
4708
4709     # check primary node
4710     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4711     assert self.pnode is not None, \
4712       "Cannot retrieve locked node %s" % self.op.pnode
4713     if pnode.offline:
4714       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4715                                  pnode.name)
4716     if pnode.drained:
4717       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4718                                  pnode.name)
4719
4720     self.secondaries = []
4721
4722     # mirror node verification
4723     if self.op.disk_template in constants.DTS_NET_MIRROR:
4724       if self.op.snode is None:
4725         raise errors.OpPrereqError("The networked disk templates need"
4726                                    " a mirror node")
4727       if self.op.snode == pnode.name:
4728         raise errors.OpPrereqError("The secondary node cannot be"
4729                                    " the primary node.")
4730       _CheckNodeOnline(self, self.op.snode)
4731       _CheckNodeNotDrained(self, self.op.snode)
4732       self.secondaries.append(self.op.snode)
4733
4734     nodenames = [pnode.name] + self.secondaries
4735
4736     req_size = _ComputeDiskSize(self.op.disk_template,
4737                                 self.disks)
4738
4739     # Check lv size requirements
4740     if req_size is not None:
4741       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4742                                          self.op.hypervisor)
4743       for node in nodenames:
4744         info = nodeinfo[node]
4745         info.Raise()
4746         info = info.data
4747         if not info:
4748           raise errors.OpPrereqError("Cannot get current information"
4749                                      " from node '%s'" % node)
4750         vg_free = info.get('vg_free', None)
4751         if not isinstance(vg_free, int):
4752           raise errors.OpPrereqError("Can't compute free disk space on"
4753                                      " node %s" % node)
4754         if req_size > info['vg_free']:
4755           raise errors.OpPrereqError("Not enough disk space on target node %s."
4756                                      " %d MB available, %d MB required" %
4757                                      (node, info['vg_free'], req_size))
4758
4759     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4760
4761     # os verification
4762     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4763     result.Raise()
4764     if not isinstance(result.data, objects.OS):
4765       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4766                                  " primary node"  % self.op.os_type)
4767
4768     # bridge check on primary node
4769     bridges = [n.bridge for n in self.nics]
4770     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4771     result.Raise()
4772     if not result.data:
4773       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4774                                  " exist on destination node '%s'" %
4775                                  (",".join(bridges), pnode.name))
4776
4777     # memory check on primary node
4778     if self.op.start:
4779       _CheckNodeFreeMemory(self, self.pnode.name,
4780                            "creating instance %s" % self.op.instance_name,
4781                            self.be_full[constants.BE_MEMORY],
4782                            self.op.hypervisor)
4783
4784   def Exec(self, feedback_fn):
4785     """Create and add the instance to the cluster.
4786
4787     """
4788     instance = self.op.instance_name
4789     pnode_name = self.pnode.name
4790
4791     ht_kind = self.op.hypervisor
4792     if ht_kind in constants.HTS_REQ_PORT:
4793       network_port = self.cfg.AllocatePort()
4794     else:
4795       network_port = None
4796
4797     ##if self.op.vnc_bind_address is None:
4798     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4799
4800     # this is needed because os.path.join does not accept None arguments
4801     if self.op.file_storage_dir is None:
4802       string_file_storage_dir = ""
4803     else:
4804       string_file_storage_dir = self.op.file_storage_dir
4805
4806     # build the full file storage dir path
4807     file_storage_dir = os.path.normpath(os.path.join(
4808                                         self.cfg.GetFileStorageDir(),
4809                                         string_file_storage_dir, instance))
4810
4811
4812     disks = _GenerateDiskTemplate(self,
4813                                   self.op.disk_template,
4814                                   instance, pnode_name,
4815                                   self.secondaries,
4816                                   self.disks,
4817                                   file_storage_dir,
4818                                   self.op.file_driver,
4819                                   0)
4820
4821     iobj = objects.Instance(name=instance, os=self.op.os_type,
4822                             primary_node=pnode_name,
4823                             nics=self.nics, disks=disks,
4824                             disk_template=self.op.disk_template,
4825                             admin_up=False,
4826                             network_port=network_port,
4827                             beparams=self.op.beparams,
4828                             hvparams=self.op.hvparams,
4829                             hypervisor=self.op.hypervisor,
4830                             )
4831
4832     feedback_fn("* creating instance disks...")
4833     try:
4834       _CreateDisks(self, iobj)
4835     except errors.OpExecError:
4836       self.LogWarning("Device creation failed, reverting...")
4837       try:
4838         _RemoveDisks(self, iobj)
4839       finally:
4840         self.cfg.ReleaseDRBDMinors(instance)
4841         raise
4842
4843     feedback_fn("adding instance %s to cluster config" % instance)
4844
4845     self.cfg.AddInstance(iobj)
4846     # Declare that we don't want to remove the instance lock anymore, as we've
4847     # added the instance to the config
4848     del self.remove_locks[locking.LEVEL_INSTANCE]
4849     # Unlock all the nodes
4850     if self.op.mode == constants.INSTANCE_IMPORT:
4851       nodes_keep = [self.op.src_node]
4852       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4853                        if node != self.op.src_node]
4854       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4855       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4856     else:
4857       self.context.glm.release(locking.LEVEL_NODE)
4858       del self.acquired_locks[locking.LEVEL_NODE]
4859
4860     if self.op.wait_for_sync:
4861       disk_abort = not _WaitForSync(self, iobj)
4862     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4863       # make sure the disks are not degraded (still sync-ing is ok)
4864       time.sleep(15)
4865       feedback_fn("* checking mirrors status")
4866       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4867     else:
4868       disk_abort = False
4869
4870     if disk_abort:
4871       _RemoveDisks(self, iobj)
4872       self.cfg.RemoveInstance(iobj.name)
4873       # Make sure the instance lock gets removed
4874       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4875       raise errors.OpExecError("There are some degraded disks for"
4876                                " this instance")
4877
4878     feedback_fn("creating os for instance %s on node %s" %
4879                 (instance, pnode_name))
4880
4881     if iobj.disk_template != constants.DT_DISKLESS:
4882       if self.op.mode == constants.INSTANCE_CREATE:
4883         feedback_fn("* running the instance OS create scripts...")
4884         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4885         msg = result.RemoteFailMsg()
4886         if msg:
4887           raise errors.OpExecError("Could not add os for instance %s"
4888                                    " on node %s: %s" %
4889                                    (instance, pnode_name, msg))
4890
4891       elif self.op.mode == constants.INSTANCE_IMPORT:
4892         feedback_fn("* running the instance OS import scripts...")
4893         src_node = self.op.src_node
4894         src_images = self.src_images
4895         cluster_name = self.cfg.GetClusterName()
4896         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4897                                                          src_node, src_images,
4898                                                          cluster_name)
4899         import_result.Raise()
4900         for idx, result in enumerate(import_result.data):
4901           if not result:
4902             self.LogWarning("Could not import the image %s for instance"
4903                             " %s, disk %d, on node %s" %
4904                             (src_images[idx], instance, idx, pnode_name))
4905       else:
4906         # also checked in the prereq part
4907         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4908                                      % self.op.mode)
4909
4910     if self.op.start:
4911       iobj.admin_up = True
4912       self.cfg.Update(iobj)
4913       logging.info("Starting instance %s on node %s", instance, pnode_name)
4914       feedback_fn("* starting instance...")
4915       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4916       msg = result.RemoteFailMsg()
4917       if msg:
4918         raise errors.OpExecError("Could not start instance: %s" % msg)
4919
4920
4921 class LUConnectConsole(NoHooksLU):
4922   """Connect to an instance's console.
4923
4924   This is somewhat special in that it returns the command line that
4925   you need to run on the master node in order to connect to the
4926   console.
4927
4928   """
4929   _OP_REQP = ["instance_name"]
4930   REQ_BGL = False
4931
4932   def ExpandNames(self):
4933     self._ExpandAndLockInstance()
4934
4935   def CheckPrereq(self):
4936     """Check prerequisites.
4937
4938     This checks that the instance is in the cluster.
4939
4940     """
4941     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4942     assert self.instance is not None, \
4943       "Cannot retrieve locked instance %s" % self.op.instance_name
4944     _CheckNodeOnline(self, self.instance.primary_node)
4945
4946   def Exec(self, feedback_fn):
4947     """Connect to the console of an instance
4948
4949     """
4950     instance = self.instance
4951     node = instance.primary_node
4952
4953     node_insts = self.rpc.call_instance_list([node],
4954                                              [instance.hypervisor])[node]
4955     node_insts.Raise()
4956
4957     if instance.name not in node_insts.data:
4958       raise errors.OpExecError("Instance %s is not running." % instance.name)
4959
4960     logging.debug("Connecting to console of %s on %s", instance.name, node)
4961
4962     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4963     cluster = self.cfg.GetClusterInfo()
4964     # beparams and hvparams are passed separately, to avoid editing the
4965     # instance and then saving the defaults in the instance itself.
4966     hvparams = cluster.FillHV(instance)
4967     beparams = cluster.FillBE(instance)
4968     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4969
4970     # build ssh cmdline
4971     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4972
4973
4974 class LUReplaceDisks(LogicalUnit):
4975   """Replace the disks of an instance.
4976
4977   """
4978   HPATH = "mirrors-replace"
4979   HTYPE = constants.HTYPE_INSTANCE
4980   _OP_REQP = ["instance_name", "mode", "disks"]
4981   REQ_BGL = False
4982
4983   def CheckArguments(self):
4984     if not hasattr(self.op, "remote_node"):
4985       self.op.remote_node = None
4986     if not hasattr(self.op, "iallocator"):
4987       self.op.iallocator = None
4988
4989     # check for valid parameter combination
4990     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4991     if self.op.mode == constants.REPLACE_DISK_CHG:
4992       if cnt == 2:
4993         raise errors.OpPrereqError("When changing the secondary either an"
4994                                    " iallocator script must be used or the"
4995                                    " new node given")
4996       elif cnt == 0:
4997         raise errors.OpPrereqError("Give either the iallocator or the new"
4998                                    " secondary, not both")
4999     else: # not replacing the secondary
5000       if cnt != 2:
5001         raise errors.OpPrereqError("The iallocator and new node options can"
5002                                    " be used only when changing the"
5003                                    " secondary node")
5004
5005   def ExpandNames(self):
5006     self._ExpandAndLockInstance()
5007
5008     if self.op.iallocator is not None:
5009       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5010     elif self.op.remote_node is not None:
5011       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5012       if remote_node is None:
5013         raise errors.OpPrereqError("Node '%s' not known" %
5014                                    self.op.remote_node)
5015       self.op.remote_node = remote_node
5016       # Warning: do not remove the locking of the new secondary here
5017       # unless DRBD8.AddChildren is changed to work in parallel;
5018       # currently it doesn't since parallel invocations of
5019       # FindUnusedMinor will conflict
5020       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5021       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5022     else:
5023       self.needed_locks[locking.LEVEL_NODE] = []
5024       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5025
5026   def DeclareLocks(self, level):
5027     # If we're not already locking all nodes in the set we have to declare the
5028     # instance's primary/secondary nodes.
5029     if (level == locking.LEVEL_NODE and
5030         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5031       self._LockInstancesNodes()
5032
5033   def _RunAllocator(self):
5034     """Compute a new secondary node using an IAllocator.
5035
5036     """
5037     ial = IAllocator(self,
5038                      mode=constants.IALLOCATOR_MODE_RELOC,
5039                      name=self.op.instance_name,
5040                      relocate_from=[self.sec_node])
5041
5042     ial.Run(self.op.iallocator)
5043
5044     if not ial.success:
5045       raise errors.OpPrereqError("Can't compute nodes using"
5046                                  " iallocator '%s': %s" % (self.op.iallocator,
5047                                                            ial.info))
5048     if len(ial.nodes) != ial.required_nodes:
5049       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5050                                  " of nodes (%s), required %s" %
5051                                  (len(ial.nodes), ial.required_nodes))
5052     self.op.remote_node = ial.nodes[0]
5053     self.LogInfo("Selected new secondary for the instance: %s",
5054                  self.op.remote_node)
5055
5056   def BuildHooksEnv(self):
5057     """Build hooks env.
5058
5059     This runs on the master, the primary and all the secondaries.
5060
5061     """
5062     env = {
5063       "MODE": self.op.mode,
5064       "NEW_SECONDARY": self.op.remote_node,
5065       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5066       }
5067     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5068     nl = [
5069       self.cfg.GetMasterNode(),
5070       self.instance.primary_node,
5071       ]
5072     if self.op.remote_node is not None:
5073       nl.append(self.op.remote_node)
5074     return env, nl, nl
5075
5076   def CheckPrereq(self):
5077     """Check prerequisites.
5078
5079     This checks that the instance is in the cluster.
5080
5081     """
5082     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5083     assert instance is not None, \
5084       "Cannot retrieve locked instance %s" % self.op.instance_name
5085     self.instance = instance
5086
5087     if instance.disk_template != constants.DT_DRBD8:
5088       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5089                                  " instances")
5090
5091     if len(instance.secondary_nodes) != 1:
5092       raise errors.OpPrereqError("The instance has a strange layout,"
5093                                  " expected one secondary but found %d" %
5094                                  len(instance.secondary_nodes))
5095
5096     self.sec_node = instance.secondary_nodes[0]
5097
5098     if self.op.iallocator is not None:
5099       self._RunAllocator()
5100
5101     remote_node = self.op.remote_node
5102     if remote_node is not None:
5103       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5104       assert self.remote_node_info is not None, \
5105         "Cannot retrieve locked node %s" % remote_node
5106     else:
5107       self.remote_node_info = None
5108     if remote_node == instance.primary_node:
5109       raise errors.OpPrereqError("The specified node is the primary node of"
5110                                  " the instance.")
5111     elif remote_node == self.sec_node:
5112       raise errors.OpPrereqError("The specified node is already the"
5113                                  " secondary node of the instance.")
5114
5115     if self.op.mode == constants.REPLACE_DISK_PRI:
5116       n1 = self.tgt_node = instance.primary_node
5117       n2 = self.oth_node = self.sec_node
5118     elif self.op.mode == constants.REPLACE_DISK_SEC:
5119       n1 = self.tgt_node = self.sec_node
5120       n2 = self.oth_node = instance.primary_node
5121     elif self.op.mode == constants.REPLACE_DISK_CHG:
5122       n1 = self.new_node = remote_node
5123       n2 = self.oth_node = instance.primary_node
5124       self.tgt_node = self.sec_node
5125       _CheckNodeNotDrained(self, remote_node)
5126     else:
5127       raise errors.ProgrammerError("Unhandled disk replace mode")
5128
5129     _CheckNodeOnline(self, n1)
5130     _CheckNodeOnline(self, n2)
5131
5132     if not self.op.disks:
5133       self.op.disks = range(len(instance.disks))
5134
5135     for disk_idx in self.op.disks:
5136       instance.FindDisk(disk_idx)
5137
5138   def _ExecD8DiskOnly(self, feedback_fn):
5139     """Replace a disk on the primary or secondary for dbrd8.
5140
5141     The algorithm for replace is quite complicated:
5142
5143       1. for each disk to be replaced:
5144
5145         1. create new LVs on the target node with unique names
5146         1. detach old LVs from the drbd device
5147         1. rename old LVs to name_replaced.<time_t>
5148         1. rename new LVs to old LVs
5149         1. attach the new LVs (with the old names now) to the drbd device
5150
5151       1. wait for sync across all devices
5152
5153       1. for each modified disk:
5154
5155         1. remove old LVs (which have the name name_replaces.<time_t>)
5156
5157     Failures are not very well handled.
5158
5159     """
5160     steps_total = 6
5161     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5162     instance = self.instance
5163     iv_names = {}
5164     vgname = self.cfg.GetVGName()
5165     # start of work
5166     cfg = self.cfg
5167     tgt_node = self.tgt_node
5168     oth_node = self.oth_node
5169
5170     # Step: check device activation
5171     self.proc.LogStep(1, steps_total, "check device existence")
5172     info("checking volume groups")
5173     my_vg = cfg.GetVGName()
5174     results = self.rpc.call_vg_list([oth_node, tgt_node])
5175     if not results:
5176       raise errors.OpExecError("Can't list volume groups on the nodes")
5177     for node in oth_node, tgt_node:
5178       res = results[node]
5179       if res.failed or not res.data or my_vg not in res.data:
5180         raise errors.OpExecError("Volume group '%s' not found on %s" %
5181                                  (my_vg, node))
5182     for idx, dev in enumerate(instance.disks):
5183       if idx not in self.op.disks:
5184         continue
5185       for node in tgt_node, oth_node:
5186         info("checking disk/%d on %s" % (idx, node))
5187         cfg.SetDiskID(dev, node)
5188         result = self.rpc.call_blockdev_find(node, dev)
5189         msg = result.RemoteFailMsg()
5190         if not msg and not result.payload:
5191           msg = "disk not found"
5192         if msg:
5193           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5194                                    (idx, node, msg))
5195
5196     # Step: check other node consistency
5197     self.proc.LogStep(2, steps_total, "check peer consistency")
5198     for idx, dev in enumerate(instance.disks):
5199       if idx not in self.op.disks:
5200         continue
5201       info("checking disk/%d consistency on %s" % (idx, oth_node))
5202       if not _CheckDiskConsistency(self, dev, oth_node,
5203                                    oth_node==instance.primary_node):
5204         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5205                                  " to replace disks on this node (%s)" %
5206                                  (oth_node, tgt_node))
5207
5208     # Step: create new storage
5209     self.proc.LogStep(3, steps_total, "allocate new storage")
5210     for idx, dev in enumerate(instance.disks):
5211       if idx not in self.op.disks:
5212         continue
5213       size = dev.size
5214       cfg.SetDiskID(dev, tgt_node)
5215       lv_names = [".disk%d_%s" % (idx, suf)
5216                   for suf in ["data", "meta"]]
5217       names = _GenerateUniqueNames(self, lv_names)
5218       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5219                              logical_id=(vgname, names[0]))
5220       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5221                              logical_id=(vgname, names[1]))
5222       new_lvs = [lv_data, lv_meta]
5223       old_lvs = dev.children
5224       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5225       info("creating new local storage on %s for %s" %
5226            (tgt_node, dev.iv_name))
5227       # we pass force_create=True to force the LVM creation
5228       for new_lv in new_lvs:
5229         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5230                         _GetInstanceInfoText(instance), False)
5231
5232     # Step: for each lv, detach+rename*2+attach
5233     self.proc.LogStep(4, steps_total, "change drbd configuration")
5234     for dev, old_lvs, new_lvs in iv_names.itervalues():
5235       info("detaching %s drbd from local storage" % dev.iv_name)
5236       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5237       msg = result.RemoteFailMsg()
5238       if msg:
5239         raise errors.OpExecError("Can't detach drbd from local storage on node"
5240                                  " %s for device %s: %s" %
5241                                  (tgt_node, dev.iv_name, msg))
5242       #dev.children = []
5243       #cfg.Update(instance)
5244
5245       # ok, we created the new LVs, so now we know we have the needed
5246       # storage; as such, we proceed on the target node to rename
5247       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5248       # using the assumption that logical_id == physical_id (which in
5249       # turn is the unique_id on that node)
5250
5251       # FIXME(iustin): use a better name for the replaced LVs
5252       temp_suffix = int(time.time())
5253       ren_fn = lambda d, suff: (d.physical_id[0],
5254                                 d.physical_id[1] + "_replaced-%s" % suff)
5255       # build the rename list based on what LVs exist on the node
5256       rlist = []
5257       for to_ren in old_lvs:
5258         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5259         if not result.RemoteFailMsg() and result.payload:
5260           # device exists
5261           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5262
5263       info("renaming the old LVs on the target node")
5264       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5265       msg = result.RemoteFailMsg()
5266       if msg:
5267         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5268                                  (tgt_node, msg))
5269       # now we rename the new LVs to the old LVs
5270       info("renaming the new LVs on the target node")
5271       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5272       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5273       msg = result.RemoteFailMsg()
5274       if msg:
5275         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5276                                  (tgt_node, msg))
5277
5278       for old, new in zip(old_lvs, new_lvs):
5279         new.logical_id = old.logical_id
5280         cfg.SetDiskID(new, tgt_node)
5281
5282       for disk in old_lvs:
5283         disk.logical_id = ren_fn(disk, temp_suffix)
5284         cfg.SetDiskID(disk, tgt_node)
5285
5286       # now that the new lvs have the old name, we can add them to the device
5287       info("adding new mirror component on %s" % tgt_node)
5288       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5289       msg = result.RemoteFailMsg()
5290       if msg:
5291         for new_lv in new_lvs:
5292           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5293           if msg:
5294             warning("Can't rollback device %s: %s", dev, msg,
5295                     hint="cleanup manually the unused logical volumes")
5296         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5297
5298       dev.children = new_lvs
5299       cfg.Update(instance)
5300
5301     # Step: wait for sync
5302
5303     # this can fail as the old devices are degraded and _WaitForSync
5304     # does a combined result over all disks, so we don't check its
5305     # return value
5306     self.proc.LogStep(5, steps_total, "sync devices")
5307     _WaitForSync(self, instance, unlock=True)
5308
5309     # so check manually all the devices
5310     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5311       cfg.SetDiskID(dev, instance.primary_node)
5312       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5313       msg = result.RemoteFailMsg()
5314       if not msg and not result.payload:
5315         msg = "disk not found"
5316       if msg:
5317         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5318                                  (name, msg))
5319       if result.payload[5]:
5320         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5321
5322     # Step: remove old storage
5323     self.proc.LogStep(6, steps_total, "removing old storage")
5324     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5325       info("remove logical volumes for %s" % name)
5326       for lv in old_lvs:
5327         cfg.SetDiskID(lv, tgt_node)
5328         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5329         if msg:
5330           warning("Can't remove old LV: %s" % msg,
5331                   hint="manually remove unused LVs")
5332           continue
5333
5334   def _ExecD8Secondary(self, feedback_fn):
5335     """Replace the secondary node for drbd8.
5336
5337     The algorithm for replace is quite complicated:
5338       - for all disks of the instance:
5339         - create new LVs on the new node with same names
5340         - shutdown the drbd device on the old secondary
5341         - disconnect the drbd network on the primary
5342         - create the drbd device on the new secondary
5343         - network attach the drbd on the primary, using an artifice:
5344           the drbd code for Attach() will connect to the network if it
5345           finds a device which is connected to the good local disks but
5346           not network enabled
5347       - wait for sync across all devices
5348       - remove all disks from the old secondary
5349
5350     Failures are not very well handled.
5351
5352     """
5353     steps_total = 6
5354     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5355     instance = self.instance
5356     iv_names = {}
5357     # start of work
5358     cfg = self.cfg
5359     old_node = self.tgt_node
5360     new_node = self.new_node
5361     pri_node = instance.primary_node
5362     nodes_ip = {
5363       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5364       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5365       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5366       }
5367
5368     # Step: check device activation
5369     self.proc.LogStep(1, steps_total, "check device existence")
5370     info("checking volume groups")
5371     my_vg = cfg.GetVGName()
5372     results = self.rpc.call_vg_list([pri_node, new_node])
5373     for node in pri_node, new_node:
5374       res = results[node]
5375       if res.failed or not res.data or my_vg not in res.data:
5376         raise errors.OpExecError("Volume group '%s' not found on %s" %
5377                                  (my_vg, node))
5378     for idx, dev in enumerate(instance.disks):
5379       if idx not in self.op.disks:
5380         continue
5381       info("checking disk/%d on %s" % (idx, pri_node))
5382       cfg.SetDiskID(dev, pri_node)
5383       result = self.rpc.call_blockdev_find(pri_node, dev)
5384       msg = result.RemoteFailMsg()
5385       if not msg and not result.payload:
5386         msg = "disk not found"
5387       if msg:
5388         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5389                                  (idx, pri_node, msg))
5390
5391     # Step: check other node consistency
5392     self.proc.LogStep(2, steps_total, "check peer consistency")
5393     for idx, dev in enumerate(instance.disks):
5394       if idx not in self.op.disks:
5395         continue
5396       info("checking disk/%d consistency on %s" % (idx, pri_node))
5397       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5398         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5399                                  " unsafe to replace the secondary" %
5400                                  pri_node)
5401
5402     # Step: create new storage
5403     self.proc.LogStep(3, steps_total, "allocate new storage")
5404     for idx, dev in enumerate(instance.disks):
5405       info("adding new local storage on %s for disk/%d" %
5406            (new_node, idx))
5407       # we pass force_create=True to force LVM creation
5408       for new_lv in dev.children:
5409         _CreateBlockDev(self, new_node, instance, new_lv, True,
5410                         _GetInstanceInfoText(instance), False)
5411
5412     # Step 4: dbrd minors and drbd setups changes
5413     # after this, we must manually remove the drbd minors on both the
5414     # error and the success paths
5415     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5416                                    instance.name)
5417     logging.debug("Allocated minors %s" % (minors,))
5418     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5419     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5420       size = dev.size
5421       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5422       # create new devices on new_node; note that we create two IDs:
5423       # one without port, so the drbd will be activated without
5424       # networking information on the new node at this stage, and one
5425       # with network, for the latter activation in step 4
5426       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5427       if pri_node == o_node1:
5428         p_minor = o_minor1
5429       else:
5430         p_minor = o_minor2
5431
5432       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5433       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5434
5435       iv_names[idx] = (dev, dev.children, new_net_id)
5436       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5437                     new_net_id)
5438       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5439                               logical_id=new_alone_id,
5440                               children=dev.children)
5441       try:
5442         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5443                               _GetInstanceInfoText(instance), False)
5444       except errors.GenericError:
5445         self.cfg.ReleaseDRBDMinors(instance.name)
5446         raise
5447
5448     for idx, dev in enumerate(instance.disks):
5449       # we have new devices, shutdown the drbd on the old secondary
5450       info("shutting down drbd for disk/%d on old node" % idx)
5451       cfg.SetDiskID(dev, old_node)
5452       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5453       if msg:
5454         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5455                 (idx, msg),
5456                 hint="Please cleanup this device manually as soon as possible")
5457
5458     info("detaching primary drbds from the network (=> standalone)")
5459     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5460                                                instance.disks)[pri_node]
5461
5462     msg = result.RemoteFailMsg()
5463     if msg:
5464       # detaches didn't succeed (unlikely)
5465       self.cfg.ReleaseDRBDMinors(instance.name)
5466       raise errors.OpExecError("Can't detach the disks from the network on"
5467                                " old node: %s" % (msg,))
5468
5469     # if we managed to detach at least one, we update all the disks of
5470     # the instance to point to the new secondary
5471     info("updating instance configuration")
5472     for dev, _, new_logical_id in iv_names.itervalues():
5473       dev.logical_id = new_logical_id
5474       cfg.SetDiskID(dev, pri_node)
5475     cfg.Update(instance)
5476
5477     # and now perform the drbd attach
5478     info("attaching primary drbds to new secondary (standalone => connected)")
5479     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5480                                            instance.disks, instance.name,
5481                                            False)
5482     for to_node, to_result in result.items():
5483       msg = to_result.RemoteFailMsg()
5484       if msg:
5485         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5486                 hint="please do a gnt-instance info to see the"
5487                 " status of disks")
5488
5489     # this can fail as the old devices are degraded and _WaitForSync
5490     # does a combined result over all disks, so we don't check its
5491     # return value
5492     self.proc.LogStep(5, steps_total, "sync devices")
5493     _WaitForSync(self, instance, unlock=True)
5494
5495     # so check manually all the devices
5496     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5497       cfg.SetDiskID(dev, pri_node)
5498       result = self.rpc.call_blockdev_find(pri_node, dev)
5499       msg = result.RemoteFailMsg()
5500       if not msg and not result.payload:
5501         msg = "disk not found"
5502       if msg:
5503         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5504                                  (idx, msg))
5505       if result.payload[5]:
5506         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5507
5508     self.proc.LogStep(6, steps_total, "removing old storage")
5509     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5510       info("remove logical volumes for disk/%d" % idx)
5511       for lv in old_lvs:
5512         cfg.SetDiskID(lv, old_node)
5513         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5514         if msg:
5515           warning("Can't remove LV on old secondary: %s", msg,
5516                   hint="Cleanup stale volumes by hand")
5517
5518   def Exec(self, feedback_fn):
5519     """Execute disk replacement.
5520
5521     This dispatches the disk replacement to the appropriate handler.
5522
5523     """
5524     instance = self.instance
5525
5526     # Activate the instance disks if we're replacing them on a down instance
5527     if not instance.admin_up:
5528       _StartInstanceDisks(self, instance, True)
5529
5530     if self.op.mode == constants.REPLACE_DISK_CHG:
5531       fn = self._ExecD8Secondary
5532     else:
5533       fn = self._ExecD8DiskOnly
5534
5535     ret = fn(feedback_fn)
5536
5537     # Deactivate the instance disks if we're replacing them on a down instance
5538     if not instance.admin_up:
5539       _SafeShutdownInstanceDisks(self, instance)
5540
5541     return ret
5542
5543
5544 class LUGrowDisk(LogicalUnit):
5545   """Grow a disk of an instance.
5546
5547   """
5548   HPATH = "disk-grow"
5549   HTYPE = constants.HTYPE_INSTANCE
5550   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5551   REQ_BGL = False
5552
5553   def ExpandNames(self):
5554     self._ExpandAndLockInstance()
5555     self.needed_locks[locking.LEVEL_NODE] = []
5556     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5557
5558   def DeclareLocks(self, level):
5559     if level == locking.LEVEL_NODE:
5560       self._LockInstancesNodes()
5561
5562   def BuildHooksEnv(self):
5563     """Build hooks env.
5564
5565     This runs on the master, the primary and all the secondaries.
5566
5567     """
5568     env = {
5569       "DISK": self.op.disk,
5570       "AMOUNT": self.op.amount,
5571       }
5572     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5573     nl = [
5574       self.cfg.GetMasterNode(),
5575       self.instance.primary_node,
5576       ]
5577     return env, nl, nl
5578
5579   def CheckPrereq(self):
5580     """Check prerequisites.
5581
5582     This checks that the instance is in the cluster.
5583
5584     """
5585     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5586     assert instance is not None, \
5587       "Cannot retrieve locked instance %s" % self.op.instance_name
5588     nodenames = list(instance.all_nodes)
5589     for node in nodenames:
5590       _CheckNodeOnline(self, node)
5591
5592
5593     self.instance = instance
5594
5595     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5596       raise errors.OpPrereqError("Instance's disk layout does not support"
5597                                  " growing.")
5598
5599     self.disk = instance.FindDisk(self.op.disk)
5600
5601     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5602                                        instance.hypervisor)
5603     for node in nodenames:
5604       info = nodeinfo[node]
5605       if info.failed or not info.data:
5606         raise errors.OpPrereqError("Cannot get current information"
5607                                    " from node '%s'" % node)
5608       vg_free = info.data.get('vg_free', None)
5609       if not isinstance(vg_free, int):
5610         raise errors.OpPrereqError("Can't compute free disk space on"
5611                                    " node %s" % node)
5612       if self.op.amount > vg_free:
5613         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5614                                    " %d MiB available, %d MiB required" %
5615                                    (node, vg_free, self.op.amount))
5616
5617   def Exec(self, feedback_fn):
5618     """Execute disk grow.
5619
5620     """
5621     instance = self.instance
5622     disk = self.disk
5623     for node in instance.all_nodes:
5624       self.cfg.SetDiskID(disk, node)
5625       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5626       msg = result.RemoteFailMsg()
5627       if msg:
5628         raise errors.OpExecError("Grow request failed to node %s: %s" %
5629                                  (node, msg))
5630     disk.RecordGrow(self.op.amount)
5631     self.cfg.Update(instance)
5632     if self.op.wait_for_sync:
5633       disk_abort = not _WaitForSync(self, instance)
5634       if disk_abort:
5635         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5636                              " status.\nPlease check the instance.")
5637
5638
5639 class LUQueryInstanceData(NoHooksLU):
5640   """Query runtime instance data.
5641
5642   """
5643   _OP_REQP = ["instances", "static"]
5644   REQ_BGL = False
5645
5646   def ExpandNames(self):
5647     self.needed_locks = {}
5648     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5649
5650     if not isinstance(self.op.instances, list):
5651       raise errors.OpPrereqError("Invalid argument type 'instances'")
5652
5653     if self.op.instances:
5654       self.wanted_names = []
5655       for name in self.op.instances:
5656         full_name = self.cfg.ExpandInstanceName(name)
5657         if full_name is None:
5658           raise errors.OpPrereqError("Instance '%s' not known" % name)
5659         self.wanted_names.append(full_name)
5660       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5661     else:
5662       self.wanted_names = None
5663       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5664
5665     self.needed_locks[locking.LEVEL_NODE] = []
5666     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5667
5668   def DeclareLocks(self, level):
5669     if level == locking.LEVEL_NODE:
5670       self._LockInstancesNodes()
5671
5672   def CheckPrereq(self):
5673     """Check prerequisites.
5674
5675     This only checks the optional instance list against the existing names.
5676
5677     """
5678     if self.wanted_names is None:
5679       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5680
5681     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5682                              in self.wanted_names]
5683     return
5684
5685   def _ComputeDiskStatus(self, instance, snode, dev):
5686     """Compute block device status.
5687
5688     """
5689     static = self.op.static
5690     if not static:
5691       self.cfg.SetDiskID(dev, instance.primary_node)
5692       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5693       if dev_pstatus.offline:
5694         dev_pstatus = None
5695       else:
5696         msg = dev_pstatus.RemoteFailMsg()
5697         if msg:
5698           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5699                                    (instance.name, msg))
5700         dev_pstatus = dev_pstatus.payload
5701     else:
5702       dev_pstatus = None
5703
5704     if dev.dev_type in constants.LDS_DRBD:
5705       # we change the snode then (otherwise we use the one passed in)
5706       if dev.logical_id[0] == instance.primary_node:
5707         snode = dev.logical_id[1]
5708       else:
5709         snode = dev.logical_id[0]
5710
5711     if snode and not static:
5712       self.cfg.SetDiskID(dev, snode)
5713       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5714       if dev_sstatus.offline:
5715         dev_sstatus = None
5716       else:
5717         msg = dev_sstatus.RemoteFailMsg()
5718         if msg:
5719           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5720                                    (instance.name, msg))
5721         dev_sstatus = dev_sstatus.payload
5722     else:
5723       dev_sstatus = None
5724
5725     if dev.children:
5726       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5727                       for child in dev.children]
5728     else:
5729       dev_children = []
5730
5731     data = {
5732       "iv_name": dev.iv_name,
5733       "dev_type": dev.dev_type,
5734       "logical_id": dev.logical_id,
5735       "physical_id": dev.physical_id,
5736       "pstatus": dev_pstatus,
5737       "sstatus": dev_sstatus,
5738       "children": dev_children,
5739       "mode": dev.mode,
5740       }
5741
5742     return data
5743
5744   def Exec(self, feedback_fn):
5745     """Gather and return data"""
5746     result = {}
5747
5748     cluster = self.cfg.GetClusterInfo()
5749
5750     for instance in self.wanted_instances:
5751       if not self.op.static:
5752         remote_info = self.rpc.call_instance_info(instance.primary_node,
5753                                                   instance.name,
5754                                                   instance.hypervisor)
5755         remote_info.Raise()
5756         remote_info = remote_info.data
5757         if remote_info and "state" in remote_info:
5758           remote_state = "up"
5759         else:
5760           remote_state = "down"
5761       else:
5762         remote_state = None
5763       if instance.admin_up:
5764         config_state = "up"
5765       else:
5766         config_state = "down"
5767
5768       disks = [self._ComputeDiskStatus(instance, None, device)
5769                for device in instance.disks]
5770
5771       idict = {
5772         "name": instance.name,
5773         "config_state": config_state,
5774         "run_state": remote_state,
5775         "pnode": instance.primary_node,
5776         "snodes": instance.secondary_nodes,
5777         "os": instance.os,
5778         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5779         "disks": disks,
5780         "hypervisor": instance.hypervisor,
5781         "network_port": instance.network_port,
5782         "hv_instance": instance.hvparams,
5783         "hv_actual": cluster.FillHV(instance),
5784         "be_instance": instance.beparams,
5785         "be_actual": cluster.FillBE(instance),
5786         }
5787
5788       result[instance.name] = idict
5789
5790     return result
5791
5792
5793 class LUSetInstanceParams(LogicalUnit):
5794   """Modifies an instances's parameters.
5795
5796   """
5797   HPATH = "instance-modify"
5798   HTYPE = constants.HTYPE_INSTANCE
5799   _OP_REQP = ["instance_name"]
5800   REQ_BGL = False
5801
5802   def CheckArguments(self):
5803     if not hasattr(self.op, 'nics'):
5804       self.op.nics = []
5805     if not hasattr(self.op, 'disks'):
5806       self.op.disks = []
5807     if not hasattr(self.op, 'beparams'):
5808       self.op.beparams = {}
5809     if not hasattr(self.op, 'hvparams'):
5810       self.op.hvparams = {}
5811     self.op.force = getattr(self.op, "force", False)
5812     if not (self.op.nics or self.op.disks or
5813             self.op.hvparams or self.op.beparams):
5814       raise errors.OpPrereqError("No changes submitted")
5815
5816     # Disk validation
5817     disk_addremove = 0
5818     for disk_op, disk_dict in self.op.disks:
5819       if disk_op == constants.DDM_REMOVE:
5820         disk_addremove += 1
5821         continue
5822       elif disk_op == constants.DDM_ADD:
5823         disk_addremove += 1
5824       else:
5825         if not isinstance(disk_op, int):
5826           raise errors.OpPrereqError("Invalid disk index")
5827       if disk_op == constants.DDM_ADD:
5828         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5829         if mode not in constants.DISK_ACCESS_SET:
5830           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5831         size = disk_dict.get('size', None)
5832         if size is None:
5833           raise errors.OpPrereqError("Required disk parameter size missing")
5834         try:
5835           size = int(size)
5836         except ValueError, err:
5837           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5838                                      str(err))
5839         disk_dict['size'] = size
5840       else:
5841         # modification of disk
5842         if 'size' in disk_dict:
5843           raise errors.OpPrereqError("Disk size change not possible, use"
5844                                      " grow-disk")
5845
5846     if disk_addremove > 1:
5847       raise errors.OpPrereqError("Only one disk add or remove operation"
5848                                  " supported at a time")
5849
5850     # NIC validation
5851     nic_addremove = 0
5852     for nic_op, nic_dict in self.op.nics:
5853       if nic_op == constants.DDM_REMOVE:
5854         nic_addremove += 1
5855         continue
5856       elif nic_op == constants.DDM_ADD:
5857         nic_addremove += 1
5858       else:
5859         if not isinstance(nic_op, int):
5860           raise errors.OpPrereqError("Invalid nic index")
5861
5862       # nic_dict should be a dict
5863       nic_ip = nic_dict.get('ip', None)
5864       if nic_ip is not None:
5865         if nic_ip.lower() == constants.VALUE_NONE:
5866           nic_dict['ip'] = None
5867         else:
5868           if not utils.IsValidIP(nic_ip):
5869             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5870
5871       if nic_op == constants.DDM_ADD:
5872         nic_bridge = nic_dict.get('bridge', None)
5873         if nic_bridge is None:
5874           nic_dict['bridge'] = self.cfg.GetDefBridge()
5875         nic_mac = nic_dict.get('mac', None)
5876         if nic_mac is None:
5877           nic_dict['mac'] = constants.VALUE_AUTO
5878
5879       if 'mac' in nic_dict:
5880         nic_mac = nic_dict['mac']
5881         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5882           if not utils.IsValidMac(nic_mac):
5883             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5884         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5885           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5886                                      " modifying an existing nic")
5887
5888     if nic_addremove > 1:
5889       raise errors.OpPrereqError("Only one NIC add or remove operation"
5890                                  " supported at a time")
5891
5892   def ExpandNames(self):
5893     self._ExpandAndLockInstance()
5894     self.needed_locks[locking.LEVEL_NODE] = []
5895     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5896
5897   def DeclareLocks(self, level):
5898     if level == locking.LEVEL_NODE:
5899       self._LockInstancesNodes()
5900
5901   def BuildHooksEnv(self):
5902     """Build hooks env.
5903
5904     This runs on the master, primary and secondaries.
5905
5906     """
5907     args = dict()
5908     if constants.BE_MEMORY in self.be_new:
5909       args['memory'] = self.be_new[constants.BE_MEMORY]
5910     if constants.BE_VCPUS in self.be_new:
5911       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5912     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5913     # information at all.
5914     if self.op.nics:
5915       args['nics'] = []
5916       nic_override = dict(self.op.nics)
5917       for idx, nic in enumerate(self.instance.nics):
5918         if idx in nic_override:
5919           this_nic_override = nic_override[idx]
5920         else:
5921           this_nic_override = {}
5922         if 'ip' in this_nic_override:
5923           ip = this_nic_override['ip']
5924         else:
5925           ip = nic.ip
5926         if 'bridge' in this_nic_override:
5927           bridge = this_nic_override['bridge']
5928         else:
5929           bridge = nic.bridge
5930         if 'mac' in this_nic_override:
5931           mac = this_nic_override['mac']
5932         else:
5933           mac = nic.mac
5934         args['nics'].append((ip, bridge, mac))
5935       if constants.DDM_ADD in nic_override:
5936         ip = nic_override[constants.DDM_ADD].get('ip', None)
5937         bridge = nic_override[constants.DDM_ADD]['bridge']
5938         mac = nic_override[constants.DDM_ADD]['mac']
5939         args['nics'].append((ip, bridge, mac))
5940       elif constants.DDM_REMOVE in nic_override:
5941         del args['nics'][-1]
5942
5943     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5944     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5945     return env, nl, nl
5946
5947   def CheckPrereq(self):
5948     """Check prerequisites.
5949
5950     This only checks the instance list against the existing names.
5951
5952     """
5953     force = self.force = self.op.force
5954
5955     # checking the new params on the primary/secondary nodes
5956
5957     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5958     assert self.instance is not None, \
5959       "Cannot retrieve locked instance %s" % self.op.instance_name
5960     pnode = instance.primary_node
5961     nodelist = list(instance.all_nodes)
5962
5963     # hvparams processing
5964     if self.op.hvparams:
5965       i_hvdict = copy.deepcopy(instance.hvparams)
5966       for key, val in self.op.hvparams.iteritems():
5967         if val == constants.VALUE_DEFAULT:
5968           try:
5969             del i_hvdict[key]
5970           except KeyError:
5971             pass
5972         else:
5973           i_hvdict[key] = val
5974       cluster = self.cfg.GetClusterInfo()
5975       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5976       hv_new = objects.FillDict(cluster.hvparams[instance.hypervisor],
5977                                 i_hvdict)
5978       # local check
5979       hypervisor.GetHypervisor(
5980         instance.hypervisor).CheckParameterSyntax(hv_new)
5981       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5982       self.hv_new = hv_new # the new actual values
5983       self.hv_inst = i_hvdict # the new dict (without defaults)
5984     else:
5985       self.hv_new = self.hv_inst = {}
5986
5987     # beparams processing
5988     if self.op.beparams:
5989       i_bedict = copy.deepcopy(instance.beparams)
5990       for key, val in self.op.beparams.iteritems():
5991         if val == constants.VALUE_DEFAULT:
5992           try:
5993             del i_bedict[key]
5994           except KeyError:
5995             pass
5996         else:
5997           i_bedict[key] = val
5998       cluster = self.cfg.GetClusterInfo()
5999       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6000       be_new = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
6001                                 i_bedict)
6002       self.be_new = be_new # the new actual values
6003       self.be_inst = i_bedict # the new dict (without defaults)
6004     else:
6005       self.be_new = self.be_inst = {}
6006
6007     self.warn = []
6008
6009     if constants.BE_MEMORY in self.op.beparams and not self.force:
6010       mem_check_list = [pnode]
6011       if be_new[constants.BE_AUTO_BALANCE]:
6012         # either we changed auto_balance to yes or it was from before
6013         mem_check_list.extend(instance.secondary_nodes)
6014       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6015                                                   instance.hypervisor)
6016       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6017                                          instance.hypervisor)
6018       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6019         # Assume the primary node is unreachable and go ahead
6020         self.warn.append("Can't get info from primary node %s" % pnode)
6021       else:
6022         if not instance_info.failed and instance_info.data:
6023           current_mem = int(instance_info.data['memory'])
6024         else:
6025           # Assume instance not running
6026           # (there is a slight race condition here, but it's not very probable,
6027           # and we have no other way to check)
6028           current_mem = 0
6029         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6030                     nodeinfo[pnode].data['memory_free'])
6031         if miss_mem > 0:
6032           raise errors.OpPrereqError("This change will prevent the instance"
6033                                      " from starting, due to %d MB of memory"
6034                                      " missing on its primary node" % miss_mem)
6035
6036       if be_new[constants.BE_AUTO_BALANCE]:
6037         for node, nres in nodeinfo.iteritems():
6038           if node not in instance.secondary_nodes:
6039             continue
6040           if nres.failed or not isinstance(nres.data, dict):
6041             self.warn.append("Can't get info from secondary node %s" % node)
6042           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6043             self.warn.append("Not enough memory to failover instance to"
6044                              " secondary node %s" % node)
6045
6046     # NIC processing
6047     for nic_op, nic_dict in self.op.nics:
6048       if nic_op == constants.DDM_REMOVE:
6049         if not instance.nics:
6050           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6051         continue
6052       if nic_op != constants.DDM_ADD:
6053         # an existing nic
6054         if nic_op < 0 or nic_op >= len(instance.nics):
6055           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6056                                      " are 0 to %d" %
6057                                      (nic_op, len(instance.nics)))
6058       if 'bridge' in nic_dict:
6059         nic_bridge = nic_dict['bridge']
6060         if nic_bridge is None:
6061           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6062         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6063           msg = ("Bridge '%s' doesn't exist on one of"
6064                  " the instance nodes" % nic_bridge)
6065           if self.force:
6066             self.warn.append(msg)
6067           else:
6068             raise errors.OpPrereqError(msg)
6069       if 'mac' in nic_dict:
6070         nic_mac = nic_dict['mac']
6071         if nic_mac is None:
6072           raise errors.OpPrereqError('Cannot set the nic mac to None')
6073         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6074           # otherwise generate the mac
6075           nic_dict['mac'] = self.cfg.GenerateMAC()
6076         else:
6077           # or validate/reserve the current one
6078           if self.cfg.IsMacInUse(nic_mac):
6079             raise errors.OpPrereqError("MAC address %s already in use"
6080                                        " in cluster" % nic_mac)
6081
6082     # DISK processing
6083     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6084       raise errors.OpPrereqError("Disk operations not supported for"
6085                                  " diskless instances")
6086     for disk_op, disk_dict in self.op.disks:
6087       if disk_op == constants.DDM_REMOVE:
6088         if len(instance.disks) == 1:
6089           raise errors.OpPrereqError("Cannot remove the last disk of"
6090                                      " an instance")
6091         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6092         ins_l = ins_l[pnode]
6093         if ins_l.failed or not isinstance(ins_l.data, list):
6094           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6095         if instance.name in ins_l.data:
6096           raise errors.OpPrereqError("Instance is running, can't remove"
6097                                      " disks.")
6098
6099       if (disk_op == constants.DDM_ADD and
6100           len(instance.nics) >= constants.MAX_DISKS):
6101         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6102                                    " add more" % constants.MAX_DISKS)
6103       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6104         # an existing disk
6105         if disk_op < 0 or disk_op >= len(instance.disks):
6106           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6107                                      " are 0 to %d" %
6108                                      (disk_op, len(instance.disks)))
6109
6110     return
6111
6112   def Exec(self, feedback_fn):
6113     """Modifies an instance.
6114
6115     All parameters take effect only at the next restart of the instance.
6116
6117     """
6118     # Process here the warnings from CheckPrereq, as we don't have a
6119     # feedback_fn there.
6120     for warn in self.warn:
6121       feedback_fn("WARNING: %s" % warn)
6122
6123     result = []
6124     instance = self.instance
6125     # disk changes
6126     for disk_op, disk_dict in self.op.disks:
6127       if disk_op == constants.DDM_REMOVE:
6128         # remove the last disk
6129         device = instance.disks.pop()
6130         device_idx = len(instance.disks)
6131         for node, disk in device.ComputeNodeTree(instance.primary_node):
6132           self.cfg.SetDiskID(disk, node)
6133           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6134           if msg:
6135             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6136                             " continuing anyway", device_idx, node, msg)
6137         result.append(("disk/%d" % device_idx, "remove"))
6138       elif disk_op == constants.DDM_ADD:
6139         # add a new disk
6140         if instance.disk_template == constants.DT_FILE:
6141           file_driver, file_path = instance.disks[0].logical_id
6142           file_path = os.path.dirname(file_path)
6143         else:
6144           file_driver = file_path = None
6145         disk_idx_base = len(instance.disks)
6146         new_disk = _GenerateDiskTemplate(self,
6147                                          instance.disk_template,
6148                                          instance.name, instance.primary_node,
6149                                          instance.secondary_nodes,
6150                                          [disk_dict],
6151                                          file_path,
6152                                          file_driver,
6153                                          disk_idx_base)[0]
6154         instance.disks.append(new_disk)
6155         info = _GetInstanceInfoText(instance)
6156
6157         logging.info("Creating volume %s for instance %s",
6158                      new_disk.iv_name, instance.name)
6159         # Note: this needs to be kept in sync with _CreateDisks
6160         #HARDCODE
6161         for node in instance.all_nodes:
6162           f_create = node == instance.primary_node
6163           try:
6164             _CreateBlockDev(self, node, instance, new_disk,
6165                             f_create, info, f_create)
6166           except errors.OpExecError, err:
6167             self.LogWarning("Failed to create volume %s (%s) on"
6168                             " node %s: %s",
6169                             new_disk.iv_name, new_disk, node, err)
6170         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6171                        (new_disk.size, new_disk.mode)))
6172       else:
6173         # change a given disk
6174         instance.disks[disk_op].mode = disk_dict['mode']
6175         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6176     # NIC changes
6177     for nic_op, nic_dict in self.op.nics:
6178       if nic_op == constants.DDM_REMOVE:
6179         # remove the last nic
6180         del instance.nics[-1]
6181         result.append(("nic.%d" % len(instance.nics), "remove"))
6182       elif nic_op == constants.DDM_ADD:
6183         # mac and bridge should be set, by now
6184         mac = nic_dict['mac']
6185         bridge = nic_dict['bridge']
6186         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6187                               bridge=bridge)
6188         instance.nics.append(new_nic)
6189         result.append(("nic.%d" % (len(instance.nics) - 1),
6190                        "add:mac=%s,ip=%s,bridge=%s" %
6191                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6192       else:
6193         # change a given nic
6194         for key in 'mac', 'ip', 'bridge':
6195           if key in nic_dict:
6196             setattr(instance.nics[nic_op], key, nic_dict[key])
6197             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6198
6199     # hvparams changes
6200     if self.op.hvparams:
6201       instance.hvparams = self.hv_inst
6202       for key, val in self.op.hvparams.iteritems():
6203         result.append(("hv/%s" % key, val))
6204
6205     # beparams changes
6206     if self.op.beparams:
6207       instance.beparams = self.be_inst
6208       for key, val in self.op.beparams.iteritems():
6209         result.append(("be/%s" % key, val))
6210
6211     self.cfg.Update(instance)
6212
6213     return result
6214
6215
6216 class LUQueryExports(NoHooksLU):
6217   """Query the exports list
6218
6219   """
6220   _OP_REQP = ['nodes']
6221   REQ_BGL = False
6222
6223   def ExpandNames(self):
6224     self.needed_locks = {}
6225     self.share_locks[locking.LEVEL_NODE] = 1
6226     if not self.op.nodes:
6227       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6228     else:
6229       self.needed_locks[locking.LEVEL_NODE] = \
6230         _GetWantedNodes(self, self.op.nodes)
6231
6232   def CheckPrereq(self):
6233     """Check prerequisites.
6234
6235     """
6236     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6237
6238   def Exec(self, feedback_fn):
6239     """Compute the list of all the exported system images.
6240
6241     @rtype: dict
6242     @return: a dictionary with the structure node->(export-list)
6243         where export-list is a list of the instances exported on
6244         that node.
6245
6246     """
6247     rpcresult = self.rpc.call_export_list(self.nodes)
6248     result = {}
6249     for node in rpcresult:
6250       if rpcresult[node].failed:
6251         result[node] = False
6252       else:
6253         result[node] = rpcresult[node].data
6254
6255     return result
6256
6257
6258 class LUExportInstance(LogicalUnit):
6259   """Export an instance to an image in the cluster.
6260
6261   """
6262   HPATH = "instance-export"
6263   HTYPE = constants.HTYPE_INSTANCE
6264   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6265   REQ_BGL = False
6266
6267   def ExpandNames(self):
6268     self._ExpandAndLockInstance()
6269     # FIXME: lock only instance primary and destination node
6270     #
6271     # Sad but true, for now we have do lock all nodes, as we don't know where
6272     # the previous export might be, and and in this LU we search for it and
6273     # remove it from its current node. In the future we could fix this by:
6274     #  - making a tasklet to search (share-lock all), then create the new one,
6275     #    then one to remove, after
6276     #  - removing the removal operation altoghether
6277     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6278
6279   def DeclareLocks(self, level):
6280     """Last minute lock declaration."""
6281     # All nodes are locked anyway, so nothing to do here.
6282
6283   def BuildHooksEnv(self):
6284     """Build hooks env.
6285
6286     This will run on the master, primary node and target node.
6287
6288     """
6289     env = {
6290       "EXPORT_NODE": self.op.target_node,
6291       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6292       }
6293     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6294     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6295           self.op.target_node]
6296     return env, nl, nl
6297
6298   def CheckPrereq(self):
6299     """Check prerequisites.
6300
6301     This checks that the instance and node names are valid.
6302
6303     """
6304     instance_name = self.op.instance_name
6305     self.instance = self.cfg.GetInstanceInfo(instance_name)
6306     assert self.instance is not None, \
6307           "Cannot retrieve locked instance %s" % self.op.instance_name
6308     _CheckNodeOnline(self, self.instance.primary_node)
6309
6310     self.dst_node = self.cfg.GetNodeInfo(
6311       self.cfg.ExpandNodeName(self.op.target_node))
6312
6313     if self.dst_node is None:
6314       # This is wrong node name, not a non-locked node
6315       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6316     _CheckNodeOnline(self, self.dst_node.name)
6317     _CheckNodeNotDrained(self, self.dst_node.name)
6318
6319     # instance disk type verification
6320     for disk in self.instance.disks:
6321       if disk.dev_type == constants.LD_FILE:
6322         raise errors.OpPrereqError("Export not supported for instances with"
6323                                    " file-based disks")
6324
6325   def Exec(self, feedback_fn):
6326     """Export an instance to an image in the cluster.
6327
6328     """
6329     instance = self.instance
6330     dst_node = self.dst_node
6331     src_node = instance.primary_node
6332     if self.op.shutdown:
6333       # shutdown the instance, but not the disks
6334       result = self.rpc.call_instance_shutdown(src_node, instance)
6335       msg = result.RemoteFailMsg()
6336       if msg:
6337         raise errors.OpExecError("Could not shutdown instance %s on"
6338                                  " node %s: %s" %
6339                                  (instance.name, src_node, msg))
6340
6341     vgname = self.cfg.GetVGName()
6342
6343     snap_disks = []
6344
6345     # set the disks ID correctly since call_instance_start needs the
6346     # correct drbd minor to create the symlinks
6347     for disk in instance.disks:
6348       self.cfg.SetDiskID(disk, src_node)
6349
6350     try:
6351       for disk in instance.disks:
6352         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6353         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6354         if new_dev_name.failed or not new_dev_name.data:
6355           self.LogWarning("Could not snapshot block device %s on node %s",
6356                           disk.logical_id[1], src_node)
6357           snap_disks.append(False)
6358         else:
6359           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6360                                  logical_id=(vgname, new_dev_name.data),
6361                                  physical_id=(vgname, new_dev_name.data),
6362                                  iv_name=disk.iv_name)
6363           snap_disks.append(new_dev)
6364
6365     finally:
6366       if self.op.shutdown and instance.admin_up:
6367         result = self.rpc.call_instance_start(src_node, instance, None, None)
6368         msg = result.RemoteFailMsg()
6369         if msg:
6370           _ShutdownInstanceDisks(self, instance)
6371           raise errors.OpExecError("Could not start instance: %s" % msg)
6372
6373     # TODO: check for size
6374
6375     cluster_name = self.cfg.GetClusterName()
6376     for idx, dev in enumerate(snap_disks):
6377       if dev:
6378         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6379                                                instance, cluster_name, idx)
6380         if result.failed or not result.data:
6381           self.LogWarning("Could not export block device %s from node %s to"
6382                           " node %s", dev.logical_id[1], src_node,
6383                           dst_node.name)
6384         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6385         if msg:
6386           self.LogWarning("Could not remove snapshot block device %s from node"
6387                           " %s: %s", dev.logical_id[1], src_node, msg)
6388
6389     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6390     if result.failed or not result.data:
6391       self.LogWarning("Could not finalize export for instance %s on node %s",
6392                       instance.name, dst_node.name)
6393
6394     nodelist = self.cfg.GetNodeList()
6395     nodelist.remove(dst_node.name)
6396
6397     # on one-node clusters nodelist will be empty after the removal
6398     # if we proceed the backup would be removed because OpQueryExports
6399     # substitutes an empty list with the full cluster node list.
6400     if nodelist:
6401       exportlist = self.rpc.call_export_list(nodelist)
6402       for node in exportlist:
6403         if exportlist[node].failed:
6404           continue
6405         if instance.name in exportlist[node].data:
6406           if not self.rpc.call_export_remove(node, instance.name):
6407             self.LogWarning("Could not remove older export for instance %s"
6408                             " on node %s", instance.name, node)
6409
6410
6411 class LURemoveExport(NoHooksLU):
6412   """Remove exports related to the named instance.
6413
6414   """
6415   _OP_REQP = ["instance_name"]
6416   REQ_BGL = False
6417
6418   def ExpandNames(self):
6419     self.needed_locks = {}
6420     # We need all nodes to be locked in order for RemoveExport to work, but we
6421     # don't need to lock the instance itself, as nothing will happen to it (and
6422     # we can remove exports also for a removed instance)
6423     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6424
6425   def CheckPrereq(self):
6426     """Check prerequisites.
6427     """
6428     pass
6429
6430   def Exec(self, feedback_fn):
6431     """Remove any export.
6432
6433     """
6434     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6435     # If the instance was not found we'll try with the name that was passed in.
6436     # This will only work if it was an FQDN, though.
6437     fqdn_warn = False
6438     if not instance_name:
6439       fqdn_warn = True
6440       instance_name = self.op.instance_name
6441
6442     exportlist = self.rpc.call_export_list(self.acquired_locks[
6443       locking.LEVEL_NODE])
6444     found = False
6445     for node in exportlist:
6446       if exportlist[node].failed:
6447         self.LogWarning("Failed to query node %s, continuing" % node)
6448         continue
6449       if instance_name in exportlist[node].data:
6450         found = True
6451         result = self.rpc.call_export_remove(node, instance_name)
6452         if result.failed or not result.data:
6453           logging.error("Could not remove export for instance %s"
6454                         " on node %s", instance_name, node)
6455
6456     if fqdn_warn and not found:
6457       feedback_fn("Export not found. If trying to remove an export belonging"
6458                   " to a deleted instance please use its Fully Qualified"
6459                   " Domain Name.")
6460
6461
6462 class TagsLU(NoHooksLU):
6463   """Generic tags LU.
6464
6465   This is an abstract class which is the parent of all the other tags LUs.
6466
6467   """
6468
6469   def ExpandNames(self):
6470     self.needed_locks = {}
6471     if self.op.kind == constants.TAG_NODE:
6472       name = self.cfg.ExpandNodeName(self.op.name)
6473       if name is None:
6474         raise errors.OpPrereqError("Invalid node name (%s)" %
6475                                    (self.op.name,))
6476       self.op.name = name
6477       self.needed_locks[locking.LEVEL_NODE] = name
6478     elif self.op.kind == constants.TAG_INSTANCE:
6479       name = self.cfg.ExpandInstanceName(self.op.name)
6480       if name is None:
6481         raise errors.OpPrereqError("Invalid instance name (%s)" %
6482                                    (self.op.name,))
6483       self.op.name = name
6484       self.needed_locks[locking.LEVEL_INSTANCE] = name
6485
6486   def CheckPrereq(self):
6487     """Check prerequisites.
6488
6489     """
6490     if self.op.kind == constants.TAG_CLUSTER:
6491       self.target = self.cfg.GetClusterInfo()
6492     elif self.op.kind == constants.TAG_NODE:
6493       self.target = self.cfg.GetNodeInfo(self.op.name)
6494     elif self.op.kind == constants.TAG_INSTANCE:
6495       self.target = self.cfg.GetInstanceInfo(self.op.name)
6496     else:
6497       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6498                                  str(self.op.kind))
6499
6500
6501 class LUGetTags(TagsLU):
6502   """Returns the tags of a given object.
6503
6504   """
6505   _OP_REQP = ["kind", "name"]
6506   REQ_BGL = False
6507
6508   def Exec(self, feedback_fn):
6509     """Returns the tag list.
6510
6511     """
6512     return list(self.target.GetTags())
6513
6514
6515 class LUSearchTags(NoHooksLU):
6516   """Searches the tags for a given pattern.
6517
6518   """
6519   _OP_REQP = ["pattern"]
6520   REQ_BGL = False
6521
6522   def ExpandNames(self):
6523     self.needed_locks = {}
6524
6525   def CheckPrereq(self):
6526     """Check prerequisites.
6527
6528     This checks the pattern passed for validity by compiling it.
6529
6530     """
6531     try:
6532       self.re = re.compile(self.op.pattern)
6533     except re.error, err:
6534       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6535                                  (self.op.pattern, err))
6536
6537   def Exec(self, feedback_fn):
6538     """Returns the tag list.
6539
6540     """
6541     cfg = self.cfg
6542     tgts = [("/cluster", cfg.GetClusterInfo())]
6543     ilist = cfg.GetAllInstancesInfo().values()
6544     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6545     nlist = cfg.GetAllNodesInfo().values()
6546     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6547     results = []
6548     for path, target in tgts:
6549       for tag in target.GetTags():
6550         if self.re.search(tag):
6551           results.append((path, tag))
6552     return results
6553
6554
6555 class LUAddTags(TagsLU):
6556   """Sets a tag on a given object.
6557
6558   """
6559   _OP_REQP = ["kind", "name", "tags"]
6560   REQ_BGL = False
6561
6562   def CheckPrereq(self):
6563     """Check prerequisites.
6564
6565     This checks the type and length of the tag name and value.
6566
6567     """
6568     TagsLU.CheckPrereq(self)
6569     for tag in self.op.tags:
6570       objects.TaggableObject.ValidateTag(tag)
6571
6572   def Exec(self, feedback_fn):
6573     """Sets the tag.
6574
6575     """
6576     try:
6577       for tag in self.op.tags:
6578         self.target.AddTag(tag)
6579     except errors.TagError, err:
6580       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6581     try:
6582       self.cfg.Update(self.target)
6583     except errors.ConfigurationError:
6584       raise errors.OpRetryError("There has been a modification to the"
6585                                 " config file and the operation has been"
6586                                 " aborted. Please retry.")
6587
6588
6589 class LUDelTags(TagsLU):
6590   """Delete a list of tags from a given object.
6591
6592   """
6593   _OP_REQP = ["kind", "name", "tags"]
6594   REQ_BGL = False
6595
6596   def CheckPrereq(self):
6597     """Check prerequisites.
6598
6599     This checks that we have the given tag.
6600
6601     """
6602     TagsLU.CheckPrereq(self)
6603     for tag in self.op.tags:
6604       objects.TaggableObject.ValidateTag(tag)
6605     del_tags = frozenset(self.op.tags)
6606     cur_tags = self.target.GetTags()
6607     if not del_tags <= cur_tags:
6608       diff_tags = del_tags - cur_tags
6609       diff_names = ["'%s'" % tag for tag in diff_tags]
6610       diff_names.sort()
6611       raise errors.OpPrereqError("Tag(s) %s not found" %
6612                                  (",".join(diff_names)))
6613
6614   def Exec(self, feedback_fn):
6615     """Remove the tag from the object.
6616
6617     """
6618     for tag in self.op.tags:
6619       self.target.RemoveTag(tag)
6620     try:
6621       self.cfg.Update(self.target)
6622     except errors.ConfigurationError:
6623       raise errors.OpRetryError("There has been a modification to the"
6624                                 " config file and the operation has been"
6625                                 " aborted. Please retry.")
6626
6627
6628 class LUTestDelay(NoHooksLU):
6629   """Sleep for a specified amount of time.
6630
6631   This LU sleeps on the master and/or nodes for a specified amount of
6632   time.
6633
6634   """
6635   _OP_REQP = ["duration", "on_master", "on_nodes"]
6636   REQ_BGL = False
6637
6638   def ExpandNames(self):
6639     """Expand names and set required locks.
6640
6641     This expands the node list, if any.
6642
6643     """
6644     self.needed_locks = {}
6645     if self.op.on_nodes:
6646       # _GetWantedNodes can be used here, but is not always appropriate to use
6647       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6648       # more information.
6649       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6650       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6651
6652   def CheckPrereq(self):
6653     """Check prerequisites.
6654
6655     """
6656
6657   def Exec(self, feedback_fn):
6658     """Do the actual sleep.
6659
6660     """
6661     if self.op.on_master:
6662       if not utils.TestDelay(self.op.duration):
6663         raise errors.OpExecError("Error during master delay test")
6664     if self.op.on_nodes:
6665       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6666       if not result:
6667         raise errors.OpExecError("Complete failure from rpc call")
6668       for node, node_result in result.items():
6669         node_result.Raise()
6670         if not node_result.data:
6671           raise errors.OpExecError("Failure during rpc call to node %s,"
6672                                    " result: %s" % (node, node_result.data))
6673
6674
6675 class IAllocator(object):
6676   """IAllocator framework.
6677
6678   An IAllocator instance has three sets of attributes:
6679     - cfg that is needed to query the cluster
6680     - input data (all members of the _KEYS class attribute are required)
6681     - four buffer attributes (in|out_data|text), that represent the
6682       input (to the external script) in text and data structure format,
6683       and the output from it, again in two formats
6684     - the result variables from the script (success, info, nodes) for
6685       easy usage
6686
6687   """
6688   _ALLO_KEYS = [
6689     "mem_size", "disks", "disk_template",
6690     "os", "tags", "nics", "vcpus", "hypervisor",
6691     ]
6692   _RELO_KEYS = [
6693     "relocate_from",
6694     ]
6695
6696   def __init__(self, lu, mode, name, **kwargs):
6697     self.lu = lu
6698     # init buffer variables
6699     self.in_text = self.out_text = self.in_data = self.out_data = None
6700     # init all input fields so that pylint is happy
6701     self.mode = mode
6702     self.name = name
6703     self.mem_size = self.disks = self.disk_template = None
6704     self.os = self.tags = self.nics = self.vcpus = None
6705     self.hypervisor = None
6706     self.relocate_from = None
6707     # computed fields
6708     self.required_nodes = None
6709     # init result fields
6710     self.success = self.info = self.nodes = None
6711     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6712       keyset = self._ALLO_KEYS
6713     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6714       keyset = self._RELO_KEYS
6715     else:
6716       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6717                                    " IAllocator" % self.mode)
6718     for key in kwargs:
6719       if key not in keyset:
6720         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6721                                      " IAllocator" % key)
6722       setattr(self, key, kwargs[key])
6723     for key in keyset:
6724       if key not in kwargs:
6725         raise errors.ProgrammerError("Missing input parameter '%s' to"
6726                                      " IAllocator" % key)
6727     self._BuildInputData()
6728
6729   def _ComputeClusterData(self):
6730     """Compute the generic allocator input data.
6731
6732     This is the data that is independent of the actual operation.
6733
6734     """
6735     cfg = self.lu.cfg
6736     cluster_info = cfg.GetClusterInfo()
6737     # cluster data
6738     data = {
6739       "version": constants.IALLOCATOR_VERSION,
6740       "cluster_name": cfg.GetClusterName(),
6741       "cluster_tags": list(cluster_info.GetTags()),
6742       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6743       # we don't have job IDs
6744       }
6745     iinfo = cfg.GetAllInstancesInfo().values()
6746     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6747
6748     # node data
6749     node_results = {}
6750     node_list = cfg.GetNodeList()
6751
6752     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6753       hypervisor_name = self.hypervisor
6754     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6755       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6756
6757     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6758                                            hypervisor_name)
6759     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6760                        cluster_info.enabled_hypervisors)
6761     for nname, nresult in node_data.items():
6762       # first fill in static (config-based) values
6763       ninfo = cfg.GetNodeInfo(nname)
6764       pnr = {
6765         "tags": list(ninfo.GetTags()),
6766         "primary_ip": ninfo.primary_ip,
6767         "secondary_ip": ninfo.secondary_ip,
6768         "offline": ninfo.offline,
6769         "drained": ninfo.drained,
6770         "master_candidate": ninfo.master_candidate,
6771         }
6772
6773       if not ninfo.offline:
6774         nresult.Raise()
6775         if not isinstance(nresult.data, dict):
6776           raise errors.OpExecError("Can't get data for node %s" % nname)
6777         remote_info = nresult.data
6778         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6779                      'vg_size', 'vg_free', 'cpu_total']:
6780           if attr not in remote_info:
6781             raise errors.OpExecError("Node '%s' didn't return attribute"
6782                                      " '%s'" % (nname, attr))
6783           try:
6784             remote_info[attr] = int(remote_info[attr])
6785           except ValueError, err:
6786             raise errors.OpExecError("Node '%s' returned invalid value"
6787                                      " for '%s': %s" % (nname, attr, err))
6788         # compute memory used by primary instances
6789         i_p_mem = i_p_up_mem = 0
6790         for iinfo, beinfo in i_list:
6791           if iinfo.primary_node == nname:
6792             i_p_mem += beinfo[constants.BE_MEMORY]
6793             if iinfo.name not in node_iinfo[nname].data:
6794               i_used_mem = 0
6795             else:
6796               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6797             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6798             remote_info['memory_free'] -= max(0, i_mem_diff)
6799
6800             if iinfo.admin_up:
6801               i_p_up_mem += beinfo[constants.BE_MEMORY]
6802
6803         # compute memory used by instances
6804         pnr_dyn = {
6805           "total_memory": remote_info['memory_total'],
6806           "reserved_memory": remote_info['memory_dom0'],
6807           "free_memory": remote_info['memory_free'],
6808           "total_disk": remote_info['vg_size'],
6809           "free_disk": remote_info['vg_free'],
6810           "total_cpus": remote_info['cpu_total'],
6811           "i_pri_memory": i_p_mem,
6812           "i_pri_up_memory": i_p_up_mem,
6813           }
6814         pnr.update(pnr_dyn)
6815
6816       node_results[nname] = pnr
6817     data["nodes"] = node_results
6818
6819     # instance data
6820     instance_data = {}
6821     for iinfo, beinfo in i_list:
6822       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6823                   for n in iinfo.nics]
6824       pir = {
6825         "tags": list(iinfo.GetTags()),
6826         "admin_up": iinfo.admin_up,
6827         "vcpus": beinfo[constants.BE_VCPUS],
6828         "memory": beinfo[constants.BE_MEMORY],
6829         "os": iinfo.os,
6830         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6831         "nics": nic_data,
6832         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6833         "disk_template": iinfo.disk_template,
6834         "hypervisor": iinfo.hypervisor,
6835         }
6836       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6837                                                  pir["disks"])
6838       instance_data[iinfo.name] = pir
6839
6840     data["instances"] = instance_data
6841
6842     self.in_data = data
6843
6844   def _AddNewInstance(self):
6845     """Add new instance data to allocator structure.
6846
6847     This in combination with _AllocatorGetClusterData will create the
6848     correct structure needed as input for the allocator.
6849
6850     The checks for the completeness of the opcode must have already been
6851     done.
6852
6853     """
6854     data = self.in_data
6855
6856     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6857
6858     if self.disk_template in constants.DTS_NET_MIRROR:
6859       self.required_nodes = 2
6860     else:
6861       self.required_nodes = 1
6862     request = {
6863       "type": "allocate",
6864       "name": self.name,
6865       "disk_template": self.disk_template,
6866       "tags": self.tags,
6867       "os": self.os,
6868       "vcpus": self.vcpus,
6869       "memory": self.mem_size,
6870       "disks": self.disks,
6871       "disk_space_total": disk_space,
6872       "nics": self.nics,
6873       "required_nodes": self.required_nodes,
6874       }
6875     data["request"] = request
6876
6877   def _AddRelocateInstance(self):
6878     """Add relocate instance data to allocator structure.
6879
6880     This in combination with _IAllocatorGetClusterData will create the
6881     correct structure needed as input for the allocator.
6882
6883     The checks for the completeness of the opcode must have already been
6884     done.
6885
6886     """
6887     instance = self.lu.cfg.GetInstanceInfo(self.name)
6888     if instance is None:
6889       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6890                                    " IAllocator" % self.name)
6891
6892     if instance.disk_template not in constants.DTS_NET_MIRROR:
6893       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6894
6895     if len(instance.secondary_nodes) != 1:
6896       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6897
6898     self.required_nodes = 1
6899     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6900     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6901
6902     request = {
6903       "type": "relocate",
6904       "name": self.name,
6905       "disk_space_total": disk_space,
6906       "required_nodes": self.required_nodes,
6907       "relocate_from": self.relocate_from,
6908       }
6909     self.in_data["request"] = request
6910
6911   def _BuildInputData(self):
6912     """Build input data structures.
6913
6914     """
6915     self._ComputeClusterData()
6916
6917     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6918       self._AddNewInstance()
6919     else:
6920       self._AddRelocateInstance()
6921
6922     self.in_text = serializer.Dump(self.in_data)
6923
6924   def Run(self, name, validate=True, call_fn=None):
6925     """Run an instance allocator and return the results.
6926
6927     """
6928     if call_fn is None:
6929       call_fn = self.lu.rpc.call_iallocator_runner
6930     data = self.in_text
6931
6932     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6933     result.Raise()
6934
6935     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6936       raise errors.OpExecError("Invalid result from master iallocator runner")
6937
6938     rcode, stdout, stderr, fail = result.data
6939
6940     if rcode == constants.IARUN_NOTFOUND:
6941       raise errors.OpExecError("Can't find allocator '%s'" % name)
6942     elif rcode == constants.IARUN_FAILURE:
6943       raise errors.OpExecError("Instance allocator call failed: %s,"
6944                                " output: %s" % (fail, stdout+stderr))
6945     self.out_text = stdout
6946     if validate:
6947       self._ValidateResult()
6948
6949   def _ValidateResult(self):
6950     """Process the allocator results.
6951
6952     This will process and if successful save the result in
6953     self.out_data and the other parameters.
6954
6955     """
6956     try:
6957       rdict = serializer.Load(self.out_text)
6958     except Exception, err:
6959       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6960
6961     if not isinstance(rdict, dict):
6962       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6963
6964     for key in "success", "info", "nodes":
6965       if key not in rdict:
6966         raise errors.OpExecError("Can't parse iallocator results:"
6967                                  " missing key '%s'" % key)
6968       setattr(self, key, rdict[key])
6969
6970     if not isinstance(rdict["nodes"], list):
6971       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6972                                " is not a list")
6973     self.out_data = rdict
6974
6975
6976 class LUTestAllocator(NoHooksLU):
6977   """Run allocator tests.
6978
6979   This LU runs the allocator tests
6980
6981   """
6982   _OP_REQP = ["direction", "mode", "name"]
6983
6984   def CheckPrereq(self):
6985     """Check prerequisites.
6986
6987     This checks the opcode parameters depending on the director and mode test.
6988
6989     """
6990     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6991       for attr in ["name", "mem_size", "disks", "disk_template",
6992                    "os", "tags", "nics", "vcpus"]:
6993         if not hasattr(self.op, attr):
6994           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6995                                      attr)
6996       iname = self.cfg.ExpandInstanceName(self.op.name)
6997       if iname is not None:
6998         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6999                                    iname)
7000       if not isinstance(self.op.nics, list):
7001         raise errors.OpPrereqError("Invalid parameter 'nics'")
7002       for row in self.op.nics:
7003         if (not isinstance(row, dict) or
7004             "mac" not in row or
7005             "ip" not in row or
7006             "bridge" not in row):
7007           raise errors.OpPrereqError("Invalid contents of the"
7008                                      " 'nics' parameter")
7009       if not isinstance(self.op.disks, list):
7010         raise errors.OpPrereqError("Invalid parameter 'disks'")
7011       for row in self.op.disks:
7012         if (not isinstance(row, dict) or
7013             "size" not in row or
7014             not isinstance(row["size"], int) or
7015             "mode" not in row or
7016             row["mode"] not in ['r', 'w']):
7017           raise errors.OpPrereqError("Invalid contents of the"
7018                                      " 'disks' parameter")
7019       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7020         self.op.hypervisor = self.cfg.GetHypervisorType()
7021     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7022       if not hasattr(self.op, "name"):
7023         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7024       fname = self.cfg.ExpandInstanceName(self.op.name)
7025       if fname is None:
7026         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7027                                    self.op.name)
7028       self.op.name = fname
7029       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7030     else:
7031       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7032                                  self.op.mode)
7033
7034     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7035       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7036         raise errors.OpPrereqError("Missing allocator name")
7037     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7038       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7039                                  self.op.direction)
7040
7041   def Exec(self, feedback_fn):
7042     """Run the allocator test.
7043
7044     """
7045     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7046       ial = IAllocator(self,
7047                        mode=self.op.mode,
7048                        name=self.op.name,
7049                        mem_size=self.op.mem_size,
7050                        disks=self.op.disks,
7051                        disk_template=self.op.disk_template,
7052                        os=self.op.os,
7053                        tags=self.op.tags,
7054                        nics=self.op.nics,
7055                        vcpus=self.op.vcpus,
7056                        hypervisor=self.op.hypervisor,
7057                        )
7058     else:
7059       ial = IAllocator(self,
7060                        mode=self.op.mode,
7061                        name=self.op.name,
7062                        relocate_from=list(self.relocate_from),
7063                        )
7064
7065     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7066       result = ial.in_text
7067     else:
7068       ial.Run(self.op.allocator, validate=False)
7069       result = ial.out_text
7070     return result