Allow modifying of default nic parameters
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384          msg = to_result.RemoteFailMsg()
1385          if msg:
1386            msg = ("Copy of file %s to node %s failed: %s" %
1387                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388            self.proc.LogWarning(msg)
1389
1390     finally:
1391       result = self.rpc.call_node_start_master(master, False)
1392       if result.failed or not result.data:
1393         self.LogWarning("Could not re-enable the master role on"
1394                         " the master, please restart manually.")
1395
1396
1397 def _RecursiveCheckIfLVMBased(disk):
1398   """Check if the given disk or its children are lvm-based.
1399
1400   @type disk: L{objects.Disk}
1401   @param disk: the disk to check
1402   @rtype: booleean
1403   @return: boolean indicating whether a LD_LV dev_type was found or not
1404
1405   """
1406   if disk.children:
1407     for chdisk in disk.children:
1408       if _RecursiveCheckIfLVMBased(chdisk):
1409         return True
1410   return disk.dev_type == constants.LD_LV
1411
1412
1413 class LUSetClusterParams(LogicalUnit):
1414   """Change the parameters of the cluster.
1415
1416   """
1417   HPATH = "cluster-modify"
1418   HTYPE = constants.HTYPE_CLUSTER
1419   _OP_REQP = []
1420   REQ_BGL = False
1421
1422   def CheckArguments(self):
1423     """Check parameters
1424
1425     """
1426     if not hasattr(self.op, "candidate_pool_size"):
1427       self.op.candidate_pool_size = None
1428     if self.op.candidate_pool_size is not None:
1429       try:
1430         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431       except (ValueError, TypeError), err:
1432         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433                                    str(err))
1434       if self.op.candidate_pool_size < 1:
1435         raise errors.OpPrereqError("At least one master candidate needed")
1436
1437   def ExpandNames(self):
1438     # FIXME: in the future maybe other cluster params won't require checking on
1439     # all nodes to be modified.
1440     self.needed_locks = {
1441       locking.LEVEL_NODE: locking.ALL_SET,
1442     }
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445   def BuildHooksEnv(self):
1446     """Build hooks env.
1447
1448     """
1449     env = {
1450       "OP_TARGET": self.cfg.GetClusterName(),
1451       "NEW_VG_NAME": self.op.vg_name,
1452       }
1453     mn = self.cfg.GetMasterNode()
1454     return env, [mn], [mn]
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     This checks whether the given params don't conflict and
1460     if the given volume group is valid.
1461
1462     """
1463     if self.op.vg_name is not None and not self.op.vg_name:
1464       instances = self.cfg.GetAllInstancesInfo().values()
1465       for inst in instances:
1466         for disk in inst.disks:
1467           if _RecursiveCheckIfLVMBased(disk):
1468             raise errors.OpPrereqError("Cannot disable lvm storage while"
1469                                        " lvm-based instances exist")
1470
1471     node_list = self.acquired_locks[locking.LEVEL_NODE]
1472
1473     # if vg_name not None, checks given volume group on all nodes
1474     if self.op.vg_name:
1475       vglist = self.rpc.call_vg_list(node_list)
1476       for node in node_list:
1477         if vglist[node].failed:
1478           # ignoring down node
1479           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480           continue
1481         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482                                               self.op.vg_name,
1483                                               constants.MIN_VG_SIZE)
1484         if vgstatus:
1485           raise errors.OpPrereqError("Error on node '%s': %s" %
1486                                      (node, vgstatus))
1487
1488     self.cluster = cluster = self.cfg.GetClusterInfo()
1489     # validate params changes
1490     if self.op.beparams:
1491       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492       self.new_beparams = objects.FillDict(
1493         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1494
1495     if self.op.nicparams:
1496       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1497       self.new_nicparams = objects.FillDict(
1498         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1499       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1500
1501     # hypervisor list/parameters
1502     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1503     if self.op.hvparams:
1504       if not isinstance(self.op.hvparams, dict):
1505         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1506       for hv_name, hv_dict in self.op.hvparams.items():
1507         if hv_name not in self.new_hvparams:
1508           self.new_hvparams[hv_name] = hv_dict
1509         else:
1510           self.new_hvparams[hv_name].update(hv_dict)
1511
1512     if self.op.enabled_hypervisors is not None:
1513       self.hv_list = self.op.enabled_hypervisors
1514     else:
1515       self.hv_list = cluster.enabled_hypervisors
1516
1517     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1518       # either the enabled list has changed, or the parameters have, validate
1519       for hv_name, hv_params in self.new_hvparams.items():
1520         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1521             (self.op.enabled_hypervisors and
1522              hv_name in self.op.enabled_hypervisors)):
1523           # either this is a new hypervisor, or its parameters have changed
1524           hv_class = hypervisor.GetHypervisor(hv_name)
1525           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1526           hv_class.CheckParameterSyntax(hv_params)
1527           _CheckHVParams(self, node_list, hv_name, hv_params)
1528
1529   def Exec(self, feedback_fn):
1530     """Change the parameters of the cluster.
1531
1532     """
1533     if self.op.vg_name is not None:
1534       new_volume = self.op.vg_name
1535       if not new_volume:
1536         new_volume = None
1537       if new_volume != self.cfg.GetVGName():
1538         self.cfg.SetVGName(new_volume)
1539       else:
1540         feedback_fn("Cluster LVM configuration already in desired"
1541                     " state, not changing")
1542     if self.op.hvparams:
1543       self.cluster.hvparams = self.new_hvparams
1544     if self.op.enabled_hypervisors is not None:
1545       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1546     if self.op.beparams:
1547       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1548     if self.op.nicparams:
1549       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1550
1551     if self.op.candidate_pool_size is not None:
1552       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1553
1554     self.cfg.Update(self.cluster)
1555
1556     # we want to update nodes after the cluster so that if any errors
1557     # happen, we have recorded and saved the cluster info
1558     if self.op.candidate_pool_size is not None:
1559       _AdjustCandidatePool(self)
1560
1561
1562 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1563   """Distribute additional files which are part of the cluster configuration.
1564
1565   ConfigWriter takes care of distributing the config and ssconf files, but
1566   there are more files which should be distributed to all nodes. This function
1567   makes sure those are copied.
1568
1569   @param lu: calling logical unit
1570   @param additional_nodes: list of nodes not in the config to distribute to
1571
1572   """
1573   # 1. Gather target nodes
1574   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1575   dist_nodes = lu.cfg.GetNodeList()
1576   if additional_nodes is not None:
1577     dist_nodes.extend(additional_nodes)
1578   if myself.name in dist_nodes:
1579     dist_nodes.remove(myself.name)
1580   # 2. Gather files to distribute
1581   dist_files = set([constants.ETC_HOSTS,
1582                     constants.SSH_KNOWN_HOSTS_FILE,
1583                     constants.RAPI_CERT_FILE,
1584                     constants.RAPI_USERS_FILE,
1585                    ])
1586
1587   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1588   for hv_name in enabled_hypervisors:
1589     hv_class = hypervisor.GetHypervisor(hv_name)
1590     dist_files.update(hv_class.GetAncillaryFiles())
1591
1592   # 3. Perform the files upload
1593   for fname in dist_files:
1594     if os.path.exists(fname):
1595       result = lu.rpc.call_upload_file(dist_nodes, fname)
1596       for to_node, to_result in result.items():
1597          msg = to_result.RemoteFailMsg()
1598          if msg:
1599            msg = ("Copy of file %s to node %s failed: %s" %
1600                    (fname, to_node, msg))
1601            lu.proc.LogWarning(msg)
1602
1603
1604 class LURedistributeConfig(NoHooksLU):
1605   """Force the redistribution of cluster configuration.
1606
1607   This is a very simple LU.
1608
1609   """
1610   _OP_REQP = []
1611   REQ_BGL = False
1612
1613   def ExpandNames(self):
1614     self.needed_locks = {
1615       locking.LEVEL_NODE: locking.ALL_SET,
1616     }
1617     self.share_locks[locking.LEVEL_NODE] = 1
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     """
1623
1624   def Exec(self, feedback_fn):
1625     """Redistribute the configuration.
1626
1627     """
1628     self.cfg.Update(self.cfg.GetClusterInfo())
1629     _RedistributeAncillaryFiles(self)
1630
1631
1632 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1633   """Sleep and poll for an instance's disk to sync.
1634
1635   """
1636   if not instance.disks:
1637     return True
1638
1639   if not oneshot:
1640     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1641
1642   node = instance.primary_node
1643
1644   for dev in instance.disks:
1645     lu.cfg.SetDiskID(dev, node)
1646
1647   retries = 0
1648   while True:
1649     max_time = 0
1650     done = True
1651     cumul_degraded = False
1652     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1653     if rstats.failed or not rstats.data:
1654       lu.LogWarning("Can't get any data from node %s", node)
1655       retries += 1
1656       if retries >= 10:
1657         raise errors.RemoteError("Can't contact node %s for mirror data,"
1658                                  " aborting." % node)
1659       time.sleep(6)
1660       continue
1661     rstats = rstats.data
1662     retries = 0
1663     for i, mstat in enumerate(rstats):
1664       if mstat is None:
1665         lu.LogWarning("Can't compute data for node %s/%s",
1666                            node, instance.disks[i].iv_name)
1667         continue
1668       # we ignore the ldisk parameter
1669       perc_done, est_time, is_degraded, _ = mstat
1670       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1671       if perc_done is not None:
1672         done = False
1673         if est_time is not None:
1674           rem_time = "%d estimated seconds remaining" % est_time
1675           max_time = est_time
1676         else:
1677           rem_time = "no time estimate"
1678         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1679                         (instance.disks[i].iv_name, perc_done, rem_time))
1680     if done or oneshot:
1681       break
1682
1683     time.sleep(min(60, max_time))
1684
1685   if done:
1686     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1687   return not cumul_degraded
1688
1689
1690 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1691   """Check that mirrors are not degraded.
1692
1693   The ldisk parameter, if True, will change the test from the
1694   is_degraded attribute (which represents overall non-ok status for
1695   the device(s)) to the ldisk (representing the local storage status).
1696
1697   """
1698   lu.cfg.SetDiskID(dev, node)
1699   if ldisk:
1700     idx = 6
1701   else:
1702     idx = 5
1703
1704   result = True
1705   if on_primary or dev.AssembleOnSecondary():
1706     rstats = lu.rpc.call_blockdev_find(node, dev)
1707     msg = rstats.RemoteFailMsg()
1708     if msg:
1709       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1710       result = False
1711     elif not rstats.payload:
1712       lu.LogWarning("Can't find disk on node %s", node)
1713       result = False
1714     else:
1715       result = result and (not rstats.payload[idx])
1716   if dev.children:
1717     for child in dev.children:
1718       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1719
1720   return result
1721
1722
1723 class LUDiagnoseOS(NoHooksLU):
1724   """Logical unit for OS diagnose/query.
1725
1726   """
1727   _OP_REQP = ["output_fields", "names"]
1728   REQ_BGL = False
1729   _FIELDS_STATIC = utils.FieldSet()
1730   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1731
1732   def ExpandNames(self):
1733     if self.op.names:
1734       raise errors.OpPrereqError("Selective OS query not supported")
1735
1736     _CheckOutputFields(static=self._FIELDS_STATIC,
1737                        dynamic=self._FIELDS_DYNAMIC,
1738                        selected=self.op.output_fields)
1739
1740     # Lock all nodes, in shared mode
1741     # Temporary removal of locks, should be reverted later
1742     # TODO: reintroduce locks when they are lighter-weight
1743     self.needed_locks = {}
1744     #self.share_locks[locking.LEVEL_NODE] = 1
1745     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1746
1747   def CheckPrereq(self):
1748     """Check prerequisites.
1749
1750     """
1751
1752   @staticmethod
1753   def _DiagnoseByOS(node_list, rlist):
1754     """Remaps a per-node return list into an a per-os per-node dictionary
1755
1756     @param node_list: a list with the names of all nodes
1757     @param rlist: a map with node names as keys and OS objects as values
1758
1759     @rtype: dict
1760     @return: a dictionary with osnames as keys and as value another map, with
1761         nodes as keys and list of OS objects as values, eg::
1762
1763           {"debian-etch": {"node1": [<object>,...],
1764                            "node2": [<object>,]}
1765           }
1766
1767     """
1768     all_os = {}
1769     # we build here the list of nodes that didn't fail the RPC (at RPC
1770     # level), so that nodes with a non-responding node daemon don't
1771     # make all OSes invalid
1772     good_nodes = [node_name for node_name in rlist
1773                   if not rlist[node_name].failed]
1774     for node_name, nr in rlist.iteritems():
1775       if nr.failed or not nr.data:
1776         continue
1777       for os_obj in nr.data:
1778         if os_obj.name not in all_os:
1779           # build a list of nodes for this os containing empty lists
1780           # for each node in node_list
1781           all_os[os_obj.name] = {}
1782           for nname in good_nodes:
1783             all_os[os_obj.name][nname] = []
1784         all_os[os_obj.name][node_name].append(os_obj)
1785     return all_os
1786
1787   def Exec(self, feedback_fn):
1788     """Compute the list of OSes.
1789
1790     """
1791     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1792     node_data = self.rpc.call_os_diagnose(valid_nodes)
1793     if node_data == False:
1794       raise errors.OpExecError("Can't gather the list of OSes")
1795     pol = self._DiagnoseByOS(valid_nodes, node_data)
1796     output = []
1797     for os_name, os_data in pol.iteritems():
1798       row = []
1799       for field in self.op.output_fields:
1800         if field == "name":
1801           val = os_name
1802         elif field == "valid":
1803           val = utils.all([osl and osl[0] for osl in os_data.values()])
1804         elif field == "node_status":
1805           val = {}
1806           for node_name, nos_list in os_data.iteritems():
1807             val[node_name] = [(v.status, v.path) for v in nos_list]
1808         else:
1809           raise errors.ParameterError(field)
1810         row.append(val)
1811       output.append(row)
1812
1813     return output
1814
1815
1816 class LURemoveNode(LogicalUnit):
1817   """Logical unit for removing a node.
1818
1819   """
1820   HPATH = "node-remove"
1821   HTYPE = constants.HTYPE_NODE
1822   _OP_REQP = ["node_name"]
1823
1824   def BuildHooksEnv(self):
1825     """Build hooks env.
1826
1827     This doesn't run on the target node in the pre phase as a failed
1828     node would then be impossible to remove.
1829
1830     """
1831     env = {
1832       "OP_TARGET": self.op.node_name,
1833       "NODE_NAME": self.op.node_name,
1834       }
1835     all_nodes = self.cfg.GetNodeList()
1836     all_nodes.remove(self.op.node_name)
1837     return env, all_nodes, all_nodes
1838
1839   def CheckPrereq(self):
1840     """Check prerequisites.
1841
1842     This checks:
1843      - the node exists in the configuration
1844      - it does not have primary or secondary instances
1845      - it's not the master
1846
1847     Any errors are signalled by raising errors.OpPrereqError.
1848
1849     """
1850     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1851     if node is None:
1852       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1853
1854     instance_list = self.cfg.GetInstanceList()
1855
1856     masternode = self.cfg.GetMasterNode()
1857     if node.name == masternode:
1858       raise errors.OpPrereqError("Node is the master node,"
1859                                  " you need to failover first.")
1860
1861     for instance_name in instance_list:
1862       instance = self.cfg.GetInstanceInfo(instance_name)
1863       if node.name in instance.all_nodes:
1864         raise errors.OpPrereqError("Instance %s is still running on the node,"
1865                                    " please remove first." % instance_name)
1866     self.op.node_name = node.name
1867     self.node = node
1868
1869   def Exec(self, feedback_fn):
1870     """Removes the node from the cluster.
1871
1872     """
1873     node = self.node
1874     logging.info("Stopping the node daemon and removing configs from node %s",
1875                  node.name)
1876
1877     self.context.RemoveNode(node.name)
1878
1879     self.rpc.call_node_leave_cluster(node.name)
1880
1881     # Promote nodes to master candidate as needed
1882     _AdjustCandidatePool(self)
1883
1884
1885 class LUQueryNodes(NoHooksLU):
1886   """Logical unit for querying nodes.
1887
1888   """
1889   _OP_REQP = ["output_fields", "names", "use_locking"]
1890   REQ_BGL = False
1891   _FIELDS_DYNAMIC = utils.FieldSet(
1892     "dtotal", "dfree",
1893     "mtotal", "mnode", "mfree",
1894     "bootid",
1895     "ctotal", "cnodes", "csockets",
1896     )
1897
1898   _FIELDS_STATIC = utils.FieldSet(
1899     "name", "pinst_cnt", "sinst_cnt",
1900     "pinst_list", "sinst_list",
1901     "pip", "sip", "tags",
1902     "serial_no",
1903     "master_candidate",
1904     "master",
1905     "offline",
1906     "drained",
1907     )
1908
1909   def ExpandNames(self):
1910     _CheckOutputFields(static=self._FIELDS_STATIC,
1911                        dynamic=self._FIELDS_DYNAMIC,
1912                        selected=self.op.output_fields)
1913
1914     self.needed_locks = {}
1915     self.share_locks[locking.LEVEL_NODE] = 1
1916
1917     if self.op.names:
1918       self.wanted = _GetWantedNodes(self, self.op.names)
1919     else:
1920       self.wanted = locking.ALL_SET
1921
1922     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1923     self.do_locking = self.do_node_query and self.op.use_locking
1924     if self.do_locking:
1925       # if we don't request only static fields, we need to lock the nodes
1926       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1927
1928
1929   def CheckPrereq(self):
1930     """Check prerequisites.
1931
1932     """
1933     # The validation of the node list is done in the _GetWantedNodes,
1934     # if non empty, and if empty, there's no validation to do
1935     pass
1936
1937   def Exec(self, feedback_fn):
1938     """Computes the list of nodes and their attributes.
1939
1940     """
1941     all_info = self.cfg.GetAllNodesInfo()
1942     if self.do_locking:
1943       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1944     elif self.wanted != locking.ALL_SET:
1945       nodenames = self.wanted
1946       missing = set(nodenames).difference(all_info.keys())
1947       if missing:
1948         raise errors.OpExecError(
1949           "Some nodes were removed before retrieving their data: %s" % missing)
1950     else:
1951       nodenames = all_info.keys()
1952
1953     nodenames = utils.NiceSort(nodenames)
1954     nodelist = [all_info[name] for name in nodenames]
1955
1956     # begin data gathering
1957
1958     if self.do_node_query:
1959       live_data = {}
1960       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1961                                           self.cfg.GetHypervisorType())
1962       for name in nodenames:
1963         nodeinfo = node_data[name]
1964         if not nodeinfo.failed and nodeinfo.data:
1965           nodeinfo = nodeinfo.data
1966           fn = utils.TryConvert
1967           live_data[name] = {
1968             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1969             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1970             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1971             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1972             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1973             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1974             "bootid": nodeinfo.get('bootid', None),
1975             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1976             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1977             }
1978         else:
1979           live_data[name] = {}
1980     else:
1981       live_data = dict.fromkeys(nodenames, {})
1982
1983     node_to_primary = dict([(name, set()) for name in nodenames])
1984     node_to_secondary = dict([(name, set()) for name in nodenames])
1985
1986     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1987                              "sinst_cnt", "sinst_list"))
1988     if inst_fields & frozenset(self.op.output_fields):
1989       instancelist = self.cfg.GetInstanceList()
1990
1991       for instance_name in instancelist:
1992         inst = self.cfg.GetInstanceInfo(instance_name)
1993         if inst.primary_node in node_to_primary:
1994           node_to_primary[inst.primary_node].add(inst.name)
1995         for secnode in inst.secondary_nodes:
1996           if secnode in node_to_secondary:
1997             node_to_secondary[secnode].add(inst.name)
1998
1999     master_node = self.cfg.GetMasterNode()
2000
2001     # end data gathering
2002
2003     output = []
2004     for node in nodelist:
2005       node_output = []
2006       for field in self.op.output_fields:
2007         if field == "name":
2008           val = node.name
2009         elif field == "pinst_list":
2010           val = list(node_to_primary[node.name])
2011         elif field == "sinst_list":
2012           val = list(node_to_secondary[node.name])
2013         elif field == "pinst_cnt":
2014           val = len(node_to_primary[node.name])
2015         elif field == "sinst_cnt":
2016           val = len(node_to_secondary[node.name])
2017         elif field == "pip":
2018           val = node.primary_ip
2019         elif field == "sip":
2020           val = node.secondary_ip
2021         elif field == "tags":
2022           val = list(node.GetTags())
2023         elif field == "serial_no":
2024           val = node.serial_no
2025         elif field == "master_candidate":
2026           val = node.master_candidate
2027         elif field == "master":
2028           val = node.name == master_node
2029         elif field == "offline":
2030           val = node.offline
2031         elif field == "drained":
2032           val = node.drained
2033         elif self._FIELDS_DYNAMIC.Matches(field):
2034           val = live_data[node.name].get(field, None)
2035         else:
2036           raise errors.ParameterError(field)
2037         node_output.append(val)
2038       output.append(node_output)
2039
2040     return output
2041
2042
2043 class LUQueryNodeVolumes(NoHooksLU):
2044   """Logical unit for getting volumes on node(s).
2045
2046   """
2047   _OP_REQP = ["nodes", "output_fields"]
2048   REQ_BGL = False
2049   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2050   _FIELDS_STATIC = utils.FieldSet("node")
2051
2052   def ExpandNames(self):
2053     _CheckOutputFields(static=self._FIELDS_STATIC,
2054                        dynamic=self._FIELDS_DYNAMIC,
2055                        selected=self.op.output_fields)
2056
2057     self.needed_locks = {}
2058     self.share_locks[locking.LEVEL_NODE] = 1
2059     if not self.op.nodes:
2060       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2061     else:
2062       self.needed_locks[locking.LEVEL_NODE] = \
2063         _GetWantedNodes(self, self.op.nodes)
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     This checks that the fields required are valid output fields.
2069
2070     """
2071     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2072
2073   def Exec(self, feedback_fn):
2074     """Computes the list of nodes and their attributes.
2075
2076     """
2077     nodenames = self.nodes
2078     volumes = self.rpc.call_node_volumes(nodenames)
2079
2080     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2081              in self.cfg.GetInstanceList()]
2082
2083     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2084
2085     output = []
2086     for node in nodenames:
2087       if node not in volumes or volumes[node].failed or not volumes[node].data:
2088         continue
2089
2090       node_vols = volumes[node].data[:]
2091       node_vols.sort(key=lambda vol: vol['dev'])
2092
2093       for vol in node_vols:
2094         node_output = []
2095         for field in self.op.output_fields:
2096           if field == "node":
2097             val = node
2098           elif field == "phys":
2099             val = vol['dev']
2100           elif field == "vg":
2101             val = vol['vg']
2102           elif field == "name":
2103             val = vol['name']
2104           elif field == "size":
2105             val = int(float(vol['size']))
2106           elif field == "instance":
2107             for inst in ilist:
2108               if node not in lv_by_node[inst]:
2109                 continue
2110               if vol['name'] in lv_by_node[inst][node]:
2111                 val = inst.name
2112                 break
2113             else:
2114               val = '-'
2115           else:
2116             raise errors.ParameterError(field)
2117           node_output.append(str(val))
2118
2119         output.append(node_output)
2120
2121     return output
2122
2123
2124 class LUAddNode(LogicalUnit):
2125   """Logical unit for adding node to the cluster.
2126
2127   """
2128   HPATH = "node-add"
2129   HTYPE = constants.HTYPE_NODE
2130   _OP_REQP = ["node_name"]
2131
2132   def BuildHooksEnv(self):
2133     """Build hooks env.
2134
2135     This will run on all nodes before, and on all nodes + the new node after.
2136
2137     """
2138     env = {
2139       "OP_TARGET": self.op.node_name,
2140       "NODE_NAME": self.op.node_name,
2141       "NODE_PIP": self.op.primary_ip,
2142       "NODE_SIP": self.op.secondary_ip,
2143       }
2144     nodes_0 = self.cfg.GetNodeList()
2145     nodes_1 = nodes_0 + [self.op.node_name, ]
2146     return env, nodes_0, nodes_1
2147
2148   def CheckPrereq(self):
2149     """Check prerequisites.
2150
2151     This checks:
2152      - the new node is not already in the config
2153      - it is resolvable
2154      - its parameters (single/dual homed) matches the cluster
2155
2156     Any errors are signalled by raising errors.OpPrereqError.
2157
2158     """
2159     node_name = self.op.node_name
2160     cfg = self.cfg
2161
2162     dns_data = utils.HostInfo(node_name)
2163
2164     node = dns_data.name
2165     primary_ip = self.op.primary_ip = dns_data.ip
2166     secondary_ip = getattr(self.op, "secondary_ip", None)
2167     if secondary_ip is None:
2168       secondary_ip = primary_ip
2169     if not utils.IsValidIP(secondary_ip):
2170       raise errors.OpPrereqError("Invalid secondary IP given")
2171     self.op.secondary_ip = secondary_ip
2172
2173     node_list = cfg.GetNodeList()
2174     if not self.op.readd and node in node_list:
2175       raise errors.OpPrereqError("Node %s is already in the configuration" %
2176                                  node)
2177     elif self.op.readd and node not in node_list:
2178       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2179
2180     for existing_node_name in node_list:
2181       existing_node = cfg.GetNodeInfo(existing_node_name)
2182
2183       if self.op.readd and node == existing_node_name:
2184         if (existing_node.primary_ip != primary_ip or
2185             existing_node.secondary_ip != secondary_ip):
2186           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2187                                      " address configuration as before")
2188         continue
2189
2190       if (existing_node.primary_ip == primary_ip or
2191           existing_node.secondary_ip == primary_ip or
2192           existing_node.primary_ip == secondary_ip or
2193           existing_node.secondary_ip == secondary_ip):
2194         raise errors.OpPrereqError("New node ip address(es) conflict with"
2195                                    " existing node %s" % existing_node.name)
2196
2197     # check that the type of the node (single versus dual homed) is the
2198     # same as for the master
2199     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2200     master_singlehomed = myself.secondary_ip == myself.primary_ip
2201     newbie_singlehomed = secondary_ip == primary_ip
2202     if master_singlehomed != newbie_singlehomed:
2203       if master_singlehomed:
2204         raise errors.OpPrereqError("The master has no private ip but the"
2205                                    " new node has one")
2206       else:
2207         raise errors.OpPrereqError("The master has a private ip but the"
2208                                    " new node doesn't have one")
2209
2210     # checks reachablity
2211     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2212       raise errors.OpPrereqError("Node not reachable by ping")
2213
2214     if not newbie_singlehomed:
2215       # check reachability from my secondary ip to newbie's secondary ip
2216       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2217                            source=myself.secondary_ip):
2218         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2219                                    " based ping to noded port")
2220
2221     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2222     mc_now, _ = self.cfg.GetMasterCandidateStats()
2223     master_candidate = mc_now < cp_size
2224
2225     self.new_node = objects.Node(name=node,
2226                                  primary_ip=primary_ip,
2227                                  secondary_ip=secondary_ip,
2228                                  master_candidate=master_candidate,
2229                                  offline=False, drained=False)
2230
2231   def Exec(self, feedback_fn):
2232     """Adds the new node to the cluster.
2233
2234     """
2235     new_node = self.new_node
2236     node = new_node.name
2237
2238     # check connectivity
2239     result = self.rpc.call_version([node])[node]
2240     result.Raise()
2241     if result.data:
2242       if constants.PROTOCOL_VERSION == result.data:
2243         logging.info("Communication to node %s fine, sw version %s match",
2244                      node, result.data)
2245       else:
2246         raise errors.OpExecError("Version mismatch master version %s,"
2247                                  " node version %s" %
2248                                  (constants.PROTOCOL_VERSION, result.data))
2249     else:
2250       raise errors.OpExecError("Cannot get version from the new node")
2251
2252     # setup ssh on node
2253     logging.info("Copy ssh key to node %s", node)
2254     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2255     keyarray = []
2256     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2257                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2258                 priv_key, pub_key]
2259
2260     for i in keyfiles:
2261       f = open(i, 'r')
2262       try:
2263         keyarray.append(f.read())
2264       finally:
2265         f.close()
2266
2267     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2268                                     keyarray[2],
2269                                     keyarray[3], keyarray[4], keyarray[5])
2270
2271     msg = result.RemoteFailMsg()
2272     if msg:
2273       raise errors.OpExecError("Cannot transfer ssh keys to the"
2274                                " new node: %s" % msg)
2275
2276     # Add node to our /etc/hosts, and add key to known_hosts
2277     if self.cfg.GetClusterInfo().modify_etc_hosts:
2278       utils.AddHostToEtcHosts(new_node.name)
2279
2280     if new_node.secondary_ip != new_node.primary_ip:
2281       result = self.rpc.call_node_has_ip_address(new_node.name,
2282                                                  new_node.secondary_ip)
2283       if result.failed or not result.data:
2284         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2285                                  " you gave (%s). Please fix and re-run this"
2286                                  " command." % new_node.secondary_ip)
2287
2288     node_verify_list = [self.cfg.GetMasterNode()]
2289     node_verify_param = {
2290       'nodelist': [node],
2291       # TODO: do a node-net-test as well?
2292     }
2293
2294     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2295                                        self.cfg.GetClusterName())
2296     for verifier in node_verify_list:
2297       if result[verifier].failed or not result[verifier].data:
2298         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2299                                  " for remote verification" % verifier)
2300       if result[verifier].data['nodelist']:
2301         for failed in result[verifier].data['nodelist']:
2302           feedback_fn("ssh/hostname verification failed %s -> %s" %
2303                       (verifier, result[verifier].data['nodelist'][failed]))
2304         raise errors.OpExecError("ssh/hostname verification failed.")
2305
2306     if self.op.readd:
2307       _RedistributeAncillaryFiles(self)
2308       self.context.ReaddNode(new_node)
2309     else:
2310       _RedistributeAncillaryFiles(self, additional_nodes=node)
2311       self.context.AddNode(new_node)
2312
2313
2314 class LUSetNodeParams(LogicalUnit):
2315   """Modifies the parameters of a node.
2316
2317   """
2318   HPATH = "node-modify"
2319   HTYPE = constants.HTYPE_NODE
2320   _OP_REQP = ["node_name"]
2321   REQ_BGL = False
2322
2323   def CheckArguments(self):
2324     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2325     if node_name is None:
2326       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2327     self.op.node_name = node_name
2328     _CheckBooleanOpField(self.op, 'master_candidate')
2329     _CheckBooleanOpField(self.op, 'offline')
2330     _CheckBooleanOpField(self.op, 'drained')
2331     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2332     if all_mods.count(None) == 3:
2333       raise errors.OpPrereqError("Please pass at least one modification")
2334     if all_mods.count(True) > 1:
2335       raise errors.OpPrereqError("Can't set the node into more than one"
2336                                  " state at the same time")
2337
2338   def ExpandNames(self):
2339     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2340
2341   def BuildHooksEnv(self):
2342     """Build hooks env.
2343
2344     This runs on the master node.
2345
2346     """
2347     env = {
2348       "OP_TARGET": self.op.node_name,
2349       "MASTER_CANDIDATE": str(self.op.master_candidate),
2350       "OFFLINE": str(self.op.offline),
2351       "DRAINED": str(self.op.drained),
2352       }
2353     nl = [self.cfg.GetMasterNode(),
2354           self.op.node_name]
2355     return env, nl, nl
2356
2357   def CheckPrereq(self):
2358     """Check prerequisites.
2359
2360     This only checks the instance list against the existing names.
2361
2362     """
2363     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2364
2365     if ((self.op.master_candidate == False or self.op.offline == True or
2366          self.op.drained == True) and node.master_candidate):
2367       # we will demote the node from master_candidate
2368       if self.op.node_name == self.cfg.GetMasterNode():
2369         raise errors.OpPrereqError("The master node has to be a"
2370                                    " master candidate, online and not drained")
2371       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2372       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2373       if num_candidates <= cp_size:
2374         msg = ("Not enough master candidates (desired"
2375                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2376         if self.op.force:
2377           self.LogWarning(msg)
2378         else:
2379           raise errors.OpPrereqError(msg)
2380
2381     if (self.op.master_candidate == True and
2382         ((node.offline and not self.op.offline == False) or
2383          (node.drained and not self.op.drained == False))):
2384       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2385                                  " to master_candidate" % node.name)
2386
2387     return
2388
2389   def Exec(self, feedback_fn):
2390     """Modifies a node.
2391
2392     """
2393     node = self.node
2394
2395     result = []
2396     changed_mc = False
2397
2398     if self.op.offline is not None:
2399       node.offline = self.op.offline
2400       result.append(("offline", str(self.op.offline)))
2401       if self.op.offline == True:
2402         if node.master_candidate:
2403           node.master_candidate = False
2404           changed_mc = True
2405           result.append(("master_candidate", "auto-demotion due to offline"))
2406         if node.drained:
2407           node.drained = False
2408           result.append(("drained", "clear drained status due to offline"))
2409
2410     if self.op.master_candidate is not None:
2411       node.master_candidate = self.op.master_candidate
2412       changed_mc = True
2413       result.append(("master_candidate", str(self.op.master_candidate)))
2414       if self.op.master_candidate == False:
2415         rrc = self.rpc.call_node_demote_from_mc(node.name)
2416         msg = rrc.RemoteFailMsg()
2417         if msg:
2418           self.LogWarning("Node failed to demote itself: %s" % msg)
2419
2420     if self.op.drained is not None:
2421       node.drained = self.op.drained
2422       result.append(("drained", str(self.op.drained)))
2423       if self.op.drained == True:
2424         if node.master_candidate:
2425           node.master_candidate = False
2426           changed_mc = True
2427           result.append(("master_candidate", "auto-demotion due to drain"))
2428         if node.offline:
2429           node.offline = False
2430           result.append(("offline", "clear offline status due to drain"))
2431
2432     # this will trigger configuration file update, if needed
2433     self.cfg.Update(node)
2434     # this will trigger job queue propagation or cleanup
2435     if changed_mc:
2436       self.context.ReaddNode(node)
2437
2438     return result
2439
2440
2441 class LUPowercycleNode(NoHooksLU):
2442   """Powercycles a node.
2443
2444   """
2445   _OP_REQP = ["node_name", "force"]
2446   REQ_BGL = False
2447
2448   def CheckArguments(self):
2449     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2450     if node_name is None:
2451       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2452     self.op.node_name = node_name
2453     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2454       raise errors.OpPrereqError("The node is the master and the force"
2455                                  " parameter was not set")
2456
2457   def ExpandNames(self):
2458     """Locking for PowercycleNode.
2459
2460     This is a last-resource option and shouldn't block on other
2461     jobs. Therefore, we grab no locks.
2462
2463     """
2464     self.needed_locks = {}
2465
2466   def CheckPrereq(self):
2467     """Check prerequisites.
2468
2469     This LU has no prereqs.
2470
2471     """
2472     pass
2473
2474   def Exec(self, feedback_fn):
2475     """Reboots a node.
2476
2477     """
2478     result = self.rpc.call_node_powercycle(self.op.node_name,
2479                                            self.cfg.GetHypervisorType())
2480     msg = result.RemoteFailMsg()
2481     if msg:
2482       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2483     return result.payload
2484
2485
2486 class LUQueryClusterInfo(NoHooksLU):
2487   """Query cluster configuration.
2488
2489   """
2490   _OP_REQP = []
2491   REQ_BGL = False
2492
2493   def ExpandNames(self):
2494     self.needed_locks = {}
2495
2496   def CheckPrereq(self):
2497     """No prerequsites needed for this LU.
2498
2499     """
2500     pass
2501
2502   def Exec(self, feedback_fn):
2503     """Return cluster config.
2504
2505     """
2506     cluster = self.cfg.GetClusterInfo()
2507     result = {
2508       "software_version": constants.RELEASE_VERSION,
2509       "protocol_version": constants.PROTOCOL_VERSION,
2510       "config_version": constants.CONFIG_VERSION,
2511       "os_api_version": constants.OS_API_VERSION,
2512       "export_version": constants.EXPORT_VERSION,
2513       "architecture": (platform.architecture()[0], platform.machine()),
2514       "name": cluster.cluster_name,
2515       "master": cluster.master_node,
2516       "default_hypervisor": cluster.default_hypervisor,
2517       "enabled_hypervisors": cluster.enabled_hypervisors,
2518       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2519                         for hypervisor in cluster.enabled_hypervisors]),
2520       "beparams": cluster.beparams,
2521       "candidate_pool_size": cluster.candidate_pool_size,
2522       "default_bridge": cluster.default_bridge,
2523       "master_netdev": cluster.master_netdev,
2524       "volume_group_name": cluster.volume_group_name,
2525       "file_storage_dir": cluster.file_storage_dir,
2526       }
2527
2528     return result
2529
2530
2531 class LUQueryConfigValues(NoHooksLU):
2532   """Return configuration values.
2533
2534   """
2535   _OP_REQP = []
2536   REQ_BGL = False
2537   _FIELDS_DYNAMIC = utils.FieldSet()
2538   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2539
2540   def ExpandNames(self):
2541     self.needed_locks = {}
2542
2543     _CheckOutputFields(static=self._FIELDS_STATIC,
2544                        dynamic=self._FIELDS_DYNAMIC,
2545                        selected=self.op.output_fields)
2546
2547   def CheckPrereq(self):
2548     """No prerequisites.
2549
2550     """
2551     pass
2552
2553   def Exec(self, feedback_fn):
2554     """Dump a representation of the cluster config to the standard output.
2555
2556     """
2557     values = []
2558     for field in self.op.output_fields:
2559       if field == "cluster_name":
2560         entry = self.cfg.GetClusterName()
2561       elif field == "master_node":
2562         entry = self.cfg.GetMasterNode()
2563       elif field == "drain_flag":
2564         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2565       else:
2566         raise errors.ParameterError(field)
2567       values.append(entry)
2568     return values
2569
2570
2571 class LUActivateInstanceDisks(NoHooksLU):
2572   """Bring up an instance's disks.
2573
2574   """
2575   _OP_REQP = ["instance_name"]
2576   REQ_BGL = False
2577
2578   def ExpandNames(self):
2579     self._ExpandAndLockInstance()
2580     self.needed_locks[locking.LEVEL_NODE] = []
2581     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2582
2583   def DeclareLocks(self, level):
2584     if level == locking.LEVEL_NODE:
2585       self._LockInstancesNodes()
2586
2587   def CheckPrereq(self):
2588     """Check prerequisites.
2589
2590     This checks that the instance is in the cluster.
2591
2592     """
2593     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2594     assert self.instance is not None, \
2595       "Cannot retrieve locked instance %s" % self.op.instance_name
2596     _CheckNodeOnline(self, self.instance.primary_node)
2597
2598   def Exec(self, feedback_fn):
2599     """Activate the disks.
2600
2601     """
2602     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2603     if not disks_ok:
2604       raise errors.OpExecError("Cannot activate block devices")
2605
2606     return disks_info
2607
2608
2609 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2610   """Prepare the block devices for an instance.
2611
2612   This sets up the block devices on all nodes.
2613
2614   @type lu: L{LogicalUnit}
2615   @param lu: the logical unit on whose behalf we execute
2616   @type instance: L{objects.Instance}
2617   @param instance: the instance for whose disks we assemble
2618   @type ignore_secondaries: boolean
2619   @param ignore_secondaries: if true, errors on secondary nodes
2620       won't result in an error return from the function
2621   @return: False if the operation failed, otherwise a list of
2622       (host, instance_visible_name, node_visible_name)
2623       with the mapping from node devices to instance devices
2624
2625   """
2626   device_info = []
2627   disks_ok = True
2628   iname = instance.name
2629   # With the two passes mechanism we try to reduce the window of
2630   # opportunity for the race condition of switching DRBD to primary
2631   # before handshaking occured, but we do not eliminate it
2632
2633   # The proper fix would be to wait (with some limits) until the
2634   # connection has been made and drbd transitions from WFConnection
2635   # into any other network-connected state (Connected, SyncTarget,
2636   # SyncSource, etc.)
2637
2638   # 1st pass, assemble on all nodes in secondary mode
2639   for inst_disk in instance.disks:
2640     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2641       lu.cfg.SetDiskID(node_disk, node)
2642       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2643       msg = result.RemoteFailMsg()
2644       if msg:
2645         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2646                            " (is_primary=False, pass=1): %s",
2647                            inst_disk.iv_name, node, msg)
2648         if not ignore_secondaries:
2649           disks_ok = False
2650
2651   # FIXME: race condition on drbd migration to primary
2652
2653   # 2nd pass, do only the primary node
2654   for inst_disk in instance.disks:
2655     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2656       if node != instance.primary_node:
2657         continue
2658       lu.cfg.SetDiskID(node_disk, node)
2659       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2660       msg = result.RemoteFailMsg()
2661       if msg:
2662         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2663                            " (is_primary=True, pass=2): %s",
2664                            inst_disk.iv_name, node, msg)
2665         disks_ok = False
2666     device_info.append((instance.primary_node, inst_disk.iv_name,
2667                         result.payload))
2668
2669   # leave the disks configured for the primary node
2670   # this is a workaround that would be fixed better by
2671   # improving the logical/physical id handling
2672   for disk in instance.disks:
2673     lu.cfg.SetDiskID(disk, instance.primary_node)
2674
2675   return disks_ok, device_info
2676
2677
2678 def _StartInstanceDisks(lu, instance, force):
2679   """Start the disks of an instance.
2680
2681   """
2682   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2683                                            ignore_secondaries=force)
2684   if not disks_ok:
2685     _ShutdownInstanceDisks(lu, instance)
2686     if force is not None and not force:
2687       lu.proc.LogWarning("", hint="If the message above refers to a"
2688                          " secondary node,"
2689                          " you can retry the operation using '--force'.")
2690     raise errors.OpExecError("Disk consistency error")
2691
2692
2693 class LUDeactivateInstanceDisks(NoHooksLU):
2694   """Shutdown an instance's disks.
2695
2696   """
2697   _OP_REQP = ["instance_name"]
2698   REQ_BGL = False
2699
2700   def ExpandNames(self):
2701     self._ExpandAndLockInstance()
2702     self.needed_locks[locking.LEVEL_NODE] = []
2703     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2704
2705   def DeclareLocks(self, level):
2706     if level == locking.LEVEL_NODE:
2707       self._LockInstancesNodes()
2708
2709   def CheckPrereq(self):
2710     """Check prerequisites.
2711
2712     This checks that the instance is in the cluster.
2713
2714     """
2715     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716     assert self.instance is not None, \
2717       "Cannot retrieve locked instance %s" % self.op.instance_name
2718
2719   def Exec(self, feedback_fn):
2720     """Deactivate the disks
2721
2722     """
2723     instance = self.instance
2724     _SafeShutdownInstanceDisks(self, instance)
2725
2726
2727 def _SafeShutdownInstanceDisks(lu, instance):
2728   """Shutdown block devices of an instance.
2729
2730   This function checks if an instance is running, before calling
2731   _ShutdownInstanceDisks.
2732
2733   """
2734   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2735                                       [instance.hypervisor])
2736   ins_l = ins_l[instance.primary_node]
2737   if ins_l.failed or not isinstance(ins_l.data, list):
2738     raise errors.OpExecError("Can't contact node '%s'" %
2739                              instance.primary_node)
2740
2741   if instance.name in ins_l.data:
2742     raise errors.OpExecError("Instance is running, can't shutdown"
2743                              " block devices.")
2744
2745   _ShutdownInstanceDisks(lu, instance)
2746
2747
2748 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2749   """Shutdown block devices of an instance.
2750
2751   This does the shutdown on all nodes of the instance.
2752
2753   If the ignore_primary is false, errors on the primary node are
2754   ignored.
2755
2756   """
2757   all_result = True
2758   for disk in instance.disks:
2759     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2760       lu.cfg.SetDiskID(top_disk, node)
2761       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2762       msg = result.RemoteFailMsg()
2763       if msg:
2764         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2765                       disk.iv_name, node, msg)
2766         if not ignore_primary or node != instance.primary_node:
2767           all_result = False
2768   return all_result
2769
2770
2771 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2772   """Checks if a node has enough free memory.
2773
2774   This function check if a given node has the needed amount of free
2775   memory. In case the node has less memory or we cannot get the
2776   information from the node, this function raise an OpPrereqError
2777   exception.
2778
2779   @type lu: C{LogicalUnit}
2780   @param lu: a logical unit from which we get configuration data
2781   @type node: C{str}
2782   @param node: the node to check
2783   @type reason: C{str}
2784   @param reason: string to use in the error message
2785   @type requested: C{int}
2786   @param requested: the amount of memory in MiB to check for
2787   @type hypervisor_name: C{str}
2788   @param hypervisor_name: the hypervisor to ask for memory stats
2789   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2790       we cannot check the node
2791
2792   """
2793   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2794   nodeinfo[node].Raise()
2795   free_mem = nodeinfo[node].data.get('memory_free')
2796   if not isinstance(free_mem, int):
2797     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2798                              " was '%s'" % (node, free_mem))
2799   if requested > free_mem:
2800     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2801                              " needed %s MiB, available %s MiB" %
2802                              (node, reason, requested, free_mem))
2803
2804
2805 class LUStartupInstance(LogicalUnit):
2806   """Starts an instance.
2807
2808   """
2809   HPATH = "instance-start"
2810   HTYPE = constants.HTYPE_INSTANCE
2811   _OP_REQP = ["instance_name", "force"]
2812   REQ_BGL = False
2813
2814   def ExpandNames(self):
2815     self._ExpandAndLockInstance()
2816
2817   def BuildHooksEnv(self):
2818     """Build hooks env.
2819
2820     This runs on master, primary and secondary nodes of the instance.
2821
2822     """
2823     env = {
2824       "FORCE": self.op.force,
2825       }
2826     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2827     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2828     return env, nl, nl
2829
2830   def CheckPrereq(self):
2831     """Check prerequisites.
2832
2833     This checks that the instance is in the cluster.
2834
2835     """
2836     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2837     assert self.instance is not None, \
2838       "Cannot retrieve locked instance %s" % self.op.instance_name
2839
2840     # extra beparams
2841     self.beparams = getattr(self.op, "beparams", {})
2842     if self.beparams:
2843       if not isinstance(self.beparams, dict):
2844         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2845                                    " dict" % (type(self.beparams), ))
2846       # fill the beparams dict
2847       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2848       self.op.beparams = self.beparams
2849
2850     # extra hvparams
2851     self.hvparams = getattr(self.op, "hvparams", {})
2852     if self.hvparams:
2853       if not isinstance(self.hvparams, dict):
2854         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2855                                    " dict" % (type(self.hvparams), ))
2856
2857       # check hypervisor parameter syntax (locally)
2858       cluster = self.cfg.GetClusterInfo()
2859       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2860       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2861                                     instance.hvparams)
2862       filled_hvp.update(self.hvparams)
2863       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2864       hv_type.CheckParameterSyntax(filled_hvp)
2865       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2866       self.op.hvparams = self.hvparams
2867
2868     _CheckNodeOnline(self, instance.primary_node)
2869
2870     bep = self.cfg.GetClusterInfo().FillBE(instance)
2871     # check bridges existance
2872     _CheckInstanceBridgesExist(self, instance)
2873
2874     remote_info = self.rpc.call_instance_info(instance.primary_node,
2875                                               instance.name,
2876                                               instance.hypervisor)
2877     remote_info.Raise()
2878     if not remote_info.data:
2879       _CheckNodeFreeMemory(self, instance.primary_node,
2880                            "starting instance %s" % instance.name,
2881                            bep[constants.BE_MEMORY], instance.hypervisor)
2882
2883   def Exec(self, feedback_fn):
2884     """Start the instance.
2885
2886     """
2887     instance = self.instance
2888     force = self.op.force
2889
2890     self.cfg.MarkInstanceUp(instance.name)
2891
2892     node_current = instance.primary_node
2893
2894     _StartInstanceDisks(self, instance, force)
2895
2896     result = self.rpc.call_instance_start(node_current, instance,
2897                                           self.hvparams, self.beparams)
2898     msg = result.RemoteFailMsg()
2899     if msg:
2900       _ShutdownInstanceDisks(self, instance)
2901       raise errors.OpExecError("Could not start instance: %s" % msg)
2902
2903
2904 class LURebootInstance(LogicalUnit):
2905   """Reboot an instance.
2906
2907   """
2908   HPATH = "instance-reboot"
2909   HTYPE = constants.HTYPE_INSTANCE
2910   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2911   REQ_BGL = False
2912
2913   def ExpandNames(self):
2914     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2915                                    constants.INSTANCE_REBOOT_HARD,
2916                                    constants.INSTANCE_REBOOT_FULL]:
2917       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2918                                   (constants.INSTANCE_REBOOT_SOFT,
2919                                    constants.INSTANCE_REBOOT_HARD,
2920                                    constants.INSTANCE_REBOOT_FULL))
2921     self._ExpandAndLockInstance()
2922
2923   def BuildHooksEnv(self):
2924     """Build hooks env.
2925
2926     This runs on master, primary and secondary nodes of the instance.
2927
2928     """
2929     env = {
2930       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2931       "REBOOT_TYPE": self.op.reboot_type,
2932       }
2933     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2934     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2935     return env, nl, nl
2936
2937   def CheckPrereq(self):
2938     """Check prerequisites.
2939
2940     This checks that the instance is in the cluster.
2941
2942     """
2943     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2944     assert self.instance is not None, \
2945       "Cannot retrieve locked instance %s" % self.op.instance_name
2946
2947     _CheckNodeOnline(self, instance.primary_node)
2948
2949     # check bridges existance
2950     _CheckInstanceBridgesExist(self, instance)
2951
2952   def Exec(self, feedback_fn):
2953     """Reboot the instance.
2954
2955     """
2956     instance = self.instance
2957     ignore_secondaries = self.op.ignore_secondaries
2958     reboot_type = self.op.reboot_type
2959
2960     node_current = instance.primary_node
2961
2962     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2963                        constants.INSTANCE_REBOOT_HARD]:
2964       for disk in instance.disks:
2965         self.cfg.SetDiskID(disk, node_current)
2966       result = self.rpc.call_instance_reboot(node_current, instance,
2967                                              reboot_type)
2968       msg = result.RemoteFailMsg()
2969       if msg:
2970         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2971     else:
2972       result = self.rpc.call_instance_shutdown(node_current, instance)
2973       msg = result.RemoteFailMsg()
2974       if msg:
2975         raise errors.OpExecError("Could not shutdown instance for"
2976                                  " full reboot: %s" % msg)
2977       _ShutdownInstanceDisks(self, instance)
2978       _StartInstanceDisks(self, instance, ignore_secondaries)
2979       result = self.rpc.call_instance_start(node_current, instance, None, None)
2980       msg = result.RemoteFailMsg()
2981       if msg:
2982         _ShutdownInstanceDisks(self, instance)
2983         raise errors.OpExecError("Could not start instance for"
2984                                  " full reboot: %s" % msg)
2985
2986     self.cfg.MarkInstanceUp(instance.name)
2987
2988
2989 class LUShutdownInstance(LogicalUnit):
2990   """Shutdown an instance.
2991
2992   """
2993   HPATH = "instance-stop"
2994   HTYPE = constants.HTYPE_INSTANCE
2995   _OP_REQP = ["instance_name"]
2996   REQ_BGL = False
2997
2998   def ExpandNames(self):
2999     self._ExpandAndLockInstance()
3000
3001   def BuildHooksEnv(self):
3002     """Build hooks env.
3003
3004     This runs on master, primary and secondary nodes of the instance.
3005
3006     """
3007     env = _BuildInstanceHookEnvByObject(self, self.instance)
3008     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3009     return env, nl, nl
3010
3011   def CheckPrereq(self):
3012     """Check prerequisites.
3013
3014     This checks that the instance is in the cluster.
3015
3016     """
3017     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3018     assert self.instance is not None, \
3019       "Cannot retrieve locked instance %s" % self.op.instance_name
3020     _CheckNodeOnline(self, self.instance.primary_node)
3021
3022   def Exec(self, feedback_fn):
3023     """Shutdown the instance.
3024
3025     """
3026     instance = self.instance
3027     node_current = instance.primary_node
3028     self.cfg.MarkInstanceDown(instance.name)
3029     result = self.rpc.call_instance_shutdown(node_current, instance)
3030     msg = result.RemoteFailMsg()
3031     if msg:
3032       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3033
3034     _ShutdownInstanceDisks(self, instance)
3035
3036
3037 class LUReinstallInstance(LogicalUnit):
3038   """Reinstall an instance.
3039
3040   """
3041   HPATH = "instance-reinstall"
3042   HTYPE = constants.HTYPE_INSTANCE
3043   _OP_REQP = ["instance_name"]
3044   REQ_BGL = False
3045
3046   def ExpandNames(self):
3047     self._ExpandAndLockInstance()
3048
3049   def BuildHooksEnv(self):
3050     """Build hooks env.
3051
3052     This runs on master, primary and secondary nodes of the instance.
3053
3054     """
3055     env = _BuildInstanceHookEnvByObject(self, self.instance)
3056     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3057     return env, nl, nl
3058
3059   def CheckPrereq(self):
3060     """Check prerequisites.
3061
3062     This checks that the instance is in the cluster and is not running.
3063
3064     """
3065     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3066     assert instance is not None, \
3067       "Cannot retrieve locked instance %s" % self.op.instance_name
3068     _CheckNodeOnline(self, instance.primary_node)
3069
3070     if instance.disk_template == constants.DT_DISKLESS:
3071       raise errors.OpPrereqError("Instance '%s' has no disks" %
3072                                  self.op.instance_name)
3073     if instance.admin_up:
3074       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3075                                  self.op.instance_name)
3076     remote_info = self.rpc.call_instance_info(instance.primary_node,
3077                                               instance.name,
3078                                               instance.hypervisor)
3079     remote_info.Raise()
3080     if remote_info.data:
3081       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3082                                  (self.op.instance_name,
3083                                   instance.primary_node))
3084
3085     self.op.os_type = getattr(self.op, "os_type", None)
3086     if self.op.os_type is not None:
3087       # OS verification
3088       pnode = self.cfg.GetNodeInfo(
3089         self.cfg.ExpandNodeName(instance.primary_node))
3090       if pnode is None:
3091         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3092                                    self.op.pnode)
3093       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3094       result.Raise()
3095       if not isinstance(result.data, objects.OS):
3096         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3097                                    " primary node"  % self.op.os_type)
3098
3099     self.instance = instance
3100
3101   def Exec(self, feedback_fn):
3102     """Reinstall the instance.
3103
3104     """
3105     inst = self.instance
3106
3107     if self.op.os_type is not None:
3108       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3109       inst.os = self.op.os_type
3110       self.cfg.Update(inst)
3111
3112     _StartInstanceDisks(self, inst, None)
3113     try:
3114       feedback_fn("Running the instance OS create scripts...")
3115       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3116       msg = result.RemoteFailMsg()
3117       if msg:
3118         raise errors.OpExecError("Could not install OS for instance %s"
3119                                  " on node %s: %s" %
3120                                  (inst.name, inst.primary_node, msg))
3121     finally:
3122       _ShutdownInstanceDisks(self, inst)
3123
3124
3125 class LURenameInstance(LogicalUnit):
3126   """Rename an instance.
3127
3128   """
3129   HPATH = "instance-rename"
3130   HTYPE = constants.HTYPE_INSTANCE
3131   _OP_REQP = ["instance_name", "new_name"]
3132
3133   def BuildHooksEnv(self):
3134     """Build hooks env.
3135
3136     This runs on master, primary and secondary nodes of the instance.
3137
3138     """
3139     env = _BuildInstanceHookEnvByObject(self, self.instance)
3140     env["INSTANCE_NEW_NAME"] = self.op.new_name
3141     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3142     return env, nl, nl
3143
3144   def CheckPrereq(self):
3145     """Check prerequisites.
3146
3147     This checks that the instance is in the cluster and is not running.
3148
3149     """
3150     instance = self.cfg.GetInstanceInfo(
3151       self.cfg.ExpandInstanceName(self.op.instance_name))
3152     if instance is None:
3153       raise errors.OpPrereqError("Instance '%s' not known" %
3154                                  self.op.instance_name)
3155     _CheckNodeOnline(self, instance.primary_node)
3156
3157     if instance.admin_up:
3158       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3159                                  self.op.instance_name)
3160     remote_info = self.rpc.call_instance_info(instance.primary_node,
3161                                               instance.name,
3162                                               instance.hypervisor)
3163     remote_info.Raise()
3164     if remote_info.data:
3165       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3166                                  (self.op.instance_name,
3167                                   instance.primary_node))
3168     self.instance = instance
3169
3170     # new name verification
3171     name_info = utils.HostInfo(self.op.new_name)
3172
3173     self.op.new_name = new_name = name_info.name
3174     instance_list = self.cfg.GetInstanceList()
3175     if new_name in instance_list:
3176       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3177                                  new_name)
3178
3179     if not getattr(self.op, "ignore_ip", False):
3180       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3181         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3182                                    (name_info.ip, new_name))
3183
3184
3185   def Exec(self, feedback_fn):
3186     """Reinstall the instance.
3187
3188     """
3189     inst = self.instance
3190     old_name = inst.name
3191
3192     if inst.disk_template == constants.DT_FILE:
3193       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3194
3195     self.cfg.RenameInstance(inst.name, self.op.new_name)
3196     # Change the instance lock. This is definitely safe while we hold the BGL
3197     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3198     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3199
3200     # re-read the instance from the configuration after rename
3201     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3202
3203     if inst.disk_template == constants.DT_FILE:
3204       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3205       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3206                                                      old_file_storage_dir,
3207                                                      new_file_storage_dir)
3208       result.Raise()
3209       if not result.data:
3210         raise errors.OpExecError("Could not connect to node '%s' to rename"
3211                                  " directory '%s' to '%s' (but the instance"
3212                                  " has been renamed in Ganeti)" % (
3213                                  inst.primary_node, old_file_storage_dir,
3214                                  new_file_storage_dir))
3215
3216       if not result.data[0]:
3217         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3218                                  " (but the instance has been renamed in"
3219                                  " Ganeti)" % (old_file_storage_dir,
3220                                                new_file_storage_dir))
3221
3222     _StartInstanceDisks(self, inst, None)
3223     try:
3224       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3225                                                  old_name)
3226       msg = result.RemoteFailMsg()
3227       if msg:
3228         msg = ("Could not run OS rename script for instance %s on node %s"
3229                " (but the instance has been renamed in Ganeti): %s" %
3230                (inst.name, inst.primary_node, msg))
3231         self.proc.LogWarning(msg)
3232     finally:
3233       _ShutdownInstanceDisks(self, inst)
3234
3235
3236 class LURemoveInstance(LogicalUnit):
3237   """Remove an instance.
3238
3239   """
3240   HPATH = "instance-remove"
3241   HTYPE = constants.HTYPE_INSTANCE
3242   _OP_REQP = ["instance_name", "ignore_failures"]
3243   REQ_BGL = False
3244
3245   def ExpandNames(self):
3246     self._ExpandAndLockInstance()
3247     self.needed_locks[locking.LEVEL_NODE] = []
3248     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3249
3250   def DeclareLocks(self, level):
3251     if level == locking.LEVEL_NODE:
3252       self._LockInstancesNodes()
3253
3254   def BuildHooksEnv(self):
3255     """Build hooks env.
3256
3257     This runs on master, primary and secondary nodes of the instance.
3258
3259     """
3260     env = _BuildInstanceHookEnvByObject(self, self.instance)
3261     nl = [self.cfg.GetMasterNode()]
3262     return env, nl, nl
3263
3264   def CheckPrereq(self):
3265     """Check prerequisites.
3266
3267     This checks that the instance is in the cluster.
3268
3269     """
3270     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3271     assert self.instance is not None, \
3272       "Cannot retrieve locked instance %s" % self.op.instance_name
3273
3274   def Exec(self, feedback_fn):
3275     """Remove the instance.
3276
3277     """
3278     instance = self.instance
3279     logging.info("Shutting down instance %s on node %s",
3280                  instance.name, instance.primary_node)
3281
3282     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3283     msg = result.RemoteFailMsg()
3284     if msg:
3285       if self.op.ignore_failures:
3286         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3287       else:
3288         raise errors.OpExecError("Could not shutdown instance %s on"
3289                                  " node %s: %s" %
3290                                  (instance.name, instance.primary_node, msg))
3291
3292     logging.info("Removing block devices for instance %s", instance.name)
3293
3294     if not _RemoveDisks(self, instance):
3295       if self.op.ignore_failures:
3296         feedback_fn("Warning: can't remove instance's disks")
3297       else:
3298         raise errors.OpExecError("Can't remove instance's disks")
3299
3300     logging.info("Removing instance %s out of cluster config", instance.name)
3301
3302     self.cfg.RemoveInstance(instance.name)
3303     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3304
3305
3306 class LUQueryInstances(NoHooksLU):
3307   """Logical unit for querying instances.
3308
3309   """
3310   _OP_REQP = ["output_fields", "names", "use_locking"]
3311   REQ_BGL = False
3312   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3313                                     "admin_state",
3314                                     "disk_template", "ip", "mac", "bridge",
3315                                     "sda_size", "sdb_size", "vcpus", "tags",
3316                                     "network_port", "beparams",
3317                                     r"(disk)\.(size)/([0-9]+)",
3318                                     r"(disk)\.(sizes)", "disk_usage",
3319                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3320                                     r"(nic)\.(macs|ips|bridges)",
3321                                     r"(disk|nic)\.(count)",
3322                                     "serial_no", "hypervisor", "hvparams",] +
3323                                   ["hv/%s" % name
3324                                    for name in constants.HVS_PARAMETERS] +
3325                                   ["be/%s" % name
3326                                    for name in constants.BES_PARAMETERS])
3327   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3328
3329
3330   def ExpandNames(self):
3331     _CheckOutputFields(static=self._FIELDS_STATIC,
3332                        dynamic=self._FIELDS_DYNAMIC,
3333                        selected=self.op.output_fields)
3334
3335     self.needed_locks = {}
3336     self.share_locks[locking.LEVEL_INSTANCE] = 1
3337     self.share_locks[locking.LEVEL_NODE] = 1
3338
3339     if self.op.names:
3340       self.wanted = _GetWantedInstances(self, self.op.names)
3341     else:
3342       self.wanted = locking.ALL_SET
3343
3344     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3345     self.do_locking = self.do_node_query and self.op.use_locking
3346     if self.do_locking:
3347       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3348       self.needed_locks[locking.LEVEL_NODE] = []
3349       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3350
3351   def DeclareLocks(self, level):
3352     if level == locking.LEVEL_NODE and self.do_locking:
3353       self._LockInstancesNodes()
3354
3355   def CheckPrereq(self):
3356     """Check prerequisites.
3357
3358     """
3359     pass
3360
3361   def Exec(self, feedback_fn):
3362     """Computes the list of nodes and their attributes.
3363
3364     """
3365     all_info = self.cfg.GetAllInstancesInfo()
3366     if self.wanted == locking.ALL_SET:
3367       # caller didn't specify instance names, so ordering is not important
3368       if self.do_locking:
3369         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3370       else:
3371         instance_names = all_info.keys()
3372       instance_names = utils.NiceSort(instance_names)
3373     else:
3374       # caller did specify names, so we must keep the ordering
3375       if self.do_locking:
3376         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3377       else:
3378         tgt_set = all_info.keys()
3379       missing = set(self.wanted).difference(tgt_set)
3380       if missing:
3381         raise errors.OpExecError("Some instances were removed before"
3382                                  " retrieving their data: %s" % missing)
3383       instance_names = self.wanted
3384
3385     instance_list = [all_info[iname] for iname in instance_names]
3386
3387     # begin data gathering
3388
3389     nodes = frozenset([inst.primary_node for inst in instance_list])
3390     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3391
3392     bad_nodes = []
3393     off_nodes = []
3394     if self.do_node_query:
3395       live_data = {}
3396       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3397       for name in nodes:
3398         result = node_data[name]
3399         if result.offline:
3400           # offline nodes will be in both lists
3401           off_nodes.append(name)
3402         if result.failed:
3403           bad_nodes.append(name)
3404         else:
3405           if result.data:
3406             live_data.update(result.data)
3407             # else no instance is alive
3408     else:
3409       live_data = dict([(name, {}) for name in instance_names])
3410
3411     # end data gathering
3412
3413     HVPREFIX = "hv/"
3414     BEPREFIX = "be/"
3415     output = []
3416     for instance in instance_list:
3417       iout = []
3418       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3419       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3420       for field in self.op.output_fields:
3421         st_match = self._FIELDS_STATIC.Matches(field)
3422         if field == "name":
3423           val = instance.name
3424         elif field == "os":
3425           val = instance.os
3426         elif field == "pnode":
3427           val = instance.primary_node
3428         elif field == "snodes":
3429           val = list(instance.secondary_nodes)
3430         elif field == "admin_state":
3431           val = instance.admin_up
3432         elif field == "oper_state":
3433           if instance.primary_node in bad_nodes:
3434             val = None
3435           else:
3436             val = bool(live_data.get(instance.name))
3437         elif field == "status":
3438           if instance.primary_node in off_nodes:
3439             val = "ERROR_nodeoffline"
3440           elif instance.primary_node in bad_nodes:
3441             val = "ERROR_nodedown"
3442           else:
3443             running = bool(live_data.get(instance.name))
3444             if running:
3445               if instance.admin_up:
3446                 val = "running"
3447               else:
3448                 val = "ERROR_up"
3449             else:
3450               if instance.admin_up:
3451                 val = "ERROR_down"
3452               else:
3453                 val = "ADMIN_down"
3454         elif field == "oper_ram":
3455           if instance.primary_node in bad_nodes:
3456             val = None
3457           elif instance.name in live_data:
3458             val = live_data[instance.name].get("memory", "?")
3459           else:
3460             val = "-"
3461         elif field == "disk_template":
3462           val = instance.disk_template
3463         elif field == "ip":
3464           val = instance.nics[0].ip
3465         elif field == "bridge":
3466           val = instance.nics[0].bridge
3467         elif field == "mac":
3468           val = instance.nics[0].mac
3469         elif field == "sda_size" or field == "sdb_size":
3470           idx = ord(field[2]) - ord('a')
3471           try:
3472             val = instance.FindDisk(idx).size
3473           except errors.OpPrereqError:
3474             val = None
3475         elif field == "disk_usage": # total disk usage per node
3476           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3477           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3478         elif field == "tags":
3479           val = list(instance.GetTags())
3480         elif field == "serial_no":
3481           val = instance.serial_no
3482         elif field == "network_port":
3483           val = instance.network_port
3484         elif field == "hypervisor":
3485           val = instance.hypervisor
3486         elif field == "hvparams":
3487           val = i_hv
3488         elif (field.startswith(HVPREFIX) and
3489               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3490           val = i_hv.get(field[len(HVPREFIX):], None)
3491         elif field == "beparams":
3492           val = i_be
3493         elif (field.startswith(BEPREFIX) and
3494               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3495           val = i_be.get(field[len(BEPREFIX):], None)
3496         elif st_match and st_match.groups():
3497           # matches a variable list
3498           st_groups = st_match.groups()
3499           if st_groups and st_groups[0] == "disk":
3500             if st_groups[1] == "count":
3501               val = len(instance.disks)
3502             elif st_groups[1] == "sizes":
3503               val = [disk.size for disk in instance.disks]
3504             elif st_groups[1] == "size":
3505               try:
3506                 val = instance.FindDisk(st_groups[2]).size
3507               except errors.OpPrereqError:
3508                 val = None
3509             else:
3510               assert False, "Unhandled disk parameter"
3511           elif st_groups[0] == "nic":
3512             if st_groups[1] == "count":
3513               val = len(instance.nics)
3514             elif st_groups[1] == "macs":
3515               val = [nic.mac for nic in instance.nics]
3516             elif st_groups[1] == "ips":
3517               val = [nic.ip for nic in instance.nics]
3518             elif st_groups[1] == "bridges":
3519               val = [nic.bridge for nic in instance.nics]
3520             else:
3521               # index-based item
3522               nic_idx = int(st_groups[2])
3523               if nic_idx >= len(instance.nics):
3524                 val = None
3525               else:
3526                 if st_groups[1] == "mac":
3527                   val = instance.nics[nic_idx].mac
3528                 elif st_groups[1] == "ip":
3529                   val = instance.nics[nic_idx].ip
3530                 elif st_groups[1] == "bridge":
3531                   val = instance.nics[nic_idx].bridge
3532                 else:
3533                   assert False, "Unhandled NIC parameter"
3534           else:
3535             assert False, "Unhandled variable parameter"
3536         else:
3537           raise errors.ParameterError(field)
3538         iout.append(val)
3539       output.append(iout)
3540
3541     return output
3542
3543
3544 class LUFailoverInstance(LogicalUnit):
3545   """Failover an instance.
3546
3547   """
3548   HPATH = "instance-failover"
3549   HTYPE = constants.HTYPE_INSTANCE
3550   _OP_REQP = ["instance_name", "ignore_consistency"]
3551   REQ_BGL = False
3552
3553   def ExpandNames(self):
3554     self._ExpandAndLockInstance()
3555     self.needed_locks[locking.LEVEL_NODE] = []
3556     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3557
3558   def DeclareLocks(self, level):
3559     if level == locking.LEVEL_NODE:
3560       self._LockInstancesNodes()
3561
3562   def BuildHooksEnv(self):
3563     """Build hooks env.
3564
3565     This runs on master, primary and secondary nodes of the instance.
3566
3567     """
3568     env = {
3569       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3570       }
3571     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3572     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3573     return env, nl, nl
3574
3575   def CheckPrereq(self):
3576     """Check prerequisites.
3577
3578     This checks that the instance is in the cluster.
3579
3580     """
3581     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3582     assert self.instance is not None, \
3583       "Cannot retrieve locked instance %s" % self.op.instance_name
3584
3585     bep = self.cfg.GetClusterInfo().FillBE(instance)
3586     if instance.disk_template not in constants.DTS_NET_MIRROR:
3587       raise errors.OpPrereqError("Instance's disk layout is not"
3588                                  " network mirrored, cannot failover.")
3589
3590     secondary_nodes = instance.secondary_nodes
3591     if not secondary_nodes:
3592       raise errors.ProgrammerError("no secondary node but using "
3593                                    "a mirrored disk template")
3594
3595     target_node = secondary_nodes[0]
3596     _CheckNodeOnline(self, target_node)
3597     _CheckNodeNotDrained(self, target_node)
3598     # check memory requirements on the secondary node
3599     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3600                          instance.name, bep[constants.BE_MEMORY],
3601                          instance.hypervisor)
3602
3603     # check bridge existance
3604     brlist = [nic.bridge for nic in instance.nics]
3605     result = self.rpc.call_bridges_exist(target_node, brlist)
3606     result.Raise()
3607     if not result.data:
3608       raise errors.OpPrereqError("One or more target bridges %s does not"
3609                                  " exist on destination node '%s'" %
3610                                  (brlist, target_node))
3611
3612   def Exec(self, feedback_fn):
3613     """Failover an instance.
3614
3615     The failover is done by shutting it down on its present node and
3616     starting it on the secondary.
3617
3618     """
3619     instance = self.instance
3620
3621     source_node = instance.primary_node
3622     target_node = instance.secondary_nodes[0]
3623
3624     feedback_fn("* checking disk consistency between source and target")
3625     for dev in instance.disks:
3626       # for drbd, these are drbd over lvm
3627       if not _CheckDiskConsistency(self, dev, target_node, False):
3628         if instance.admin_up and not self.op.ignore_consistency:
3629           raise errors.OpExecError("Disk %s is degraded on target node,"
3630                                    " aborting failover." % dev.iv_name)
3631
3632     feedback_fn("* shutting down instance on source node")
3633     logging.info("Shutting down instance %s on node %s",
3634                  instance.name, source_node)
3635
3636     result = self.rpc.call_instance_shutdown(source_node, instance)
3637     msg = result.RemoteFailMsg()
3638     if msg:
3639       if self.op.ignore_consistency:
3640         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3641                              " Proceeding anyway. Please make sure node"
3642                              " %s is down. Error details: %s",
3643                              instance.name, source_node, source_node, msg)
3644       else:
3645         raise errors.OpExecError("Could not shutdown instance %s on"
3646                                  " node %s: %s" %
3647                                  (instance.name, source_node, msg))
3648
3649     feedback_fn("* deactivating the instance's disks on source node")
3650     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3651       raise errors.OpExecError("Can't shut down the instance's disks.")
3652
3653     instance.primary_node = target_node
3654     # distribute new instance config to the other nodes
3655     self.cfg.Update(instance)
3656
3657     # Only start the instance if it's marked as up
3658     if instance.admin_up:
3659       feedback_fn("* activating the instance's disks on target node")
3660       logging.info("Starting instance %s on node %s",
3661                    instance.name, target_node)
3662
3663       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3664                                                ignore_secondaries=True)
3665       if not disks_ok:
3666         _ShutdownInstanceDisks(self, instance)
3667         raise errors.OpExecError("Can't activate the instance's disks")
3668
3669       feedback_fn("* starting the instance on the target node")
3670       result = self.rpc.call_instance_start(target_node, instance, None, None)
3671       msg = result.RemoteFailMsg()
3672       if msg:
3673         _ShutdownInstanceDisks(self, instance)
3674         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3675                                  (instance.name, target_node, msg))
3676
3677
3678 class LUMigrateInstance(LogicalUnit):
3679   """Migrate an instance.
3680
3681   This is migration without shutting down, compared to the failover,
3682   which is done with shutdown.
3683
3684   """
3685   HPATH = "instance-migrate"
3686   HTYPE = constants.HTYPE_INSTANCE
3687   _OP_REQP = ["instance_name", "live", "cleanup"]
3688
3689   REQ_BGL = False
3690
3691   def ExpandNames(self):
3692     self._ExpandAndLockInstance()
3693     self.needed_locks[locking.LEVEL_NODE] = []
3694     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3695
3696   def DeclareLocks(self, level):
3697     if level == locking.LEVEL_NODE:
3698       self._LockInstancesNodes()
3699
3700   def BuildHooksEnv(self):
3701     """Build hooks env.
3702
3703     This runs on master, primary and secondary nodes of the instance.
3704
3705     """
3706     env = _BuildInstanceHookEnvByObject(self, self.instance)
3707     env["MIGRATE_LIVE"] = self.op.live
3708     env["MIGRATE_CLEANUP"] = self.op.cleanup
3709     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3710     return env, nl, nl
3711
3712   def CheckPrereq(self):
3713     """Check prerequisites.
3714
3715     This checks that the instance is in the cluster.
3716
3717     """
3718     instance = self.cfg.GetInstanceInfo(
3719       self.cfg.ExpandInstanceName(self.op.instance_name))
3720     if instance is None:
3721       raise errors.OpPrereqError("Instance '%s' not known" %
3722                                  self.op.instance_name)
3723
3724     if instance.disk_template != constants.DT_DRBD8:
3725       raise errors.OpPrereqError("Instance's disk layout is not"
3726                                  " drbd8, cannot migrate.")
3727
3728     secondary_nodes = instance.secondary_nodes
3729     if not secondary_nodes:
3730       raise errors.ConfigurationError("No secondary node but using"
3731                                       " drbd8 disk template")
3732
3733     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3734
3735     target_node = secondary_nodes[0]
3736     # check memory requirements on the secondary node
3737     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3738                          instance.name, i_be[constants.BE_MEMORY],
3739                          instance.hypervisor)
3740
3741     # check bridge existance
3742     brlist = [nic.bridge for nic in instance.nics]
3743     result = self.rpc.call_bridges_exist(target_node, brlist)
3744     if result.failed or not result.data:
3745       raise errors.OpPrereqError("One or more target bridges %s does not"
3746                                  " exist on destination node '%s'" %
3747                                  (brlist, target_node))
3748
3749     if not self.op.cleanup:
3750       _CheckNodeNotDrained(self, target_node)
3751       result = self.rpc.call_instance_migratable(instance.primary_node,
3752                                                  instance)
3753       msg = result.RemoteFailMsg()
3754       if msg:
3755         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3756                                    msg)
3757
3758     self.instance = instance
3759
3760   def _WaitUntilSync(self):
3761     """Poll with custom rpc for disk sync.
3762
3763     This uses our own step-based rpc call.
3764
3765     """
3766     self.feedback_fn("* wait until resync is done")
3767     all_done = False
3768     while not all_done:
3769       all_done = True
3770       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3771                                             self.nodes_ip,
3772                                             self.instance.disks)
3773       min_percent = 100
3774       for node, nres in result.items():
3775         msg = nres.RemoteFailMsg()
3776         if msg:
3777           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3778                                    (node, msg))
3779         node_done, node_percent = nres.payload
3780         all_done = all_done and node_done
3781         if node_percent is not None:
3782           min_percent = min(min_percent, node_percent)
3783       if not all_done:
3784         if min_percent < 100:
3785           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3786         time.sleep(2)
3787
3788   def _EnsureSecondary(self, node):
3789     """Demote a node to secondary.
3790
3791     """
3792     self.feedback_fn("* switching node %s to secondary mode" % node)
3793
3794     for dev in self.instance.disks:
3795       self.cfg.SetDiskID(dev, node)
3796
3797     result = self.rpc.call_blockdev_close(node, self.instance.name,
3798                                           self.instance.disks)
3799     msg = result.RemoteFailMsg()
3800     if msg:
3801       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3802                                " error %s" % (node, msg))
3803
3804   def _GoStandalone(self):
3805     """Disconnect from the network.
3806
3807     """
3808     self.feedback_fn("* changing into standalone mode")
3809     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3810                                                self.instance.disks)
3811     for node, nres in result.items():
3812       msg = nres.RemoteFailMsg()
3813       if msg:
3814         raise errors.OpExecError("Cannot disconnect disks node %s,"
3815                                  " error %s" % (node, msg))
3816
3817   def _GoReconnect(self, multimaster):
3818     """Reconnect to the network.
3819
3820     """
3821     if multimaster:
3822       msg = "dual-master"
3823     else:
3824       msg = "single-master"
3825     self.feedback_fn("* changing disks into %s mode" % msg)
3826     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3827                                            self.instance.disks,
3828                                            self.instance.name, multimaster)
3829     for node, nres in result.items():
3830       msg = nres.RemoteFailMsg()
3831       if msg:
3832         raise errors.OpExecError("Cannot change disks config on node %s,"
3833                                  " error: %s" % (node, msg))
3834
3835   def _ExecCleanup(self):
3836     """Try to cleanup after a failed migration.
3837
3838     The cleanup is done by:
3839       - check that the instance is running only on one node
3840         (and update the config if needed)
3841       - change disks on its secondary node to secondary
3842       - wait until disks are fully synchronized
3843       - disconnect from the network
3844       - change disks into single-master mode
3845       - wait again until disks are fully synchronized
3846
3847     """
3848     instance = self.instance
3849     target_node = self.target_node
3850     source_node = self.source_node
3851
3852     # check running on only one node
3853     self.feedback_fn("* checking where the instance actually runs"
3854                      " (if this hangs, the hypervisor might be in"
3855                      " a bad state)")
3856     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3857     for node, result in ins_l.items():
3858       result.Raise()
3859       if not isinstance(result.data, list):
3860         raise errors.OpExecError("Can't contact node '%s'" % node)
3861
3862     runningon_source = instance.name in ins_l[source_node].data
3863     runningon_target = instance.name in ins_l[target_node].data
3864
3865     if runningon_source and runningon_target:
3866       raise errors.OpExecError("Instance seems to be running on two nodes,"
3867                                " or the hypervisor is confused. You will have"
3868                                " to ensure manually that it runs only on one"
3869                                " and restart this operation.")
3870
3871     if not (runningon_source or runningon_target):
3872       raise errors.OpExecError("Instance does not seem to be running at all."
3873                                " In this case, it's safer to repair by"
3874                                " running 'gnt-instance stop' to ensure disk"
3875                                " shutdown, and then restarting it.")
3876
3877     if runningon_target:
3878       # the migration has actually succeeded, we need to update the config
3879       self.feedback_fn("* instance running on secondary node (%s),"
3880                        " updating config" % target_node)
3881       instance.primary_node = target_node
3882       self.cfg.Update(instance)
3883       demoted_node = source_node
3884     else:
3885       self.feedback_fn("* instance confirmed to be running on its"
3886                        " primary node (%s)" % source_node)
3887       demoted_node = target_node
3888
3889     self._EnsureSecondary(demoted_node)
3890     try:
3891       self._WaitUntilSync()
3892     except errors.OpExecError:
3893       # we ignore here errors, since if the device is standalone, it
3894       # won't be able to sync
3895       pass
3896     self._GoStandalone()
3897     self._GoReconnect(False)
3898     self._WaitUntilSync()
3899
3900     self.feedback_fn("* done")
3901
3902   def _RevertDiskStatus(self):
3903     """Try to revert the disk status after a failed migration.
3904
3905     """
3906     target_node = self.target_node
3907     try:
3908       self._EnsureSecondary(target_node)
3909       self._GoStandalone()
3910       self._GoReconnect(False)
3911       self._WaitUntilSync()
3912     except errors.OpExecError, err:
3913       self.LogWarning("Migration failed and I can't reconnect the"
3914                       " drives: error '%s'\n"
3915                       "Please look and recover the instance status" %
3916                       str(err))
3917
3918   def _AbortMigration(self):
3919     """Call the hypervisor code to abort a started migration.
3920
3921     """
3922     instance = self.instance
3923     target_node = self.target_node
3924     migration_info = self.migration_info
3925
3926     abort_result = self.rpc.call_finalize_migration(target_node,
3927                                                     instance,
3928                                                     migration_info,
3929                                                     False)
3930     abort_msg = abort_result.RemoteFailMsg()
3931     if abort_msg:
3932       logging.error("Aborting migration failed on target node %s: %s" %
3933                     (target_node, abort_msg))
3934       # Don't raise an exception here, as we stil have to try to revert the
3935       # disk status, even if this step failed.
3936
3937   def _ExecMigration(self):
3938     """Migrate an instance.
3939
3940     The migrate is done by:
3941       - change the disks into dual-master mode
3942       - wait until disks are fully synchronized again
3943       - migrate the instance
3944       - change disks on the new secondary node (the old primary) to secondary
3945       - wait until disks are fully synchronized
3946       - change disks into single-master mode
3947
3948     """
3949     instance = self.instance
3950     target_node = self.target_node
3951     source_node = self.source_node
3952
3953     self.feedback_fn("* checking disk consistency between source and target")
3954     for dev in instance.disks:
3955       if not _CheckDiskConsistency(self, dev, target_node, False):
3956         raise errors.OpExecError("Disk %s is degraded or not fully"
3957                                  " synchronized on target node,"
3958                                  " aborting migrate." % dev.iv_name)
3959
3960     # First get the migration information from the remote node
3961     result = self.rpc.call_migration_info(source_node, instance)
3962     msg = result.RemoteFailMsg()
3963     if msg:
3964       log_err = ("Failed fetching source migration information from %s: %s" %
3965                  (source_node, msg))
3966       logging.error(log_err)
3967       raise errors.OpExecError(log_err)
3968
3969     self.migration_info = migration_info = result.payload
3970
3971     # Then switch the disks to master/master mode
3972     self._EnsureSecondary(target_node)
3973     self._GoStandalone()
3974     self._GoReconnect(True)
3975     self._WaitUntilSync()
3976
3977     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3978     result = self.rpc.call_accept_instance(target_node,
3979                                            instance,
3980                                            migration_info,
3981                                            self.nodes_ip[target_node])
3982
3983     msg = result.RemoteFailMsg()
3984     if msg:
3985       logging.error("Instance pre-migration failed, trying to revert"
3986                     " disk status: %s", msg)
3987       self._AbortMigration()
3988       self._RevertDiskStatus()
3989       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3990                                (instance.name, msg))
3991
3992     self.feedback_fn("* migrating instance to %s" % target_node)
3993     time.sleep(10)
3994     result = self.rpc.call_instance_migrate(source_node, instance,
3995                                             self.nodes_ip[target_node],
3996                                             self.op.live)
3997     msg = result.RemoteFailMsg()
3998     if msg:
3999       logging.error("Instance migration failed, trying to revert"
4000                     " disk status: %s", msg)
4001       self._AbortMigration()
4002       self._RevertDiskStatus()
4003       raise errors.OpExecError("Could not migrate instance %s: %s" %
4004                                (instance.name, msg))
4005     time.sleep(10)
4006
4007     instance.primary_node = target_node
4008     # distribute new instance config to the other nodes
4009     self.cfg.Update(instance)
4010
4011     result = self.rpc.call_finalize_migration(target_node,
4012                                               instance,
4013                                               migration_info,
4014                                               True)
4015     msg = result.RemoteFailMsg()
4016     if msg:
4017       logging.error("Instance migration succeeded, but finalization failed:"
4018                     " %s" % msg)
4019       raise errors.OpExecError("Could not finalize instance migration: %s" %
4020                                msg)
4021
4022     self._EnsureSecondary(source_node)
4023     self._WaitUntilSync()
4024     self._GoStandalone()
4025     self._GoReconnect(False)
4026     self._WaitUntilSync()
4027
4028     self.feedback_fn("* done")
4029
4030   def Exec(self, feedback_fn):
4031     """Perform the migration.
4032
4033     """
4034     self.feedback_fn = feedback_fn
4035
4036     self.source_node = self.instance.primary_node
4037     self.target_node = self.instance.secondary_nodes[0]
4038     self.all_nodes = [self.source_node, self.target_node]
4039     self.nodes_ip = {
4040       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4041       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4042       }
4043     if self.op.cleanup:
4044       return self._ExecCleanup()
4045     else:
4046       return self._ExecMigration()
4047
4048
4049 def _CreateBlockDev(lu, node, instance, device, force_create,
4050                     info, force_open):
4051   """Create a tree of block devices on a given node.
4052
4053   If this device type has to be created on secondaries, create it and
4054   all its children.
4055
4056   If not, just recurse to children keeping the same 'force' value.
4057
4058   @param lu: the lu on whose behalf we execute
4059   @param node: the node on which to create the device
4060   @type instance: L{objects.Instance}
4061   @param instance: the instance which owns the device
4062   @type device: L{objects.Disk}
4063   @param device: the device to create
4064   @type force_create: boolean
4065   @param force_create: whether to force creation of this device; this
4066       will be change to True whenever we find a device which has
4067       CreateOnSecondary() attribute
4068   @param info: the extra 'metadata' we should attach to the device
4069       (this will be represented as a LVM tag)
4070   @type force_open: boolean
4071   @param force_open: this parameter will be passes to the
4072       L{backend.BlockdevCreate} function where it specifies
4073       whether we run on primary or not, and it affects both
4074       the child assembly and the device own Open() execution
4075
4076   """
4077   if device.CreateOnSecondary():
4078     force_create = True
4079
4080   if device.children:
4081     for child in device.children:
4082       _CreateBlockDev(lu, node, instance, child, force_create,
4083                       info, force_open)
4084
4085   if not force_create:
4086     return
4087
4088   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4089
4090
4091 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4092   """Create a single block device on a given node.
4093
4094   This will not recurse over children of the device, so they must be
4095   created in advance.
4096
4097   @param lu: the lu on whose behalf we execute
4098   @param node: the node on which to create the device
4099   @type instance: L{objects.Instance}
4100   @param instance: the instance which owns the device
4101   @type device: L{objects.Disk}
4102   @param device: the device to create
4103   @param info: the extra 'metadata' we should attach to the device
4104       (this will be represented as a LVM tag)
4105   @type force_open: boolean
4106   @param force_open: this parameter will be passes to the
4107       L{backend.BlockdevCreate} function where it specifies
4108       whether we run on primary or not, and it affects both
4109       the child assembly and the device own Open() execution
4110
4111   """
4112   lu.cfg.SetDiskID(device, node)
4113   result = lu.rpc.call_blockdev_create(node, device, device.size,
4114                                        instance.name, force_open, info)
4115   msg = result.RemoteFailMsg()
4116   if msg:
4117     raise errors.OpExecError("Can't create block device %s on"
4118                              " node %s for instance %s: %s" %
4119                              (device, node, instance.name, msg))
4120   if device.physical_id is None:
4121     device.physical_id = result.payload
4122
4123
4124 def _GenerateUniqueNames(lu, exts):
4125   """Generate a suitable LV name.
4126
4127   This will generate a logical volume name for the given instance.
4128
4129   """
4130   results = []
4131   for val in exts:
4132     new_id = lu.cfg.GenerateUniqueID()
4133     results.append("%s%s" % (new_id, val))
4134   return results
4135
4136
4137 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4138                          p_minor, s_minor):
4139   """Generate a drbd8 device complete with its children.
4140
4141   """
4142   port = lu.cfg.AllocatePort()
4143   vgname = lu.cfg.GetVGName()
4144   shared_secret = lu.cfg.GenerateDRBDSecret()
4145   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4146                           logical_id=(vgname, names[0]))
4147   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4148                           logical_id=(vgname, names[1]))
4149   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4150                           logical_id=(primary, secondary, port,
4151                                       p_minor, s_minor,
4152                                       shared_secret),
4153                           children=[dev_data, dev_meta],
4154                           iv_name=iv_name)
4155   return drbd_dev
4156
4157
4158 def _GenerateDiskTemplate(lu, template_name,
4159                           instance_name, primary_node,
4160                           secondary_nodes, disk_info,
4161                           file_storage_dir, file_driver,
4162                           base_index):
4163   """Generate the entire disk layout for a given template type.
4164
4165   """
4166   #TODO: compute space requirements
4167
4168   vgname = lu.cfg.GetVGName()
4169   disk_count = len(disk_info)
4170   disks = []
4171   if template_name == constants.DT_DISKLESS:
4172     pass
4173   elif template_name == constants.DT_PLAIN:
4174     if len(secondary_nodes) != 0:
4175       raise errors.ProgrammerError("Wrong template configuration")
4176
4177     names = _GenerateUniqueNames(lu, [".disk%d" % i
4178                                       for i in range(disk_count)])
4179     for idx, disk in enumerate(disk_info):
4180       disk_index = idx + base_index
4181       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4182                               logical_id=(vgname, names[idx]),
4183                               iv_name="disk/%d" % disk_index,
4184                               mode=disk["mode"])
4185       disks.append(disk_dev)
4186   elif template_name == constants.DT_DRBD8:
4187     if len(secondary_nodes) != 1:
4188       raise errors.ProgrammerError("Wrong template configuration")
4189     remote_node = secondary_nodes[0]
4190     minors = lu.cfg.AllocateDRBDMinor(
4191       [primary_node, remote_node] * len(disk_info), instance_name)
4192
4193     names = []
4194     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4195                                                for i in range(disk_count)]):
4196       names.append(lv_prefix + "_data")
4197       names.append(lv_prefix + "_meta")
4198     for idx, disk in enumerate(disk_info):
4199       disk_index = idx + base_index
4200       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4201                                       disk["size"], names[idx*2:idx*2+2],
4202                                       "disk/%d" % disk_index,
4203                                       minors[idx*2], minors[idx*2+1])
4204       disk_dev.mode = disk["mode"]
4205       disks.append(disk_dev)
4206   elif template_name == constants.DT_FILE:
4207     if len(secondary_nodes) != 0:
4208       raise errors.ProgrammerError("Wrong template configuration")
4209
4210     for idx, disk in enumerate(disk_info):
4211       disk_index = idx + base_index
4212       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4213                               iv_name="disk/%d" % disk_index,
4214                               logical_id=(file_driver,
4215                                           "%s/disk%d" % (file_storage_dir,
4216                                                          disk_index)),
4217                               mode=disk["mode"])
4218       disks.append(disk_dev)
4219   else:
4220     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4221   return disks
4222
4223
4224 def _GetInstanceInfoText(instance):
4225   """Compute that text that should be added to the disk's metadata.
4226
4227   """
4228   return "originstname+%s" % instance.name
4229
4230
4231 def _CreateDisks(lu, instance):
4232   """Create all disks for an instance.
4233
4234   This abstracts away some work from AddInstance.
4235
4236   @type lu: L{LogicalUnit}
4237   @param lu: the logical unit on whose behalf we execute
4238   @type instance: L{objects.Instance}
4239   @param instance: the instance whose disks we should create
4240   @rtype: boolean
4241   @return: the success of the creation
4242
4243   """
4244   info = _GetInstanceInfoText(instance)
4245   pnode = instance.primary_node
4246
4247   if instance.disk_template == constants.DT_FILE:
4248     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4249     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4250
4251     if result.failed or not result.data:
4252       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4253
4254     if not result.data[0]:
4255       raise errors.OpExecError("Failed to create directory '%s'" %
4256                                file_storage_dir)
4257
4258   # Note: this needs to be kept in sync with adding of disks in
4259   # LUSetInstanceParams
4260   for device in instance.disks:
4261     logging.info("Creating volume %s for instance %s",
4262                  device.iv_name, instance.name)
4263     #HARDCODE
4264     for node in instance.all_nodes:
4265       f_create = node == pnode
4266       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4267
4268
4269 def _RemoveDisks(lu, instance):
4270   """Remove all disks for an instance.
4271
4272   This abstracts away some work from `AddInstance()` and
4273   `RemoveInstance()`. Note that in case some of the devices couldn't
4274   be removed, the removal will continue with the other ones (compare
4275   with `_CreateDisks()`).
4276
4277   @type lu: L{LogicalUnit}
4278   @param lu: the logical unit on whose behalf we execute
4279   @type instance: L{objects.Instance}
4280   @param instance: the instance whose disks we should remove
4281   @rtype: boolean
4282   @return: the success of the removal
4283
4284   """
4285   logging.info("Removing block devices for instance %s", instance.name)
4286
4287   all_result = True
4288   for device in instance.disks:
4289     for node, disk in device.ComputeNodeTree(instance.primary_node):
4290       lu.cfg.SetDiskID(disk, node)
4291       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4292       if msg:
4293         lu.LogWarning("Could not remove block device %s on node %s,"
4294                       " continuing anyway: %s", device.iv_name, node, msg)
4295         all_result = False
4296
4297   if instance.disk_template == constants.DT_FILE:
4298     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4299     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4300                                                  file_storage_dir)
4301     if result.failed or not result.data:
4302       logging.error("Could not remove directory '%s'", file_storage_dir)
4303       all_result = False
4304
4305   return all_result
4306
4307
4308 def _ComputeDiskSize(disk_template, disks):
4309   """Compute disk size requirements in the volume group
4310
4311   """
4312   # Required free disk space as a function of disk and swap space
4313   req_size_dict = {
4314     constants.DT_DISKLESS: None,
4315     constants.DT_PLAIN: sum(d["size"] for d in disks),
4316     # 128 MB are added for drbd metadata for each disk
4317     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4318     constants.DT_FILE: None,
4319   }
4320
4321   if disk_template not in req_size_dict:
4322     raise errors.ProgrammerError("Disk template '%s' size requirement"
4323                                  " is unknown" %  disk_template)
4324
4325   return req_size_dict[disk_template]
4326
4327
4328 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4329   """Hypervisor parameter validation.
4330
4331   This function abstract the hypervisor parameter validation to be
4332   used in both instance create and instance modify.
4333
4334   @type lu: L{LogicalUnit}
4335   @param lu: the logical unit for which we check
4336   @type nodenames: list
4337   @param nodenames: the list of nodes on which we should check
4338   @type hvname: string
4339   @param hvname: the name of the hypervisor we should use
4340   @type hvparams: dict
4341   @param hvparams: the parameters which we need to check
4342   @raise errors.OpPrereqError: if the parameters are not valid
4343
4344   """
4345   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4346                                                   hvname,
4347                                                   hvparams)
4348   for node in nodenames:
4349     info = hvinfo[node]
4350     if info.offline:
4351       continue
4352     msg = info.RemoteFailMsg()
4353     if msg:
4354       raise errors.OpPrereqError("Hypervisor parameter validation"
4355                                  " failed on node %s: %s" % (node, msg))
4356
4357
4358 class LUCreateInstance(LogicalUnit):
4359   """Create an instance.
4360
4361   """
4362   HPATH = "instance-add"
4363   HTYPE = constants.HTYPE_INSTANCE
4364   _OP_REQP = ["instance_name", "disks", "disk_template",
4365               "mode", "start",
4366               "wait_for_sync", "ip_check", "nics",
4367               "hvparams", "beparams"]
4368   REQ_BGL = False
4369
4370   def _ExpandNode(self, node):
4371     """Expands and checks one node name.
4372
4373     """
4374     node_full = self.cfg.ExpandNodeName(node)
4375     if node_full is None:
4376       raise errors.OpPrereqError("Unknown node %s" % node)
4377     return node_full
4378
4379   def ExpandNames(self):
4380     """ExpandNames for CreateInstance.
4381
4382     Figure out the right locks for instance creation.
4383
4384     """
4385     self.needed_locks = {}
4386
4387     # set optional parameters to none if they don't exist
4388     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4389       if not hasattr(self.op, attr):
4390         setattr(self.op, attr, None)
4391
4392     # cheap checks, mostly valid constants given
4393
4394     # verify creation mode
4395     if self.op.mode not in (constants.INSTANCE_CREATE,
4396                             constants.INSTANCE_IMPORT):
4397       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4398                                  self.op.mode)
4399
4400     # disk template and mirror node verification
4401     if self.op.disk_template not in constants.DISK_TEMPLATES:
4402       raise errors.OpPrereqError("Invalid disk template name")
4403
4404     if self.op.hypervisor is None:
4405       self.op.hypervisor = self.cfg.GetHypervisorType()
4406
4407     cluster = self.cfg.GetClusterInfo()
4408     enabled_hvs = cluster.enabled_hypervisors
4409     if self.op.hypervisor not in enabled_hvs:
4410       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4411                                  " cluster (%s)" % (self.op.hypervisor,
4412                                   ",".join(enabled_hvs)))
4413
4414     # check hypervisor parameter syntax (locally)
4415     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4416     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4417                                   self.op.hvparams)
4418     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4419     hv_type.CheckParameterSyntax(filled_hvp)
4420
4421     # fill and remember the beparams dict
4422     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4423     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4424                                     self.op.beparams)
4425
4426     #### instance parameters check
4427
4428     # instance name verification
4429     hostname1 = utils.HostInfo(self.op.instance_name)
4430     self.op.instance_name = instance_name = hostname1.name
4431
4432     # this is just a preventive check, but someone might still add this
4433     # instance in the meantime, and creation will fail at lock-add time
4434     if instance_name in self.cfg.GetInstanceList():
4435       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4436                                  instance_name)
4437
4438     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4439
4440     # NIC buildup
4441     self.nics = []
4442     for nic in self.op.nics:
4443       # ip validity checks
4444       ip = nic.get("ip", None)
4445       if ip is None or ip.lower() == "none":
4446         nic_ip = None
4447       elif ip.lower() == constants.VALUE_AUTO:
4448         nic_ip = hostname1.ip
4449       else:
4450         if not utils.IsValidIP(ip):
4451           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4452                                      " like a valid IP" % ip)
4453         nic_ip = ip
4454
4455       # MAC address verification
4456       mac = nic.get("mac", constants.VALUE_AUTO)
4457       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4458         if not utils.IsValidMac(mac.lower()):
4459           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4460                                      mac)
4461       # bridge verification
4462       bridge = nic.get("bridge", None)
4463       if bridge is None:
4464         bridge = self.cfg.GetDefBridge()
4465       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4466
4467     # disk checks/pre-build
4468     self.disks = []
4469     for disk in self.op.disks:
4470       mode = disk.get("mode", constants.DISK_RDWR)
4471       if mode not in constants.DISK_ACCESS_SET:
4472         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4473                                    mode)
4474       size = disk.get("size", None)
4475       if size is None:
4476         raise errors.OpPrereqError("Missing disk size")
4477       try:
4478         size = int(size)
4479       except ValueError:
4480         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4481       self.disks.append({"size": size, "mode": mode})
4482
4483     # used in CheckPrereq for ip ping check
4484     self.check_ip = hostname1.ip
4485
4486     # file storage checks
4487     if (self.op.file_driver and
4488         not self.op.file_driver in constants.FILE_DRIVER):
4489       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4490                                  self.op.file_driver)
4491
4492     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4493       raise errors.OpPrereqError("File storage directory path not absolute")
4494
4495     ### Node/iallocator related checks
4496     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4497       raise errors.OpPrereqError("One and only one of iallocator and primary"
4498                                  " node must be given")
4499
4500     if self.op.iallocator:
4501       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4502     else:
4503       self.op.pnode = self._ExpandNode(self.op.pnode)
4504       nodelist = [self.op.pnode]
4505       if self.op.snode is not None:
4506         self.op.snode = self._ExpandNode(self.op.snode)
4507         nodelist.append(self.op.snode)
4508       self.needed_locks[locking.LEVEL_NODE] = nodelist
4509
4510     # in case of import lock the source node too
4511     if self.op.mode == constants.INSTANCE_IMPORT:
4512       src_node = getattr(self.op, "src_node", None)
4513       src_path = getattr(self.op, "src_path", None)
4514
4515       if src_path is None:
4516         self.op.src_path = src_path = self.op.instance_name
4517
4518       if src_node is None:
4519         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4520         self.op.src_node = None
4521         if os.path.isabs(src_path):
4522           raise errors.OpPrereqError("Importing an instance from an absolute"
4523                                      " path requires a source node option.")
4524       else:
4525         self.op.src_node = src_node = self._ExpandNode(src_node)
4526         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4527           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4528         if not os.path.isabs(src_path):
4529           self.op.src_path = src_path = \
4530             os.path.join(constants.EXPORT_DIR, src_path)
4531
4532     else: # INSTANCE_CREATE
4533       if getattr(self.op, "os_type", None) is None:
4534         raise errors.OpPrereqError("No guest OS specified")
4535
4536   def _RunAllocator(self):
4537     """Run the allocator based on input opcode.
4538
4539     """
4540     nics = [n.ToDict() for n in self.nics]
4541     ial = IAllocator(self,
4542                      mode=constants.IALLOCATOR_MODE_ALLOC,
4543                      name=self.op.instance_name,
4544                      disk_template=self.op.disk_template,
4545                      tags=[],
4546                      os=self.op.os_type,
4547                      vcpus=self.be_full[constants.BE_VCPUS],
4548                      mem_size=self.be_full[constants.BE_MEMORY],
4549                      disks=self.disks,
4550                      nics=nics,
4551                      hypervisor=self.op.hypervisor,
4552                      )
4553
4554     ial.Run(self.op.iallocator)
4555
4556     if not ial.success:
4557       raise errors.OpPrereqError("Can't compute nodes using"
4558                                  " iallocator '%s': %s" % (self.op.iallocator,
4559                                                            ial.info))
4560     if len(ial.nodes) != ial.required_nodes:
4561       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4562                                  " of nodes (%s), required %s" %
4563                                  (self.op.iallocator, len(ial.nodes),
4564                                   ial.required_nodes))
4565     self.op.pnode = ial.nodes[0]
4566     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4567                  self.op.instance_name, self.op.iallocator,
4568                  ", ".join(ial.nodes))
4569     if ial.required_nodes == 2:
4570       self.op.snode = ial.nodes[1]
4571
4572   def BuildHooksEnv(self):
4573     """Build hooks env.
4574
4575     This runs on master, primary and secondary nodes of the instance.
4576
4577     """
4578     env = {
4579       "ADD_MODE": self.op.mode,
4580       }
4581     if self.op.mode == constants.INSTANCE_IMPORT:
4582       env["SRC_NODE"] = self.op.src_node
4583       env["SRC_PATH"] = self.op.src_path
4584       env["SRC_IMAGES"] = self.src_images
4585
4586     env.update(_BuildInstanceHookEnv(
4587       name=self.op.instance_name,
4588       primary_node=self.op.pnode,
4589       secondary_nodes=self.secondaries,
4590       status=self.op.start,
4591       os_type=self.op.os_type,
4592       memory=self.be_full[constants.BE_MEMORY],
4593       vcpus=self.be_full[constants.BE_VCPUS],
4594       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4595       disk_template=self.op.disk_template,
4596       disks=[(d["size"], d["mode"]) for d in self.disks],
4597     ))
4598
4599     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4600           self.secondaries)
4601     return env, nl, nl
4602
4603
4604   def CheckPrereq(self):
4605     """Check prerequisites.
4606
4607     """
4608     if (not self.cfg.GetVGName() and
4609         self.op.disk_template not in constants.DTS_NOT_LVM):
4610       raise errors.OpPrereqError("Cluster does not support lvm-based"
4611                                  " instances")
4612
4613     if self.op.mode == constants.INSTANCE_IMPORT:
4614       src_node = self.op.src_node
4615       src_path = self.op.src_path
4616
4617       if src_node is None:
4618         exp_list = self.rpc.call_export_list(
4619           self.acquired_locks[locking.LEVEL_NODE])
4620         found = False
4621         for node in exp_list:
4622           if not exp_list[node].failed and src_path in exp_list[node].data:
4623             found = True
4624             self.op.src_node = src_node = node
4625             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4626                                                        src_path)
4627             break
4628         if not found:
4629           raise errors.OpPrereqError("No export found for relative path %s" %
4630                                       src_path)
4631
4632       _CheckNodeOnline(self, src_node)
4633       result = self.rpc.call_export_info(src_node, src_path)
4634       result.Raise()
4635       if not result.data:
4636         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4637
4638       export_info = result.data
4639       if not export_info.has_section(constants.INISECT_EXP):
4640         raise errors.ProgrammerError("Corrupted export config")
4641
4642       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4643       if (int(ei_version) != constants.EXPORT_VERSION):
4644         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4645                                    (ei_version, constants.EXPORT_VERSION))
4646
4647       # Check that the new instance doesn't have less disks than the export
4648       instance_disks = len(self.disks)
4649       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4650       if instance_disks < export_disks:
4651         raise errors.OpPrereqError("Not enough disks to import."
4652                                    " (instance: %d, export: %d)" %
4653                                    (instance_disks, export_disks))
4654
4655       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4656       disk_images = []
4657       for idx in range(export_disks):
4658         option = 'disk%d_dump' % idx
4659         if export_info.has_option(constants.INISECT_INS, option):
4660           # FIXME: are the old os-es, disk sizes, etc. useful?
4661           export_name = export_info.get(constants.INISECT_INS, option)
4662           image = os.path.join(src_path, export_name)
4663           disk_images.append(image)
4664         else:
4665           disk_images.append(False)
4666
4667       self.src_images = disk_images
4668
4669       old_name = export_info.get(constants.INISECT_INS, 'name')
4670       # FIXME: int() here could throw a ValueError on broken exports
4671       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4672       if self.op.instance_name == old_name:
4673         for idx, nic in enumerate(self.nics):
4674           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4675             nic_mac_ini = 'nic%d_mac' % idx
4676             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4677
4678     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4679     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4680     if self.op.start and not self.op.ip_check:
4681       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4682                                  " adding an instance in start mode")
4683
4684     if self.op.ip_check:
4685       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4686         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4687                                    (self.check_ip, self.op.instance_name))
4688
4689     #### mac address generation
4690     # By generating here the mac address both the allocator and the hooks get
4691     # the real final mac address rather than the 'auto' or 'generate' value.
4692     # There is a race condition between the generation and the instance object
4693     # creation, which means that we know the mac is valid now, but we're not
4694     # sure it will be when we actually add the instance. If things go bad
4695     # adding the instance will abort because of a duplicate mac, and the
4696     # creation job will fail.
4697     for nic in self.nics:
4698       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4699         nic.mac = self.cfg.GenerateMAC()
4700
4701     #### allocator run
4702
4703     if self.op.iallocator is not None:
4704       self._RunAllocator()
4705
4706     #### node related checks
4707
4708     # check primary node
4709     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4710     assert self.pnode is not None, \
4711       "Cannot retrieve locked node %s" % self.op.pnode
4712     if pnode.offline:
4713       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4714                                  pnode.name)
4715     if pnode.drained:
4716       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4717                                  pnode.name)
4718
4719     self.secondaries = []
4720
4721     # mirror node verification
4722     if self.op.disk_template in constants.DTS_NET_MIRROR:
4723       if self.op.snode is None:
4724         raise errors.OpPrereqError("The networked disk templates need"
4725                                    " a mirror node")
4726       if self.op.snode == pnode.name:
4727         raise errors.OpPrereqError("The secondary node cannot be"
4728                                    " the primary node.")
4729       _CheckNodeOnline(self, self.op.snode)
4730       _CheckNodeNotDrained(self, self.op.snode)
4731       self.secondaries.append(self.op.snode)
4732
4733     nodenames = [pnode.name] + self.secondaries
4734
4735     req_size = _ComputeDiskSize(self.op.disk_template,
4736                                 self.disks)
4737
4738     # Check lv size requirements
4739     if req_size is not None:
4740       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4741                                          self.op.hypervisor)
4742       for node in nodenames:
4743         info = nodeinfo[node]
4744         info.Raise()
4745         info = info.data
4746         if not info:
4747           raise errors.OpPrereqError("Cannot get current information"
4748                                      " from node '%s'" % node)
4749         vg_free = info.get('vg_free', None)
4750         if not isinstance(vg_free, int):
4751           raise errors.OpPrereqError("Can't compute free disk space on"
4752                                      " node %s" % node)
4753         if req_size > info['vg_free']:
4754           raise errors.OpPrereqError("Not enough disk space on target node %s."
4755                                      " %d MB available, %d MB required" %
4756                                      (node, info['vg_free'], req_size))
4757
4758     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4759
4760     # os verification
4761     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4762     result.Raise()
4763     if not isinstance(result.data, objects.OS):
4764       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4765                                  " primary node"  % self.op.os_type)
4766
4767     # bridge check on primary node
4768     bridges = [n.bridge for n in self.nics]
4769     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4770     result.Raise()
4771     if not result.data:
4772       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4773                                  " exist on destination node '%s'" %
4774                                  (",".join(bridges), pnode.name))
4775
4776     # memory check on primary node
4777     if self.op.start:
4778       _CheckNodeFreeMemory(self, self.pnode.name,
4779                            "creating instance %s" % self.op.instance_name,
4780                            self.be_full[constants.BE_MEMORY],
4781                            self.op.hypervisor)
4782
4783   def Exec(self, feedback_fn):
4784     """Create and add the instance to the cluster.
4785
4786     """
4787     instance = self.op.instance_name
4788     pnode_name = self.pnode.name
4789
4790     ht_kind = self.op.hypervisor
4791     if ht_kind in constants.HTS_REQ_PORT:
4792       network_port = self.cfg.AllocatePort()
4793     else:
4794       network_port = None
4795
4796     ##if self.op.vnc_bind_address is None:
4797     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4798
4799     # this is needed because os.path.join does not accept None arguments
4800     if self.op.file_storage_dir is None:
4801       string_file_storage_dir = ""
4802     else:
4803       string_file_storage_dir = self.op.file_storage_dir
4804
4805     # build the full file storage dir path
4806     file_storage_dir = os.path.normpath(os.path.join(
4807                                         self.cfg.GetFileStorageDir(),
4808                                         string_file_storage_dir, instance))
4809
4810
4811     disks = _GenerateDiskTemplate(self,
4812                                   self.op.disk_template,
4813                                   instance, pnode_name,
4814                                   self.secondaries,
4815                                   self.disks,
4816                                   file_storage_dir,
4817                                   self.op.file_driver,
4818                                   0)
4819
4820     iobj = objects.Instance(name=instance, os=self.op.os_type,
4821                             primary_node=pnode_name,
4822                             nics=self.nics, disks=disks,
4823                             disk_template=self.op.disk_template,
4824                             admin_up=False,
4825                             network_port=network_port,
4826                             beparams=self.op.beparams,
4827                             hvparams=self.op.hvparams,
4828                             hypervisor=self.op.hypervisor,
4829                             )
4830
4831     feedback_fn("* creating instance disks...")
4832     try:
4833       _CreateDisks(self, iobj)
4834     except errors.OpExecError:
4835       self.LogWarning("Device creation failed, reverting...")
4836       try:
4837         _RemoveDisks(self, iobj)
4838       finally:
4839         self.cfg.ReleaseDRBDMinors(instance)
4840         raise
4841
4842     feedback_fn("adding instance %s to cluster config" % instance)
4843
4844     self.cfg.AddInstance(iobj)
4845     # Declare that we don't want to remove the instance lock anymore, as we've
4846     # added the instance to the config
4847     del self.remove_locks[locking.LEVEL_INSTANCE]
4848     # Unlock all the nodes
4849     if self.op.mode == constants.INSTANCE_IMPORT:
4850       nodes_keep = [self.op.src_node]
4851       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4852                        if node != self.op.src_node]
4853       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4854       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4855     else:
4856       self.context.glm.release(locking.LEVEL_NODE)
4857       del self.acquired_locks[locking.LEVEL_NODE]
4858
4859     if self.op.wait_for_sync:
4860       disk_abort = not _WaitForSync(self, iobj)
4861     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4862       # make sure the disks are not degraded (still sync-ing is ok)
4863       time.sleep(15)
4864       feedback_fn("* checking mirrors status")
4865       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4866     else:
4867       disk_abort = False
4868
4869     if disk_abort:
4870       _RemoveDisks(self, iobj)
4871       self.cfg.RemoveInstance(iobj.name)
4872       # Make sure the instance lock gets removed
4873       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4874       raise errors.OpExecError("There are some degraded disks for"
4875                                " this instance")
4876
4877     feedback_fn("creating os for instance %s on node %s" %
4878                 (instance, pnode_name))
4879
4880     if iobj.disk_template != constants.DT_DISKLESS:
4881       if self.op.mode == constants.INSTANCE_CREATE:
4882         feedback_fn("* running the instance OS create scripts...")
4883         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4884         msg = result.RemoteFailMsg()
4885         if msg:
4886           raise errors.OpExecError("Could not add os for instance %s"
4887                                    " on node %s: %s" %
4888                                    (instance, pnode_name, msg))
4889
4890       elif self.op.mode == constants.INSTANCE_IMPORT:
4891         feedback_fn("* running the instance OS import scripts...")
4892         src_node = self.op.src_node
4893         src_images = self.src_images
4894         cluster_name = self.cfg.GetClusterName()
4895         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4896                                                          src_node, src_images,
4897                                                          cluster_name)
4898         import_result.Raise()
4899         for idx, result in enumerate(import_result.data):
4900           if not result:
4901             self.LogWarning("Could not import the image %s for instance"
4902                             " %s, disk %d, on node %s" %
4903                             (src_images[idx], instance, idx, pnode_name))
4904       else:
4905         # also checked in the prereq part
4906         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4907                                      % self.op.mode)
4908
4909     if self.op.start:
4910       iobj.admin_up = True
4911       self.cfg.Update(iobj)
4912       logging.info("Starting instance %s on node %s", instance, pnode_name)
4913       feedback_fn("* starting instance...")
4914       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4915       msg = result.RemoteFailMsg()
4916       if msg:
4917         raise errors.OpExecError("Could not start instance: %s" % msg)
4918
4919
4920 class LUConnectConsole(NoHooksLU):
4921   """Connect to an instance's console.
4922
4923   This is somewhat special in that it returns the command line that
4924   you need to run on the master node in order to connect to the
4925   console.
4926
4927   """
4928   _OP_REQP = ["instance_name"]
4929   REQ_BGL = False
4930
4931   def ExpandNames(self):
4932     self._ExpandAndLockInstance()
4933
4934   def CheckPrereq(self):
4935     """Check prerequisites.
4936
4937     This checks that the instance is in the cluster.
4938
4939     """
4940     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4941     assert self.instance is not None, \
4942       "Cannot retrieve locked instance %s" % self.op.instance_name
4943     _CheckNodeOnline(self, self.instance.primary_node)
4944
4945   def Exec(self, feedback_fn):
4946     """Connect to the console of an instance
4947
4948     """
4949     instance = self.instance
4950     node = instance.primary_node
4951
4952     node_insts = self.rpc.call_instance_list([node],
4953                                              [instance.hypervisor])[node]
4954     node_insts.Raise()
4955
4956     if instance.name not in node_insts.data:
4957       raise errors.OpExecError("Instance %s is not running." % instance.name)
4958
4959     logging.debug("Connecting to console of %s on %s", instance.name, node)
4960
4961     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4962     cluster = self.cfg.GetClusterInfo()
4963     # beparams and hvparams are passed separately, to avoid editing the
4964     # instance and then saving the defaults in the instance itself.
4965     hvparams = cluster.FillHV(instance)
4966     beparams = cluster.FillBE(instance)
4967     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4968
4969     # build ssh cmdline
4970     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4971
4972
4973 class LUReplaceDisks(LogicalUnit):
4974   """Replace the disks of an instance.
4975
4976   """
4977   HPATH = "mirrors-replace"
4978   HTYPE = constants.HTYPE_INSTANCE
4979   _OP_REQP = ["instance_name", "mode", "disks"]
4980   REQ_BGL = False
4981
4982   def CheckArguments(self):
4983     if not hasattr(self.op, "remote_node"):
4984       self.op.remote_node = None
4985     if not hasattr(self.op, "iallocator"):
4986       self.op.iallocator = None
4987
4988     # check for valid parameter combination
4989     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4990     if self.op.mode == constants.REPLACE_DISK_CHG:
4991       if cnt == 2:
4992         raise errors.OpPrereqError("When changing the secondary either an"
4993                                    " iallocator script must be used or the"
4994                                    " new node given")
4995       elif cnt == 0:
4996         raise errors.OpPrereqError("Give either the iallocator or the new"
4997                                    " secondary, not both")
4998     else: # not replacing the secondary
4999       if cnt != 2:
5000         raise errors.OpPrereqError("The iallocator and new node options can"
5001                                    " be used only when changing the"
5002                                    " secondary node")
5003
5004   def ExpandNames(self):
5005     self._ExpandAndLockInstance()
5006
5007     if self.op.iallocator is not None:
5008       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5009     elif self.op.remote_node is not None:
5010       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5011       if remote_node is None:
5012         raise errors.OpPrereqError("Node '%s' not known" %
5013                                    self.op.remote_node)
5014       self.op.remote_node = remote_node
5015       # Warning: do not remove the locking of the new secondary here
5016       # unless DRBD8.AddChildren is changed to work in parallel;
5017       # currently it doesn't since parallel invocations of
5018       # FindUnusedMinor will conflict
5019       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5020       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5021     else:
5022       self.needed_locks[locking.LEVEL_NODE] = []
5023       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5024
5025   def DeclareLocks(self, level):
5026     # If we're not already locking all nodes in the set we have to declare the
5027     # instance's primary/secondary nodes.
5028     if (level == locking.LEVEL_NODE and
5029         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5030       self._LockInstancesNodes()
5031
5032   def _RunAllocator(self):
5033     """Compute a new secondary node using an IAllocator.
5034
5035     """
5036     ial = IAllocator(self,
5037                      mode=constants.IALLOCATOR_MODE_RELOC,
5038                      name=self.op.instance_name,
5039                      relocate_from=[self.sec_node])
5040
5041     ial.Run(self.op.iallocator)
5042
5043     if not ial.success:
5044       raise errors.OpPrereqError("Can't compute nodes using"
5045                                  " iallocator '%s': %s" % (self.op.iallocator,
5046                                                            ial.info))
5047     if len(ial.nodes) != ial.required_nodes:
5048       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5049                                  " of nodes (%s), required %s" %
5050                                  (len(ial.nodes), ial.required_nodes))
5051     self.op.remote_node = ial.nodes[0]
5052     self.LogInfo("Selected new secondary for the instance: %s",
5053                  self.op.remote_node)
5054
5055   def BuildHooksEnv(self):
5056     """Build hooks env.
5057
5058     This runs on the master, the primary and all the secondaries.
5059
5060     """
5061     env = {
5062       "MODE": self.op.mode,
5063       "NEW_SECONDARY": self.op.remote_node,
5064       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5065       }
5066     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5067     nl = [
5068       self.cfg.GetMasterNode(),
5069       self.instance.primary_node,
5070       ]
5071     if self.op.remote_node is not None:
5072       nl.append(self.op.remote_node)
5073     return env, nl, nl
5074
5075   def CheckPrereq(self):
5076     """Check prerequisites.
5077
5078     This checks that the instance is in the cluster.
5079
5080     """
5081     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5082     assert instance is not None, \
5083       "Cannot retrieve locked instance %s" % self.op.instance_name
5084     self.instance = instance
5085
5086     if instance.disk_template != constants.DT_DRBD8:
5087       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5088                                  " instances")
5089
5090     if len(instance.secondary_nodes) != 1:
5091       raise errors.OpPrereqError("The instance has a strange layout,"
5092                                  " expected one secondary but found %d" %
5093                                  len(instance.secondary_nodes))
5094
5095     self.sec_node = instance.secondary_nodes[0]
5096
5097     if self.op.iallocator is not None:
5098       self._RunAllocator()
5099
5100     remote_node = self.op.remote_node
5101     if remote_node is not None:
5102       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5103       assert self.remote_node_info is not None, \
5104         "Cannot retrieve locked node %s" % remote_node
5105     else:
5106       self.remote_node_info = None
5107     if remote_node == instance.primary_node:
5108       raise errors.OpPrereqError("The specified node is the primary node of"
5109                                  " the instance.")
5110     elif remote_node == self.sec_node:
5111       raise errors.OpPrereqError("The specified node is already the"
5112                                  " secondary node of the instance.")
5113
5114     if self.op.mode == constants.REPLACE_DISK_PRI:
5115       n1 = self.tgt_node = instance.primary_node
5116       n2 = self.oth_node = self.sec_node
5117     elif self.op.mode == constants.REPLACE_DISK_SEC:
5118       n1 = self.tgt_node = self.sec_node
5119       n2 = self.oth_node = instance.primary_node
5120     elif self.op.mode == constants.REPLACE_DISK_CHG:
5121       n1 = self.new_node = remote_node
5122       n2 = self.oth_node = instance.primary_node
5123       self.tgt_node = self.sec_node
5124       _CheckNodeNotDrained(self, remote_node)
5125     else:
5126       raise errors.ProgrammerError("Unhandled disk replace mode")
5127
5128     _CheckNodeOnline(self, n1)
5129     _CheckNodeOnline(self, n2)
5130
5131     if not self.op.disks:
5132       self.op.disks = range(len(instance.disks))
5133
5134     for disk_idx in self.op.disks:
5135       instance.FindDisk(disk_idx)
5136
5137   def _ExecD8DiskOnly(self, feedback_fn):
5138     """Replace a disk on the primary or secondary for dbrd8.
5139
5140     The algorithm for replace is quite complicated:
5141
5142       1. for each disk to be replaced:
5143
5144         1. create new LVs on the target node with unique names
5145         1. detach old LVs from the drbd device
5146         1. rename old LVs to name_replaced.<time_t>
5147         1. rename new LVs to old LVs
5148         1. attach the new LVs (with the old names now) to the drbd device
5149
5150       1. wait for sync across all devices
5151
5152       1. for each modified disk:
5153
5154         1. remove old LVs (which have the name name_replaces.<time_t>)
5155
5156     Failures are not very well handled.
5157
5158     """
5159     steps_total = 6
5160     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5161     instance = self.instance
5162     iv_names = {}
5163     vgname = self.cfg.GetVGName()
5164     # start of work
5165     cfg = self.cfg
5166     tgt_node = self.tgt_node
5167     oth_node = self.oth_node
5168
5169     # Step: check device activation
5170     self.proc.LogStep(1, steps_total, "check device existence")
5171     info("checking volume groups")
5172     my_vg = cfg.GetVGName()
5173     results = self.rpc.call_vg_list([oth_node, tgt_node])
5174     if not results:
5175       raise errors.OpExecError("Can't list volume groups on the nodes")
5176     for node in oth_node, tgt_node:
5177       res = results[node]
5178       if res.failed or not res.data or my_vg not in res.data:
5179         raise errors.OpExecError("Volume group '%s' not found on %s" %
5180                                  (my_vg, node))
5181     for idx, dev in enumerate(instance.disks):
5182       if idx not in self.op.disks:
5183         continue
5184       for node in tgt_node, oth_node:
5185         info("checking disk/%d on %s" % (idx, node))
5186         cfg.SetDiskID(dev, node)
5187         result = self.rpc.call_blockdev_find(node, dev)
5188         msg = result.RemoteFailMsg()
5189         if not msg and not result.payload:
5190           msg = "disk not found"
5191         if msg:
5192           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5193                                    (idx, node, msg))
5194
5195     # Step: check other node consistency
5196     self.proc.LogStep(2, steps_total, "check peer consistency")
5197     for idx, dev in enumerate(instance.disks):
5198       if idx not in self.op.disks:
5199         continue
5200       info("checking disk/%d consistency on %s" % (idx, oth_node))
5201       if not _CheckDiskConsistency(self, dev, oth_node,
5202                                    oth_node==instance.primary_node):
5203         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5204                                  " to replace disks on this node (%s)" %
5205                                  (oth_node, tgt_node))
5206
5207     # Step: create new storage
5208     self.proc.LogStep(3, steps_total, "allocate new storage")
5209     for idx, dev in enumerate(instance.disks):
5210       if idx not in self.op.disks:
5211         continue
5212       size = dev.size
5213       cfg.SetDiskID(dev, tgt_node)
5214       lv_names = [".disk%d_%s" % (idx, suf)
5215                   for suf in ["data", "meta"]]
5216       names = _GenerateUniqueNames(self, lv_names)
5217       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5218                              logical_id=(vgname, names[0]))
5219       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5220                              logical_id=(vgname, names[1]))
5221       new_lvs = [lv_data, lv_meta]
5222       old_lvs = dev.children
5223       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5224       info("creating new local storage on %s for %s" %
5225            (tgt_node, dev.iv_name))
5226       # we pass force_create=True to force the LVM creation
5227       for new_lv in new_lvs:
5228         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5229                         _GetInstanceInfoText(instance), False)
5230
5231     # Step: for each lv, detach+rename*2+attach
5232     self.proc.LogStep(4, steps_total, "change drbd configuration")
5233     for dev, old_lvs, new_lvs in iv_names.itervalues():
5234       info("detaching %s drbd from local storage" % dev.iv_name)
5235       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5236       msg = result.RemoteFailMsg()
5237       if msg:
5238         raise errors.OpExecError("Can't detach drbd from local storage on node"
5239                                  " %s for device %s: %s" %
5240                                  (tgt_node, dev.iv_name, msg))
5241       #dev.children = []
5242       #cfg.Update(instance)
5243
5244       # ok, we created the new LVs, so now we know we have the needed
5245       # storage; as such, we proceed on the target node to rename
5246       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5247       # using the assumption that logical_id == physical_id (which in
5248       # turn is the unique_id on that node)
5249
5250       # FIXME(iustin): use a better name for the replaced LVs
5251       temp_suffix = int(time.time())
5252       ren_fn = lambda d, suff: (d.physical_id[0],
5253                                 d.physical_id[1] + "_replaced-%s" % suff)
5254       # build the rename list based on what LVs exist on the node
5255       rlist = []
5256       for to_ren in old_lvs:
5257         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5258         if not result.RemoteFailMsg() and result.payload:
5259           # device exists
5260           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5261
5262       info("renaming the old LVs on the target node")
5263       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5264       msg = result.RemoteFailMsg()
5265       if msg:
5266         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5267                                  (tgt_node, msg))
5268       # now we rename the new LVs to the old LVs
5269       info("renaming the new LVs on the target node")
5270       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5271       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5272       msg = result.RemoteFailMsg()
5273       if msg:
5274         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5275                                  (tgt_node, msg))
5276
5277       for old, new in zip(old_lvs, new_lvs):
5278         new.logical_id = old.logical_id
5279         cfg.SetDiskID(new, tgt_node)
5280
5281       for disk in old_lvs:
5282         disk.logical_id = ren_fn(disk, temp_suffix)
5283         cfg.SetDiskID(disk, tgt_node)
5284
5285       # now that the new lvs have the old name, we can add them to the device
5286       info("adding new mirror component on %s" % tgt_node)
5287       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5288       msg = result.RemoteFailMsg()
5289       if msg:
5290         for new_lv in new_lvs:
5291           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5292           if msg:
5293             warning("Can't rollback device %s: %s", dev, msg,
5294                     hint="cleanup manually the unused logical volumes")
5295         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5296
5297       dev.children = new_lvs
5298       cfg.Update(instance)
5299
5300     # Step: wait for sync
5301
5302     # this can fail as the old devices are degraded and _WaitForSync
5303     # does a combined result over all disks, so we don't check its
5304     # return value
5305     self.proc.LogStep(5, steps_total, "sync devices")
5306     _WaitForSync(self, instance, unlock=True)
5307
5308     # so check manually all the devices
5309     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5310       cfg.SetDiskID(dev, instance.primary_node)
5311       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5312       msg = result.RemoteFailMsg()
5313       if not msg and not result.payload:
5314         msg = "disk not found"
5315       if msg:
5316         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5317                                  (name, msg))
5318       if result.payload[5]:
5319         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5320
5321     # Step: remove old storage
5322     self.proc.LogStep(6, steps_total, "removing old storage")
5323     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5324       info("remove logical volumes for %s" % name)
5325       for lv in old_lvs:
5326         cfg.SetDiskID(lv, tgt_node)
5327         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5328         if msg:
5329           warning("Can't remove old LV: %s" % msg,
5330                   hint="manually remove unused LVs")
5331           continue
5332
5333   def _ExecD8Secondary(self, feedback_fn):
5334     """Replace the secondary node for drbd8.
5335
5336     The algorithm for replace is quite complicated:
5337       - for all disks of the instance:
5338         - create new LVs on the new node with same names
5339         - shutdown the drbd device on the old secondary
5340         - disconnect the drbd network on the primary
5341         - create the drbd device on the new secondary
5342         - network attach the drbd on the primary, using an artifice:
5343           the drbd code for Attach() will connect to the network if it
5344           finds a device which is connected to the good local disks but
5345           not network enabled
5346       - wait for sync across all devices
5347       - remove all disks from the old secondary
5348
5349     Failures are not very well handled.
5350
5351     """
5352     steps_total = 6
5353     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5354     instance = self.instance
5355     iv_names = {}
5356     # start of work
5357     cfg = self.cfg
5358     old_node = self.tgt_node
5359     new_node = self.new_node
5360     pri_node = instance.primary_node
5361     nodes_ip = {
5362       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5363       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5364       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5365       }
5366
5367     # Step: check device activation
5368     self.proc.LogStep(1, steps_total, "check device existence")
5369     info("checking volume groups")
5370     my_vg = cfg.GetVGName()
5371     results = self.rpc.call_vg_list([pri_node, new_node])
5372     for node in pri_node, new_node:
5373       res = results[node]
5374       if res.failed or not res.data or my_vg not in res.data:
5375         raise errors.OpExecError("Volume group '%s' not found on %s" %
5376                                  (my_vg, node))
5377     for idx, dev in enumerate(instance.disks):
5378       if idx not in self.op.disks:
5379         continue
5380       info("checking disk/%d on %s" % (idx, pri_node))
5381       cfg.SetDiskID(dev, pri_node)
5382       result = self.rpc.call_blockdev_find(pri_node, dev)
5383       msg = result.RemoteFailMsg()
5384       if not msg and not result.payload:
5385         msg = "disk not found"
5386       if msg:
5387         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5388                                  (idx, pri_node, msg))
5389
5390     # Step: check other node consistency
5391     self.proc.LogStep(2, steps_total, "check peer consistency")
5392     for idx, dev in enumerate(instance.disks):
5393       if idx not in self.op.disks:
5394         continue
5395       info("checking disk/%d consistency on %s" % (idx, pri_node))
5396       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5397         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5398                                  " unsafe to replace the secondary" %
5399                                  pri_node)
5400
5401     # Step: create new storage
5402     self.proc.LogStep(3, steps_total, "allocate new storage")
5403     for idx, dev in enumerate(instance.disks):
5404       info("adding new local storage on %s for disk/%d" %
5405            (new_node, idx))
5406       # we pass force_create=True to force LVM creation
5407       for new_lv in dev.children:
5408         _CreateBlockDev(self, new_node, instance, new_lv, True,
5409                         _GetInstanceInfoText(instance), False)
5410
5411     # Step 4: dbrd minors and drbd setups changes
5412     # after this, we must manually remove the drbd minors on both the
5413     # error and the success paths
5414     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5415                                    instance.name)
5416     logging.debug("Allocated minors %s" % (minors,))
5417     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5418     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5419       size = dev.size
5420       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5421       # create new devices on new_node; note that we create two IDs:
5422       # one without port, so the drbd will be activated without
5423       # networking information on the new node at this stage, and one
5424       # with network, for the latter activation in step 4
5425       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5426       if pri_node == o_node1:
5427         p_minor = o_minor1
5428       else:
5429         p_minor = o_minor2
5430
5431       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5432       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5433
5434       iv_names[idx] = (dev, dev.children, new_net_id)
5435       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5436                     new_net_id)
5437       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5438                               logical_id=new_alone_id,
5439                               children=dev.children)
5440       try:
5441         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5442                               _GetInstanceInfoText(instance), False)
5443       except errors.GenericError:
5444         self.cfg.ReleaseDRBDMinors(instance.name)
5445         raise
5446
5447     for idx, dev in enumerate(instance.disks):
5448       # we have new devices, shutdown the drbd on the old secondary
5449       info("shutting down drbd for disk/%d on old node" % idx)
5450       cfg.SetDiskID(dev, old_node)
5451       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5452       if msg:
5453         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5454                 (idx, msg),
5455                 hint="Please cleanup this device manually as soon as possible")
5456
5457     info("detaching primary drbds from the network (=> standalone)")
5458     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5459                                                instance.disks)[pri_node]
5460
5461     msg = result.RemoteFailMsg()
5462     if msg:
5463       # detaches didn't succeed (unlikely)
5464       self.cfg.ReleaseDRBDMinors(instance.name)
5465       raise errors.OpExecError("Can't detach the disks from the network on"
5466                                " old node: %s" % (msg,))
5467
5468     # if we managed to detach at least one, we update all the disks of
5469     # the instance to point to the new secondary
5470     info("updating instance configuration")
5471     for dev, _, new_logical_id in iv_names.itervalues():
5472       dev.logical_id = new_logical_id
5473       cfg.SetDiskID(dev, pri_node)
5474     cfg.Update(instance)
5475
5476     # and now perform the drbd attach
5477     info("attaching primary drbds to new secondary (standalone => connected)")
5478     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5479                                            instance.disks, instance.name,
5480                                            False)
5481     for to_node, to_result in result.items():
5482       msg = to_result.RemoteFailMsg()
5483       if msg:
5484         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5485                 hint="please do a gnt-instance info to see the"
5486                 " status of disks")
5487
5488     # this can fail as the old devices are degraded and _WaitForSync
5489     # does a combined result over all disks, so we don't check its
5490     # return value
5491     self.proc.LogStep(5, steps_total, "sync devices")
5492     _WaitForSync(self, instance, unlock=True)
5493
5494     # so check manually all the devices
5495     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5496       cfg.SetDiskID(dev, pri_node)
5497       result = self.rpc.call_blockdev_find(pri_node, dev)
5498       msg = result.RemoteFailMsg()
5499       if not msg and not result.payload:
5500         msg = "disk not found"
5501       if msg:
5502         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5503                                  (idx, msg))
5504       if result.payload[5]:
5505         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5506
5507     self.proc.LogStep(6, steps_total, "removing old storage")
5508     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5509       info("remove logical volumes for disk/%d" % idx)
5510       for lv in old_lvs:
5511         cfg.SetDiskID(lv, old_node)
5512         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5513         if msg:
5514           warning("Can't remove LV on old secondary: %s", msg,
5515                   hint="Cleanup stale volumes by hand")
5516
5517   def Exec(self, feedback_fn):
5518     """Execute disk replacement.
5519
5520     This dispatches the disk replacement to the appropriate handler.
5521
5522     """
5523     instance = self.instance
5524
5525     # Activate the instance disks if we're replacing them on a down instance
5526     if not instance.admin_up:
5527       _StartInstanceDisks(self, instance, True)
5528
5529     if self.op.mode == constants.REPLACE_DISK_CHG:
5530       fn = self._ExecD8Secondary
5531     else:
5532       fn = self._ExecD8DiskOnly
5533
5534     ret = fn(feedback_fn)
5535
5536     # Deactivate the instance disks if we're replacing them on a down instance
5537     if not instance.admin_up:
5538       _SafeShutdownInstanceDisks(self, instance)
5539
5540     return ret
5541
5542
5543 class LUGrowDisk(LogicalUnit):
5544   """Grow a disk of an instance.
5545
5546   """
5547   HPATH = "disk-grow"
5548   HTYPE = constants.HTYPE_INSTANCE
5549   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5550   REQ_BGL = False
5551
5552   def ExpandNames(self):
5553     self._ExpandAndLockInstance()
5554     self.needed_locks[locking.LEVEL_NODE] = []
5555     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5556
5557   def DeclareLocks(self, level):
5558     if level == locking.LEVEL_NODE:
5559       self._LockInstancesNodes()
5560
5561   def BuildHooksEnv(self):
5562     """Build hooks env.
5563
5564     This runs on the master, the primary and all the secondaries.
5565
5566     """
5567     env = {
5568       "DISK": self.op.disk,
5569       "AMOUNT": self.op.amount,
5570       }
5571     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5572     nl = [
5573       self.cfg.GetMasterNode(),
5574       self.instance.primary_node,
5575       ]
5576     return env, nl, nl
5577
5578   def CheckPrereq(self):
5579     """Check prerequisites.
5580
5581     This checks that the instance is in the cluster.
5582
5583     """
5584     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5585     assert instance is not None, \
5586       "Cannot retrieve locked instance %s" % self.op.instance_name
5587     nodenames = list(instance.all_nodes)
5588     for node in nodenames:
5589       _CheckNodeOnline(self, node)
5590
5591
5592     self.instance = instance
5593
5594     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5595       raise errors.OpPrereqError("Instance's disk layout does not support"
5596                                  " growing.")
5597
5598     self.disk = instance.FindDisk(self.op.disk)
5599
5600     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5601                                        instance.hypervisor)
5602     for node in nodenames:
5603       info = nodeinfo[node]
5604       if info.failed or not info.data:
5605         raise errors.OpPrereqError("Cannot get current information"
5606                                    " from node '%s'" % node)
5607       vg_free = info.data.get('vg_free', None)
5608       if not isinstance(vg_free, int):
5609         raise errors.OpPrereqError("Can't compute free disk space on"
5610                                    " node %s" % node)
5611       if self.op.amount > vg_free:
5612         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5613                                    " %d MiB available, %d MiB required" %
5614                                    (node, vg_free, self.op.amount))
5615
5616   def Exec(self, feedback_fn):
5617     """Execute disk grow.
5618
5619     """
5620     instance = self.instance
5621     disk = self.disk
5622     for node in instance.all_nodes:
5623       self.cfg.SetDiskID(disk, node)
5624       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5625       msg = result.RemoteFailMsg()
5626       if msg:
5627         raise errors.OpExecError("Grow request failed to node %s: %s" %
5628                                  (node, msg))
5629     disk.RecordGrow(self.op.amount)
5630     self.cfg.Update(instance)
5631     if self.op.wait_for_sync:
5632       disk_abort = not _WaitForSync(self, instance)
5633       if disk_abort:
5634         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5635                              " status.\nPlease check the instance.")
5636
5637
5638 class LUQueryInstanceData(NoHooksLU):
5639   """Query runtime instance data.
5640
5641   """
5642   _OP_REQP = ["instances", "static"]
5643   REQ_BGL = False
5644
5645   def ExpandNames(self):
5646     self.needed_locks = {}
5647     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5648
5649     if not isinstance(self.op.instances, list):
5650       raise errors.OpPrereqError("Invalid argument type 'instances'")
5651
5652     if self.op.instances:
5653       self.wanted_names = []
5654       for name in self.op.instances:
5655         full_name = self.cfg.ExpandInstanceName(name)
5656         if full_name is None:
5657           raise errors.OpPrereqError("Instance '%s' not known" % name)
5658         self.wanted_names.append(full_name)
5659       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5660     else:
5661       self.wanted_names = None
5662       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5663
5664     self.needed_locks[locking.LEVEL_NODE] = []
5665     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5666
5667   def DeclareLocks(self, level):
5668     if level == locking.LEVEL_NODE:
5669       self._LockInstancesNodes()
5670
5671   def CheckPrereq(self):
5672     """Check prerequisites.
5673
5674     This only checks the optional instance list against the existing names.
5675
5676     """
5677     if self.wanted_names is None:
5678       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5679
5680     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5681                              in self.wanted_names]
5682     return
5683
5684   def _ComputeDiskStatus(self, instance, snode, dev):
5685     """Compute block device status.
5686
5687     """
5688     static = self.op.static
5689     if not static:
5690       self.cfg.SetDiskID(dev, instance.primary_node)
5691       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5692       if dev_pstatus.offline:
5693         dev_pstatus = None
5694       else:
5695         msg = dev_pstatus.RemoteFailMsg()
5696         if msg:
5697           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5698                                    (instance.name, msg))
5699         dev_pstatus = dev_pstatus.payload
5700     else:
5701       dev_pstatus = None
5702
5703     if dev.dev_type in constants.LDS_DRBD:
5704       # we change the snode then (otherwise we use the one passed in)
5705       if dev.logical_id[0] == instance.primary_node:
5706         snode = dev.logical_id[1]
5707       else:
5708         snode = dev.logical_id[0]
5709
5710     if snode and not static:
5711       self.cfg.SetDiskID(dev, snode)
5712       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5713       if dev_sstatus.offline:
5714         dev_sstatus = None
5715       else:
5716         msg = dev_sstatus.RemoteFailMsg()
5717         if msg:
5718           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5719                                    (instance.name, msg))
5720         dev_sstatus = dev_sstatus.payload
5721     else:
5722       dev_sstatus = None
5723
5724     if dev.children:
5725       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5726                       for child in dev.children]
5727     else:
5728       dev_children = []
5729
5730     data = {
5731       "iv_name": dev.iv_name,
5732       "dev_type": dev.dev_type,
5733       "logical_id": dev.logical_id,
5734       "physical_id": dev.physical_id,
5735       "pstatus": dev_pstatus,
5736       "sstatus": dev_sstatus,
5737       "children": dev_children,
5738       "mode": dev.mode,
5739       }
5740
5741     return data
5742
5743   def Exec(self, feedback_fn):
5744     """Gather and return data"""
5745     result = {}
5746
5747     cluster = self.cfg.GetClusterInfo()
5748
5749     for instance in self.wanted_instances:
5750       if not self.op.static:
5751         remote_info = self.rpc.call_instance_info(instance.primary_node,
5752                                                   instance.name,
5753                                                   instance.hypervisor)
5754         remote_info.Raise()
5755         remote_info = remote_info.data
5756         if remote_info and "state" in remote_info:
5757           remote_state = "up"
5758         else:
5759           remote_state = "down"
5760       else:
5761         remote_state = None
5762       if instance.admin_up:
5763         config_state = "up"
5764       else:
5765         config_state = "down"
5766
5767       disks = [self._ComputeDiskStatus(instance, None, device)
5768                for device in instance.disks]
5769
5770       idict = {
5771         "name": instance.name,
5772         "config_state": config_state,
5773         "run_state": remote_state,
5774         "pnode": instance.primary_node,
5775         "snodes": instance.secondary_nodes,
5776         "os": instance.os,
5777         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5778         "disks": disks,
5779         "hypervisor": instance.hypervisor,
5780         "network_port": instance.network_port,
5781         "hv_instance": instance.hvparams,
5782         "hv_actual": cluster.FillHV(instance),
5783         "be_instance": instance.beparams,
5784         "be_actual": cluster.FillBE(instance),
5785         }
5786
5787       result[instance.name] = idict
5788
5789     return result
5790
5791
5792 class LUSetInstanceParams(LogicalUnit):
5793   """Modifies an instances's parameters.
5794
5795   """
5796   HPATH = "instance-modify"
5797   HTYPE = constants.HTYPE_INSTANCE
5798   _OP_REQP = ["instance_name"]
5799   REQ_BGL = False
5800
5801   def CheckArguments(self):
5802     if not hasattr(self.op, 'nics'):
5803       self.op.nics = []
5804     if not hasattr(self.op, 'disks'):
5805       self.op.disks = []
5806     if not hasattr(self.op, 'beparams'):
5807       self.op.beparams = {}
5808     if not hasattr(self.op, 'hvparams'):
5809       self.op.hvparams = {}
5810     self.op.force = getattr(self.op, "force", False)
5811     if not (self.op.nics or self.op.disks or
5812             self.op.hvparams or self.op.beparams):
5813       raise errors.OpPrereqError("No changes submitted")
5814
5815     # Disk validation
5816     disk_addremove = 0
5817     for disk_op, disk_dict in self.op.disks:
5818       if disk_op == constants.DDM_REMOVE:
5819         disk_addremove += 1
5820         continue
5821       elif disk_op == constants.DDM_ADD:
5822         disk_addremove += 1
5823       else:
5824         if not isinstance(disk_op, int):
5825           raise errors.OpPrereqError("Invalid disk index")
5826       if disk_op == constants.DDM_ADD:
5827         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5828         if mode not in constants.DISK_ACCESS_SET:
5829           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5830         size = disk_dict.get('size', None)
5831         if size is None:
5832           raise errors.OpPrereqError("Required disk parameter size missing")
5833         try:
5834           size = int(size)
5835         except ValueError, err:
5836           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5837                                      str(err))
5838         disk_dict['size'] = size
5839       else:
5840         # modification of disk
5841         if 'size' in disk_dict:
5842           raise errors.OpPrereqError("Disk size change not possible, use"
5843                                      " grow-disk")
5844
5845     if disk_addremove > 1:
5846       raise errors.OpPrereqError("Only one disk add or remove operation"
5847                                  " supported at a time")
5848
5849     # NIC validation
5850     nic_addremove = 0
5851     for nic_op, nic_dict in self.op.nics:
5852       if nic_op == constants.DDM_REMOVE:
5853         nic_addremove += 1
5854         continue
5855       elif nic_op == constants.DDM_ADD:
5856         nic_addremove += 1
5857       else:
5858         if not isinstance(nic_op, int):
5859           raise errors.OpPrereqError("Invalid nic index")
5860
5861       # nic_dict should be a dict
5862       nic_ip = nic_dict.get('ip', None)
5863       if nic_ip is not None:
5864         if nic_ip.lower() == constants.VALUE_NONE:
5865           nic_dict['ip'] = None
5866         else:
5867           if not utils.IsValidIP(nic_ip):
5868             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5869
5870       if nic_op == constants.DDM_ADD:
5871         nic_bridge = nic_dict.get('bridge', None)
5872         if nic_bridge is None:
5873           nic_dict['bridge'] = self.cfg.GetDefBridge()
5874         nic_mac = nic_dict.get('mac', None)
5875         if nic_mac is None:
5876           nic_dict['mac'] = constants.VALUE_AUTO
5877
5878       if 'mac' in nic_dict:
5879         nic_mac = nic_dict['mac']
5880         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5881           if not utils.IsValidMac(nic_mac):
5882             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5883         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5884           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5885                                      " modifying an existing nic")
5886
5887     if nic_addremove > 1:
5888       raise errors.OpPrereqError("Only one NIC add or remove operation"
5889                                  " supported at a time")
5890
5891   def ExpandNames(self):
5892     self._ExpandAndLockInstance()
5893     self.needed_locks[locking.LEVEL_NODE] = []
5894     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5895
5896   def DeclareLocks(self, level):
5897     if level == locking.LEVEL_NODE:
5898       self._LockInstancesNodes()
5899
5900   def BuildHooksEnv(self):
5901     """Build hooks env.
5902
5903     This runs on the master, primary and secondaries.
5904
5905     """
5906     args = dict()
5907     if constants.BE_MEMORY in self.be_new:
5908       args['memory'] = self.be_new[constants.BE_MEMORY]
5909     if constants.BE_VCPUS in self.be_new:
5910       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5911     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5912     # information at all.
5913     if self.op.nics:
5914       args['nics'] = []
5915       nic_override = dict(self.op.nics)
5916       for idx, nic in enumerate(self.instance.nics):
5917         if idx in nic_override:
5918           this_nic_override = nic_override[idx]
5919         else:
5920           this_nic_override = {}
5921         if 'ip' in this_nic_override:
5922           ip = this_nic_override['ip']
5923         else:
5924           ip = nic.ip
5925         if 'bridge' in this_nic_override:
5926           bridge = this_nic_override['bridge']
5927         else:
5928           bridge = nic.bridge
5929         if 'mac' in this_nic_override:
5930           mac = this_nic_override['mac']
5931         else:
5932           mac = nic.mac
5933         args['nics'].append((ip, bridge, mac))
5934       if constants.DDM_ADD in nic_override:
5935         ip = nic_override[constants.DDM_ADD].get('ip', None)
5936         bridge = nic_override[constants.DDM_ADD]['bridge']
5937         mac = nic_override[constants.DDM_ADD]['mac']
5938         args['nics'].append((ip, bridge, mac))
5939       elif constants.DDM_REMOVE in nic_override:
5940         del args['nics'][-1]
5941
5942     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5943     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5944     return env, nl, nl
5945
5946   def CheckPrereq(self):
5947     """Check prerequisites.
5948
5949     This only checks the instance list against the existing names.
5950
5951     """
5952     force = self.force = self.op.force
5953
5954     # checking the new params on the primary/secondary nodes
5955
5956     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5957     assert self.instance is not None, \
5958       "Cannot retrieve locked instance %s" % self.op.instance_name
5959     pnode = instance.primary_node
5960     nodelist = list(instance.all_nodes)
5961
5962     # hvparams processing
5963     if self.op.hvparams:
5964       i_hvdict = copy.deepcopy(instance.hvparams)
5965       for key, val in self.op.hvparams.iteritems():
5966         if val == constants.VALUE_DEFAULT:
5967           try:
5968             del i_hvdict[key]
5969           except KeyError:
5970             pass
5971         else:
5972           i_hvdict[key] = val
5973       cluster = self.cfg.GetClusterInfo()
5974       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5975       hv_new = objects.FillDict(cluster.hvparams[instance.hypervisor],
5976                                 i_hvdict)
5977       # local check
5978       hypervisor.GetHypervisor(
5979         instance.hypervisor).CheckParameterSyntax(hv_new)
5980       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5981       self.hv_new = hv_new # the new actual values
5982       self.hv_inst = i_hvdict # the new dict (without defaults)
5983     else:
5984       self.hv_new = self.hv_inst = {}
5985
5986     # beparams processing
5987     if self.op.beparams:
5988       i_bedict = copy.deepcopy(instance.beparams)
5989       for key, val in self.op.beparams.iteritems():
5990         if val == constants.VALUE_DEFAULT:
5991           try:
5992             del i_bedict[key]
5993           except KeyError:
5994             pass
5995         else:
5996           i_bedict[key] = val
5997       cluster = self.cfg.GetClusterInfo()
5998       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5999       be_new = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
6000                                 i_bedict)
6001       self.be_new = be_new # the new actual values
6002       self.be_inst = i_bedict # the new dict (without defaults)
6003     else:
6004       self.be_new = self.be_inst = {}
6005
6006     self.warn = []
6007
6008     if constants.BE_MEMORY in self.op.beparams and not self.force:
6009       mem_check_list = [pnode]
6010       if be_new[constants.BE_AUTO_BALANCE]:
6011         # either we changed auto_balance to yes or it was from before
6012         mem_check_list.extend(instance.secondary_nodes)
6013       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6014                                                   instance.hypervisor)
6015       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6016                                          instance.hypervisor)
6017       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6018         # Assume the primary node is unreachable and go ahead
6019         self.warn.append("Can't get info from primary node %s" % pnode)
6020       else:
6021         if not instance_info.failed and instance_info.data:
6022           current_mem = int(instance_info.data['memory'])
6023         else:
6024           # Assume instance not running
6025           # (there is a slight race condition here, but it's not very probable,
6026           # and we have no other way to check)
6027           current_mem = 0
6028         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6029                     nodeinfo[pnode].data['memory_free'])
6030         if miss_mem > 0:
6031           raise errors.OpPrereqError("This change will prevent the instance"
6032                                      " from starting, due to %d MB of memory"
6033                                      " missing on its primary node" % miss_mem)
6034
6035       if be_new[constants.BE_AUTO_BALANCE]:
6036         for node, nres in nodeinfo.iteritems():
6037           if node not in instance.secondary_nodes:
6038             continue
6039           if nres.failed or not isinstance(nres.data, dict):
6040             self.warn.append("Can't get info from secondary node %s" % node)
6041           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6042             self.warn.append("Not enough memory to failover instance to"
6043                              " secondary node %s" % node)
6044
6045     # NIC processing
6046     for nic_op, nic_dict in self.op.nics:
6047       if nic_op == constants.DDM_REMOVE:
6048         if not instance.nics:
6049           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6050         continue
6051       if nic_op != constants.DDM_ADD:
6052         # an existing nic
6053         if nic_op < 0 or nic_op >= len(instance.nics):
6054           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6055                                      " are 0 to %d" %
6056                                      (nic_op, len(instance.nics)))
6057       if 'bridge' in nic_dict:
6058         nic_bridge = nic_dict['bridge']
6059         if nic_bridge is None:
6060           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6061         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6062           msg = ("Bridge '%s' doesn't exist on one of"
6063                  " the instance nodes" % nic_bridge)
6064           if self.force:
6065             self.warn.append(msg)
6066           else:
6067             raise errors.OpPrereqError(msg)
6068       if 'mac' in nic_dict:
6069         nic_mac = nic_dict['mac']
6070         if nic_mac is None:
6071           raise errors.OpPrereqError('Cannot set the nic mac to None')
6072         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6073           # otherwise generate the mac
6074           nic_dict['mac'] = self.cfg.GenerateMAC()
6075         else:
6076           # or validate/reserve the current one
6077           if self.cfg.IsMacInUse(nic_mac):
6078             raise errors.OpPrereqError("MAC address %s already in use"
6079                                        " in cluster" % nic_mac)
6080
6081     # DISK processing
6082     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6083       raise errors.OpPrereqError("Disk operations not supported for"
6084                                  " diskless instances")
6085     for disk_op, disk_dict in self.op.disks:
6086       if disk_op == constants.DDM_REMOVE:
6087         if len(instance.disks) == 1:
6088           raise errors.OpPrereqError("Cannot remove the last disk of"
6089                                      " an instance")
6090         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6091         ins_l = ins_l[pnode]
6092         if ins_l.failed or not isinstance(ins_l.data, list):
6093           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6094         if instance.name in ins_l.data:
6095           raise errors.OpPrereqError("Instance is running, can't remove"
6096                                      " disks.")
6097
6098       if (disk_op == constants.DDM_ADD and
6099           len(instance.nics) >= constants.MAX_DISKS):
6100         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6101                                    " add more" % constants.MAX_DISKS)
6102       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6103         # an existing disk
6104         if disk_op < 0 or disk_op >= len(instance.disks):
6105           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6106                                      " are 0 to %d" %
6107                                      (disk_op, len(instance.disks)))
6108
6109     return
6110
6111   def Exec(self, feedback_fn):
6112     """Modifies an instance.
6113
6114     All parameters take effect only at the next restart of the instance.
6115
6116     """
6117     # Process here the warnings from CheckPrereq, as we don't have a
6118     # feedback_fn there.
6119     for warn in self.warn:
6120       feedback_fn("WARNING: %s" % warn)
6121
6122     result = []
6123     instance = self.instance
6124     # disk changes
6125     for disk_op, disk_dict in self.op.disks:
6126       if disk_op == constants.DDM_REMOVE:
6127         # remove the last disk
6128         device = instance.disks.pop()
6129         device_idx = len(instance.disks)
6130         for node, disk in device.ComputeNodeTree(instance.primary_node):
6131           self.cfg.SetDiskID(disk, node)
6132           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6133           if msg:
6134             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6135                             " continuing anyway", device_idx, node, msg)
6136         result.append(("disk/%d" % device_idx, "remove"))
6137       elif disk_op == constants.DDM_ADD:
6138         # add a new disk
6139         if instance.disk_template == constants.DT_FILE:
6140           file_driver, file_path = instance.disks[0].logical_id
6141           file_path = os.path.dirname(file_path)
6142         else:
6143           file_driver = file_path = None
6144         disk_idx_base = len(instance.disks)
6145         new_disk = _GenerateDiskTemplate(self,
6146                                          instance.disk_template,
6147                                          instance.name, instance.primary_node,
6148                                          instance.secondary_nodes,
6149                                          [disk_dict],
6150                                          file_path,
6151                                          file_driver,
6152                                          disk_idx_base)[0]
6153         instance.disks.append(new_disk)
6154         info = _GetInstanceInfoText(instance)
6155
6156         logging.info("Creating volume %s for instance %s",
6157                      new_disk.iv_name, instance.name)
6158         # Note: this needs to be kept in sync with _CreateDisks
6159         #HARDCODE
6160         for node in instance.all_nodes:
6161           f_create = node == instance.primary_node
6162           try:
6163             _CreateBlockDev(self, node, instance, new_disk,
6164                             f_create, info, f_create)
6165           except errors.OpExecError, err:
6166             self.LogWarning("Failed to create volume %s (%s) on"
6167                             " node %s: %s",
6168                             new_disk.iv_name, new_disk, node, err)
6169         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6170                        (new_disk.size, new_disk.mode)))
6171       else:
6172         # change a given disk
6173         instance.disks[disk_op].mode = disk_dict['mode']
6174         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6175     # NIC changes
6176     for nic_op, nic_dict in self.op.nics:
6177       if nic_op == constants.DDM_REMOVE:
6178         # remove the last nic
6179         del instance.nics[-1]
6180         result.append(("nic.%d" % len(instance.nics), "remove"))
6181       elif nic_op == constants.DDM_ADD:
6182         # mac and bridge should be set, by now
6183         mac = nic_dict['mac']
6184         bridge = nic_dict['bridge']
6185         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6186                               bridge=bridge)
6187         instance.nics.append(new_nic)
6188         result.append(("nic.%d" % (len(instance.nics) - 1),
6189                        "add:mac=%s,ip=%s,bridge=%s" %
6190                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6191       else:
6192         # change a given nic
6193         for key in 'mac', 'ip', 'bridge':
6194           if key in nic_dict:
6195             setattr(instance.nics[nic_op], key, nic_dict[key])
6196             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6197
6198     # hvparams changes
6199     if self.op.hvparams:
6200       instance.hvparams = self.hv_inst
6201       for key, val in self.op.hvparams.iteritems():
6202         result.append(("hv/%s" % key, val))
6203
6204     # beparams changes
6205     if self.op.beparams:
6206       instance.beparams = self.be_inst
6207       for key, val in self.op.beparams.iteritems():
6208         result.append(("be/%s" % key, val))
6209
6210     self.cfg.Update(instance)
6211
6212     return result
6213
6214
6215 class LUQueryExports(NoHooksLU):
6216   """Query the exports list
6217
6218   """
6219   _OP_REQP = ['nodes']
6220   REQ_BGL = False
6221
6222   def ExpandNames(self):
6223     self.needed_locks = {}
6224     self.share_locks[locking.LEVEL_NODE] = 1
6225     if not self.op.nodes:
6226       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6227     else:
6228       self.needed_locks[locking.LEVEL_NODE] = \
6229         _GetWantedNodes(self, self.op.nodes)
6230
6231   def CheckPrereq(self):
6232     """Check prerequisites.
6233
6234     """
6235     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6236
6237   def Exec(self, feedback_fn):
6238     """Compute the list of all the exported system images.
6239
6240     @rtype: dict
6241     @return: a dictionary with the structure node->(export-list)
6242         where export-list is a list of the instances exported on
6243         that node.
6244
6245     """
6246     rpcresult = self.rpc.call_export_list(self.nodes)
6247     result = {}
6248     for node in rpcresult:
6249       if rpcresult[node].failed:
6250         result[node] = False
6251       else:
6252         result[node] = rpcresult[node].data
6253
6254     return result
6255
6256
6257 class LUExportInstance(LogicalUnit):
6258   """Export an instance to an image in the cluster.
6259
6260   """
6261   HPATH = "instance-export"
6262   HTYPE = constants.HTYPE_INSTANCE
6263   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6264   REQ_BGL = False
6265
6266   def ExpandNames(self):
6267     self._ExpandAndLockInstance()
6268     # FIXME: lock only instance primary and destination node
6269     #
6270     # Sad but true, for now we have do lock all nodes, as we don't know where
6271     # the previous export might be, and and in this LU we search for it and
6272     # remove it from its current node. In the future we could fix this by:
6273     #  - making a tasklet to search (share-lock all), then create the new one,
6274     #    then one to remove, after
6275     #  - removing the removal operation altoghether
6276     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6277
6278   def DeclareLocks(self, level):
6279     """Last minute lock declaration."""
6280     # All nodes are locked anyway, so nothing to do here.
6281
6282   def BuildHooksEnv(self):
6283     """Build hooks env.
6284
6285     This will run on the master, primary node and target node.
6286
6287     """
6288     env = {
6289       "EXPORT_NODE": self.op.target_node,
6290       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6291       }
6292     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6293     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6294           self.op.target_node]
6295     return env, nl, nl
6296
6297   def CheckPrereq(self):
6298     """Check prerequisites.
6299
6300     This checks that the instance and node names are valid.
6301
6302     """
6303     instance_name = self.op.instance_name
6304     self.instance = self.cfg.GetInstanceInfo(instance_name)
6305     assert self.instance is not None, \
6306           "Cannot retrieve locked instance %s" % self.op.instance_name
6307     _CheckNodeOnline(self, self.instance.primary_node)
6308
6309     self.dst_node = self.cfg.GetNodeInfo(
6310       self.cfg.ExpandNodeName(self.op.target_node))
6311
6312     if self.dst_node is None:
6313       # This is wrong node name, not a non-locked node
6314       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6315     _CheckNodeOnline(self, self.dst_node.name)
6316     _CheckNodeNotDrained(self, self.dst_node.name)
6317
6318     # instance disk type verification
6319     for disk in self.instance.disks:
6320       if disk.dev_type == constants.LD_FILE:
6321         raise errors.OpPrereqError("Export not supported for instances with"
6322                                    " file-based disks")
6323
6324   def Exec(self, feedback_fn):
6325     """Export an instance to an image in the cluster.
6326
6327     """
6328     instance = self.instance
6329     dst_node = self.dst_node
6330     src_node = instance.primary_node
6331     if self.op.shutdown:
6332       # shutdown the instance, but not the disks
6333       result = self.rpc.call_instance_shutdown(src_node, instance)
6334       msg = result.RemoteFailMsg()
6335       if msg:
6336         raise errors.OpExecError("Could not shutdown instance %s on"
6337                                  " node %s: %s" %
6338                                  (instance.name, src_node, msg))
6339
6340     vgname = self.cfg.GetVGName()
6341
6342     snap_disks = []
6343
6344     # set the disks ID correctly since call_instance_start needs the
6345     # correct drbd minor to create the symlinks
6346     for disk in instance.disks:
6347       self.cfg.SetDiskID(disk, src_node)
6348
6349     try:
6350       for disk in instance.disks:
6351         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6352         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6353         if new_dev_name.failed or not new_dev_name.data:
6354           self.LogWarning("Could not snapshot block device %s on node %s",
6355                           disk.logical_id[1], src_node)
6356           snap_disks.append(False)
6357         else:
6358           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6359                                  logical_id=(vgname, new_dev_name.data),
6360                                  physical_id=(vgname, new_dev_name.data),
6361                                  iv_name=disk.iv_name)
6362           snap_disks.append(new_dev)
6363
6364     finally:
6365       if self.op.shutdown and instance.admin_up:
6366         result = self.rpc.call_instance_start(src_node, instance, None, None)
6367         msg = result.RemoteFailMsg()
6368         if msg:
6369           _ShutdownInstanceDisks(self, instance)
6370           raise errors.OpExecError("Could not start instance: %s" % msg)
6371
6372     # TODO: check for size
6373
6374     cluster_name = self.cfg.GetClusterName()
6375     for idx, dev in enumerate(snap_disks):
6376       if dev:
6377         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6378                                                instance, cluster_name, idx)
6379         if result.failed or not result.data:
6380           self.LogWarning("Could not export block device %s from node %s to"
6381                           " node %s", dev.logical_id[1], src_node,
6382                           dst_node.name)
6383         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6384         if msg:
6385           self.LogWarning("Could not remove snapshot block device %s from node"
6386                           " %s: %s", dev.logical_id[1], src_node, msg)
6387
6388     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6389     if result.failed or not result.data:
6390       self.LogWarning("Could not finalize export for instance %s on node %s",
6391                       instance.name, dst_node.name)
6392
6393     nodelist = self.cfg.GetNodeList()
6394     nodelist.remove(dst_node.name)
6395
6396     # on one-node clusters nodelist will be empty after the removal
6397     # if we proceed the backup would be removed because OpQueryExports
6398     # substitutes an empty list with the full cluster node list.
6399     if nodelist:
6400       exportlist = self.rpc.call_export_list(nodelist)
6401       for node in exportlist:
6402         if exportlist[node].failed:
6403           continue
6404         if instance.name in exportlist[node].data:
6405           if not self.rpc.call_export_remove(node, instance.name):
6406             self.LogWarning("Could not remove older export for instance %s"
6407                             " on node %s", instance.name, node)
6408
6409
6410 class LURemoveExport(NoHooksLU):
6411   """Remove exports related to the named instance.
6412
6413   """
6414   _OP_REQP = ["instance_name"]
6415   REQ_BGL = False
6416
6417   def ExpandNames(self):
6418     self.needed_locks = {}
6419     # We need all nodes to be locked in order for RemoveExport to work, but we
6420     # don't need to lock the instance itself, as nothing will happen to it (and
6421     # we can remove exports also for a removed instance)
6422     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6423
6424   def CheckPrereq(self):
6425     """Check prerequisites.
6426     """
6427     pass
6428
6429   def Exec(self, feedback_fn):
6430     """Remove any export.
6431
6432     """
6433     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6434     # If the instance was not found we'll try with the name that was passed in.
6435     # This will only work if it was an FQDN, though.
6436     fqdn_warn = False
6437     if not instance_name:
6438       fqdn_warn = True
6439       instance_name = self.op.instance_name
6440
6441     exportlist = self.rpc.call_export_list(self.acquired_locks[
6442       locking.LEVEL_NODE])
6443     found = False
6444     for node in exportlist:
6445       if exportlist[node].failed:
6446         self.LogWarning("Failed to query node %s, continuing" % node)
6447         continue
6448       if instance_name in exportlist[node].data:
6449         found = True
6450         result = self.rpc.call_export_remove(node, instance_name)
6451         if result.failed or not result.data:
6452           logging.error("Could not remove export for instance %s"
6453                         " on node %s", instance_name, node)
6454
6455     if fqdn_warn and not found:
6456       feedback_fn("Export not found. If trying to remove an export belonging"
6457                   " to a deleted instance please use its Fully Qualified"
6458                   " Domain Name.")
6459
6460
6461 class TagsLU(NoHooksLU):
6462   """Generic tags LU.
6463
6464   This is an abstract class which is the parent of all the other tags LUs.
6465
6466   """
6467
6468   def ExpandNames(self):
6469     self.needed_locks = {}
6470     if self.op.kind == constants.TAG_NODE:
6471       name = self.cfg.ExpandNodeName(self.op.name)
6472       if name is None:
6473         raise errors.OpPrereqError("Invalid node name (%s)" %
6474                                    (self.op.name,))
6475       self.op.name = name
6476       self.needed_locks[locking.LEVEL_NODE] = name
6477     elif self.op.kind == constants.TAG_INSTANCE:
6478       name = self.cfg.ExpandInstanceName(self.op.name)
6479       if name is None:
6480         raise errors.OpPrereqError("Invalid instance name (%s)" %
6481                                    (self.op.name,))
6482       self.op.name = name
6483       self.needed_locks[locking.LEVEL_INSTANCE] = name
6484
6485   def CheckPrereq(self):
6486     """Check prerequisites.
6487
6488     """
6489     if self.op.kind == constants.TAG_CLUSTER:
6490       self.target = self.cfg.GetClusterInfo()
6491     elif self.op.kind == constants.TAG_NODE:
6492       self.target = self.cfg.GetNodeInfo(self.op.name)
6493     elif self.op.kind == constants.TAG_INSTANCE:
6494       self.target = self.cfg.GetInstanceInfo(self.op.name)
6495     else:
6496       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6497                                  str(self.op.kind))
6498
6499
6500 class LUGetTags(TagsLU):
6501   """Returns the tags of a given object.
6502
6503   """
6504   _OP_REQP = ["kind", "name"]
6505   REQ_BGL = False
6506
6507   def Exec(self, feedback_fn):
6508     """Returns the tag list.
6509
6510     """
6511     return list(self.target.GetTags())
6512
6513
6514 class LUSearchTags(NoHooksLU):
6515   """Searches the tags for a given pattern.
6516
6517   """
6518   _OP_REQP = ["pattern"]
6519   REQ_BGL = False
6520
6521   def ExpandNames(self):
6522     self.needed_locks = {}
6523
6524   def CheckPrereq(self):
6525     """Check prerequisites.
6526
6527     This checks the pattern passed for validity by compiling it.
6528
6529     """
6530     try:
6531       self.re = re.compile(self.op.pattern)
6532     except re.error, err:
6533       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6534                                  (self.op.pattern, err))
6535
6536   def Exec(self, feedback_fn):
6537     """Returns the tag list.
6538
6539     """
6540     cfg = self.cfg
6541     tgts = [("/cluster", cfg.GetClusterInfo())]
6542     ilist = cfg.GetAllInstancesInfo().values()
6543     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6544     nlist = cfg.GetAllNodesInfo().values()
6545     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6546     results = []
6547     for path, target in tgts:
6548       for tag in target.GetTags():
6549         if self.re.search(tag):
6550           results.append((path, tag))
6551     return results
6552
6553
6554 class LUAddTags(TagsLU):
6555   """Sets a tag on a given object.
6556
6557   """
6558   _OP_REQP = ["kind", "name", "tags"]
6559   REQ_BGL = False
6560
6561   def CheckPrereq(self):
6562     """Check prerequisites.
6563
6564     This checks the type and length of the tag name and value.
6565
6566     """
6567     TagsLU.CheckPrereq(self)
6568     for tag in self.op.tags:
6569       objects.TaggableObject.ValidateTag(tag)
6570
6571   def Exec(self, feedback_fn):
6572     """Sets the tag.
6573
6574     """
6575     try:
6576       for tag in self.op.tags:
6577         self.target.AddTag(tag)
6578     except errors.TagError, err:
6579       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6580     try:
6581       self.cfg.Update(self.target)
6582     except errors.ConfigurationError:
6583       raise errors.OpRetryError("There has been a modification to the"
6584                                 " config file and the operation has been"
6585                                 " aborted. Please retry.")
6586
6587
6588 class LUDelTags(TagsLU):
6589   """Delete a list of tags from a given object.
6590
6591   """
6592   _OP_REQP = ["kind", "name", "tags"]
6593   REQ_BGL = False
6594
6595   def CheckPrereq(self):
6596     """Check prerequisites.
6597
6598     This checks that we have the given tag.
6599
6600     """
6601     TagsLU.CheckPrereq(self)
6602     for tag in self.op.tags:
6603       objects.TaggableObject.ValidateTag(tag)
6604     del_tags = frozenset(self.op.tags)
6605     cur_tags = self.target.GetTags()
6606     if not del_tags <= cur_tags:
6607       diff_tags = del_tags - cur_tags
6608       diff_names = ["'%s'" % tag for tag in diff_tags]
6609       diff_names.sort()
6610       raise errors.OpPrereqError("Tag(s) %s not found" %
6611                                  (",".join(diff_names)))
6612
6613   def Exec(self, feedback_fn):
6614     """Remove the tag from the object.
6615
6616     """
6617     for tag in self.op.tags:
6618       self.target.RemoveTag(tag)
6619     try:
6620       self.cfg.Update(self.target)
6621     except errors.ConfigurationError:
6622       raise errors.OpRetryError("There has been a modification to the"
6623                                 " config file and the operation has been"
6624                                 " aborted. Please retry.")
6625
6626
6627 class LUTestDelay(NoHooksLU):
6628   """Sleep for a specified amount of time.
6629
6630   This LU sleeps on the master and/or nodes for a specified amount of
6631   time.
6632
6633   """
6634   _OP_REQP = ["duration", "on_master", "on_nodes"]
6635   REQ_BGL = False
6636
6637   def ExpandNames(self):
6638     """Expand names and set required locks.
6639
6640     This expands the node list, if any.
6641
6642     """
6643     self.needed_locks = {}
6644     if self.op.on_nodes:
6645       # _GetWantedNodes can be used here, but is not always appropriate to use
6646       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6647       # more information.
6648       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6649       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6650
6651   def CheckPrereq(self):
6652     """Check prerequisites.
6653
6654     """
6655
6656   def Exec(self, feedback_fn):
6657     """Do the actual sleep.
6658
6659     """
6660     if self.op.on_master:
6661       if not utils.TestDelay(self.op.duration):
6662         raise errors.OpExecError("Error during master delay test")
6663     if self.op.on_nodes:
6664       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6665       if not result:
6666         raise errors.OpExecError("Complete failure from rpc call")
6667       for node, node_result in result.items():
6668         node_result.Raise()
6669         if not node_result.data:
6670           raise errors.OpExecError("Failure during rpc call to node %s,"
6671                                    " result: %s" % (node, node_result.data))
6672
6673
6674 class IAllocator(object):
6675   """IAllocator framework.
6676
6677   An IAllocator instance has three sets of attributes:
6678     - cfg that is needed to query the cluster
6679     - input data (all members of the _KEYS class attribute are required)
6680     - four buffer attributes (in|out_data|text), that represent the
6681       input (to the external script) in text and data structure format,
6682       and the output from it, again in two formats
6683     - the result variables from the script (success, info, nodes) for
6684       easy usage
6685
6686   """
6687   _ALLO_KEYS = [
6688     "mem_size", "disks", "disk_template",
6689     "os", "tags", "nics", "vcpus", "hypervisor",
6690     ]
6691   _RELO_KEYS = [
6692     "relocate_from",
6693     ]
6694
6695   def __init__(self, lu, mode, name, **kwargs):
6696     self.lu = lu
6697     # init buffer variables
6698     self.in_text = self.out_text = self.in_data = self.out_data = None
6699     # init all input fields so that pylint is happy
6700     self.mode = mode
6701     self.name = name
6702     self.mem_size = self.disks = self.disk_template = None
6703     self.os = self.tags = self.nics = self.vcpus = None
6704     self.hypervisor = None
6705     self.relocate_from = None
6706     # computed fields
6707     self.required_nodes = None
6708     # init result fields
6709     self.success = self.info = self.nodes = None
6710     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6711       keyset = self._ALLO_KEYS
6712     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6713       keyset = self._RELO_KEYS
6714     else:
6715       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6716                                    " IAllocator" % self.mode)
6717     for key in kwargs:
6718       if key not in keyset:
6719         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6720                                      " IAllocator" % key)
6721       setattr(self, key, kwargs[key])
6722     for key in keyset:
6723       if key not in kwargs:
6724         raise errors.ProgrammerError("Missing input parameter '%s' to"
6725                                      " IAllocator" % key)
6726     self._BuildInputData()
6727
6728   def _ComputeClusterData(self):
6729     """Compute the generic allocator input data.
6730
6731     This is the data that is independent of the actual operation.
6732
6733     """
6734     cfg = self.lu.cfg
6735     cluster_info = cfg.GetClusterInfo()
6736     # cluster data
6737     data = {
6738       "version": constants.IALLOCATOR_VERSION,
6739       "cluster_name": cfg.GetClusterName(),
6740       "cluster_tags": list(cluster_info.GetTags()),
6741       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6742       # we don't have job IDs
6743       }
6744     iinfo = cfg.GetAllInstancesInfo().values()
6745     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6746
6747     # node data
6748     node_results = {}
6749     node_list = cfg.GetNodeList()
6750
6751     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6752       hypervisor_name = self.hypervisor
6753     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6754       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6755
6756     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6757                                            hypervisor_name)
6758     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6759                        cluster_info.enabled_hypervisors)
6760     for nname, nresult in node_data.items():
6761       # first fill in static (config-based) values
6762       ninfo = cfg.GetNodeInfo(nname)
6763       pnr = {
6764         "tags": list(ninfo.GetTags()),
6765         "primary_ip": ninfo.primary_ip,
6766         "secondary_ip": ninfo.secondary_ip,
6767         "offline": ninfo.offline,
6768         "drained": ninfo.drained,
6769         "master_candidate": ninfo.master_candidate,
6770         }
6771
6772       if not ninfo.offline:
6773         nresult.Raise()
6774         if not isinstance(nresult.data, dict):
6775           raise errors.OpExecError("Can't get data for node %s" % nname)
6776         remote_info = nresult.data
6777         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6778                      'vg_size', 'vg_free', 'cpu_total']:
6779           if attr not in remote_info:
6780             raise errors.OpExecError("Node '%s' didn't return attribute"
6781                                      " '%s'" % (nname, attr))
6782           try:
6783             remote_info[attr] = int(remote_info[attr])
6784           except ValueError, err:
6785             raise errors.OpExecError("Node '%s' returned invalid value"
6786                                      " for '%s': %s" % (nname, attr, err))
6787         # compute memory used by primary instances
6788         i_p_mem = i_p_up_mem = 0
6789         for iinfo, beinfo in i_list:
6790           if iinfo.primary_node == nname:
6791             i_p_mem += beinfo[constants.BE_MEMORY]
6792             if iinfo.name not in node_iinfo[nname].data:
6793               i_used_mem = 0
6794             else:
6795               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6796             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6797             remote_info['memory_free'] -= max(0, i_mem_diff)
6798
6799             if iinfo.admin_up:
6800               i_p_up_mem += beinfo[constants.BE_MEMORY]
6801
6802         # compute memory used by instances
6803         pnr_dyn = {
6804           "total_memory": remote_info['memory_total'],
6805           "reserved_memory": remote_info['memory_dom0'],
6806           "free_memory": remote_info['memory_free'],
6807           "total_disk": remote_info['vg_size'],
6808           "free_disk": remote_info['vg_free'],
6809           "total_cpus": remote_info['cpu_total'],
6810           "i_pri_memory": i_p_mem,
6811           "i_pri_up_memory": i_p_up_mem,
6812           }
6813         pnr.update(pnr_dyn)
6814
6815       node_results[nname] = pnr
6816     data["nodes"] = node_results
6817
6818     # instance data
6819     instance_data = {}
6820     for iinfo, beinfo in i_list:
6821       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6822                   for n in iinfo.nics]
6823       pir = {
6824         "tags": list(iinfo.GetTags()),
6825         "admin_up": iinfo.admin_up,
6826         "vcpus": beinfo[constants.BE_VCPUS],
6827         "memory": beinfo[constants.BE_MEMORY],
6828         "os": iinfo.os,
6829         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6830         "nics": nic_data,
6831         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6832         "disk_template": iinfo.disk_template,
6833         "hypervisor": iinfo.hypervisor,
6834         }
6835       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6836                                                  pir["disks"])
6837       instance_data[iinfo.name] = pir
6838
6839     data["instances"] = instance_data
6840
6841     self.in_data = data
6842
6843   def _AddNewInstance(self):
6844     """Add new instance data to allocator structure.
6845
6846     This in combination with _AllocatorGetClusterData will create the
6847     correct structure needed as input for the allocator.
6848
6849     The checks for the completeness of the opcode must have already been
6850     done.
6851
6852     """
6853     data = self.in_data
6854
6855     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6856
6857     if self.disk_template in constants.DTS_NET_MIRROR:
6858       self.required_nodes = 2
6859     else:
6860       self.required_nodes = 1
6861     request = {
6862       "type": "allocate",
6863       "name": self.name,
6864       "disk_template": self.disk_template,
6865       "tags": self.tags,
6866       "os": self.os,
6867       "vcpus": self.vcpus,
6868       "memory": self.mem_size,
6869       "disks": self.disks,
6870       "disk_space_total": disk_space,
6871       "nics": self.nics,
6872       "required_nodes": self.required_nodes,
6873       }
6874     data["request"] = request
6875
6876   def _AddRelocateInstance(self):
6877     """Add relocate instance data to allocator structure.
6878
6879     This in combination with _IAllocatorGetClusterData will create the
6880     correct structure needed as input for the allocator.
6881
6882     The checks for the completeness of the opcode must have already been
6883     done.
6884
6885     """
6886     instance = self.lu.cfg.GetInstanceInfo(self.name)
6887     if instance is None:
6888       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6889                                    " IAllocator" % self.name)
6890
6891     if instance.disk_template not in constants.DTS_NET_MIRROR:
6892       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6893
6894     if len(instance.secondary_nodes) != 1:
6895       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6896
6897     self.required_nodes = 1
6898     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6899     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6900
6901     request = {
6902       "type": "relocate",
6903       "name": self.name,
6904       "disk_space_total": disk_space,
6905       "required_nodes": self.required_nodes,
6906       "relocate_from": self.relocate_from,
6907       }
6908     self.in_data["request"] = request
6909
6910   def _BuildInputData(self):
6911     """Build input data structures.
6912
6913     """
6914     self._ComputeClusterData()
6915
6916     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6917       self._AddNewInstance()
6918     else:
6919       self._AddRelocateInstance()
6920
6921     self.in_text = serializer.Dump(self.in_data)
6922
6923   def Run(self, name, validate=True, call_fn=None):
6924     """Run an instance allocator and return the results.
6925
6926     """
6927     if call_fn is None:
6928       call_fn = self.lu.rpc.call_iallocator_runner
6929     data = self.in_text
6930
6931     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6932     result.Raise()
6933
6934     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6935       raise errors.OpExecError("Invalid result from master iallocator runner")
6936
6937     rcode, stdout, stderr, fail = result.data
6938
6939     if rcode == constants.IARUN_NOTFOUND:
6940       raise errors.OpExecError("Can't find allocator '%s'" % name)
6941     elif rcode == constants.IARUN_FAILURE:
6942       raise errors.OpExecError("Instance allocator call failed: %s,"
6943                                " output: %s" % (fail, stdout+stderr))
6944     self.out_text = stdout
6945     if validate:
6946       self._ValidateResult()
6947
6948   def _ValidateResult(self):
6949     """Process the allocator results.
6950
6951     This will process and if successful save the result in
6952     self.out_data and the other parameters.
6953
6954     """
6955     try:
6956       rdict = serializer.Load(self.out_text)
6957     except Exception, err:
6958       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6959
6960     if not isinstance(rdict, dict):
6961       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6962
6963     for key in "success", "info", "nodes":
6964       if key not in rdict:
6965         raise errors.OpExecError("Can't parse iallocator results:"
6966                                  " missing key '%s'" % key)
6967       setattr(self, key, rdict[key])
6968
6969     if not isinstance(rdict["nodes"], list):
6970       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6971                                " is not a list")
6972     self.out_data = rdict
6973
6974
6975 class LUTestAllocator(NoHooksLU):
6976   """Run allocator tests.
6977
6978   This LU runs the allocator tests
6979
6980   """
6981   _OP_REQP = ["direction", "mode", "name"]
6982
6983   def CheckPrereq(self):
6984     """Check prerequisites.
6985
6986     This checks the opcode parameters depending on the director and mode test.
6987
6988     """
6989     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6990       for attr in ["name", "mem_size", "disks", "disk_template",
6991                    "os", "tags", "nics", "vcpus"]:
6992         if not hasattr(self.op, attr):
6993           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6994                                      attr)
6995       iname = self.cfg.ExpandInstanceName(self.op.name)
6996       if iname is not None:
6997         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6998                                    iname)
6999       if not isinstance(self.op.nics, list):
7000         raise errors.OpPrereqError("Invalid parameter 'nics'")
7001       for row in self.op.nics:
7002         if (not isinstance(row, dict) or
7003             "mac" not in row or
7004             "ip" not in row or
7005             "bridge" not in row):
7006           raise errors.OpPrereqError("Invalid contents of the"
7007                                      " 'nics' parameter")
7008       if not isinstance(self.op.disks, list):
7009         raise errors.OpPrereqError("Invalid parameter 'disks'")
7010       for row in self.op.disks:
7011         if (not isinstance(row, dict) or
7012             "size" not in row or
7013             not isinstance(row["size"], int) or
7014             "mode" not in row or
7015             row["mode"] not in ['r', 'w']):
7016           raise errors.OpPrereqError("Invalid contents of the"
7017                                      " 'disks' parameter")
7018       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7019         self.op.hypervisor = self.cfg.GetHypervisorType()
7020     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7021       if not hasattr(self.op, "name"):
7022         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7023       fname = self.cfg.ExpandInstanceName(self.op.name)
7024       if fname is None:
7025         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7026                                    self.op.name)
7027       self.op.name = fname
7028       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7029     else:
7030       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7031                                  self.op.mode)
7032
7033     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7034       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7035         raise errors.OpPrereqError("Missing allocator name")
7036     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7037       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7038                                  self.op.direction)
7039
7040   def Exec(self, feedback_fn):
7041     """Run the allocator test.
7042
7043     """
7044     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7045       ial = IAllocator(self,
7046                        mode=self.op.mode,
7047                        name=self.op.name,
7048                        mem_size=self.op.mem_size,
7049                        disks=self.op.disks,
7050                        disk_template=self.op.disk_template,
7051                        os=self.op.os,
7052                        tags=self.op.tags,
7053                        nics=self.op.nics,
7054                        vcpus=self.op.vcpus,
7055                        hypervisor=self.op.hypervisor,
7056                        )
7057     else:
7058       ial = IAllocator(self,
7059                        mode=self.op.mode,
7060                        name=self.op.name,
7061                        relocate_from=list(self.relocate_from),
7062                        )
7063
7064     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7065       result = ial.in_text
7066     else:
7067       ial.Run(self.op.allocator, validate=False)
7068       result = ial.out_text
7069     return result