repair-size: ensure child disks have sane sizes
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: file '%s' should not exist on non master"
755                         " candidates (and the file is outdated)" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURepairDiskSizes(NoHooksLU):
1333   """Verifies the cluster disks sizes.
1334
1335   """
1336   _OP_REQP = ["instances"]
1337   REQ_BGL = False
1338
1339   def ExpandNames(self):
1340
1341     if not isinstance(self.op.instances, list):
1342       raise errors.OpPrereqError("Invalid argument type 'instances'")
1343
1344     if self.op.instances:
1345       self.wanted_names = []
1346       for name in self.op.instances:
1347         full_name = self.cfg.ExpandInstanceName(name)
1348         if full_name is None:
1349           raise errors.OpPrereqError("Instance '%s' not known" % name)
1350         self.wanted_names.append(full_name)
1351       self.needed_locks = {
1352         locking.LEVEL_NODE: [],
1353         locking.LEVEL_INSTANCE: self.wanted_names,
1354         }
1355       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1356     else:
1357       self.wanted_names = None
1358       self.needed_locks = {
1359         locking.LEVEL_NODE: locking.ALL_SET,
1360         locking.LEVEL_INSTANCE: locking.ALL_SET,
1361         }
1362     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1363
1364   def DeclareLocks(self, level):
1365     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1366       self._LockInstancesNodes(primary_only=True)
1367
1368   def CheckPrereq(self):
1369     """Check prerequisites.
1370
1371     This only checks the optional instance list against the existing names.
1372
1373     """
1374     if self.wanted_names is None:
1375       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1376
1377     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1378                              in self.wanted_names]
1379
1380   def _EnsureChildSizes(self, disk):
1381     """Ensure children of the disk have the needed disk size.
1382
1383     This is valid mainly for DRBD8 and fixes an issue where the
1384     children have smaller disk size.
1385
1386     @param disk: an L{ganeti.objects.Disk} object
1387
1388     """
1389     if disk.dev_type == constants.LD_DRBD8:
1390       assert disk.children, "Empty children for DRBD8?"
1391       fchild = disk.children[0]
1392       mismatch = fchild.size < disk.size
1393       if mismatch:
1394         self.LogInfo("Child disk has size %d, parent %d, fixing",
1395                      fchild.size, disk.size)
1396         fchild.size = disk.size
1397
1398       # and we recurse on this child only, not on the metadev
1399       return self._EnsureChildSizes(fchild) or mismatch
1400     else:
1401       return False
1402
1403   def Exec(self, feedback_fn):
1404     """Verify the size of cluster disks.
1405
1406     """
1407     # TODO: check child disks too
1408     # TODO: check differences in size between primary/secondary nodes
1409     per_node_disks = {}
1410     for instance in self.wanted_instances:
1411       pnode = instance.primary_node
1412       if pnode not in per_node_disks:
1413         per_node_disks[pnode] = []
1414       for idx, disk in enumerate(instance.disks):
1415         per_node_disks[pnode].append((instance, idx, disk))
1416
1417     changed = []
1418     for node, dskl in per_node_disks.items():
1419       newl = [v[2].Copy() for v in dskl]
1420       for dsk in newl:
1421         self.cfg.SetDiskID(dsk, node)
1422       result = self.rpc.call_blockdev_getsizes(node, newl)
1423       if result.failed:
1424         self.LogWarning("Failure in blockdev_getsizes call to node"
1425                         " %s, ignoring", node)
1426         continue
1427       if len(result.data) != len(dskl):
1428         self.LogWarning("Invalid result from node %s, ignoring node results",
1429                         node)
1430         continue
1431       for ((instance, idx, disk), size) in zip(dskl, result.data):
1432         if size is None:
1433           self.LogWarning("Disk %d of instance %s did not return size"
1434                           " information, ignoring", idx, instance.name)
1435           continue
1436         if not isinstance(size, (int, long)):
1437           self.LogWarning("Disk %d of instance %s did not return valid"
1438                           " size information, ignoring", idx, instance.name)
1439           continue
1440         size = size >> 20
1441         if size != disk.size:
1442           self.LogInfo("Disk %d of instance %s has mismatched size,"
1443                        " correcting: recorded %d, actual %d", idx,
1444                        instance.name, disk.size, size)
1445           disk.size = size
1446           self.cfg.Update(instance)
1447           changed.append((instance.name, idx, size))
1448         if self._EnsureChildSizes(disk):
1449           self.cfg.Update(instance)
1450           changed.append((instance.name, idx, disk.size))
1451     return changed
1452
1453
1454 class LURenameCluster(LogicalUnit):
1455   """Rename the cluster.
1456
1457   """
1458   HPATH = "cluster-rename"
1459   HTYPE = constants.HTYPE_CLUSTER
1460   _OP_REQP = ["name"]
1461
1462   def BuildHooksEnv(self):
1463     """Build hooks env.
1464
1465     """
1466     env = {
1467       "OP_TARGET": self.cfg.GetClusterName(),
1468       "NEW_NAME": self.op.name,
1469       }
1470     mn = self.cfg.GetMasterNode()
1471     return env, [mn], [mn]
1472
1473   def CheckPrereq(self):
1474     """Verify that the passed name is a valid one.
1475
1476     """
1477     hostname = utils.HostInfo(self.op.name)
1478
1479     new_name = hostname.name
1480     self.ip = new_ip = hostname.ip
1481     old_name = self.cfg.GetClusterName()
1482     old_ip = self.cfg.GetMasterIP()
1483     if new_name == old_name and new_ip == old_ip:
1484       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1485                                  " cluster has changed")
1486     if new_ip != old_ip:
1487       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1488         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1489                                    " reachable on the network. Aborting." %
1490                                    new_ip)
1491
1492     self.op.name = new_name
1493
1494   def Exec(self, feedback_fn):
1495     """Rename the cluster.
1496
1497     """
1498     clustername = self.op.name
1499     ip = self.ip
1500
1501     # shutdown the master IP
1502     master = self.cfg.GetMasterNode()
1503     result = self.rpc.call_node_stop_master(master, False)
1504     if result.failed or not result.data:
1505       raise errors.OpExecError("Could not disable the master role")
1506
1507     try:
1508       cluster = self.cfg.GetClusterInfo()
1509       cluster.cluster_name = clustername
1510       cluster.master_ip = ip
1511       self.cfg.Update(cluster)
1512
1513       # update the known hosts file
1514       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1515       node_list = self.cfg.GetNodeList()
1516       try:
1517         node_list.remove(master)
1518       except ValueError:
1519         pass
1520       result = self.rpc.call_upload_file(node_list,
1521                                          constants.SSH_KNOWN_HOSTS_FILE)
1522       for to_node, to_result in result.iteritems():
1523         if to_result.failed or not to_result.data:
1524           logging.error("Copy of file %s to node %s failed",
1525                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1526
1527     finally:
1528       result = self.rpc.call_node_start_master(master, False, False)
1529       if result.failed or not result.data:
1530         self.LogWarning("Could not re-enable the master role on"
1531                         " the master, please restart manually.")
1532
1533
1534 def _RecursiveCheckIfLVMBased(disk):
1535   """Check if the given disk or its children are lvm-based.
1536
1537   @type disk: L{objects.Disk}
1538   @param disk: the disk to check
1539   @rtype: booleean
1540   @return: boolean indicating whether a LD_LV dev_type was found or not
1541
1542   """
1543   if disk.children:
1544     for chdisk in disk.children:
1545       if _RecursiveCheckIfLVMBased(chdisk):
1546         return True
1547   return disk.dev_type == constants.LD_LV
1548
1549
1550 class LUSetClusterParams(LogicalUnit):
1551   """Change the parameters of the cluster.
1552
1553   """
1554   HPATH = "cluster-modify"
1555   HTYPE = constants.HTYPE_CLUSTER
1556   _OP_REQP = []
1557   REQ_BGL = False
1558
1559   def CheckArguments(self):
1560     """Check parameters
1561
1562     """
1563     if not hasattr(self.op, "candidate_pool_size"):
1564       self.op.candidate_pool_size = None
1565     if self.op.candidate_pool_size is not None:
1566       try:
1567         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1568       except (ValueError, TypeError), err:
1569         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1570                                    str(err))
1571       if self.op.candidate_pool_size < 1:
1572         raise errors.OpPrereqError("At least one master candidate needed")
1573
1574   def ExpandNames(self):
1575     # FIXME: in the future maybe other cluster params won't require checking on
1576     # all nodes to be modified.
1577     self.needed_locks = {
1578       locking.LEVEL_NODE: locking.ALL_SET,
1579     }
1580     self.share_locks[locking.LEVEL_NODE] = 1
1581
1582   def BuildHooksEnv(self):
1583     """Build hooks env.
1584
1585     """
1586     env = {
1587       "OP_TARGET": self.cfg.GetClusterName(),
1588       "NEW_VG_NAME": self.op.vg_name,
1589       }
1590     mn = self.cfg.GetMasterNode()
1591     return env, [mn], [mn]
1592
1593   def CheckPrereq(self):
1594     """Check prerequisites.
1595
1596     This checks whether the given params don't conflict and
1597     if the given volume group is valid.
1598
1599     """
1600     if self.op.vg_name is not None and not self.op.vg_name:
1601       instances = self.cfg.GetAllInstancesInfo().values()
1602       for inst in instances:
1603         for disk in inst.disks:
1604           if _RecursiveCheckIfLVMBased(disk):
1605             raise errors.OpPrereqError("Cannot disable lvm storage while"
1606                                        " lvm-based instances exist")
1607
1608     node_list = self.acquired_locks[locking.LEVEL_NODE]
1609
1610     # if vg_name not None, checks given volume group on all nodes
1611     if self.op.vg_name:
1612       vglist = self.rpc.call_vg_list(node_list)
1613       for node in node_list:
1614         if vglist[node].failed:
1615           # ignoring down node
1616           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1617           continue
1618         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1619                                               self.op.vg_name,
1620                                               constants.MIN_VG_SIZE)
1621         if vgstatus:
1622           raise errors.OpPrereqError("Error on node '%s': %s" %
1623                                      (node, vgstatus))
1624
1625     self.cluster = cluster = self.cfg.GetClusterInfo()
1626     # validate beparams changes
1627     if self.op.beparams:
1628       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1629       self.new_beparams = cluster.FillDict(
1630         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1631
1632     # hypervisor list/parameters
1633     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1634     if self.op.hvparams:
1635       if not isinstance(self.op.hvparams, dict):
1636         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1637       for hv_name, hv_dict in self.op.hvparams.items():
1638         if hv_name not in self.new_hvparams:
1639           self.new_hvparams[hv_name] = hv_dict
1640         else:
1641           self.new_hvparams[hv_name].update(hv_dict)
1642
1643     if self.op.enabled_hypervisors is not None:
1644       self.hv_list = self.op.enabled_hypervisors
1645     else:
1646       self.hv_list = cluster.enabled_hypervisors
1647
1648     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1649       # either the enabled list has changed, or the parameters have, validate
1650       for hv_name, hv_params in self.new_hvparams.items():
1651         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1652             (self.op.enabled_hypervisors and
1653              hv_name in self.op.enabled_hypervisors)):
1654           # either this is a new hypervisor, or its parameters have changed
1655           hv_class = hypervisor.GetHypervisor(hv_name)
1656           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1657           hv_class.CheckParameterSyntax(hv_params)
1658           _CheckHVParams(self, node_list, hv_name, hv_params)
1659
1660   def Exec(self, feedback_fn):
1661     """Change the parameters of the cluster.
1662
1663     """
1664     if self.op.vg_name is not None:
1665       new_volume = self.op.vg_name
1666       if not new_volume:
1667         new_volume = None
1668       if new_volume != self.cfg.GetVGName():
1669         self.cfg.SetVGName(new_volume)
1670       else:
1671         feedback_fn("Cluster LVM configuration already in desired"
1672                     " state, not changing")
1673     if self.op.hvparams:
1674       self.cluster.hvparams = self.new_hvparams
1675     if self.op.enabled_hypervisors is not None:
1676       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1677     if self.op.beparams:
1678       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1679     if self.op.candidate_pool_size is not None:
1680       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1681       # we need to update the pool size here, otherwise the save will fail
1682       _AdjustCandidatePool(self)
1683
1684     self.cfg.Update(self.cluster)
1685
1686
1687 class LURedistributeConfig(NoHooksLU):
1688   """Force the redistribution of cluster configuration.
1689
1690   This is a very simple LU.
1691
1692   """
1693   _OP_REQP = []
1694   REQ_BGL = False
1695
1696   def ExpandNames(self):
1697     self.needed_locks = {
1698       locking.LEVEL_NODE: locking.ALL_SET,
1699     }
1700     self.share_locks[locking.LEVEL_NODE] = 1
1701
1702   def CheckPrereq(self):
1703     """Check prerequisites.
1704
1705     """
1706
1707   def Exec(self, feedback_fn):
1708     """Redistribute the configuration.
1709
1710     """
1711     self.cfg.Update(self.cfg.GetClusterInfo())
1712
1713
1714 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1715   """Sleep and poll for an instance's disk to sync.
1716
1717   """
1718   if not instance.disks:
1719     return True
1720
1721   if not oneshot:
1722     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1723
1724   node = instance.primary_node
1725
1726   for dev in instance.disks:
1727     lu.cfg.SetDiskID(dev, node)
1728
1729   retries = 0
1730   degr_retries = 10 # in seconds, as we sleep 1 second each time
1731   while True:
1732     max_time = 0
1733     done = True
1734     cumul_degraded = False
1735     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1736     if rstats.failed or not rstats.data:
1737       lu.LogWarning("Can't get any data from node %s", node)
1738       retries += 1
1739       if retries >= 10:
1740         raise errors.RemoteError("Can't contact node %s for mirror data,"
1741                                  " aborting." % node)
1742       time.sleep(6)
1743       continue
1744     rstats = rstats.data
1745     retries = 0
1746     for i, mstat in enumerate(rstats):
1747       if mstat is None:
1748         lu.LogWarning("Can't compute data for node %s/%s",
1749                            node, instance.disks[i].iv_name)
1750         continue
1751       # we ignore the ldisk parameter
1752       perc_done, est_time, is_degraded, _ = mstat
1753       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1754       if perc_done is not None:
1755         done = False
1756         if est_time is not None:
1757           rem_time = "%d estimated seconds remaining" % est_time
1758           max_time = est_time
1759         else:
1760           rem_time = "no time estimate"
1761         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1762                         (instance.disks[i].iv_name, perc_done, rem_time))
1763
1764     # if we're done but degraded, let's do a few small retries, to
1765     # make sure we see a stable and not transient situation; therefore
1766     # we force restart of the loop
1767     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1768       logging.info("Degraded disks found, %d retries left", degr_retries)
1769       degr_retries -= 1
1770       time.sleep(1)
1771       continue
1772
1773     if done or oneshot:
1774       break
1775
1776     time.sleep(min(60, max_time))
1777
1778   if done:
1779     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1780   return not cumul_degraded
1781
1782
1783 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1784   """Check that mirrors are not degraded.
1785
1786   The ldisk parameter, if True, will change the test from the
1787   is_degraded attribute (which represents overall non-ok status for
1788   the device(s)) to the ldisk (representing the local storage status).
1789
1790   """
1791   lu.cfg.SetDiskID(dev, node)
1792   if ldisk:
1793     idx = 6
1794   else:
1795     idx = 5
1796
1797   result = True
1798   if on_primary or dev.AssembleOnSecondary():
1799     rstats = lu.rpc.call_blockdev_find(node, dev)
1800     msg = rstats.RemoteFailMsg()
1801     if msg:
1802       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1803       result = False
1804     elif not rstats.payload:
1805       lu.LogWarning("Can't find disk on node %s", node)
1806       result = False
1807     else:
1808       result = result and (not rstats.payload[idx])
1809   if dev.children:
1810     for child in dev.children:
1811       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1812
1813   return result
1814
1815
1816 class LUDiagnoseOS(NoHooksLU):
1817   """Logical unit for OS diagnose/query.
1818
1819   """
1820   _OP_REQP = ["output_fields", "names"]
1821   REQ_BGL = False
1822   _FIELDS_STATIC = utils.FieldSet()
1823   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1824
1825   def ExpandNames(self):
1826     if self.op.names:
1827       raise errors.OpPrereqError("Selective OS query not supported")
1828
1829     _CheckOutputFields(static=self._FIELDS_STATIC,
1830                        dynamic=self._FIELDS_DYNAMIC,
1831                        selected=self.op.output_fields)
1832
1833     # Lock all nodes, in shared mode
1834     # Temporary removal of locks, should be reverted later
1835     # TODO: reintroduce locks when they are lighter-weight
1836     self.needed_locks = {}
1837     #self.share_locks[locking.LEVEL_NODE] = 1
1838     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1839
1840   def CheckPrereq(self):
1841     """Check prerequisites.
1842
1843     """
1844
1845   @staticmethod
1846   def _DiagnoseByOS(node_list, rlist):
1847     """Remaps a per-node return list into an a per-os per-node dictionary
1848
1849     @param node_list: a list with the names of all nodes
1850     @param rlist: a map with node names as keys and OS objects as values
1851
1852     @rtype: dict
1853     @return: a dictionary with osnames as keys and as value another map, with
1854         nodes as keys and list of OS objects as values, eg::
1855
1856           {"debian-etch": {"node1": [<object>,...],
1857                            "node2": [<object>,]}
1858           }
1859
1860     """
1861     all_os = {}
1862     # we build here the list of nodes that didn't fail the RPC (at RPC
1863     # level), so that nodes with a non-responding node daemon don't
1864     # make all OSes invalid
1865     good_nodes = [node_name for node_name in rlist
1866                   if not rlist[node_name].failed]
1867     for node_name, nr in rlist.iteritems():
1868       if nr.failed or not nr.data:
1869         continue
1870       for os_obj in nr.data:
1871         if os_obj.name not in all_os:
1872           # build a list of nodes for this os containing empty lists
1873           # for each node in node_list
1874           all_os[os_obj.name] = {}
1875           for nname in good_nodes:
1876             all_os[os_obj.name][nname] = []
1877         all_os[os_obj.name][node_name].append(os_obj)
1878     return all_os
1879
1880   def Exec(self, feedback_fn):
1881     """Compute the list of OSes.
1882
1883     """
1884     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1885     node_data = self.rpc.call_os_diagnose(valid_nodes)
1886     if node_data == False:
1887       raise errors.OpExecError("Can't gather the list of OSes")
1888     pol = self._DiagnoseByOS(valid_nodes, node_data)
1889     output = []
1890     for os_name, os_data in pol.iteritems():
1891       row = []
1892       for field in self.op.output_fields:
1893         if field == "name":
1894           val = os_name
1895         elif field == "valid":
1896           val = utils.all([osl and osl[0] for osl in os_data.values()])
1897         elif field == "node_status":
1898           val = {}
1899           for node_name, nos_list in os_data.iteritems():
1900             val[node_name] = [(v.status, v.path) for v in nos_list]
1901         else:
1902           raise errors.ParameterError(field)
1903         row.append(val)
1904       output.append(row)
1905
1906     return output
1907
1908
1909 class LURemoveNode(LogicalUnit):
1910   """Logical unit for removing a node.
1911
1912   """
1913   HPATH = "node-remove"
1914   HTYPE = constants.HTYPE_NODE
1915   _OP_REQP = ["node_name"]
1916
1917   def BuildHooksEnv(self):
1918     """Build hooks env.
1919
1920     This doesn't run on the target node in the pre phase as a failed
1921     node would then be impossible to remove.
1922
1923     """
1924     env = {
1925       "OP_TARGET": self.op.node_name,
1926       "NODE_NAME": self.op.node_name,
1927       }
1928     all_nodes = self.cfg.GetNodeList()
1929     all_nodes.remove(self.op.node_name)
1930     return env, all_nodes, all_nodes
1931
1932   def CheckPrereq(self):
1933     """Check prerequisites.
1934
1935     This checks:
1936      - the node exists in the configuration
1937      - it does not have primary or secondary instances
1938      - it's not the master
1939
1940     Any errors are signalled by raising errors.OpPrereqError.
1941
1942     """
1943     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1944     if node is None:
1945       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1946
1947     instance_list = self.cfg.GetInstanceList()
1948
1949     masternode = self.cfg.GetMasterNode()
1950     if node.name == masternode:
1951       raise errors.OpPrereqError("Node is the master node,"
1952                                  " you need to failover first.")
1953
1954     for instance_name in instance_list:
1955       instance = self.cfg.GetInstanceInfo(instance_name)
1956       if node.name in instance.all_nodes:
1957         raise errors.OpPrereqError("Instance %s is still running on the node,"
1958                                    " please remove first." % instance_name)
1959     self.op.node_name = node.name
1960     self.node = node
1961
1962   def Exec(self, feedback_fn):
1963     """Removes the node from the cluster.
1964
1965     """
1966     node = self.node
1967     logging.info("Stopping the node daemon and removing configs from node %s",
1968                  node.name)
1969
1970     self.context.RemoveNode(node.name)
1971
1972     self.rpc.call_node_leave_cluster(node.name)
1973
1974     # Promote nodes to master candidate as needed
1975     _AdjustCandidatePool(self)
1976
1977
1978 class LUQueryNodes(NoHooksLU):
1979   """Logical unit for querying nodes.
1980
1981   """
1982   _OP_REQP = ["output_fields", "names", "use_locking"]
1983   REQ_BGL = False
1984   _FIELDS_DYNAMIC = utils.FieldSet(
1985     "dtotal", "dfree",
1986     "mtotal", "mnode", "mfree",
1987     "bootid",
1988     "ctotal", "cnodes", "csockets",
1989     )
1990
1991   _FIELDS_STATIC = utils.FieldSet(
1992     "name", "pinst_cnt", "sinst_cnt",
1993     "pinst_list", "sinst_list",
1994     "pip", "sip", "tags",
1995     "serial_no",
1996     "master_candidate",
1997     "master",
1998     "offline",
1999     "drained",
2000     "role",
2001     )
2002
2003   def ExpandNames(self):
2004     _CheckOutputFields(static=self._FIELDS_STATIC,
2005                        dynamic=self._FIELDS_DYNAMIC,
2006                        selected=self.op.output_fields)
2007
2008     self.needed_locks = {}
2009     self.share_locks[locking.LEVEL_NODE] = 1
2010
2011     if self.op.names:
2012       self.wanted = _GetWantedNodes(self, self.op.names)
2013     else:
2014       self.wanted = locking.ALL_SET
2015
2016     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2017     self.do_locking = self.do_node_query and self.op.use_locking
2018     if self.do_locking:
2019       # if we don't request only static fields, we need to lock the nodes
2020       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2021
2022
2023   def CheckPrereq(self):
2024     """Check prerequisites.
2025
2026     """
2027     # The validation of the node list is done in the _GetWantedNodes,
2028     # if non empty, and if empty, there's no validation to do
2029     pass
2030
2031   def Exec(self, feedback_fn):
2032     """Computes the list of nodes and their attributes.
2033
2034     """
2035     all_info = self.cfg.GetAllNodesInfo()
2036     if self.do_locking:
2037       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2038     elif self.wanted != locking.ALL_SET:
2039       nodenames = self.wanted
2040       missing = set(nodenames).difference(all_info.keys())
2041       if missing:
2042         raise errors.OpExecError(
2043           "Some nodes were removed before retrieving their data: %s" % missing)
2044     else:
2045       nodenames = all_info.keys()
2046
2047     nodenames = utils.NiceSort(nodenames)
2048     nodelist = [all_info[name] for name in nodenames]
2049
2050     # begin data gathering
2051
2052     if self.do_node_query:
2053       live_data = {}
2054       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2055                                           self.cfg.GetHypervisorType())
2056       for name in nodenames:
2057         nodeinfo = node_data[name]
2058         if not nodeinfo.failed and nodeinfo.data:
2059           nodeinfo = nodeinfo.data
2060           fn = utils.TryConvert
2061           live_data[name] = {
2062             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2063             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2064             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2065             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2066             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2067             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2068             "bootid": nodeinfo.get('bootid', None),
2069             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2070             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2071             }
2072         else:
2073           live_data[name] = {}
2074     else:
2075       live_data = dict.fromkeys(nodenames, {})
2076
2077     node_to_primary = dict([(name, set()) for name in nodenames])
2078     node_to_secondary = dict([(name, set()) for name in nodenames])
2079
2080     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2081                              "sinst_cnt", "sinst_list"))
2082     if inst_fields & frozenset(self.op.output_fields):
2083       instancelist = self.cfg.GetInstanceList()
2084
2085       for instance_name in instancelist:
2086         inst = self.cfg.GetInstanceInfo(instance_name)
2087         if inst.primary_node in node_to_primary:
2088           node_to_primary[inst.primary_node].add(inst.name)
2089         for secnode in inst.secondary_nodes:
2090           if secnode in node_to_secondary:
2091             node_to_secondary[secnode].add(inst.name)
2092
2093     master_node = self.cfg.GetMasterNode()
2094
2095     # end data gathering
2096
2097     output = []
2098     for node in nodelist:
2099       node_output = []
2100       for field in self.op.output_fields:
2101         if field == "name":
2102           val = node.name
2103         elif field == "pinst_list":
2104           val = list(node_to_primary[node.name])
2105         elif field == "sinst_list":
2106           val = list(node_to_secondary[node.name])
2107         elif field == "pinst_cnt":
2108           val = len(node_to_primary[node.name])
2109         elif field == "sinst_cnt":
2110           val = len(node_to_secondary[node.name])
2111         elif field == "pip":
2112           val = node.primary_ip
2113         elif field == "sip":
2114           val = node.secondary_ip
2115         elif field == "tags":
2116           val = list(node.GetTags())
2117         elif field == "serial_no":
2118           val = node.serial_no
2119         elif field == "master_candidate":
2120           val = node.master_candidate
2121         elif field == "master":
2122           val = node.name == master_node
2123         elif field == "offline":
2124           val = node.offline
2125         elif field == "drained":
2126           val = node.drained
2127         elif self._FIELDS_DYNAMIC.Matches(field):
2128           val = live_data[node.name].get(field, None)
2129         elif field == "role":
2130           if node.name == master_node:
2131             val = "M"
2132           elif node.master_candidate:
2133             val = "C"
2134           elif node.drained:
2135             val = "D"
2136           elif node.offline:
2137             val = "O"
2138           else:
2139             val = "R"
2140         else:
2141           raise errors.ParameterError(field)
2142         node_output.append(val)
2143       output.append(node_output)
2144
2145     return output
2146
2147
2148 class LUQueryNodeVolumes(NoHooksLU):
2149   """Logical unit for getting volumes on node(s).
2150
2151   """
2152   _OP_REQP = ["nodes", "output_fields"]
2153   REQ_BGL = False
2154   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2155   _FIELDS_STATIC = utils.FieldSet("node")
2156
2157   def ExpandNames(self):
2158     _CheckOutputFields(static=self._FIELDS_STATIC,
2159                        dynamic=self._FIELDS_DYNAMIC,
2160                        selected=self.op.output_fields)
2161
2162     self.needed_locks = {}
2163     self.share_locks[locking.LEVEL_NODE] = 1
2164     if not self.op.nodes:
2165       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2166     else:
2167       self.needed_locks[locking.LEVEL_NODE] = \
2168         _GetWantedNodes(self, self.op.nodes)
2169
2170   def CheckPrereq(self):
2171     """Check prerequisites.
2172
2173     This checks that the fields required are valid output fields.
2174
2175     """
2176     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2177
2178   def Exec(self, feedback_fn):
2179     """Computes the list of nodes and their attributes.
2180
2181     """
2182     nodenames = self.nodes
2183     volumes = self.rpc.call_node_volumes(nodenames)
2184
2185     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2186              in self.cfg.GetInstanceList()]
2187
2188     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2189
2190     output = []
2191     for node in nodenames:
2192       if node not in volumes or volumes[node].failed or not volumes[node].data:
2193         continue
2194
2195       node_vols = volumes[node].data[:]
2196       node_vols.sort(key=lambda vol: vol['dev'])
2197
2198       for vol in node_vols:
2199         node_output = []
2200         for field in self.op.output_fields:
2201           if field == "node":
2202             val = node
2203           elif field == "phys":
2204             val = vol['dev']
2205           elif field == "vg":
2206             val = vol['vg']
2207           elif field == "name":
2208             val = vol['name']
2209           elif field == "size":
2210             val = int(float(vol['size']))
2211           elif field == "instance":
2212             for inst in ilist:
2213               if node not in lv_by_node[inst]:
2214                 continue
2215               if vol['name'] in lv_by_node[inst][node]:
2216                 val = inst.name
2217                 break
2218             else:
2219               val = '-'
2220           else:
2221             raise errors.ParameterError(field)
2222           node_output.append(str(val))
2223
2224         output.append(node_output)
2225
2226     return output
2227
2228
2229 class LUAddNode(LogicalUnit):
2230   """Logical unit for adding node to the cluster.
2231
2232   """
2233   HPATH = "node-add"
2234   HTYPE = constants.HTYPE_NODE
2235   _OP_REQP = ["node_name"]
2236
2237   def BuildHooksEnv(self):
2238     """Build hooks env.
2239
2240     This will run on all nodes before, and on all nodes + the new node after.
2241
2242     """
2243     env = {
2244       "OP_TARGET": self.op.node_name,
2245       "NODE_NAME": self.op.node_name,
2246       "NODE_PIP": self.op.primary_ip,
2247       "NODE_SIP": self.op.secondary_ip,
2248       }
2249     nodes_0 = self.cfg.GetNodeList()
2250     nodes_1 = nodes_0 + [self.op.node_name, ]
2251     return env, nodes_0, nodes_1
2252
2253   def CheckPrereq(self):
2254     """Check prerequisites.
2255
2256     This checks:
2257      - the new node is not already in the config
2258      - it is resolvable
2259      - its parameters (single/dual homed) matches the cluster
2260
2261     Any errors are signalled by raising errors.OpPrereqError.
2262
2263     """
2264     node_name = self.op.node_name
2265     cfg = self.cfg
2266
2267     dns_data = utils.HostInfo(node_name)
2268
2269     node = dns_data.name
2270     primary_ip = self.op.primary_ip = dns_data.ip
2271     secondary_ip = getattr(self.op, "secondary_ip", None)
2272     if secondary_ip is None:
2273       secondary_ip = primary_ip
2274     if not utils.IsValidIP(secondary_ip):
2275       raise errors.OpPrereqError("Invalid secondary IP given")
2276     self.op.secondary_ip = secondary_ip
2277
2278     node_list = cfg.GetNodeList()
2279     if not self.op.readd and node in node_list:
2280       raise errors.OpPrereqError("Node %s is already in the configuration" %
2281                                  node)
2282     elif self.op.readd and node not in node_list:
2283       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2284
2285     for existing_node_name in node_list:
2286       existing_node = cfg.GetNodeInfo(existing_node_name)
2287
2288       if self.op.readd and node == existing_node_name:
2289         if (existing_node.primary_ip != primary_ip or
2290             existing_node.secondary_ip != secondary_ip):
2291           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2292                                      " address configuration as before")
2293         continue
2294
2295       if (existing_node.primary_ip == primary_ip or
2296           existing_node.secondary_ip == primary_ip or
2297           existing_node.primary_ip == secondary_ip or
2298           existing_node.secondary_ip == secondary_ip):
2299         raise errors.OpPrereqError("New node ip address(es) conflict with"
2300                                    " existing node %s" % existing_node.name)
2301
2302     # check that the type of the node (single versus dual homed) is the
2303     # same as for the master
2304     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2305     master_singlehomed = myself.secondary_ip == myself.primary_ip
2306     newbie_singlehomed = secondary_ip == primary_ip
2307     if master_singlehomed != newbie_singlehomed:
2308       if master_singlehomed:
2309         raise errors.OpPrereqError("The master has no private ip but the"
2310                                    " new node has one")
2311       else:
2312         raise errors.OpPrereqError("The master has a private ip but the"
2313                                    " new node doesn't have one")
2314
2315     # checks reachablity
2316     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2317       raise errors.OpPrereqError("Node not reachable by ping")
2318
2319     if not newbie_singlehomed:
2320       # check reachability from my secondary ip to newbie's secondary ip
2321       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2322                            source=myself.secondary_ip):
2323         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2324                                    " based ping to noded port")
2325
2326     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2327     if self.op.readd:
2328       exceptions = [node]
2329     else:
2330       exceptions = []
2331     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2332     # the new node will increase mc_max with one, so:
2333     mc_max = min(mc_max + 1, cp_size)
2334     self.master_candidate = mc_now < mc_max
2335
2336     if self.op.readd:
2337       self.new_node = self.cfg.GetNodeInfo(node)
2338       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2339     else:
2340       self.new_node = objects.Node(name=node,
2341                                    primary_ip=primary_ip,
2342                                    secondary_ip=secondary_ip,
2343                                    master_candidate=self.master_candidate,
2344                                    offline=False, drained=False)
2345
2346   def Exec(self, feedback_fn):
2347     """Adds the new node to the cluster.
2348
2349     """
2350     new_node = self.new_node
2351     node = new_node.name
2352
2353     # for re-adds, reset the offline/drained/master-candidate flags;
2354     # we need to reset here, otherwise offline would prevent RPC calls
2355     # later in the procedure; this also means that if the re-add
2356     # fails, we are left with a non-offlined, broken node
2357     if self.op.readd:
2358       new_node.drained = new_node.offline = False
2359       self.LogInfo("Readding a node, the offline/drained flags were reset")
2360       # if we demote the node, we do cleanup later in the procedure
2361       new_node.master_candidate = self.master_candidate
2362
2363     # notify the user about any possible mc promotion
2364     if new_node.master_candidate:
2365       self.LogInfo("Node will be a master candidate")
2366
2367     # check connectivity
2368     result = self.rpc.call_version([node])[node]
2369     result.Raise()
2370     if result.data:
2371       if constants.PROTOCOL_VERSION == result.data:
2372         logging.info("Communication to node %s fine, sw version %s match",
2373                      node, result.data)
2374       else:
2375         raise errors.OpExecError("Version mismatch master version %s,"
2376                                  " node version %s" %
2377                                  (constants.PROTOCOL_VERSION, result.data))
2378     else:
2379       raise errors.OpExecError("Cannot get version from the new node")
2380
2381     # setup ssh on node
2382     logging.info("Copy ssh key to node %s", node)
2383     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2384     keyarray = []
2385     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2386                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2387                 priv_key, pub_key]
2388
2389     for i in keyfiles:
2390       f = open(i, 'r')
2391       try:
2392         keyarray.append(f.read())
2393       finally:
2394         f.close()
2395
2396     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2397                                     keyarray[2],
2398                                     keyarray[3], keyarray[4], keyarray[5])
2399
2400     msg = result.RemoteFailMsg()
2401     if msg:
2402       raise errors.OpExecError("Cannot transfer ssh keys to the"
2403                                " new node: %s" % msg)
2404
2405     # Add node to our /etc/hosts, and add key to known_hosts
2406     utils.AddHostToEtcHosts(new_node.name)
2407
2408     if new_node.secondary_ip != new_node.primary_ip:
2409       result = self.rpc.call_node_has_ip_address(new_node.name,
2410                                                  new_node.secondary_ip)
2411       if result.failed or not result.data:
2412         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2413                                  " you gave (%s). Please fix and re-run this"
2414                                  " command." % new_node.secondary_ip)
2415
2416     node_verify_list = [self.cfg.GetMasterNode()]
2417     node_verify_param = {
2418       'nodelist': [node],
2419       # TODO: do a node-net-test as well?
2420     }
2421
2422     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2423                                        self.cfg.GetClusterName())
2424     for verifier in node_verify_list:
2425       if result[verifier].failed or not result[verifier].data:
2426         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2427                                  " for remote verification" % verifier)
2428       if result[verifier].data['nodelist']:
2429         for failed in result[verifier].data['nodelist']:
2430           feedback_fn("ssh/hostname verification failed %s -> %s" %
2431                       (verifier, result[verifier].data['nodelist'][failed]))
2432         raise errors.OpExecError("ssh/hostname verification failed.")
2433
2434     # Distribute updated /etc/hosts and known_hosts to all nodes,
2435     # including the node just added
2436     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2437     dist_nodes = self.cfg.GetNodeList()
2438     if not self.op.readd:
2439       dist_nodes.append(node)
2440     if myself.name in dist_nodes:
2441       dist_nodes.remove(myself.name)
2442
2443     logging.debug("Copying hosts and known_hosts to all nodes")
2444     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2445       result = self.rpc.call_upload_file(dist_nodes, fname)
2446       for to_node, to_result in result.iteritems():
2447         if to_result.failed or not to_result.data:
2448           logging.error("Copy of file %s to node %s failed", fname, to_node)
2449
2450     to_copy = []
2451     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2452     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2453       to_copy.append(constants.VNC_PASSWORD_FILE)
2454
2455     for fname in to_copy:
2456       result = self.rpc.call_upload_file([node], fname)
2457       if result[node].failed or not result[node]:
2458         logging.error("Could not copy file %s to node %s", fname, node)
2459
2460     if self.op.readd:
2461       self.context.ReaddNode(new_node)
2462       # make sure we redistribute the config
2463       self.cfg.Update(new_node)
2464       # and make sure the new node will not have old files around
2465       if not new_node.master_candidate:
2466         result = self.rpc.call_node_demote_from_mc(new_node.name)
2467         msg = result.RemoteFailMsg()
2468         if msg:
2469           self.LogWarning("Node failed to demote itself from master"
2470                           " candidate status: %s" % msg)
2471     else:
2472       self.context.AddNode(new_node)
2473
2474
2475 class LUSetNodeParams(LogicalUnit):
2476   """Modifies the parameters of a node.
2477
2478   """
2479   HPATH = "node-modify"
2480   HTYPE = constants.HTYPE_NODE
2481   _OP_REQP = ["node_name"]
2482   REQ_BGL = False
2483
2484   def CheckArguments(self):
2485     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2486     if node_name is None:
2487       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2488     self.op.node_name = node_name
2489     _CheckBooleanOpField(self.op, 'master_candidate')
2490     _CheckBooleanOpField(self.op, 'offline')
2491     _CheckBooleanOpField(self.op, 'drained')
2492     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2493     if all_mods.count(None) == 3:
2494       raise errors.OpPrereqError("Please pass at least one modification")
2495     if all_mods.count(True) > 1:
2496       raise errors.OpPrereqError("Can't set the node into more than one"
2497                                  " state at the same time")
2498
2499   def ExpandNames(self):
2500     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2501
2502   def BuildHooksEnv(self):
2503     """Build hooks env.
2504
2505     This runs on the master node.
2506
2507     """
2508     env = {
2509       "OP_TARGET": self.op.node_name,
2510       "MASTER_CANDIDATE": str(self.op.master_candidate),
2511       "OFFLINE": str(self.op.offline),
2512       "DRAINED": str(self.op.drained),
2513       }
2514     nl = [self.cfg.GetMasterNode(),
2515           self.op.node_name]
2516     return env, nl, nl
2517
2518   def CheckPrereq(self):
2519     """Check prerequisites.
2520
2521     This only checks the instance list against the existing names.
2522
2523     """
2524     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2525
2526     if ((self.op.master_candidate == False or self.op.offline == True or
2527          self.op.drained == True) and node.master_candidate):
2528       # we will demote the node from master_candidate
2529       if self.op.node_name == self.cfg.GetMasterNode():
2530         raise errors.OpPrereqError("The master node has to be a"
2531                                    " master candidate, online and not drained")
2532       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2533       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2534       if num_candidates <= cp_size:
2535         msg = ("Not enough master candidates (desired"
2536                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2537         if self.op.force:
2538           self.LogWarning(msg)
2539         else:
2540           raise errors.OpPrereqError(msg)
2541
2542     if (self.op.master_candidate == True and
2543         ((node.offline and not self.op.offline == False) or
2544          (node.drained and not self.op.drained == False))):
2545       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2546                                  " to master_candidate" % node.name)
2547
2548     return
2549
2550   def Exec(self, feedback_fn):
2551     """Modifies a node.
2552
2553     """
2554     node = self.node
2555
2556     result = []
2557     changed_mc = False
2558
2559     if self.op.offline is not None:
2560       node.offline = self.op.offline
2561       result.append(("offline", str(self.op.offline)))
2562       if self.op.offline == True:
2563         if node.master_candidate:
2564           node.master_candidate = False
2565           changed_mc = True
2566           result.append(("master_candidate", "auto-demotion due to offline"))
2567         if node.drained:
2568           node.drained = False
2569           result.append(("drained", "clear drained status due to offline"))
2570
2571     if self.op.master_candidate is not None:
2572       node.master_candidate = self.op.master_candidate
2573       changed_mc = True
2574       result.append(("master_candidate", str(self.op.master_candidate)))
2575       if self.op.master_candidate == False:
2576         rrc = self.rpc.call_node_demote_from_mc(node.name)
2577         msg = rrc.RemoteFailMsg()
2578         if msg:
2579           self.LogWarning("Node failed to demote itself: %s" % msg)
2580
2581     if self.op.drained is not None:
2582       node.drained = self.op.drained
2583       result.append(("drained", str(self.op.drained)))
2584       if self.op.drained == True:
2585         if node.master_candidate:
2586           node.master_candidate = False
2587           changed_mc = True
2588           result.append(("master_candidate", "auto-demotion due to drain"))
2589           rrc = self.rpc.call_node_demote_from_mc(node.name)
2590           msg = rrc.RemoteFailMsg()
2591           if msg:
2592             self.LogWarning("Node failed to demote itself: %s" % msg)
2593         if node.offline:
2594           node.offline = False
2595           result.append(("offline", "clear offline status due to drain"))
2596
2597     # this will trigger configuration file update, if needed
2598     self.cfg.Update(node)
2599     # this will trigger job queue propagation or cleanup
2600     if changed_mc:
2601       self.context.ReaddNode(node)
2602
2603     return result
2604
2605
2606 class LUQueryClusterInfo(NoHooksLU):
2607   """Query cluster configuration.
2608
2609   """
2610   _OP_REQP = []
2611   REQ_BGL = False
2612
2613   def ExpandNames(self):
2614     self.needed_locks = {}
2615
2616   def CheckPrereq(self):
2617     """No prerequsites needed for this LU.
2618
2619     """
2620     pass
2621
2622   def Exec(self, feedback_fn):
2623     """Return cluster config.
2624
2625     """
2626     cluster = self.cfg.GetClusterInfo()
2627     result = {
2628       "software_version": constants.RELEASE_VERSION,
2629       "protocol_version": constants.PROTOCOL_VERSION,
2630       "config_version": constants.CONFIG_VERSION,
2631       "os_api_version": constants.OS_API_VERSION,
2632       "export_version": constants.EXPORT_VERSION,
2633       "architecture": (platform.architecture()[0], platform.machine()),
2634       "name": cluster.cluster_name,
2635       "master": cluster.master_node,
2636       "default_hypervisor": cluster.default_hypervisor,
2637       "enabled_hypervisors": cluster.enabled_hypervisors,
2638       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2639                         for hypervisor in cluster.enabled_hypervisors]),
2640       "beparams": cluster.beparams,
2641       "candidate_pool_size": cluster.candidate_pool_size,
2642       "default_bridge": cluster.default_bridge,
2643       "master_netdev": cluster.master_netdev,
2644       "volume_group_name": cluster.volume_group_name,
2645       "file_storage_dir": cluster.file_storage_dir,
2646       }
2647
2648     return result
2649
2650
2651 class LUQueryConfigValues(NoHooksLU):
2652   """Return configuration values.
2653
2654   """
2655   _OP_REQP = []
2656   REQ_BGL = False
2657   _FIELDS_DYNAMIC = utils.FieldSet()
2658   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2659
2660   def ExpandNames(self):
2661     self.needed_locks = {}
2662
2663     _CheckOutputFields(static=self._FIELDS_STATIC,
2664                        dynamic=self._FIELDS_DYNAMIC,
2665                        selected=self.op.output_fields)
2666
2667   def CheckPrereq(self):
2668     """No prerequisites.
2669
2670     """
2671     pass
2672
2673   def Exec(self, feedback_fn):
2674     """Dump a representation of the cluster config to the standard output.
2675
2676     """
2677     values = []
2678     for field in self.op.output_fields:
2679       if field == "cluster_name":
2680         entry = self.cfg.GetClusterName()
2681       elif field == "master_node":
2682         entry = self.cfg.GetMasterNode()
2683       elif field == "drain_flag":
2684         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2685       else:
2686         raise errors.ParameterError(field)
2687       values.append(entry)
2688     return values
2689
2690
2691 class LUActivateInstanceDisks(NoHooksLU):
2692   """Bring up an instance's disks.
2693
2694   """
2695   _OP_REQP = ["instance_name"]
2696   REQ_BGL = False
2697
2698   def ExpandNames(self):
2699     self._ExpandAndLockInstance()
2700     self.needed_locks[locking.LEVEL_NODE] = []
2701     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2702
2703   def DeclareLocks(self, level):
2704     if level == locking.LEVEL_NODE:
2705       self._LockInstancesNodes()
2706
2707   def CheckPrereq(self):
2708     """Check prerequisites.
2709
2710     This checks that the instance is in the cluster.
2711
2712     """
2713     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2714     assert self.instance is not None, \
2715       "Cannot retrieve locked instance %s" % self.op.instance_name
2716     _CheckNodeOnline(self, self.instance.primary_node)
2717     if not hasattr(self.op, "ignore_size"):
2718       self.op.ignore_size = False
2719
2720   def Exec(self, feedback_fn):
2721     """Activate the disks.
2722
2723     """
2724     disks_ok, disks_info = \
2725               _AssembleInstanceDisks(self, self.instance,
2726                                      ignore_size=self.op.ignore_size)
2727     if not disks_ok:
2728       raise errors.OpExecError("Cannot activate block devices")
2729
2730     return disks_info
2731
2732
2733 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2734                            ignore_size=False):
2735   """Prepare the block devices for an instance.
2736
2737   This sets up the block devices on all nodes.
2738
2739   @type lu: L{LogicalUnit}
2740   @param lu: the logical unit on whose behalf we execute
2741   @type instance: L{objects.Instance}
2742   @param instance: the instance for whose disks we assemble
2743   @type ignore_secondaries: boolean
2744   @param ignore_secondaries: if true, errors on secondary nodes
2745       won't result in an error return from the function
2746   @type ignore_size: boolean
2747   @param ignore_size: if true, the current known size of the disk
2748       will not be used during the disk activation, useful for cases
2749       when the size is wrong
2750   @return: False if the operation failed, otherwise a list of
2751       (host, instance_visible_name, node_visible_name)
2752       with the mapping from node devices to instance devices
2753
2754   """
2755   device_info = []
2756   disks_ok = True
2757   iname = instance.name
2758   # With the two passes mechanism we try to reduce the window of
2759   # opportunity for the race condition of switching DRBD to primary
2760   # before handshaking occured, but we do not eliminate it
2761
2762   # The proper fix would be to wait (with some limits) until the
2763   # connection has been made and drbd transitions from WFConnection
2764   # into any other network-connected state (Connected, SyncTarget,
2765   # SyncSource, etc.)
2766
2767   # 1st pass, assemble on all nodes in secondary mode
2768   for inst_disk in instance.disks:
2769     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2770       if ignore_size:
2771         node_disk = node_disk.Copy()
2772         node_disk.UnsetSize()
2773       lu.cfg.SetDiskID(node_disk, node)
2774       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2775       msg = result.RemoteFailMsg()
2776       if msg:
2777         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2778                            " (is_primary=False, pass=1): %s",
2779                            inst_disk.iv_name, node, msg)
2780         if not ignore_secondaries:
2781           disks_ok = False
2782
2783   # FIXME: race condition on drbd migration to primary
2784
2785   # 2nd pass, do only the primary node
2786   for inst_disk in instance.disks:
2787     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2788       if node != instance.primary_node:
2789         continue
2790       if ignore_size:
2791         node_disk = node_disk.Copy()
2792         node_disk.UnsetSize()
2793       lu.cfg.SetDiskID(node_disk, node)
2794       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2795       msg = result.RemoteFailMsg()
2796       if msg:
2797         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2798                            " (is_primary=True, pass=2): %s",
2799                            inst_disk.iv_name, node, msg)
2800         disks_ok = False
2801     device_info.append((instance.primary_node, inst_disk.iv_name,
2802                         result.payload))
2803
2804   # leave the disks configured for the primary node
2805   # this is a workaround that would be fixed better by
2806   # improving the logical/physical id handling
2807   for disk in instance.disks:
2808     lu.cfg.SetDiskID(disk, instance.primary_node)
2809
2810   return disks_ok, device_info
2811
2812
2813 def _StartInstanceDisks(lu, instance, force):
2814   """Start the disks of an instance.
2815
2816   """
2817   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2818                                            ignore_secondaries=force)
2819   if not disks_ok:
2820     _ShutdownInstanceDisks(lu, instance)
2821     if force is not None and not force:
2822       lu.proc.LogWarning("", hint="If the message above refers to a"
2823                          " secondary node,"
2824                          " you can retry the operation using '--force'.")
2825     raise errors.OpExecError("Disk consistency error")
2826
2827
2828 class LUDeactivateInstanceDisks(NoHooksLU):
2829   """Shutdown an instance's disks.
2830
2831   """
2832   _OP_REQP = ["instance_name"]
2833   REQ_BGL = False
2834
2835   def ExpandNames(self):
2836     self._ExpandAndLockInstance()
2837     self.needed_locks[locking.LEVEL_NODE] = []
2838     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2839
2840   def DeclareLocks(self, level):
2841     if level == locking.LEVEL_NODE:
2842       self._LockInstancesNodes()
2843
2844   def CheckPrereq(self):
2845     """Check prerequisites.
2846
2847     This checks that the instance is in the cluster.
2848
2849     """
2850     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2851     assert self.instance is not None, \
2852       "Cannot retrieve locked instance %s" % self.op.instance_name
2853
2854   def Exec(self, feedback_fn):
2855     """Deactivate the disks
2856
2857     """
2858     instance = self.instance
2859     _SafeShutdownInstanceDisks(self, instance)
2860
2861
2862 def _SafeShutdownInstanceDisks(lu, instance):
2863   """Shutdown block devices of an instance.
2864
2865   This function checks if an instance is running, before calling
2866   _ShutdownInstanceDisks.
2867
2868   """
2869   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2870                                       [instance.hypervisor])
2871   ins_l = ins_l[instance.primary_node]
2872   if ins_l.failed or not isinstance(ins_l.data, list):
2873     raise errors.OpExecError("Can't contact node '%s'" %
2874                              instance.primary_node)
2875
2876   if instance.name in ins_l.data:
2877     raise errors.OpExecError("Instance is running, can't shutdown"
2878                              " block devices.")
2879
2880   _ShutdownInstanceDisks(lu, instance)
2881
2882
2883 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2884   """Shutdown block devices of an instance.
2885
2886   This does the shutdown on all nodes of the instance.
2887
2888   If the ignore_primary is false, errors on the primary node are
2889   ignored.
2890
2891   """
2892   all_result = True
2893   for disk in instance.disks:
2894     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2895       lu.cfg.SetDiskID(top_disk, node)
2896       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2897       msg = result.RemoteFailMsg()
2898       if msg:
2899         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2900                       disk.iv_name, node, msg)
2901         if not ignore_primary or node != instance.primary_node:
2902           all_result = False
2903   return all_result
2904
2905
2906 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2907   """Checks if a node has enough free memory.
2908
2909   This function check if a given node has the needed amount of free
2910   memory. In case the node has less memory or we cannot get the
2911   information from the node, this function raise an OpPrereqError
2912   exception.
2913
2914   @type lu: C{LogicalUnit}
2915   @param lu: a logical unit from which we get configuration data
2916   @type node: C{str}
2917   @param node: the node to check
2918   @type reason: C{str}
2919   @param reason: string to use in the error message
2920   @type requested: C{int}
2921   @param requested: the amount of memory in MiB to check for
2922   @type hypervisor_name: C{str}
2923   @param hypervisor_name: the hypervisor to ask for memory stats
2924   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2925       we cannot check the node
2926
2927   """
2928   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2929   nodeinfo[node].Raise()
2930   free_mem = nodeinfo[node].data.get('memory_free')
2931   if not isinstance(free_mem, int):
2932     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2933                              " was '%s'" % (node, free_mem))
2934   if requested > free_mem:
2935     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2936                              " needed %s MiB, available %s MiB" %
2937                              (node, reason, requested, free_mem))
2938
2939
2940 class LUStartupInstance(LogicalUnit):
2941   """Starts an instance.
2942
2943   """
2944   HPATH = "instance-start"
2945   HTYPE = constants.HTYPE_INSTANCE
2946   _OP_REQP = ["instance_name", "force"]
2947   REQ_BGL = False
2948
2949   def ExpandNames(self):
2950     self._ExpandAndLockInstance()
2951
2952   def BuildHooksEnv(self):
2953     """Build hooks env.
2954
2955     This runs on master, primary and secondary nodes of the instance.
2956
2957     """
2958     env = {
2959       "FORCE": self.op.force,
2960       }
2961     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2962     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2963     return env, nl, nl
2964
2965   def CheckPrereq(self):
2966     """Check prerequisites.
2967
2968     This checks that the instance is in the cluster.
2969
2970     """
2971     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2972     assert self.instance is not None, \
2973       "Cannot retrieve locked instance %s" % self.op.instance_name
2974
2975     # extra beparams
2976     self.beparams = getattr(self.op, "beparams", {})
2977     if self.beparams:
2978       if not isinstance(self.beparams, dict):
2979         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2980                                    " dict" % (type(self.beparams), ))
2981       # fill the beparams dict
2982       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2983       self.op.beparams = self.beparams
2984
2985     # extra hvparams
2986     self.hvparams = getattr(self.op, "hvparams", {})
2987     if self.hvparams:
2988       if not isinstance(self.hvparams, dict):
2989         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2990                                    " dict" % (type(self.hvparams), ))
2991
2992       # check hypervisor parameter syntax (locally)
2993       cluster = self.cfg.GetClusterInfo()
2994       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2995       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2996                                     instance.hvparams)
2997       filled_hvp.update(self.hvparams)
2998       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2999       hv_type.CheckParameterSyntax(filled_hvp)
3000       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3001       self.op.hvparams = self.hvparams
3002
3003     _CheckNodeOnline(self, instance.primary_node)
3004
3005     bep = self.cfg.GetClusterInfo().FillBE(instance)
3006     # check bridges existance
3007     _CheckInstanceBridgesExist(self, instance)
3008
3009     remote_info = self.rpc.call_instance_info(instance.primary_node,
3010                                               instance.name,
3011                                               instance.hypervisor)
3012     remote_info.Raise()
3013     if not remote_info.data:
3014       _CheckNodeFreeMemory(self, instance.primary_node,
3015                            "starting instance %s" % instance.name,
3016                            bep[constants.BE_MEMORY], instance.hypervisor)
3017
3018   def Exec(self, feedback_fn):
3019     """Start the instance.
3020
3021     """
3022     instance = self.instance
3023     force = self.op.force
3024
3025     self.cfg.MarkInstanceUp(instance.name)
3026
3027     node_current = instance.primary_node
3028
3029     _StartInstanceDisks(self, instance, force)
3030
3031     result = self.rpc.call_instance_start(node_current, instance,
3032                                           self.hvparams, self.beparams)
3033     msg = result.RemoteFailMsg()
3034     if msg:
3035       _ShutdownInstanceDisks(self, instance)
3036       raise errors.OpExecError("Could not start instance: %s" % msg)
3037
3038
3039 class LURebootInstance(LogicalUnit):
3040   """Reboot an instance.
3041
3042   """
3043   HPATH = "instance-reboot"
3044   HTYPE = constants.HTYPE_INSTANCE
3045   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3046   REQ_BGL = False
3047
3048   def ExpandNames(self):
3049     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3050                                    constants.INSTANCE_REBOOT_HARD,
3051                                    constants.INSTANCE_REBOOT_FULL]:
3052       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3053                                   (constants.INSTANCE_REBOOT_SOFT,
3054                                    constants.INSTANCE_REBOOT_HARD,
3055                                    constants.INSTANCE_REBOOT_FULL))
3056     self._ExpandAndLockInstance()
3057
3058   def BuildHooksEnv(self):
3059     """Build hooks env.
3060
3061     This runs on master, primary and secondary nodes of the instance.
3062
3063     """
3064     env = {
3065       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3066       "REBOOT_TYPE": self.op.reboot_type,
3067       }
3068     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3069     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3070     return env, nl, nl
3071
3072   def CheckPrereq(self):
3073     """Check prerequisites.
3074
3075     This checks that the instance is in the cluster.
3076
3077     """
3078     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3079     assert self.instance is not None, \
3080       "Cannot retrieve locked instance %s" % self.op.instance_name
3081
3082     _CheckNodeOnline(self, instance.primary_node)
3083
3084     # check bridges existance
3085     _CheckInstanceBridgesExist(self, instance)
3086
3087   def Exec(self, feedback_fn):
3088     """Reboot the instance.
3089
3090     """
3091     instance = self.instance
3092     ignore_secondaries = self.op.ignore_secondaries
3093     reboot_type = self.op.reboot_type
3094
3095     node_current = instance.primary_node
3096
3097     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3098                        constants.INSTANCE_REBOOT_HARD]:
3099       for disk in instance.disks:
3100         self.cfg.SetDiskID(disk, node_current)
3101       result = self.rpc.call_instance_reboot(node_current, instance,
3102                                              reboot_type)
3103       msg = result.RemoteFailMsg()
3104       if msg:
3105         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3106     else:
3107       result = self.rpc.call_instance_shutdown(node_current, instance)
3108       msg = result.RemoteFailMsg()
3109       if msg:
3110         raise errors.OpExecError("Could not shutdown instance for"
3111                                  " full reboot: %s" % msg)
3112       _ShutdownInstanceDisks(self, instance)
3113       _StartInstanceDisks(self, instance, ignore_secondaries)
3114       result = self.rpc.call_instance_start(node_current, instance, None, None)
3115       msg = result.RemoteFailMsg()
3116       if msg:
3117         _ShutdownInstanceDisks(self, instance)
3118         raise errors.OpExecError("Could not start instance for"
3119                                  " full reboot: %s" % msg)
3120
3121     self.cfg.MarkInstanceUp(instance.name)
3122
3123
3124 class LUShutdownInstance(LogicalUnit):
3125   """Shutdown an instance.
3126
3127   """
3128   HPATH = "instance-stop"
3129   HTYPE = constants.HTYPE_INSTANCE
3130   _OP_REQP = ["instance_name"]
3131   REQ_BGL = False
3132
3133   def ExpandNames(self):
3134     self._ExpandAndLockInstance()
3135
3136   def BuildHooksEnv(self):
3137     """Build hooks env.
3138
3139     This runs on master, primary and secondary nodes of the instance.
3140
3141     """
3142     env = _BuildInstanceHookEnvByObject(self, self.instance)
3143     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3144     return env, nl, nl
3145
3146   def CheckPrereq(self):
3147     """Check prerequisites.
3148
3149     This checks that the instance is in the cluster.
3150
3151     """
3152     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3153     assert self.instance is not None, \
3154       "Cannot retrieve locked instance %s" % self.op.instance_name
3155     _CheckNodeOnline(self, self.instance.primary_node)
3156
3157   def Exec(self, feedback_fn):
3158     """Shutdown the instance.
3159
3160     """
3161     instance = self.instance
3162     node_current = instance.primary_node
3163     self.cfg.MarkInstanceDown(instance.name)
3164     result = self.rpc.call_instance_shutdown(node_current, instance)
3165     msg = result.RemoteFailMsg()
3166     if msg:
3167       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3168
3169     _ShutdownInstanceDisks(self, instance)
3170
3171
3172 class LUReinstallInstance(LogicalUnit):
3173   """Reinstall an instance.
3174
3175   """
3176   HPATH = "instance-reinstall"
3177   HTYPE = constants.HTYPE_INSTANCE
3178   _OP_REQP = ["instance_name"]
3179   REQ_BGL = False
3180
3181   def ExpandNames(self):
3182     self._ExpandAndLockInstance()
3183
3184   def BuildHooksEnv(self):
3185     """Build hooks env.
3186
3187     This runs on master, primary and secondary nodes of the instance.
3188
3189     """
3190     env = _BuildInstanceHookEnvByObject(self, self.instance)
3191     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3192     return env, nl, nl
3193
3194   def CheckPrereq(self):
3195     """Check prerequisites.
3196
3197     This checks that the instance is in the cluster and is not running.
3198
3199     """
3200     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3201     assert instance is not None, \
3202       "Cannot retrieve locked instance %s" % self.op.instance_name
3203     _CheckNodeOnline(self, instance.primary_node)
3204
3205     if instance.disk_template == constants.DT_DISKLESS:
3206       raise errors.OpPrereqError("Instance '%s' has no disks" %
3207                                  self.op.instance_name)
3208     if instance.admin_up:
3209       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3210                                  self.op.instance_name)
3211     remote_info = self.rpc.call_instance_info(instance.primary_node,
3212                                               instance.name,
3213                                               instance.hypervisor)
3214     remote_info.Raise()
3215     if remote_info.data:
3216       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3217                                  (self.op.instance_name,
3218                                   instance.primary_node))
3219
3220     self.op.os_type = getattr(self.op, "os_type", None)
3221     if self.op.os_type is not None:
3222       # OS verification
3223       pnode = self.cfg.GetNodeInfo(
3224         self.cfg.ExpandNodeName(instance.primary_node))
3225       if pnode is None:
3226         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3227                                    self.op.pnode)
3228       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3229       result.Raise()
3230       if not isinstance(result.data, objects.OS):
3231         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3232                                    " primary node"  % self.op.os_type)
3233
3234     self.instance = instance
3235
3236   def Exec(self, feedback_fn):
3237     """Reinstall the instance.
3238
3239     """
3240     inst = self.instance
3241
3242     if self.op.os_type is not None:
3243       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3244       inst.os = self.op.os_type
3245       self.cfg.Update(inst)
3246
3247     _StartInstanceDisks(self, inst, None)
3248     try:
3249       feedback_fn("Running the instance OS create scripts...")
3250       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3251       msg = result.RemoteFailMsg()
3252       if msg:
3253         raise errors.OpExecError("Could not install OS for instance %s"
3254                                  " on node %s: %s" %
3255                                  (inst.name, inst.primary_node, msg))
3256     finally:
3257       _ShutdownInstanceDisks(self, inst)
3258
3259
3260 class LURenameInstance(LogicalUnit):
3261   """Rename an instance.
3262
3263   """
3264   HPATH = "instance-rename"
3265   HTYPE = constants.HTYPE_INSTANCE
3266   _OP_REQP = ["instance_name", "new_name"]
3267
3268   def BuildHooksEnv(self):
3269     """Build hooks env.
3270
3271     This runs on master, primary and secondary nodes of the instance.
3272
3273     """
3274     env = _BuildInstanceHookEnvByObject(self, self.instance)
3275     env["INSTANCE_NEW_NAME"] = self.op.new_name
3276     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3277     return env, nl, nl
3278
3279   def CheckPrereq(self):
3280     """Check prerequisites.
3281
3282     This checks that the instance is in the cluster and is not running.
3283
3284     """
3285     instance = self.cfg.GetInstanceInfo(
3286       self.cfg.ExpandInstanceName(self.op.instance_name))
3287     if instance is None:
3288       raise errors.OpPrereqError("Instance '%s' not known" %
3289                                  self.op.instance_name)
3290     _CheckNodeOnline(self, instance.primary_node)
3291
3292     if instance.admin_up:
3293       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3294                                  self.op.instance_name)
3295     remote_info = self.rpc.call_instance_info(instance.primary_node,
3296                                               instance.name,
3297                                               instance.hypervisor)
3298     remote_info.Raise()
3299     if remote_info.data:
3300       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3301                                  (self.op.instance_name,
3302                                   instance.primary_node))
3303     self.instance = instance
3304
3305     # new name verification
3306     name_info = utils.HostInfo(self.op.new_name)
3307
3308     self.op.new_name = new_name = name_info.name
3309     instance_list = self.cfg.GetInstanceList()
3310     if new_name in instance_list:
3311       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3312                                  new_name)
3313
3314     if not getattr(self.op, "ignore_ip", False):
3315       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3316         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3317                                    (name_info.ip, new_name))
3318
3319
3320   def Exec(self, feedback_fn):
3321     """Reinstall the instance.
3322
3323     """
3324     inst = self.instance
3325     old_name = inst.name
3326
3327     if inst.disk_template == constants.DT_FILE:
3328       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3329
3330     self.cfg.RenameInstance(inst.name, self.op.new_name)
3331     # Change the instance lock. This is definitely safe while we hold the BGL
3332     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3333     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3334
3335     # re-read the instance from the configuration after rename
3336     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3337
3338     if inst.disk_template == constants.DT_FILE:
3339       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3340       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3341                                                      old_file_storage_dir,
3342                                                      new_file_storage_dir)
3343       result.Raise()
3344       if not result.data:
3345         raise errors.OpExecError("Could not connect to node '%s' to rename"
3346                                  " directory '%s' to '%s' (but the instance"
3347                                  " has been renamed in Ganeti)" % (
3348                                  inst.primary_node, old_file_storage_dir,
3349                                  new_file_storage_dir))
3350
3351       if not result.data[0]:
3352         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3353                                  " (but the instance has been renamed in"
3354                                  " Ganeti)" % (old_file_storage_dir,
3355                                                new_file_storage_dir))
3356
3357     _StartInstanceDisks(self, inst, None)
3358     try:
3359       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3360                                                  old_name)
3361       msg = result.RemoteFailMsg()
3362       if msg:
3363         msg = ("Could not run OS rename script for instance %s on node %s"
3364                " (but the instance has been renamed in Ganeti): %s" %
3365                (inst.name, inst.primary_node, msg))
3366         self.proc.LogWarning(msg)
3367     finally:
3368       _ShutdownInstanceDisks(self, inst)
3369
3370
3371 class LURemoveInstance(LogicalUnit):
3372   """Remove an instance.
3373
3374   """
3375   HPATH = "instance-remove"
3376   HTYPE = constants.HTYPE_INSTANCE
3377   _OP_REQP = ["instance_name", "ignore_failures"]
3378   REQ_BGL = False
3379
3380   def ExpandNames(self):
3381     self._ExpandAndLockInstance()
3382     self.needed_locks[locking.LEVEL_NODE] = []
3383     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3384
3385   def DeclareLocks(self, level):
3386     if level == locking.LEVEL_NODE:
3387       self._LockInstancesNodes()
3388
3389   def BuildHooksEnv(self):
3390     """Build hooks env.
3391
3392     This runs on master, primary and secondary nodes of the instance.
3393
3394     """
3395     env = _BuildInstanceHookEnvByObject(self, self.instance)
3396     nl = [self.cfg.GetMasterNode()]
3397     return env, nl, nl
3398
3399   def CheckPrereq(self):
3400     """Check prerequisites.
3401
3402     This checks that the instance is in the cluster.
3403
3404     """
3405     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3406     assert self.instance is not None, \
3407       "Cannot retrieve locked instance %s" % self.op.instance_name
3408
3409   def Exec(self, feedback_fn):
3410     """Remove the instance.
3411
3412     """
3413     instance = self.instance
3414     logging.info("Shutting down instance %s on node %s",
3415                  instance.name, instance.primary_node)
3416
3417     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3418     msg = result.RemoteFailMsg()
3419     if msg:
3420       if self.op.ignore_failures:
3421         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3422       else:
3423         raise errors.OpExecError("Could not shutdown instance %s on"
3424                                  " node %s: %s" %
3425                                  (instance.name, instance.primary_node, msg))
3426
3427     logging.info("Removing block devices for instance %s", instance.name)
3428
3429     if not _RemoveDisks(self, instance):
3430       if self.op.ignore_failures:
3431         feedback_fn("Warning: can't remove instance's disks")
3432       else:
3433         raise errors.OpExecError("Can't remove instance's disks")
3434
3435     logging.info("Removing instance %s out of cluster config", instance.name)
3436
3437     self.cfg.RemoveInstance(instance.name)
3438     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3439
3440
3441 class LUQueryInstances(NoHooksLU):
3442   """Logical unit for querying instances.
3443
3444   """
3445   _OP_REQP = ["output_fields", "names", "use_locking"]
3446   REQ_BGL = False
3447   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3448                                     "admin_state",
3449                                     "disk_template", "ip", "mac", "bridge",
3450                                     "sda_size", "sdb_size", "vcpus", "tags",
3451                                     "network_port", "beparams",
3452                                     r"(disk)\.(size)/([0-9]+)",
3453                                     r"(disk)\.(sizes)", "disk_usage",
3454                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3455                                     r"(nic)\.(macs|ips|bridges)",
3456                                     r"(disk|nic)\.(count)",
3457                                     "serial_no", "hypervisor", "hvparams",] +
3458                                   ["hv/%s" % name
3459                                    for name in constants.HVS_PARAMETERS] +
3460                                   ["be/%s" % name
3461                                    for name in constants.BES_PARAMETERS])
3462   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3463
3464
3465   def ExpandNames(self):
3466     _CheckOutputFields(static=self._FIELDS_STATIC,
3467                        dynamic=self._FIELDS_DYNAMIC,
3468                        selected=self.op.output_fields)
3469
3470     self.needed_locks = {}
3471     self.share_locks[locking.LEVEL_INSTANCE] = 1
3472     self.share_locks[locking.LEVEL_NODE] = 1
3473
3474     if self.op.names:
3475       self.wanted = _GetWantedInstances(self, self.op.names)
3476     else:
3477       self.wanted = locking.ALL_SET
3478
3479     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3480     self.do_locking = self.do_node_query and self.op.use_locking
3481     if self.do_locking:
3482       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3483       self.needed_locks[locking.LEVEL_NODE] = []
3484       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3485
3486   def DeclareLocks(self, level):
3487     if level == locking.LEVEL_NODE and self.do_locking:
3488       self._LockInstancesNodes()
3489
3490   def CheckPrereq(self):
3491     """Check prerequisites.
3492
3493     """
3494     pass
3495
3496   def Exec(self, feedback_fn):
3497     """Computes the list of nodes and their attributes.
3498
3499     """
3500     all_info = self.cfg.GetAllInstancesInfo()
3501     if self.wanted == locking.ALL_SET:
3502       # caller didn't specify instance names, so ordering is not important
3503       if self.do_locking:
3504         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3505       else:
3506         instance_names = all_info.keys()
3507       instance_names = utils.NiceSort(instance_names)
3508     else:
3509       # caller did specify names, so we must keep the ordering
3510       if self.do_locking:
3511         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3512       else:
3513         tgt_set = all_info.keys()
3514       missing = set(self.wanted).difference(tgt_set)
3515       if missing:
3516         raise errors.OpExecError("Some instances were removed before"
3517                                  " retrieving their data: %s" % missing)
3518       instance_names = self.wanted
3519
3520     instance_list = [all_info[iname] for iname in instance_names]
3521
3522     # begin data gathering
3523
3524     nodes = frozenset([inst.primary_node for inst in instance_list])
3525     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3526
3527     bad_nodes = []
3528     off_nodes = []
3529     if self.do_node_query:
3530       live_data = {}
3531       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3532       for name in nodes:
3533         result = node_data[name]
3534         if result.offline:
3535           # offline nodes will be in both lists
3536           off_nodes.append(name)
3537         if result.failed:
3538           bad_nodes.append(name)
3539         else:
3540           if result.data:
3541             live_data.update(result.data)
3542             # else no instance is alive
3543     else:
3544       live_data = dict([(name, {}) for name in instance_names])
3545
3546     # end data gathering
3547
3548     HVPREFIX = "hv/"
3549     BEPREFIX = "be/"
3550     output = []
3551     for instance in instance_list:
3552       iout = []
3553       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3554       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3555       for field in self.op.output_fields:
3556         st_match = self._FIELDS_STATIC.Matches(field)
3557         if field == "name":
3558           val = instance.name
3559         elif field == "os":
3560           val = instance.os
3561         elif field == "pnode":
3562           val = instance.primary_node
3563         elif field == "snodes":
3564           val = list(instance.secondary_nodes)
3565         elif field == "admin_state":
3566           val = instance.admin_up
3567         elif field == "oper_state":
3568           if instance.primary_node in bad_nodes:
3569             val = None
3570           else:
3571             val = bool(live_data.get(instance.name))
3572         elif field == "status":
3573           if instance.primary_node in off_nodes:
3574             val = "ERROR_nodeoffline"
3575           elif instance.primary_node in bad_nodes:
3576             val = "ERROR_nodedown"
3577           else:
3578             running = bool(live_data.get(instance.name))
3579             if running:
3580               if instance.admin_up:
3581                 val = "running"
3582               else:
3583                 val = "ERROR_up"
3584             else:
3585               if instance.admin_up:
3586                 val = "ERROR_down"
3587               else:
3588                 val = "ADMIN_down"
3589         elif field == "oper_ram":
3590           if instance.primary_node in bad_nodes:
3591             val = None
3592           elif instance.name in live_data:
3593             val = live_data[instance.name].get("memory", "?")
3594           else:
3595             val = "-"
3596         elif field == "vcpus":
3597           val = i_be[constants.BE_VCPUS]
3598         elif field == "disk_template":
3599           val = instance.disk_template
3600         elif field == "ip":
3601           if instance.nics:
3602             val = instance.nics[0].ip
3603           else:
3604             val = None
3605         elif field == "bridge":
3606           if instance.nics:
3607             val = instance.nics[0].bridge
3608           else:
3609             val = None
3610         elif field == "mac":
3611           if instance.nics:
3612             val = instance.nics[0].mac
3613           else:
3614             val = None
3615         elif field == "sda_size" or field == "sdb_size":
3616           idx = ord(field[2]) - ord('a')
3617           try:
3618             val = instance.FindDisk(idx).size
3619           except errors.OpPrereqError:
3620             val = None
3621         elif field == "disk_usage": # total disk usage per node
3622           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3623           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3624         elif field == "tags":
3625           val = list(instance.GetTags())
3626         elif field == "serial_no":
3627           val = instance.serial_no
3628         elif field == "network_port":
3629           val = instance.network_port
3630         elif field == "hypervisor":
3631           val = instance.hypervisor
3632         elif field == "hvparams":
3633           val = i_hv
3634         elif (field.startswith(HVPREFIX) and
3635               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3636           val = i_hv.get(field[len(HVPREFIX):], None)
3637         elif field == "beparams":
3638           val = i_be
3639         elif (field.startswith(BEPREFIX) and
3640               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3641           val = i_be.get(field[len(BEPREFIX):], None)
3642         elif st_match and st_match.groups():
3643           # matches a variable list
3644           st_groups = st_match.groups()
3645           if st_groups and st_groups[0] == "disk":
3646             if st_groups[1] == "count":
3647               val = len(instance.disks)
3648             elif st_groups[1] == "sizes":
3649               val = [disk.size for disk in instance.disks]
3650             elif st_groups[1] == "size":
3651               try:
3652                 val = instance.FindDisk(st_groups[2]).size
3653               except errors.OpPrereqError:
3654                 val = None
3655             else:
3656               assert False, "Unhandled disk parameter"
3657           elif st_groups[0] == "nic":
3658             if st_groups[1] == "count":
3659               val = len(instance.nics)
3660             elif st_groups[1] == "macs":
3661               val = [nic.mac for nic in instance.nics]
3662             elif st_groups[1] == "ips":
3663               val = [nic.ip for nic in instance.nics]
3664             elif st_groups[1] == "bridges":
3665               val = [nic.bridge for nic in instance.nics]
3666             else:
3667               # index-based item
3668               nic_idx = int(st_groups[2])
3669               if nic_idx >= len(instance.nics):
3670                 val = None
3671               else:
3672                 if st_groups[1] == "mac":
3673                   val = instance.nics[nic_idx].mac
3674                 elif st_groups[1] == "ip":
3675                   val = instance.nics[nic_idx].ip
3676                 elif st_groups[1] == "bridge":
3677                   val = instance.nics[nic_idx].bridge
3678                 else:
3679                   assert False, "Unhandled NIC parameter"
3680           else:
3681             assert False, ("Declared but unhandled variable parameter '%s'" %
3682                            field)
3683         else:
3684           assert False, "Declared but unhandled parameter '%s'" % field
3685         iout.append(val)
3686       output.append(iout)
3687
3688     return output
3689
3690
3691 class LUFailoverInstance(LogicalUnit):
3692   """Failover an instance.
3693
3694   """
3695   HPATH = "instance-failover"
3696   HTYPE = constants.HTYPE_INSTANCE
3697   _OP_REQP = ["instance_name", "ignore_consistency"]
3698   REQ_BGL = False
3699
3700   def ExpandNames(self):
3701     self._ExpandAndLockInstance()
3702     self.needed_locks[locking.LEVEL_NODE] = []
3703     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3704
3705   def DeclareLocks(self, level):
3706     if level == locking.LEVEL_NODE:
3707       self._LockInstancesNodes()
3708
3709   def BuildHooksEnv(self):
3710     """Build hooks env.
3711
3712     This runs on master, primary and secondary nodes of the instance.
3713
3714     """
3715     env = {
3716       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3717       }
3718     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3719     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3720     return env, nl, nl
3721
3722   def CheckPrereq(self):
3723     """Check prerequisites.
3724
3725     This checks that the instance is in the cluster.
3726
3727     """
3728     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3729     assert self.instance is not None, \
3730       "Cannot retrieve locked instance %s" % self.op.instance_name
3731
3732     bep = self.cfg.GetClusterInfo().FillBE(instance)
3733     if instance.disk_template not in constants.DTS_NET_MIRROR:
3734       raise errors.OpPrereqError("Instance's disk layout is not"
3735                                  " network mirrored, cannot failover.")
3736
3737     secondary_nodes = instance.secondary_nodes
3738     if not secondary_nodes:
3739       raise errors.ProgrammerError("no secondary node but using "
3740                                    "a mirrored disk template")
3741
3742     target_node = secondary_nodes[0]
3743     _CheckNodeOnline(self, target_node)
3744     _CheckNodeNotDrained(self, target_node)
3745
3746     if instance.admin_up:
3747       # check memory requirements on the secondary node
3748       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3749                            instance.name, bep[constants.BE_MEMORY],
3750                            instance.hypervisor)
3751     else:
3752       self.LogInfo("Not checking memory on the secondary node as"
3753                    " instance will not be started")
3754
3755     # check bridge existance
3756     brlist = [nic.bridge for nic in instance.nics]
3757     result = self.rpc.call_bridges_exist(target_node, brlist)
3758     result.Raise()
3759     if not result.data:
3760       raise errors.OpPrereqError("One or more target bridges %s does not"
3761                                  " exist on destination node '%s'" %
3762                                  (brlist, target_node))
3763
3764   def Exec(self, feedback_fn):
3765     """Failover an instance.
3766
3767     The failover is done by shutting it down on its present node and
3768     starting it on the secondary.
3769
3770     """
3771     instance = self.instance
3772
3773     source_node = instance.primary_node
3774     target_node = instance.secondary_nodes[0]
3775
3776     feedback_fn("* checking disk consistency between source and target")
3777     for dev in instance.disks:
3778       # for drbd, these are drbd over lvm
3779       if not _CheckDiskConsistency(self, dev, target_node, False):
3780         if instance.admin_up and not self.op.ignore_consistency:
3781           raise errors.OpExecError("Disk %s is degraded on target node,"
3782                                    " aborting failover." % dev.iv_name)
3783
3784     feedback_fn("* shutting down instance on source node")
3785     logging.info("Shutting down instance %s on node %s",
3786                  instance.name, source_node)
3787
3788     result = self.rpc.call_instance_shutdown(source_node, instance)
3789     msg = result.RemoteFailMsg()
3790     if msg:
3791       if self.op.ignore_consistency:
3792         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3793                              " Proceeding anyway. Please make sure node"
3794                              " %s is down. Error details: %s",
3795                              instance.name, source_node, source_node, msg)
3796       else:
3797         raise errors.OpExecError("Could not shutdown instance %s on"
3798                                  " node %s: %s" %
3799                                  (instance.name, source_node, msg))
3800
3801     feedback_fn("* deactivating the instance's disks on source node")
3802     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3803       raise errors.OpExecError("Can't shut down the instance's disks.")
3804
3805     instance.primary_node = target_node
3806     # distribute new instance config to the other nodes
3807     self.cfg.Update(instance)
3808
3809     # Only start the instance if it's marked as up
3810     if instance.admin_up:
3811       feedback_fn("* activating the instance's disks on target node")
3812       logging.info("Starting instance %s on node %s",
3813                    instance.name, target_node)
3814
3815       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3816                                                ignore_secondaries=True)
3817       if not disks_ok:
3818         _ShutdownInstanceDisks(self, instance)
3819         raise errors.OpExecError("Can't activate the instance's disks")
3820
3821       feedback_fn("* starting the instance on the target node")
3822       result = self.rpc.call_instance_start(target_node, instance, None, None)
3823       msg = result.RemoteFailMsg()
3824       if msg:
3825         _ShutdownInstanceDisks(self, instance)
3826         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3827                                  (instance.name, target_node, msg))
3828
3829
3830 class LUMigrateInstance(LogicalUnit):
3831   """Migrate an instance.
3832
3833   This is migration without shutting down, compared to the failover,
3834   which is done with shutdown.
3835
3836   """
3837   HPATH = "instance-migrate"
3838   HTYPE = constants.HTYPE_INSTANCE
3839   _OP_REQP = ["instance_name", "live", "cleanup"]
3840
3841   REQ_BGL = False
3842
3843   def ExpandNames(self):
3844     self._ExpandAndLockInstance()
3845     self.needed_locks[locking.LEVEL_NODE] = []
3846     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3847
3848   def DeclareLocks(self, level):
3849     if level == locking.LEVEL_NODE:
3850       self._LockInstancesNodes()
3851
3852   def BuildHooksEnv(self):
3853     """Build hooks env.
3854
3855     This runs on master, primary and secondary nodes of the instance.
3856
3857     """
3858     env = _BuildInstanceHookEnvByObject(self, self.instance)
3859     env["MIGRATE_LIVE"] = self.op.live
3860     env["MIGRATE_CLEANUP"] = self.op.cleanup
3861     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3862     return env, nl, nl
3863
3864   def CheckPrereq(self):
3865     """Check prerequisites.
3866
3867     This checks that the instance is in the cluster.
3868
3869     """
3870     instance = self.cfg.GetInstanceInfo(
3871       self.cfg.ExpandInstanceName(self.op.instance_name))
3872     if instance is None:
3873       raise errors.OpPrereqError("Instance '%s' not known" %
3874                                  self.op.instance_name)
3875
3876     if instance.disk_template != constants.DT_DRBD8:
3877       raise errors.OpPrereqError("Instance's disk layout is not"
3878                                  " drbd8, cannot migrate.")
3879
3880     secondary_nodes = instance.secondary_nodes
3881     if not secondary_nodes:
3882       raise errors.ConfigurationError("No secondary node but using"
3883                                       " drbd8 disk template")
3884
3885     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3886
3887     target_node = secondary_nodes[0]
3888     # check memory requirements on the secondary node
3889     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3890                          instance.name, i_be[constants.BE_MEMORY],
3891                          instance.hypervisor)
3892
3893     # check bridge existance
3894     brlist = [nic.bridge for nic in instance.nics]
3895     result = self.rpc.call_bridges_exist(target_node, brlist)
3896     if result.failed or not result.data:
3897       raise errors.OpPrereqError("One or more target bridges %s does not"
3898                                  " exist on destination node '%s'" %
3899                                  (brlist, target_node))
3900
3901     if not self.op.cleanup:
3902       _CheckNodeNotDrained(self, target_node)
3903       result = self.rpc.call_instance_migratable(instance.primary_node,
3904                                                  instance)
3905       msg = result.RemoteFailMsg()
3906       if msg:
3907         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3908                                    msg)
3909
3910     self.instance = instance
3911
3912   def _WaitUntilSync(self):
3913     """Poll with custom rpc for disk sync.
3914
3915     This uses our own step-based rpc call.
3916
3917     """
3918     self.feedback_fn("* wait until resync is done")
3919     all_done = False
3920     while not all_done:
3921       all_done = True
3922       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3923                                             self.nodes_ip,
3924                                             self.instance.disks)
3925       min_percent = 100
3926       for node, nres in result.items():
3927         msg = nres.RemoteFailMsg()
3928         if msg:
3929           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3930                                    (node, msg))
3931         node_done, node_percent = nres.payload
3932         all_done = all_done and node_done
3933         if node_percent is not None:
3934           min_percent = min(min_percent, node_percent)
3935       if not all_done:
3936         if min_percent < 100:
3937           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3938         time.sleep(2)
3939
3940   def _EnsureSecondary(self, node):
3941     """Demote a node to secondary.
3942
3943     """
3944     self.feedback_fn("* switching node %s to secondary mode" % node)
3945
3946     for dev in self.instance.disks:
3947       self.cfg.SetDiskID(dev, node)
3948
3949     result = self.rpc.call_blockdev_close(node, self.instance.name,
3950                                           self.instance.disks)
3951     msg = result.RemoteFailMsg()
3952     if msg:
3953       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3954                                " error %s" % (node, msg))
3955
3956   def _GoStandalone(self):
3957     """Disconnect from the network.
3958
3959     """
3960     self.feedback_fn("* changing into standalone mode")
3961     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3962                                                self.instance.disks)
3963     for node, nres in result.items():
3964       msg = nres.RemoteFailMsg()
3965       if msg:
3966         raise errors.OpExecError("Cannot disconnect disks node %s,"
3967                                  " error %s" % (node, msg))
3968
3969   def _GoReconnect(self, multimaster):
3970     """Reconnect to the network.
3971
3972     """
3973     if multimaster:
3974       msg = "dual-master"
3975     else:
3976       msg = "single-master"
3977     self.feedback_fn("* changing disks into %s mode" % msg)
3978     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3979                                            self.instance.disks,
3980                                            self.instance.name, multimaster)
3981     for node, nres in result.items():
3982       msg = nres.RemoteFailMsg()
3983       if msg:
3984         raise errors.OpExecError("Cannot change disks config on node %s,"
3985                                  " error: %s" % (node, msg))
3986
3987   def _ExecCleanup(self):
3988     """Try to cleanup after a failed migration.
3989
3990     The cleanup is done by:
3991       - check that the instance is running only on one node
3992         (and update the config if needed)
3993       - change disks on its secondary node to secondary
3994       - wait until disks are fully synchronized
3995       - disconnect from the network
3996       - change disks into single-master mode
3997       - wait again until disks are fully synchronized
3998
3999     """
4000     instance = self.instance
4001     target_node = self.target_node
4002     source_node = self.source_node
4003
4004     # check running on only one node
4005     self.feedback_fn("* checking where the instance actually runs"
4006                      " (if this hangs, the hypervisor might be in"
4007                      " a bad state)")
4008     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4009     for node, result in ins_l.items():
4010       result.Raise()
4011       if not isinstance(result.data, list):
4012         raise errors.OpExecError("Can't contact node '%s'" % node)
4013
4014     runningon_source = instance.name in ins_l[source_node].data
4015     runningon_target = instance.name in ins_l[target_node].data
4016
4017     if runningon_source and runningon_target:
4018       raise errors.OpExecError("Instance seems to be running on two nodes,"
4019                                " or the hypervisor is confused. You will have"
4020                                " to ensure manually that it runs only on one"
4021                                " and restart this operation.")
4022
4023     if not (runningon_source or runningon_target):
4024       raise errors.OpExecError("Instance does not seem to be running at all."
4025                                " In this case, it's safer to repair by"
4026                                " running 'gnt-instance stop' to ensure disk"
4027                                " shutdown, and then restarting it.")
4028
4029     if runningon_target:
4030       # the migration has actually succeeded, we need to update the config
4031       self.feedback_fn("* instance running on secondary node (%s),"
4032                        " updating config" % target_node)
4033       instance.primary_node = target_node
4034       self.cfg.Update(instance)
4035       demoted_node = source_node
4036     else:
4037       self.feedback_fn("* instance confirmed to be running on its"
4038                        " primary node (%s)" % source_node)
4039       demoted_node = target_node
4040
4041     self._EnsureSecondary(demoted_node)
4042     try:
4043       self._WaitUntilSync()
4044     except errors.OpExecError:
4045       # we ignore here errors, since if the device is standalone, it
4046       # won't be able to sync
4047       pass
4048     self._GoStandalone()
4049     self._GoReconnect(False)
4050     self._WaitUntilSync()
4051
4052     self.feedback_fn("* done")
4053
4054   def _RevertDiskStatus(self):
4055     """Try to revert the disk status after a failed migration.
4056
4057     """
4058     target_node = self.target_node
4059     try:
4060       self._EnsureSecondary(target_node)
4061       self._GoStandalone()
4062       self._GoReconnect(False)
4063       self._WaitUntilSync()
4064     except errors.OpExecError, err:
4065       self.LogWarning("Migration failed and I can't reconnect the"
4066                       " drives: error '%s'\n"
4067                       "Please look and recover the instance status" %
4068                       str(err))
4069
4070   def _AbortMigration(self):
4071     """Call the hypervisor code to abort a started migration.
4072
4073     """
4074     instance = self.instance
4075     target_node = self.target_node
4076     migration_info = self.migration_info
4077
4078     abort_result = self.rpc.call_finalize_migration(target_node,
4079                                                     instance,
4080                                                     migration_info,
4081                                                     False)
4082     abort_msg = abort_result.RemoteFailMsg()
4083     if abort_msg:
4084       logging.error("Aborting migration failed on target node %s: %s" %
4085                     (target_node, abort_msg))
4086       # Don't raise an exception here, as we stil have to try to revert the
4087       # disk status, even if this step failed.
4088
4089   def _ExecMigration(self):
4090     """Migrate an instance.
4091
4092     The migrate is done by:
4093       - change the disks into dual-master mode
4094       - wait until disks are fully synchronized again
4095       - migrate the instance
4096       - change disks on the new secondary node (the old primary) to secondary
4097       - wait until disks are fully synchronized
4098       - change disks into single-master mode
4099
4100     """
4101     instance = self.instance
4102     target_node = self.target_node
4103     source_node = self.source_node
4104
4105     self.feedback_fn("* checking disk consistency between source and target")
4106     for dev in instance.disks:
4107       if not _CheckDiskConsistency(self, dev, target_node, False):
4108         raise errors.OpExecError("Disk %s is degraded or not fully"
4109                                  " synchronized on target node,"
4110                                  " aborting migrate." % dev.iv_name)
4111
4112     # First get the migration information from the remote node
4113     result = self.rpc.call_migration_info(source_node, instance)
4114     msg = result.RemoteFailMsg()
4115     if msg:
4116       log_err = ("Failed fetching source migration information from %s: %s" %
4117                  (source_node, msg))
4118       logging.error(log_err)
4119       raise errors.OpExecError(log_err)
4120
4121     self.migration_info = migration_info = result.payload
4122
4123     # Then switch the disks to master/master mode
4124     self._EnsureSecondary(target_node)
4125     self._GoStandalone()
4126     self._GoReconnect(True)
4127     self._WaitUntilSync()
4128
4129     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4130     result = self.rpc.call_accept_instance(target_node,
4131                                            instance,
4132                                            migration_info,
4133                                            self.nodes_ip[target_node])
4134
4135     msg = result.RemoteFailMsg()
4136     if msg:
4137       logging.error("Instance pre-migration failed, trying to revert"
4138                     " disk status: %s", msg)
4139       self._AbortMigration()
4140       self._RevertDiskStatus()
4141       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4142                                (instance.name, msg))
4143
4144     self.feedback_fn("* migrating instance to %s" % target_node)
4145     time.sleep(10)
4146     result = self.rpc.call_instance_migrate(source_node, instance,
4147                                             self.nodes_ip[target_node],
4148                                             self.op.live)
4149     msg = result.RemoteFailMsg()
4150     if msg:
4151       logging.error("Instance migration failed, trying to revert"
4152                     " disk status: %s", msg)
4153       self._AbortMigration()
4154       self._RevertDiskStatus()
4155       raise errors.OpExecError("Could not migrate instance %s: %s" %
4156                                (instance.name, msg))
4157     time.sleep(10)
4158
4159     instance.primary_node = target_node
4160     # distribute new instance config to the other nodes
4161     self.cfg.Update(instance)
4162
4163     result = self.rpc.call_finalize_migration(target_node,
4164                                               instance,
4165                                               migration_info,
4166                                               True)
4167     msg = result.RemoteFailMsg()
4168     if msg:
4169       logging.error("Instance migration succeeded, but finalization failed:"
4170                     " %s" % msg)
4171       raise errors.OpExecError("Could not finalize instance migration: %s" %
4172                                msg)
4173
4174     self._EnsureSecondary(source_node)
4175     self._WaitUntilSync()
4176     self._GoStandalone()
4177     self._GoReconnect(False)
4178     self._WaitUntilSync()
4179
4180     self.feedback_fn("* done")
4181
4182   def Exec(self, feedback_fn):
4183     """Perform the migration.
4184
4185     """
4186     self.feedback_fn = feedback_fn
4187
4188     self.source_node = self.instance.primary_node
4189     self.target_node = self.instance.secondary_nodes[0]
4190     self.all_nodes = [self.source_node, self.target_node]
4191     self.nodes_ip = {
4192       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4193       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4194       }
4195     if self.op.cleanup:
4196       return self._ExecCleanup()
4197     else:
4198       return self._ExecMigration()
4199
4200
4201 def _CreateBlockDev(lu, node, instance, device, force_create,
4202                     info, force_open):
4203   """Create a tree of block devices on a given node.
4204
4205   If this device type has to be created on secondaries, create it and
4206   all its children.
4207
4208   If not, just recurse to children keeping the same 'force' value.
4209
4210   @param lu: the lu on whose behalf we execute
4211   @param node: the node on which to create the device
4212   @type instance: L{objects.Instance}
4213   @param instance: the instance which owns the device
4214   @type device: L{objects.Disk}
4215   @param device: the device to create
4216   @type force_create: boolean
4217   @param force_create: whether to force creation of this device; this
4218       will be change to True whenever we find a device which has
4219       CreateOnSecondary() attribute
4220   @param info: the extra 'metadata' we should attach to the device
4221       (this will be represented as a LVM tag)
4222   @type force_open: boolean
4223   @param force_open: this parameter will be passes to the
4224       L{backend.BlockdevCreate} function where it specifies
4225       whether we run on primary or not, and it affects both
4226       the child assembly and the device own Open() execution
4227
4228   """
4229   if device.CreateOnSecondary():
4230     force_create = True
4231
4232   if device.children:
4233     for child in device.children:
4234       _CreateBlockDev(lu, node, instance, child, force_create,
4235                       info, force_open)
4236
4237   if not force_create:
4238     return
4239
4240   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4241
4242
4243 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4244   """Create a single block device on a given node.
4245
4246   This will not recurse over children of the device, so they must be
4247   created in advance.
4248
4249   @param lu: the lu on whose behalf we execute
4250   @param node: the node on which to create the device
4251   @type instance: L{objects.Instance}
4252   @param instance: the instance which owns the device
4253   @type device: L{objects.Disk}
4254   @param device: the device to create
4255   @param info: the extra 'metadata' we should attach to the device
4256       (this will be represented as a LVM tag)
4257   @type force_open: boolean
4258   @param force_open: this parameter will be passes to the
4259       L{backend.BlockdevCreate} function where it specifies
4260       whether we run on primary or not, and it affects both
4261       the child assembly and the device own Open() execution
4262
4263   """
4264   lu.cfg.SetDiskID(device, node)
4265   result = lu.rpc.call_blockdev_create(node, device, device.size,
4266                                        instance.name, force_open, info)
4267   msg = result.RemoteFailMsg()
4268   if msg:
4269     raise errors.OpExecError("Can't create block device %s on"
4270                              " node %s for instance %s: %s" %
4271                              (device, node, instance.name, msg))
4272   if device.physical_id is None:
4273     device.physical_id = result.payload
4274
4275
4276 def _GenerateUniqueNames(lu, exts):
4277   """Generate a suitable LV name.
4278
4279   This will generate a logical volume name for the given instance.
4280
4281   """
4282   results = []
4283   for val in exts:
4284     new_id = lu.cfg.GenerateUniqueID()
4285     results.append("%s%s" % (new_id, val))
4286   return results
4287
4288
4289 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4290                          p_minor, s_minor):
4291   """Generate a drbd8 device complete with its children.
4292
4293   """
4294   port = lu.cfg.AllocatePort()
4295   vgname = lu.cfg.GetVGName()
4296   shared_secret = lu.cfg.GenerateDRBDSecret()
4297   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4298                           logical_id=(vgname, names[0]))
4299   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4300                           logical_id=(vgname, names[1]))
4301   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4302                           logical_id=(primary, secondary, port,
4303                                       p_minor, s_minor,
4304                                       shared_secret),
4305                           children=[dev_data, dev_meta],
4306                           iv_name=iv_name)
4307   return drbd_dev
4308
4309
4310 def _GenerateDiskTemplate(lu, template_name,
4311                           instance_name, primary_node,
4312                           secondary_nodes, disk_info,
4313                           file_storage_dir, file_driver,
4314                           base_index):
4315   """Generate the entire disk layout for a given template type.
4316
4317   """
4318   #TODO: compute space requirements
4319
4320   vgname = lu.cfg.GetVGName()
4321   disk_count = len(disk_info)
4322   disks = []
4323   if template_name == constants.DT_DISKLESS:
4324     pass
4325   elif template_name == constants.DT_PLAIN:
4326     if len(secondary_nodes) != 0:
4327       raise errors.ProgrammerError("Wrong template configuration")
4328
4329     names = _GenerateUniqueNames(lu, [".disk%d" % i
4330                                       for i in range(disk_count)])
4331     for idx, disk in enumerate(disk_info):
4332       disk_index = idx + base_index
4333       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4334                               logical_id=(vgname, names[idx]),
4335                               iv_name="disk/%d" % disk_index,
4336                               mode=disk["mode"])
4337       disks.append(disk_dev)
4338   elif template_name == constants.DT_DRBD8:
4339     if len(secondary_nodes) != 1:
4340       raise errors.ProgrammerError("Wrong template configuration")
4341     remote_node = secondary_nodes[0]
4342     minors = lu.cfg.AllocateDRBDMinor(
4343       [primary_node, remote_node] * len(disk_info), instance_name)
4344
4345     names = []
4346     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4347                                                for i in range(disk_count)]):
4348       names.append(lv_prefix + "_data")
4349       names.append(lv_prefix + "_meta")
4350     for idx, disk in enumerate(disk_info):
4351       disk_index = idx + base_index
4352       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4353                                       disk["size"], names[idx*2:idx*2+2],
4354                                       "disk/%d" % disk_index,
4355                                       minors[idx*2], minors[idx*2+1])
4356       disk_dev.mode = disk["mode"]
4357       disks.append(disk_dev)
4358   elif template_name == constants.DT_FILE:
4359     if len(secondary_nodes) != 0:
4360       raise errors.ProgrammerError("Wrong template configuration")
4361
4362     for idx, disk in enumerate(disk_info):
4363       disk_index = idx + base_index
4364       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4365                               iv_name="disk/%d" % disk_index,
4366                               logical_id=(file_driver,
4367                                           "%s/disk%d" % (file_storage_dir,
4368                                                          disk_index)),
4369                               mode=disk["mode"])
4370       disks.append(disk_dev)
4371   else:
4372     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4373   return disks
4374
4375
4376 def _GetInstanceInfoText(instance):
4377   """Compute that text that should be added to the disk's metadata.
4378
4379   """
4380   return "originstname+%s" % instance.name
4381
4382
4383 def _CreateDisks(lu, instance):
4384   """Create all disks for an instance.
4385
4386   This abstracts away some work from AddInstance.
4387
4388   @type lu: L{LogicalUnit}
4389   @param lu: the logical unit on whose behalf we execute
4390   @type instance: L{objects.Instance}
4391   @param instance: the instance whose disks we should create
4392   @rtype: boolean
4393   @return: the success of the creation
4394
4395   """
4396   info = _GetInstanceInfoText(instance)
4397   pnode = instance.primary_node
4398
4399   if instance.disk_template == constants.DT_FILE:
4400     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4401     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4402
4403     if result.failed or not result.data:
4404       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4405
4406     if not result.data[0]:
4407       raise errors.OpExecError("Failed to create directory '%s'" %
4408                                file_storage_dir)
4409
4410   # Note: this needs to be kept in sync with adding of disks in
4411   # LUSetInstanceParams
4412   for device in instance.disks:
4413     logging.info("Creating volume %s for instance %s",
4414                  device.iv_name, instance.name)
4415     #HARDCODE
4416     for node in instance.all_nodes:
4417       f_create = node == pnode
4418       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4419
4420
4421 def _RemoveDisks(lu, instance):
4422   """Remove all disks for an instance.
4423
4424   This abstracts away some work from `AddInstance()` and
4425   `RemoveInstance()`. Note that in case some of the devices couldn't
4426   be removed, the removal will continue with the other ones (compare
4427   with `_CreateDisks()`).
4428
4429   @type lu: L{LogicalUnit}
4430   @param lu: the logical unit on whose behalf we execute
4431   @type instance: L{objects.Instance}
4432   @param instance: the instance whose disks we should remove
4433   @rtype: boolean
4434   @return: the success of the removal
4435
4436   """
4437   logging.info("Removing block devices for instance %s", instance.name)
4438
4439   all_result = True
4440   for device in instance.disks:
4441     for node, disk in device.ComputeNodeTree(instance.primary_node):
4442       lu.cfg.SetDiskID(disk, node)
4443       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4444       if msg:
4445         lu.LogWarning("Could not remove block device %s on node %s,"
4446                       " continuing anyway: %s", device.iv_name, node, msg)
4447         all_result = False
4448
4449   if instance.disk_template == constants.DT_FILE:
4450     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4451     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4452                                                  file_storage_dir)
4453     if result.failed or not result.data:
4454       logging.error("Could not remove directory '%s'", file_storage_dir)
4455       all_result = False
4456
4457   return all_result
4458
4459
4460 def _ComputeDiskSize(disk_template, disks):
4461   """Compute disk size requirements in the volume group
4462
4463   """
4464   # Required free disk space as a function of disk and swap space
4465   req_size_dict = {
4466     constants.DT_DISKLESS: None,
4467     constants.DT_PLAIN: sum(d["size"] for d in disks),
4468     # 128 MB are added for drbd metadata for each disk
4469     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4470     constants.DT_FILE: None,
4471   }
4472
4473   if disk_template not in req_size_dict:
4474     raise errors.ProgrammerError("Disk template '%s' size requirement"
4475                                  " is unknown" %  disk_template)
4476
4477   return req_size_dict[disk_template]
4478
4479
4480 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4481   """Hypervisor parameter validation.
4482
4483   This function abstract the hypervisor parameter validation to be
4484   used in both instance create and instance modify.
4485
4486   @type lu: L{LogicalUnit}
4487   @param lu: the logical unit for which we check
4488   @type nodenames: list
4489   @param nodenames: the list of nodes on which we should check
4490   @type hvname: string
4491   @param hvname: the name of the hypervisor we should use
4492   @type hvparams: dict
4493   @param hvparams: the parameters which we need to check
4494   @raise errors.OpPrereqError: if the parameters are not valid
4495
4496   """
4497   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4498                                                   hvname,
4499                                                   hvparams)
4500   for node in nodenames:
4501     info = hvinfo[node]
4502     if info.offline:
4503       continue
4504     msg = info.RemoteFailMsg()
4505     if msg:
4506       raise errors.OpPrereqError("Hypervisor parameter validation"
4507                                  " failed on node %s: %s" % (node, msg))
4508
4509
4510 class LUCreateInstance(LogicalUnit):
4511   """Create an instance.
4512
4513   """
4514   HPATH = "instance-add"
4515   HTYPE = constants.HTYPE_INSTANCE
4516   _OP_REQP = ["instance_name", "disks", "disk_template",
4517               "mode", "start",
4518               "wait_for_sync", "ip_check", "nics",
4519               "hvparams", "beparams"]
4520   REQ_BGL = False
4521
4522   def _ExpandNode(self, node):
4523     """Expands and checks one node name.
4524
4525     """
4526     node_full = self.cfg.ExpandNodeName(node)
4527     if node_full is None:
4528       raise errors.OpPrereqError("Unknown node %s" % node)
4529     return node_full
4530
4531   def ExpandNames(self):
4532     """ExpandNames for CreateInstance.
4533
4534     Figure out the right locks for instance creation.
4535
4536     """
4537     self.needed_locks = {}
4538
4539     # set optional parameters to none if they don't exist
4540     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4541       if not hasattr(self.op, attr):
4542         setattr(self.op, attr, None)
4543
4544     # cheap checks, mostly valid constants given
4545
4546     # verify creation mode
4547     if self.op.mode not in (constants.INSTANCE_CREATE,
4548                             constants.INSTANCE_IMPORT):
4549       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4550                                  self.op.mode)
4551
4552     # disk template and mirror node verification
4553     if self.op.disk_template not in constants.DISK_TEMPLATES:
4554       raise errors.OpPrereqError("Invalid disk template name")
4555
4556     if self.op.hypervisor is None:
4557       self.op.hypervisor = self.cfg.GetHypervisorType()
4558
4559     cluster = self.cfg.GetClusterInfo()
4560     enabled_hvs = cluster.enabled_hypervisors
4561     if self.op.hypervisor not in enabled_hvs:
4562       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4563                                  " cluster (%s)" % (self.op.hypervisor,
4564                                   ",".join(enabled_hvs)))
4565
4566     # check hypervisor parameter syntax (locally)
4567     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4568     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4569                                   self.op.hvparams)
4570     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4571     hv_type.CheckParameterSyntax(filled_hvp)
4572     self.hv_full = filled_hvp
4573
4574     # fill and remember the beparams dict
4575     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4576     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4577                                     self.op.beparams)
4578
4579     #### instance parameters check
4580
4581     # instance name verification
4582     hostname1 = utils.HostInfo(self.op.instance_name)
4583     self.op.instance_name = instance_name = hostname1.name
4584
4585     # this is just a preventive check, but someone might still add this
4586     # instance in the meantime, and creation will fail at lock-add time
4587     if instance_name in self.cfg.GetInstanceList():
4588       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4589                                  instance_name)
4590
4591     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4592
4593     # NIC buildup
4594     self.nics = []
4595     for nic in self.op.nics:
4596       # ip validity checks
4597       ip = nic.get("ip", None)
4598       if ip is None or ip.lower() == "none":
4599         nic_ip = None
4600       elif ip.lower() == constants.VALUE_AUTO:
4601         nic_ip = hostname1.ip
4602       else:
4603         if not utils.IsValidIP(ip):
4604           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4605                                      " like a valid IP" % ip)
4606         nic_ip = ip
4607
4608       # MAC address verification
4609       mac = nic.get("mac", constants.VALUE_AUTO)
4610       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4611         if not utils.IsValidMac(mac.lower()):
4612           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4613                                      mac)
4614       # bridge verification
4615       bridge = nic.get("bridge", None)
4616       if bridge is None:
4617         bridge = self.cfg.GetDefBridge()
4618       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4619
4620     # disk checks/pre-build
4621     self.disks = []
4622     for disk in self.op.disks:
4623       mode = disk.get("mode", constants.DISK_RDWR)
4624       if mode not in constants.DISK_ACCESS_SET:
4625         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4626                                    mode)
4627       size = disk.get("size", None)
4628       if size is None:
4629         raise errors.OpPrereqError("Missing disk size")
4630       try:
4631         size = int(size)
4632       except ValueError:
4633         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4634       self.disks.append({"size": size, "mode": mode})
4635
4636     # used in CheckPrereq for ip ping check
4637     self.check_ip = hostname1.ip
4638
4639     # file storage checks
4640     if (self.op.file_driver and
4641         not self.op.file_driver in constants.FILE_DRIVER):
4642       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4643                                  self.op.file_driver)
4644
4645     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4646       raise errors.OpPrereqError("File storage directory path not absolute")
4647
4648     ### Node/iallocator related checks
4649     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4650       raise errors.OpPrereqError("One and only one of iallocator and primary"
4651                                  " node must be given")
4652
4653     if self.op.iallocator:
4654       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4655     else:
4656       self.op.pnode = self._ExpandNode(self.op.pnode)
4657       nodelist = [self.op.pnode]
4658       if self.op.snode is not None:
4659         self.op.snode = self._ExpandNode(self.op.snode)
4660         nodelist.append(self.op.snode)
4661       self.needed_locks[locking.LEVEL_NODE] = nodelist
4662
4663     # in case of import lock the source node too
4664     if self.op.mode == constants.INSTANCE_IMPORT:
4665       src_node = getattr(self.op, "src_node", None)
4666       src_path = getattr(self.op, "src_path", None)
4667
4668       if src_path is None:
4669         self.op.src_path = src_path = self.op.instance_name
4670
4671       if src_node is None:
4672         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4673         self.op.src_node = None
4674         if os.path.isabs(src_path):
4675           raise errors.OpPrereqError("Importing an instance from an absolute"
4676                                      " path requires a source node option.")
4677       else:
4678         self.op.src_node = src_node = self._ExpandNode(src_node)
4679         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4680           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4681         if not os.path.isabs(src_path):
4682           self.op.src_path = src_path = \
4683             os.path.join(constants.EXPORT_DIR, src_path)
4684
4685     else: # INSTANCE_CREATE
4686       if getattr(self.op, "os_type", None) is None:
4687         raise errors.OpPrereqError("No guest OS specified")
4688
4689   def _RunAllocator(self):
4690     """Run the allocator based on input opcode.
4691
4692     """
4693     nics = [n.ToDict() for n in self.nics]
4694     ial = IAllocator(self,
4695                      mode=constants.IALLOCATOR_MODE_ALLOC,
4696                      name=self.op.instance_name,
4697                      disk_template=self.op.disk_template,
4698                      tags=[],
4699                      os=self.op.os_type,
4700                      vcpus=self.be_full[constants.BE_VCPUS],
4701                      mem_size=self.be_full[constants.BE_MEMORY],
4702                      disks=self.disks,
4703                      nics=nics,
4704                      hypervisor=self.op.hypervisor,
4705                      )
4706
4707     ial.Run(self.op.iallocator)
4708
4709     if not ial.success:
4710       raise errors.OpPrereqError("Can't compute nodes using"
4711                                  " iallocator '%s': %s" % (self.op.iallocator,
4712                                                            ial.info))
4713     if len(ial.nodes) != ial.required_nodes:
4714       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4715                                  " of nodes (%s), required %s" %
4716                                  (self.op.iallocator, len(ial.nodes),
4717                                   ial.required_nodes))
4718     self.op.pnode = ial.nodes[0]
4719     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4720                  self.op.instance_name, self.op.iallocator,
4721                  ", ".join(ial.nodes))
4722     if ial.required_nodes == 2:
4723       self.op.snode = ial.nodes[1]
4724
4725   def BuildHooksEnv(self):
4726     """Build hooks env.
4727
4728     This runs on master, primary and secondary nodes of the instance.
4729
4730     """
4731     env = {
4732       "ADD_MODE": self.op.mode,
4733       }
4734     if self.op.mode == constants.INSTANCE_IMPORT:
4735       env["SRC_NODE"] = self.op.src_node
4736       env["SRC_PATH"] = self.op.src_path
4737       env["SRC_IMAGES"] = self.src_images
4738
4739     env.update(_BuildInstanceHookEnv(
4740       name=self.op.instance_name,
4741       primary_node=self.op.pnode,
4742       secondary_nodes=self.secondaries,
4743       status=self.op.start,
4744       os_type=self.op.os_type,
4745       memory=self.be_full[constants.BE_MEMORY],
4746       vcpus=self.be_full[constants.BE_VCPUS],
4747       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4748       disk_template=self.op.disk_template,
4749       disks=[(d["size"], d["mode"]) for d in self.disks],
4750       bep=self.be_full,
4751       hvp=self.hv_full,
4752       hypervisor=self.op.hypervisor,
4753     ))
4754
4755     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4756           self.secondaries)
4757     return env, nl, nl
4758
4759
4760   def CheckPrereq(self):
4761     """Check prerequisites.
4762
4763     """
4764     if (not self.cfg.GetVGName() and
4765         self.op.disk_template not in constants.DTS_NOT_LVM):
4766       raise errors.OpPrereqError("Cluster does not support lvm-based"
4767                                  " instances")
4768
4769     if self.op.mode == constants.INSTANCE_IMPORT:
4770       src_node = self.op.src_node
4771       src_path = self.op.src_path
4772
4773       if src_node is None:
4774         exp_list = self.rpc.call_export_list(
4775           self.acquired_locks[locking.LEVEL_NODE])
4776         found = False
4777         for node in exp_list:
4778           if not exp_list[node].failed and src_path in exp_list[node].data:
4779             found = True
4780             self.op.src_node = src_node = node
4781             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4782                                                        src_path)
4783             break
4784         if not found:
4785           raise errors.OpPrereqError("No export found for relative path %s" %
4786                                       src_path)
4787
4788       _CheckNodeOnline(self, src_node)
4789       result = self.rpc.call_export_info(src_node, src_path)
4790       result.Raise()
4791       if not result.data:
4792         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4793
4794       export_info = result.data
4795       if not export_info.has_section(constants.INISECT_EXP):
4796         raise errors.ProgrammerError("Corrupted export config")
4797
4798       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4799       if (int(ei_version) != constants.EXPORT_VERSION):
4800         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4801                                    (ei_version, constants.EXPORT_VERSION))
4802
4803       # Check that the new instance doesn't have less disks than the export
4804       instance_disks = len(self.disks)
4805       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4806       if instance_disks < export_disks:
4807         raise errors.OpPrereqError("Not enough disks to import."
4808                                    " (instance: %d, export: %d)" %
4809                                    (instance_disks, export_disks))
4810
4811       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4812       disk_images = []
4813       for idx in range(export_disks):
4814         option = 'disk%d_dump' % idx
4815         if export_info.has_option(constants.INISECT_INS, option):
4816           # FIXME: are the old os-es, disk sizes, etc. useful?
4817           export_name = export_info.get(constants.INISECT_INS, option)
4818           image = os.path.join(src_path, export_name)
4819           disk_images.append(image)
4820         else:
4821           disk_images.append(False)
4822
4823       self.src_images = disk_images
4824
4825       old_name = export_info.get(constants.INISECT_INS, 'name')
4826       # FIXME: int() here could throw a ValueError on broken exports
4827       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4828       if self.op.instance_name == old_name:
4829         for idx, nic in enumerate(self.nics):
4830           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4831             nic_mac_ini = 'nic%d_mac' % idx
4832             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4833
4834     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4835     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4836     if self.op.start and not self.op.ip_check:
4837       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4838                                  " adding an instance in start mode")
4839
4840     if self.op.ip_check:
4841       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4842         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4843                                    (self.check_ip, self.op.instance_name))
4844
4845     #### mac address generation
4846     # By generating here the mac address both the allocator and the hooks get
4847     # the real final mac address rather than the 'auto' or 'generate' value.
4848     # There is a race condition between the generation and the instance object
4849     # creation, which means that we know the mac is valid now, but we're not
4850     # sure it will be when we actually add the instance. If things go bad
4851     # adding the instance will abort because of a duplicate mac, and the
4852     # creation job will fail.
4853     for nic in self.nics:
4854       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4855         nic.mac = self.cfg.GenerateMAC()
4856
4857     #### allocator run
4858
4859     if self.op.iallocator is not None:
4860       self._RunAllocator()
4861
4862     #### node related checks
4863
4864     # check primary node
4865     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4866     assert self.pnode is not None, \
4867       "Cannot retrieve locked node %s" % self.op.pnode
4868     if pnode.offline:
4869       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4870                                  pnode.name)
4871     if pnode.drained:
4872       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4873                                  pnode.name)
4874
4875     self.secondaries = []
4876
4877     # mirror node verification
4878     if self.op.disk_template in constants.DTS_NET_MIRROR:
4879       if self.op.snode is None:
4880         raise errors.OpPrereqError("The networked disk templates need"
4881                                    " a mirror node")
4882       if self.op.snode == pnode.name:
4883         raise errors.OpPrereqError("The secondary node cannot be"
4884                                    " the primary node.")
4885       _CheckNodeOnline(self, self.op.snode)
4886       _CheckNodeNotDrained(self, self.op.snode)
4887       self.secondaries.append(self.op.snode)
4888
4889     nodenames = [pnode.name] + self.secondaries
4890
4891     req_size = _ComputeDiskSize(self.op.disk_template,
4892                                 self.disks)
4893
4894     # Check lv size requirements
4895     if req_size is not None:
4896       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4897                                          self.op.hypervisor)
4898       for node in nodenames:
4899         info = nodeinfo[node]
4900         info.Raise()
4901         info = info.data
4902         if not info:
4903           raise errors.OpPrereqError("Cannot get current information"
4904                                      " from node '%s'" % node)
4905         vg_free = info.get('vg_free', None)
4906         if not isinstance(vg_free, int):
4907           raise errors.OpPrereqError("Can't compute free disk space on"
4908                                      " node %s" % node)
4909         if req_size > info['vg_free']:
4910           raise errors.OpPrereqError("Not enough disk space on target node %s."
4911                                      " %d MB available, %d MB required" %
4912                                      (node, info['vg_free'], req_size))
4913
4914     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4915
4916     # os verification
4917     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4918     result.Raise()
4919     if not isinstance(result.data, objects.OS) or not result.data:
4920       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4921                                  " primary node"  % self.op.os_type)
4922
4923     # bridge check on primary node
4924     bridges = [n.bridge for n in self.nics]
4925     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4926     result.Raise()
4927     if not result.data:
4928       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4929                                  " exist on destination node '%s'" %
4930                                  (",".join(bridges), pnode.name))
4931
4932     # memory check on primary node
4933     if self.op.start:
4934       _CheckNodeFreeMemory(self, self.pnode.name,
4935                            "creating instance %s" % self.op.instance_name,
4936                            self.be_full[constants.BE_MEMORY],
4937                            self.op.hypervisor)
4938
4939   def Exec(self, feedback_fn):
4940     """Create and add the instance to the cluster.
4941
4942     """
4943     instance = self.op.instance_name
4944     pnode_name = self.pnode.name
4945
4946     ht_kind = self.op.hypervisor
4947     if ht_kind in constants.HTS_REQ_PORT:
4948       network_port = self.cfg.AllocatePort()
4949     else:
4950       network_port = None
4951
4952     ##if self.op.vnc_bind_address is None:
4953     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4954
4955     # this is needed because os.path.join does not accept None arguments
4956     if self.op.file_storage_dir is None:
4957       string_file_storage_dir = ""
4958     else:
4959       string_file_storage_dir = self.op.file_storage_dir
4960
4961     # build the full file storage dir path
4962     file_storage_dir = os.path.normpath(os.path.join(
4963                                         self.cfg.GetFileStorageDir(),
4964                                         string_file_storage_dir, instance))
4965
4966
4967     disks = _GenerateDiskTemplate(self,
4968                                   self.op.disk_template,
4969                                   instance, pnode_name,
4970                                   self.secondaries,
4971                                   self.disks,
4972                                   file_storage_dir,
4973                                   self.op.file_driver,
4974                                   0)
4975
4976     iobj = objects.Instance(name=instance, os=self.op.os_type,
4977                             primary_node=pnode_name,
4978                             nics=self.nics, disks=disks,
4979                             disk_template=self.op.disk_template,
4980                             admin_up=False,
4981                             network_port=network_port,
4982                             beparams=self.op.beparams,
4983                             hvparams=self.op.hvparams,
4984                             hypervisor=self.op.hypervisor,
4985                             )
4986
4987     feedback_fn("* creating instance disks...")
4988     try:
4989       _CreateDisks(self, iobj)
4990     except errors.OpExecError:
4991       self.LogWarning("Device creation failed, reverting...")
4992       try:
4993         _RemoveDisks(self, iobj)
4994       finally:
4995         self.cfg.ReleaseDRBDMinors(instance)
4996         raise
4997
4998     feedback_fn("adding instance %s to cluster config" % instance)
4999
5000     self.cfg.AddInstance(iobj)
5001     # Declare that we don't want to remove the instance lock anymore, as we've
5002     # added the instance to the config
5003     del self.remove_locks[locking.LEVEL_INSTANCE]
5004     # Unlock all the nodes
5005     if self.op.mode == constants.INSTANCE_IMPORT:
5006       nodes_keep = [self.op.src_node]
5007       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5008                        if node != self.op.src_node]
5009       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5010       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5011     else:
5012       self.context.glm.release(locking.LEVEL_NODE)
5013       del self.acquired_locks[locking.LEVEL_NODE]
5014
5015     if self.op.wait_for_sync:
5016       disk_abort = not _WaitForSync(self, iobj)
5017     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5018       # make sure the disks are not degraded (still sync-ing is ok)
5019       time.sleep(15)
5020       feedback_fn("* checking mirrors status")
5021       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5022     else:
5023       disk_abort = False
5024
5025     if disk_abort:
5026       _RemoveDisks(self, iobj)
5027       self.cfg.RemoveInstance(iobj.name)
5028       # Make sure the instance lock gets removed
5029       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5030       raise errors.OpExecError("There are some degraded disks for"
5031                                " this instance")
5032
5033     feedback_fn("creating os for instance %s on node %s" %
5034                 (instance, pnode_name))
5035
5036     if iobj.disk_template != constants.DT_DISKLESS:
5037       if self.op.mode == constants.INSTANCE_CREATE:
5038         feedback_fn("* running the instance OS create scripts...")
5039         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5040         msg = result.RemoteFailMsg()
5041         if msg:
5042           raise errors.OpExecError("Could not add os for instance %s"
5043                                    " on node %s: %s" %
5044                                    (instance, pnode_name, msg))
5045
5046       elif self.op.mode == constants.INSTANCE_IMPORT:
5047         feedback_fn("* running the instance OS import scripts...")
5048         src_node = self.op.src_node
5049         src_images = self.src_images
5050         cluster_name = self.cfg.GetClusterName()
5051         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5052                                                          src_node, src_images,
5053                                                          cluster_name)
5054         import_result.Raise()
5055         for idx, result in enumerate(import_result.data):
5056           if not result:
5057             self.LogWarning("Could not import the image %s for instance"
5058                             " %s, disk %d, on node %s" %
5059                             (src_images[idx], instance, idx, pnode_name))
5060       else:
5061         # also checked in the prereq part
5062         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5063                                      % self.op.mode)
5064
5065     if self.op.start:
5066       iobj.admin_up = True
5067       self.cfg.Update(iobj)
5068       logging.info("Starting instance %s on node %s", instance, pnode_name)
5069       feedback_fn("* starting instance...")
5070       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5071       msg = result.RemoteFailMsg()
5072       if msg:
5073         raise errors.OpExecError("Could not start instance: %s" % msg)
5074
5075
5076 class LUConnectConsole(NoHooksLU):
5077   """Connect to an instance's console.
5078
5079   This is somewhat special in that it returns the command line that
5080   you need to run on the master node in order to connect to the
5081   console.
5082
5083   """
5084   _OP_REQP = ["instance_name"]
5085   REQ_BGL = False
5086
5087   def ExpandNames(self):
5088     self._ExpandAndLockInstance()
5089
5090   def CheckPrereq(self):
5091     """Check prerequisites.
5092
5093     This checks that the instance is in the cluster.
5094
5095     """
5096     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5097     assert self.instance is not None, \
5098       "Cannot retrieve locked instance %s" % self.op.instance_name
5099     _CheckNodeOnline(self, self.instance.primary_node)
5100
5101   def Exec(self, feedback_fn):
5102     """Connect to the console of an instance
5103
5104     """
5105     instance = self.instance
5106     node = instance.primary_node
5107
5108     node_insts = self.rpc.call_instance_list([node],
5109                                              [instance.hypervisor])[node]
5110     node_insts.Raise()
5111
5112     if instance.name not in node_insts.data:
5113       raise errors.OpExecError("Instance %s is not running." % instance.name)
5114
5115     logging.debug("Connecting to console of %s on %s", instance.name, node)
5116
5117     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5118     cluster = self.cfg.GetClusterInfo()
5119     # beparams and hvparams are passed separately, to avoid editing the
5120     # instance and then saving the defaults in the instance itself.
5121     hvparams = cluster.FillHV(instance)
5122     beparams = cluster.FillBE(instance)
5123     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5124
5125     # build ssh cmdline
5126     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5127
5128
5129 class LUReplaceDisks(LogicalUnit):
5130   """Replace the disks of an instance.
5131
5132   """
5133   HPATH = "mirrors-replace"
5134   HTYPE = constants.HTYPE_INSTANCE
5135   _OP_REQP = ["instance_name", "mode", "disks"]
5136   REQ_BGL = False
5137
5138   def CheckArguments(self):
5139     if not hasattr(self.op, "remote_node"):
5140       self.op.remote_node = None
5141     if not hasattr(self.op, "iallocator"):
5142       self.op.iallocator = None
5143
5144     # check for valid parameter combination
5145     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5146     if self.op.mode == constants.REPLACE_DISK_CHG:
5147       if cnt == 2:
5148         raise errors.OpPrereqError("When changing the secondary either an"
5149                                    " iallocator script must be used or the"
5150                                    " new node given")
5151       elif cnt == 0:
5152         raise errors.OpPrereqError("Give either the iallocator or the new"
5153                                    " secondary, not both")
5154     else: # not replacing the secondary
5155       if cnt != 2:
5156         raise errors.OpPrereqError("The iallocator and new node options can"
5157                                    " be used only when changing the"
5158                                    " secondary node")
5159
5160   def ExpandNames(self):
5161     self._ExpandAndLockInstance()
5162
5163     if self.op.iallocator is not None:
5164       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5165     elif self.op.remote_node is not None:
5166       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5167       if remote_node is None:
5168         raise errors.OpPrereqError("Node '%s' not known" %
5169                                    self.op.remote_node)
5170       self.op.remote_node = remote_node
5171       # Warning: do not remove the locking of the new secondary here
5172       # unless DRBD8.AddChildren is changed to work in parallel;
5173       # currently it doesn't since parallel invocations of
5174       # FindUnusedMinor will conflict
5175       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5176       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5177     else:
5178       self.needed_locks[locking.LEVEL_NODE] = []
5179       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5180
5181   def DeclareLocks(self, level):
5182     # If we're not already locking all nodes in the set we have to declare the
5183     # instance's primary/secondary nodes.
5184     if (level == locking.LEVEL_NODE and
5185         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5186       self._LockInstancesNodes()
5187
5188   def _RunAllocator(self):
5189     """Compute a new secondary node using an IAllocator.
5190
5191     """
5192     ial = IAllocator(self,
5193                      mode=constants.IALLOCATOR_MODE_RELOC,
5194                      name=self.op.instance_name,
5195                      relocate_from=[self.sec_node])
5196
5197     ial.Run(self.op.iallocator)
5198
5199     if not ial.success:
5200       raise errors.OpPrereqError("Can't compute nodes using"
5201                                  " iallocator '%s': %s" % (self.op.iallocator,
5202                                                            ial.info))
5203     if len(ial.nodes) != ial.required_nodes:
5204       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5205                                  " of nodes (%s), required %s" %
5206                                  (len(ial.nodes), ial.required_nodes))
5207     self.op.remote_node = ial.nodes[0]
5208     self.LogInfo("Selected new secondary for the instance: %s",
5209                  self.op.remote_node)
5210
5211   def BuildHooksEnv(self):
5212     """Build hooks env.
5213
5214     This runs on the master, the primary and all the secondaries.
5215
5216     """
5217     env = {
5218       "MODE": self.op.mode,
5219       "NEW_SECONDARY": self.op.remote_node,
5220       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5221       }
5222     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5223     nl = [
5224       self.cfg.GetMasterNode(),
5225       self.instance.primary_node,
5226       ]
5227     if self.op.remote_node is not None:
5228       nl.append(self.op.remote_node)
5229     return env, nl, nl
5230
5231   def CheckPrereq(self):
5232     """Check prerequisites.
5233
5234     This checks that the instance is in the cluster.
5235
5236     """
5237     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5238     assert instance is not None, \
5239       "Cannot retrieve locked instance %s" % self.op.instance_name
5240     self.instance = instance
5241
5242     if instance.disk_template != constants.DT_DRBD8:
5243       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5244                                  " instances")
5245
5246     if len(instance.secondary_nodes) != 1:
5247       raise errors.OpPrereqError("The instance has a strange layout,"
5248                                  " expected one secondary but found %d" %
5249                                  len(instance.secondary_nodes))
5250
5251     self.sec_node = instance.secondary_nodes[0]
5252
5253     if self.op.iallocator is not None:
5254       self._RunAllocator()
5255
5256     remote_node = self.op.remote_node
5257     if remote_node is not None:
5258       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5259       assert self.remote_node_info is not None, \
5260         "Cannot retrieve locked node %s" % remote_node
5261     else:
5262       self.remote_node_info = None
5263     if remote_node == instance.primary_node:
5264       raise errors.OpPrereqError("The specified node is the primary node of"
5265                                  " the instance.")
5266     elif remote_node == self.sec_node:
5267       raise errors.OpPrereqError("The specified node is already the"
5268                                  " secondary node of the instance.")
5269
5270     if self.op.mode == constants.REPLACE_DISK_PRI:
5271       n1 = self.tgt_node = instance.primary_node
5272       n2 = self.oth_node = self.sec_node
5273     elif self.op.mode == constants.REPLACE_DISK_SEC:
5274       n1 = self.tgt_node = self.sec_node
5275       n2 = self.oth_node = instance.primary_node
5276     elif self.op.mode == constants.REPLACE_DISK_CHG:
5277       n1 = self.new_node = remote_node
5278       n2 = self.oth_node = instance.primary_node
5279       self.tgt_node = self.sec_node
5280       _CheckNodeNotDrained(self, remote_node)
5281     else:
5282       raise errors.ProgrammerError("Unhandled disk replace mode")
5283
5284     _CheckNodeOnline(self, n1)
5285     _CheckNodeOnline(self, n2)
5286
5287     if not self.op.disks:
5288       self.op.disks = range(len(instance.disks))
5289
5290     for disk_idx in self.op.disks:
5291       instance.FindDisk(disk_idx)
5292
5293   def _ExecD8DiskOnly(self, feedback_fn):
5294     """Replace a disk on the primary or secondary for dbrd8.
5295
5296     The algorithm for replace is quite complicated:
5297
5298       1. for each disk to be replaced:
5299
5300         1. create new LVs on the target node with unique names
5301         1. detach old LVs from the drbd device
5302         1. rename old LVs to name_replaced.<time_t>
5303         1. rename new LVs to old LVs
5304         1. attach the new LVs (with the old names now) to the drbd device
5305
5306       1. wait for sync across all devices
5307
5308       1. for each modified disk:
5309
5310         1. remove old LVs (which have the name name_replaces.<time_t>)
5311
5312     Failures are not very well handled.
5313
5314     """
5315     steps_total = 6
5316     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5317     instance = self.instance
5318     iv_names = {}
5319     vgname = self.cfg.GetVGName()
5320     # start of work
5321     cfg = self.cfg
5322     tgt_node = self.tgt_node
5323     oth_node = self.oth_node
5324
5325     # Step: check device activation
5326     self.proc.LogStep(1, steps_total, "check device existence")
5327     info("checking volume groups")
5328     my_vg = cfg.GetVGName()
5329     results = self.rpc.call_vg_list([oth_node, tgt_node])
5330     if not results:
5331       raise errors.OpExecError("Can't list volume groups on the nodes")
5332     for node in oth_node, tgt_node:
5333       res = results[node]
5334       if res.failed or not res.data or my_vg not in res.data:
5335         raise errors.OpExecError("Volume group '%s' not found on %s" %
5336                                  (my_vg, node))
5337     for idx, dev in enumerate(instance.disks):
5338       if idx not in self.op.disks:
5339         continue
5340       for node in tgt_node, oth_node:
5341         info("checking disk/%d on %s" % (idx, node))
5342         cfg.SetDiskID(dev, node)
5343         result = self.rpc.call_blockdev_find(node, dev)
5344         msg = result.RemoteFailMsg()
5345         if not msg and not result.payload:
5346           msg = "disk not found"
5347         if msg:
5348           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5349                                    (idx, node, msg))
5350
5351     # Step: check other node consistency
5352     self.proc.LogStep(2, steps_total, "check peer consistency")
5353     for idx, dev in enumerate(instance.disks):
5354       if idx not in self.op.disks:
5355         continue
5356       info("checking disk/%d consistency on %s" % (idx, oth_node))
5357       if not _CheckDiskConsistency(self, dev, oth_node,
5358                                    oth_node==instance.primary_node):
5359         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5360                                  " to replace disks on this node (%s)" %
5361                                  (oth_node, tgt_node))
5362
5363     # Step: create new storage
5364     self.proc.LogStep(3, steps_total, "allocate new storage")
5365     for idx, dev in enumerate(instance.disks):
5366       if idx not in self.op.disks:
5367         continue
5368       size = dev.size
5369       cfg.SetDiskID(dev, tgt_node)
5370       lv_names = [".disk%d_%s" % (idx, suf)
5371                   for suf in ["data", "meta"]]
5372       names = _GenerateUniqueNames(self, lv_names)
5373       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5374                              logical_id=(vgname, names[0]))
5375       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5376                              logical_id=(vgname, names[1]))
5377       new_lvs = [lv_data, lv_meta]
5378       old_lvs = dev.children
5379       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5380       info("creating new local storage on %s for %s" %
5381            (tgt_node, dev.iv_name))
5382       # we pass force_create=True to force the LVM creation
5383       for new_lv in new_lvs:
5384         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5385                         _GetInstanceInfoText(instance), False)
5386
5387     # Step: for each lv, detach+rename*2+attach
5388     self.proc.LogStep(4, steps_total, "change drbd configuration")
5389     for dev, old_lvs, new_lvs in iv_names.itervalues():
5390       info("detaching %s drbd from local storage" % dev.iv_name)
5391       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5392       result.Raise()
5393       if not result.data:
5394         raise errors.OpExecError("Can't detach drbd from local storage on node"
5395                                  " %s for device %s" % (tgt_node, dev.iv_name))
5396       #dev.children = []
5397       #cfg.Update(instance)
5398
5399       # ok, we created the new LVs, so now we know we have the needed
5400       # storage; as such, we proceed on the target node to rename
5401       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5402       # using the assumption that logical_id == physical_id (which in
5403       # turn is the unique_id on that node)
5404
5405       # FIXME(iustin): use a better name for the replaced LVs
5406       temp_suffix = int(time.time())
5407       ren_fn = lambda d, suff: (d.physical_id[0],
5408                                 d.physical_id[1] + "_replaced-%s" % suff)
5409       # build the rename list based on what LVs exist on the node
5410       rlist = []
5411       for to_ren in old_lvs:
5412         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5413         if not result.RemoteFailMsg() and result.payload:
5414           # device exists
5415           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5416
5417       info("renaming the old LVs on the target node")
5418       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5419       result.Raise()
5420       if not result.data:
5421         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5422       # now we rename the new LVs to the old LVs
5423       info("renaming the new LVs on the target node")
5424       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5425       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5426       result.Raise()
5427       if not result.data:
5428         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5429
5430       for old, new in zip(old_lvs, new_lvs):
5431         new.logical_id = old.logical_id
5432         cfg.SetDiskID(new, tgt_node)
5433
5434       for disk in old_lvs:
5435         disk.logical_id = ren_fn(disk, temp_suffix)
5436         cfg.SetDiskID(disk, tgt_node)
5437
5438       # now that the new lvs have the old name, we can add them to the device
5439       info("adding new mirror component on %s" % tgt_node)
5440       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5441       if result.failed or not result.data:
5442         for new_lv in new_lvs:
5443           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5444           if msg:
5445             warning("Can't rollback device %s: %s", dev, msg,
5446                     hint="cleanup manually the unused logical volumes")
5447         raise errors.OpExecError("Can't add local storage to drbd")
5448
5449       dev.children = new_lvs
5450       cfg.Update(instance)
5451
5452     # Step: wait for sync
5453
5454     # this can fail as the old devices are degraded and _WaitForSync
5455     # does a combined result over all disks, so we don't check its
5456     # return value
5457     self.proc.LogStep(5, steps_total, "sync devices")
5458     _WaitForSync(self, instance, unlock=True)
5459
5460     # so check manually all the devices
5461     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5462       cfg.SetDiskID(dev, instance.primary_node)
5463       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5464       msg = result.RemoteFailMsg()
5465       if not msg and not result.payload:
5466         msg = "disk not found"
5467       if msg:
5468         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5469                                  (name, msg))
5470       if result.payload[5]:
5471         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5472
5473     # Step: remove old storage
5474     self.proc.LogStep(6, steps_total, "removing old storage")
5475     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5476       info("remove logical volumes for %s" % name)
5477       for lv in old_lvs:
5478         cfg.SetDiskID(lv, tgt_node)
5479         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5480         if msg:
5481           warning("Can't remove old LV: %s" % msg,
5482                   hint="manually remove unused LVs")
5483           continue
5484
5485   def _ExecD8Secondary(self, feedback_fn):
5486     """Replace the secondary node for drbd8.
5487
5488     The algorithm for replace is quite complicated:
5489       - for all disks of the instance:
5490         - create new LVs on the new node with same names
5491         - shutdown the drbd device on the old secondary
5492         - disconnect the drbd network on the primary
5493         - create the drbd device on the new secondary
5494         - network attach the drbd on the primary, using an artifice:
5495           the drbd code for Attach() will connect to the network if it
5496           finds a device which is connected to the good local disks but
5497           not network enabled
5498       - wait for sync across all devices
5499       - remove all disks from the old secondary
5500
5501     Failures are not very well handled.
5502
5503     """
5504     steps_total = 6
5505     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5506     instance = self.instance
5507     iv_names = {}
5508     # start of work
5509     cfg = self.cfg
5510     old_node = self.tgt_node
5511     new_node = self.new_node
5512     pri_node = instance.primary_node
5513     nodes_ip = {
5514       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5515       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5516       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5517       }
5518
5519     # Step: check device activation
5520     self.proc.LogStep(1, steps_total, "check device existence")
5521     info("checking volume groups")
5522     my_vg = cfg.GetVGName()
5523     results = self.rpc.call_vg_list([pri_node, new_node])
5524     for node in pri_node, new_node:
5525       res = results[node]
5526       if res.failed or not res.data or my_vg not in res.data:
5527         raise errors.OpExecError("Volume group '%s' not found on %s" %
5528                                  (my_vg, node))
5529     for idx, dev in enumerate(instance.disks):
5530       if idx not in self.op.disks:
5531         continue
5532       info("checking disk/%d on %s" % (idx, pri_node))
5533       cfg.SetDiskID(dev, pri_node)
5534       result = self.rpc.call_blockdev_find(pri_node, dev)
5535       msg = result.RemoteFailMsg()
5536       if not msg and not result.payload:
5537         msg = "disk not found"
5538       if msg:
5539         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5540                                  (idx, pri_node, msg))
5541
5542     # Step: check other node consistency
5543     self.proc.LogStep(2, steps_total, "check peer consistency")
5544     for idx, dev in enumerate(instance.disks):
5545       if idx not in self.op.disks:
5546         continue
5547       info("checking disk/%d consistency on %s" % (idx, pri_node))
5548       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5549         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5550                                  " unsafe to replace the secondary" %
5551                                  pri_node)
5552
5553     # Step: create new storage
5554     self.proc.LogStep(3, steps_total, "allocate new storage")
5555     for idx, dev in enumerate(instance.disks):
5556       info("adding new local storage on %s for disk/%d" %
5557            (new_node, idx))
5558       # we pass force_create=True to force LVM creation
5559       for new_lv in dev.children:
5560         _CreateBlockDev(self, new_node, instance, new_lv, True,
5561                         _GetInstanceInfoText(instance), False)
5562
5563     # Step 4: dbrd minors and drbd setups changes
5564     # after this, we must manually remove the drbd minors on both the
5565     # error and the success paths
5566     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5567                                    instance.name)
5568     logging.debug("Allocated minors %s" % (minors,))
5569     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5570     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5571       size = dev.size
5572       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5573       # create new devices on new_node; note that we create two IDs:
5574       # one without port, so the drbd will be activated without
5575       # networking information on the new node at this stage, and one
5576       # with network, for the latter activation in step 4
5577       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5578       if pri_node == o_node1:
5579         p_minor = o_minor1
5580       else:
5581         p_minor = o_minor2
5582
5583       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5584       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5585
5586       iv_names[idx] = (dev, dev.children, new_net_id)
5587       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5588                     new_net_id)
5589       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5590                               logical_id=new_alone_id,
5591                               children=dev.children,
5592                               size=dev.size)
5593       try:
5594         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5595                               _GetInstanceInfoText(instance), False)
5596       except errors.GenericError:
5597         self.cfg.ReleaseDRBDMinors(instance.name)
5598         raise
5599
5600     for idx, dev in enumerate(instance.disks):
5601       # we have new devices, shutdown the drbd on the old secondary
5602       info("shutting down drbd for disk/%d on old node" % idx)
5603       cfg.SetDiskID(dev, old_node)
5604       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5605       if msg:
5606         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5607                 (idx, msg),
5608                 hint="Please cleanup this device manually as soon as possible")
5609
5610     info("detaching primary drbds from the network (=> standalone)")
5611     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5612                                                instance.disks)[pri_node]
5613
5614     msg = result.RemoteFailMsg()
5615     if msg:
5616       # detaches didn't succeed (unlikely)
5617       self.cfg.ReleaseDRBDMinors(instance.name)
5618       raise errors.OpExecError("Can't detach the disks from the network on"
5619                                " old node: %s" % (msg,))
5620
5621     # if we managed to detach at least one, we update all the disks of
5622     # the instance to point to the new secondary
5623     info("updating instance configuration")
5624     for dev, _, new_logical_id in iv_names.itervalues():
5625       dev.logical_id = new_logical_id
5626       cfg.SetDiskID(dev, pri_node)
5627     cfg.Update(instance)
5628
5629     # and now perform the drbd attach
5630     info("attaching primary drbds to new secondary (standalone => connected)")
5631     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5632                                            instance.disks, instance.name,
5633                                            False)
5634     for to_node, to_result in result.items():
5635       msg = to_result.RemoteFailMsg()
5636       if msg:
5637         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5638                 hint="please do a gnt-instance info to see the"
5639                 " status of disks")
5640
5641     # this can fail as the old devices are degraded and _WaitForSync
5642     # does a combined result over all disks, so we don't check its
5643     # return value
5644     self.proc.LogStep(5, steps_total, "sync devices")
5645     _WaitForSync(self, instance, unlock=True)
5646
5647     # so check manually all the devices
5648     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5649       cfg.SetDiskID(dev, pri_node)
5650       result = self.rpc.call_blockdev_find(pri_node, dev)
5651       msg = result.RemoteFailMsg()
5652       if not msg and not result.payload:
5653         msg = "disk not found"
5654       if msg:
5655         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5656                                  (idx, msg))
5657       if result.payload[5]:
5658         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5659
5660     self.proc.LogStep(6, steps_total, "removing old storage")
5661     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5662       info("remove logical volumes for disk/%d" % idx)
5663       for lv in old_lvs:
5664         cfg.SetDiskID(lv, old_node)
5665         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5666         if msg:
5667           warning("Can't remove LV on old secondary: %s", msg,
5668                   hint="Cleanup stale volumes by hand")
5669
5670   def Exec(self, feedback_fn):
5671     """Execute disk replacement.
5672
5673     This dispatches the disk replacement to the appropriate handler.
5674
5675     """
5676     instance = self.instance
5677
5678     # Activate the instance disks if we're replacing them on a down instance
5679     if not instance.admin_up:
5680       _StartInstanceDisks(self, instance, True)
5681
5682     if self.op.mode == constants.REPLACE_DISK_CHG:
5683       fn = self._ExecD8Secondary
5684     else:
5685       fn = self._ExecD8DiskOnly
5686
5687     ret = fn(feedback_fn)
5688
5689     # Deactivate the instance disks if we're replacing them on a down instance
5690     if not instance.admin_up:
5691       _SafeShutdownInstanceDisks(self, instance)
5692
5693     return ret
5694
5695
5696 class LUGrowDisk(LogicalUnit):
5697   """Grow a disk of an instance.
5698
5699   """
5700   HPATH = "disk-grow"
5701   HTYPE = constants.HTYPE_INSTANCE
5702   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5703   REQ_BGL = False
5704
5705   def ExpandNames(self):
5706     self._ExpandAndLockInstance()
5707     self.needed_locks[locking.LEVEL_NODE] = []
5708     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5709
5710   def DeclareLocks(self, level):
5711     if level == locking.LEVEL_NODE:
5712       self._LockInstancesNodes()
5713
5714   def BuildHooksEnv(self):
5715     """Build hooks env.
5716
5717     This runs on the master, the primary and all the secondaries.
5718
5719     """
5720     env = {
5721       "DISK": self.op.disk,
5722       "AMOUNT": self.op.amount,
5723       }
5724     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5725     nl = [
5726       self.cfg.GetMasterNode(),
5727       self.instance.primary_node,
5728       ]
5729     return env, nl, nl
5730
5731   def CheckPrereq(self):
5732     """Check prerequisites.
5733
5734     This checks that the instance is in the cluster.
5735
5736     """
5737     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5738     assert instance is not None, \
5739       "Cannot retrieve locked instance %s" % self.op.instance_name
5740     nodenames = list(instance.all_nodes)
5741     for node in nodenames:
5742       _CheckNodeOnline(self, node)
5743
5744
5745     self.instance = instance
5746
5747     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5748       raise errors.OpPrereqError("Instance's disk layout does not support"
5749                                  " growing.")
5750
5751     self.disk = instance.FindDisk(self.op.disk)
5752
5753     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5754                                        instance.hypervisor)
5755     for node in nodenames:
5756       info = nodeinfo[node]
5757       if info.failed or not info.data:
5758         raise errors.OpPrereqError("Cannot get current information"
5759                                    " from node '%s'" % node)
5760       vg_free = info.data.get('vg_free', None)
5761       if not isinstance(vg_free, int):
5762         raise errors.OpPrereqError("Can't compute free disk space on"
5763                                    " node %s" % node)
5764       if self.op.amount > vg_free:
5765         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5766                                    " %d MiB available, %d MiB required" %
5767                                    (node, vg_free, self.op.amount))
5768
5769   def Exec(self, feedback_fn):
5770     """Execute disk grow.
5771
5772     """
5773     instance = self.instance
5774     disk = self.disk
5775     for node in instance.all_nodes:
5776       self.cfg.SetDiskID(disk, node)
5777       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5778       msg = result.RemoteFailMsg()
5779       if msg:
5780         raise errors.OpExecError("Grow request failed to node %s: %s" %
5781                                  (node, msg))
5782     disk.RecordGrow(self.op.amount)
5783     self.cfg.Update(instance)
5784     if self.op.wait_for_sync:
5785       disk_abort = not _WaitForSync(self, instance)
5786       if disk_abort:
5787         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5788                              " status.\nPlease check the instance.")
5789
5790
5791 class LUQueryInstanceData(NoHooksLU):
5792   """Query runtime instance data.
5793
5794   """
5795   _OP_REQP = ["instances", "static"]
5796   REQ_BGL = False
5797
5798   def ExpandNames(self):
5799     self.needed_locks = {}
5800     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5801
5802     if not isinstance(self.op.instances, list):
5803       raise errors.OpPrereqError("Invalid argument type 'instances'")
5804
5805     if self.op.instances:
5806       self.wanted_names = []
5807       for name in self.op.instances:
5808         full_name = self.cfg.ExpandInstanceName(name)
5809         if full_name is None:
5810           raise errors.OpPrereqError("Instance '%s' not known" % name)
5811         self.wanted_names.append(full_name)
5812       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5813     else:
5814       self.wanted_names = None
5815       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5816
5817     self.needed_locks[locking.LEVEL_NODE] = []
5818     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5819
5820   def DeclareLocks(self, level):
5821     if level == locking.LEVEL_NODE:
5822       self._LockInstancesNodes()
5823
5824   def CheckPrereq(self):
5825     """Check prerequisites.
5826
5827     This only checks the optional instance list against the existing names.
5828
5829     """
5830     if self.wanted_names is None:
5831       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5832
5833     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5834                              in self.wanted_names]
5835     return
5836
5837   def _ComputeDiskStatus(self, instance, snode, dev):
5838     """Compute block device status.
5839
5840     """
5841     static = self.op.static
5842     if not static:
5843       self.cfg.SetDiskID(dev, instance.primary_node)
5844       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5845       if dev_pstatus.offline:
5846         dev_pstatus = None
5847       else:
5848         msg = dev_pstatus.RemoteFailMsg()
5849         if msg:
5850           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5851                                    (instance.name, msg))
5852         dev_pstatus = dev_pstatus.payload
5853     else:
5854       dev_pstatus = None
5855
5856     if dev.dev_type in constants.LDS_DRBD:
5857       # we change the snode then (otherwise we use the one passed in)
5858       if dev.logical_id[0] == instance.primary_node:
5859         snode = dev.logical_id[1]
5860       else:
5861         snode = dev.logical_id[0]
5862
5863     if snode and not static:
5864       self.cfg.SetDiskID(dev, snode)
5865       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5866       if dev_sstatus.offline:
5867         dev_sstatus = None
5868       else:
5869         msg = dev_sstatus.RemoteFailMsg()
5870         if msg:
5871           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5872                                    (instance.name, msg))
5873         dev_sstatus = dev_sstatus.payload
5874     else:
5875       dev_sstatus = None
5876
5877     if dev.children:
5878       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5879                       for child in dev.children]
5880     else:
5881       dev_children = []
5882
5883     data = {
5884       "iv_name": dev.iv_name,
5885       "dev_type": dev.dev_type,
5886       "logical_id": dev.logical_id,
5887       "physical_id": dev.physical_id,
5888       "pstatus": dev_pstatus,
5889       "sstatus": dev_sstatus,
5890       "children": dev_children,
5891       "mode": dev.mode,
5892       "size": dev.size,
5893       }
5894
5895     return data
5896
5897   def Exec(self, feedback_fn):
5898     """Gather and return data"""
5899     result = {}
5900
5901     cluster = self.cfg.GetClusterInfo()
5902
5903     for instance in self.wanted_instances:
5904       if not self.op.static:
5905         remote_info = self.rpc.call_instance_info(instance.primary_node,
5906                                                   instance.name,
5907                                                   instance.hypervisor)
5908         remote_info.Raise()
5909         remote_info = remote_info.data
5910         if remote_info and "state" in remote_info:
5911           remote_state = "up"
5912         else:
5913           remote_state = "down"
5914       else:
5915         remote_state = None
5916       if instance.admin_up:
5917         config_state = "up"
5918       else:
5919         config_state = "down"
5920
5921       disks = [self._ComputeDiskStatus(instance, None, device)
5922                for device in instance.disks]
5923
5924       idict = {
5925         "name": instance.name,
5926         "config_state": config_state,
5927         "run_state": remote_state,
5928         "pnode": instance.primary_node,
5929         "snodes": instance.secondary_nodes,
5930         "os": instance.os,
5931         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5932         "disks": disks,
5933         "hypervisor": instance.hypervisor,
5934         "network_port": instance.network_port,
5935         "hv_instance": instance.hvparams,
5936         "hv_actual": cluster.FillHV(instance),
5937         "be_instance": instance.beparams,
5938         "be_actual": cluster.FillBE(instance),
5939         }
5940
5941       result[instance.name] = idict
5942
5943     return result
5944
5945
5946 class LUSetInstanceParams(LogicalUnit):
5947   """Modifies an instances's parameters.
5948
5949   """
5950   HPATH = "instance-modify"
5951   HTYPE = constants.HTYPE_INSTANCE
5952   _OP_REQP = ["instance_name"]
5953   REQ_BGL = False
5954
5955   def CheckArguments(self):
5956     if not hasattr(self.op, 'nics'):
5957       self.op.nics = []
5958     if not hasattr(self.op, 'disks'):
5959       self.op.disks = []
5960     if not hasattr(self.op, 'beparams'):
5961       self.op.beparams = {}
5962     if not hasattr(self.op, 'hvparams'):
5963       self.op.hvparams = {}
5964     self.op.force = getattr(self.op, "force", False)
5965     if not (self.op.nics or self.op.disks or
5966             self.op.hvparams or self.op.beparams):
5967       raise errors.OpPrereqError("No changes submitted")
5968
5969     # Disk validation
5970     disk_addremove = 0
5971     for disk_op, disk_dict in self.op.disks:
5972       if disk_op == constants.DDM_REMOVE:
5973         disk_addremove += 1
5974         continue
5975       elif disk_op == constants.DDM_ADD:
5976         disk_addremove += 1
5977       else:
5978         if not isinstance(disk_op, int):
5979           raise errors.OpPrereqError("Invalid disk index")
5980       if disk_op == constants.DDM_ADD:
5981         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5982         if mode not in constants.DISK_ACCESS_SET:
5983           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5984         size = disk_dict.get('size', None)
5985         if size is None:
5986           raise errors.OpPrereqError("Required disk parameter size missing")
5987         try:
5988           size = int(size)
5989         except ValueError, err:
5990           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5991                                      str(err))
5992         disk_dict['size'] = size
5993       else:
5994         # modification of disk
5995         if 'size' in disk_dict:
5996           raise errors.OpPrereqError("Disk size change not possible, use"
5997                                      " grow-disk")
5998
5999     if disk_addremove > 1:
6000       raise errors.OpPrereqError("Only one disk add or remove operation"
6001                                  " supported at a time")
6002
6003     # NIC validation
6004     nic_addremove = 0
6005     for nic_op, nic_dict in self.op.nics:
6006       if nic_op == constants.DDM_REMOVE:
6007         nic_addremove += 1
6008         continue
6009       elif nic_op == constants.DDM_ADD:
6010         nic_addremove += 1
6011       else:
6012         if not isinstance(nic_op, int):
6013           raise errors.OpPrereqError("Invalid nic index")
6014
6015       # nic_dict should be a dict
6016       nic_ip = nic_dict.get('ip', None)
6017       if nic_ip is not None:
6018         if nic_ip.lower() == constants.VALUE_NONE:
6019           nic_dict['ip'] = None
6020         else:
6021           if not utils.IsValidIP(nic_ip):
6022             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6023
6024       if nic_op == constants.DDM_ADD:
6025         nic_bridge = nic_dict.get('bridge', None)
6026         if nic_bridge is None:
6027           nic_dict['bridge'] = self.cfg.GetDefBridge()
6028         nic_mac = nic_dict.get('mac', None)
6029         if nic_mac is None:
6030           nic_dict['mac'] = constants.VALUE_AUTO
6031
6032       if 'mac' in nic_dict:
6033         nic_mac = nic_dict['mac']
6034         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6035           if not utils.IsValidMac(nic_mac):
6036             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6037         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6038           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6039                                      " modifying an existing nic")
6040
6041     if nic_addremove > 1:
6042       raise errors.OpPrereqError("Only one NIC add or remove operation"
6043                                  " supported at a time")
6044
6045   def ExpandNames(self):
6046     self._ExpandAndLockInstance()
6047     self.needed_locks[locking.LEVEL_NODE] = []
6048     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6049
6050   def DeclareLocks(self, level):
6051     if level == locking.LEVEL_NODE:
6052       self._LockInstancesNodes()
6053
6054   def BuildHooksEnv(self):
6055     """Build hooks env.
6056
6057     This runs on the master, primary and secondaries.
6058
6059     """
6060     args = dict()
6061     if constants.BE_MEMORY in self.be_new:
6062       args['memory'] = self.be_new[constants.BE_MEMORY]
6063     if constants.BE_VCPUS in self.be_new:
6064       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6065     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6066     # information at all.
6067     if self.op.nics:
6068       args['nics'] = []
6069       nic_override = dict(self.op.nics)
6070       for idx, nic in enumerate(self.instance.nics):
6071         if idx in nic_override:
6072           this_nic_override = nic_override[idx]
6073         else:
6074           this_nic_override = {}
6075         if 'ip' in this_nic_override:
6076           ip = this_nic_override['ip']
6077         else:
6078           ip = nic.ip
6079         if 'bridge' in this_nic_override:
6080           bridge = this_nic_override['bridge']
6081         else:
6082           bridge = nic.bridge
6083         if 'mac' in this_nic_override:
6084           mac = this_nic_override['mac']
6085         else:
6086           mac = nic.mac
6087         args['nics'].append((ip, bridge, mac))
6088       if constants.DDM_ADD in nic_override:
6089         ip = nic_override[constants.DDM_ADD].get('ip', None)
6090         bridge = nic_override[constants.DDM_ADD]['bridge']
6091         mac = nic_override[constants.DDM_ADD]['mac']
6092         args['nics'].append((ip, bridge, mac))
6093       elif constants.DDM_REMOVE in nic_override:
6094         del args['nics'][-1]
6095
6096     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6097     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6098     return env, nl, nl
6099
6100   def CheckPrereq(self):
6101     """Check prerequisites.
6102
6103     This only checks the instance list against the existing names.
6104
6105     """
6106     force = self.force = self.op.force
6107
6108     # checking the new params on the primary/secondary nodes
6109
6110     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6111     assert self.instance is not None, \
6112       "Cannot retrieve locked instance %s" % self.op.instance_name
6113     pnode = instance.primary_node
6114     nodelist = list(instance.all_nodes)
6115
6116     # hvparams processing
6117     if self.op.hvparams:
6118       i_hvdict = copy.deepcopy(instance.hvparams)
6119       for key, val in self.op.hvparams.iteritems():
6120         if val == constants.VALUE_DEFAULT:
6121           try:
6122             del i_hvdict[key]
6123           except KeyError:
6124             pass
6125         else:
6126           i_hvdict[key] = val
6127       cluster = self.cfg.GetClusterInfo()
6128       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6129       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6130                                 i_hvdict)
6131       # local check
6132       hypervisor.GetHypervisor(
6133         instance.hypervisor).CheckParameterSyntax(hv_new)
6134       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6135       self.hv_new = hv_new # the new actual values
6136       self.hv_inst = i_hvdict # the new dict (without defaults)
6137     else:
6138       self.hv_new = self.hv_inst = {}
6139
6140     # beparams processing
6141     if self.op.beparams:
6142       i_bedict = copy.deepcopy(instance.beparams)
6143       for key, val in self.op.beparams.iteritems():
6144         if val == constants.VALUE_DEFAULT:
6145           try:
6146             del i_bedict[key]
6147           except KeyError:
6148             pass
6149         else:
6150           i_bedict[key] = val
6151       cluster = self.cfg.GetClusterInfo()
6152       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6153       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6154                                 i_bedict)
6155       self.be_new = be_new # the new actual values
6156       self.be_inst = i_bedict # the new dict (without defaults)
6157     else:
6158       self.be_new = self.be_inst = {}
6159
6160     self.warn = []
6161
6162     if constants.BE_MEMORY in self.op.beparams and not self.force:
6163       mem_check_list = [pnode]
6164       if be_new[constants.BE_AUTO_BALANCE]:
6165         # either we changed auto_balance to yes or it was from before
6166         mem_check_list.extend(instance.secondary_nodes)
6167       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6168                                                   instance.hypervisor)
6169       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6170                                          instance.hypervisor)
6171       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6172         # Assume the primary node is unreachable and go ahead
6173         self.warn.append("Can't get info from primary node %s" % pnode)
6174       else:
6175         if not instance_info.failed and instance_info.data:
6176           current_mem = int(instance_info.data['memory'])
6177         else:
6178           # Assume instance not running
6179           # (there is a slight race condition here, but it's not very probable,
6180           # and we have no other way to check)
6181           current_mem = 0
6182         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6183                     nodeinfo[pnode].data['memory_free'])
6184         if miss_mem > 0:
6185           raise errors.OpPrereqError("This change will prevent the instance"
6186                                      " from starting, due to %d MB of memory"
6187                                      " missing on its primary node" % miss_mem)
6188
6189       if be_new[constants.BE_AUTO_BALANCE]:
6190         for node, nres in nodeinfo.iteritems():
6191           if node not in instance.secondary_nodes:
6192             continue
6193           if nres.failed or not isinstance(nres.data, dict):
6194             self.warn.append("Can't get info from secondary node %s" % node)
6195           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6196             self.warn.append("Not enough memory to failover instance to"
6197                              " secondary node %s" % node)
6198
6199     # NIC processing
6200     for nic_op, nic_dict in self.op.nics:
6201       if nic_op == constants.DDM_REMOVE:
6202         if not instance.nics:
6203           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6204         continue
6205       if nic_op != constants.DDM_ADD:
6206         # an existing nic
6207         if nic_op < 0 or nic_op >= len(instance.nics):
6208           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6209                                      " are 0 to %d" %
6210                                      (nic_op, len(instance.nics)))
6211       if 'bridge' in nic_dict:
6212         nic_bridge = nic_dict['bridge']
6213         if nic_bridge is None:
6214           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6215         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6216           msg = ("Bridge '%s' doesn't exist on one of"
6217                  " the instance nodes" % nic_bridge)
6218           if self.force:
6219             self.warn.append(msg)
6220           else:
6221             raise errors.OpPrereqError(msg)
6222       if 'mac' in nic_dict:
6223         nic_mac = nic_dict['mac']
6224         if nic_mac is None:
6225           raise errors.OpPrereqError('Cannot set the nic mac to None')
6226         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6227           # otherwise generate the mac
6228           nic_dict['mac'] = self.cfg.GenerateMAC()
6229         else:
6230           # or validate/reserve the current one
6231           if self.cfg.IsMacInUse(nic_mac):
6232             raise errors.OpPrereqError("MAC address %s already in use"
6233                                        " in cluster" % nic_mac)
6234
6235     # DISK processing
6236     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6237       raise errors.OpPrereqError("Disk operations not supported for"
6238                                  " diskless instances")
6239     for disk_op, disk_dict in self.op.disks:
6240       if disk_op == constants.DDM_REMOVE:
6241         if len(instance.disks) == 1:
6242           raise errors.OpPrereqError("Cannot remove the last disk of"
6243                                      " an instance")
6244         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6245         ins_l = ins_l[pnode]
6246         if ins_l.failed or not isinstance(ins_l.data, list):
6247           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6248         if instance.name in ins_l.data:
6249           raise errors.OpPrereqError("Instance is running, can't remove"
6250                                      " disks.")
6251
6252       if (disk_op == constants.DDM_ADD and
6253           len(instance.nics) >= constants.MAX_DISKS):
6254         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6255                                    " add more" % constants.MAX_DISKS)
6256       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6257         # an existing disk
6258         if disk_op < 0 or disk_op >= len(instance.disks):
6259           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6260                                      " are 0 to %d" %
6261                                      (disk_op, len(instance.disks)))
6262
6263     return
6264
6265   def Exec(self, feedback_fn):
6266     """Modifies an instance.
6267
6268     All parameters take effect only at the next restart of the instance.
6269
6270     """
6271     # Process here the warnings from CheckPrereq, as we don't have a
6272     # feedback_fn there.
6273     for warn in self.warn:
6274       feedback_fn("WARNING: %s" % warn)
6275
6276     result = []
6277     instance = self.instance
6278     # disk changes
6279     for disk_op, disk_dict in self.op.disks:
6280       if disk_op == constants.DDM_REMOVE:
6281         # remove the last disk
6282         device = instance.disks.pop()
6283         device_idx = len(instance.disks)
6284         for node, disk in device.ComputeNodeTree(instance.primary_node):
6285           self.cfg.SetDiskID(disk, node)
6286           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6287           if msg:
6288             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6289                             " continuing anyway", device_idx, node, msg)
6290         result.append(("disk/%d" % device_idx, "remove"))
6291       elif disk_op == constants.DDM_ADD:
6292         # add a new disk
6293         if instance.disk_template == constants.DT_FILE:
6294           file_driver, file_path = instance.disks[0].logical_id
6295           file_path = os.path.dirname(file_path)
6296         else:
6297           file_driver = file_path = None
6298         disk_idx_base = len(instance.disks)
6299         new_disk = _GenerateDiskTemplate(self,
6300                                          instance.disk_template,
6301                                          instance.name, instance.primary_node,
6302                                          instance.secondary_nodes,
6303                                          [disk_dict],
6304                                          file_path,
6305                                          file_driver,
6306                                          disk_idx_base)[0]
6307         instance.disks.append(new_disk)
6308         info = _GetInstanceInfoText(instance)
6309
6310         logging.info("Creating volume %s for instance %s",
6311                      new_disk.iv_name, instance.name)
6312         # Note: this needs to be kept in sync with _CreateDisks
6313         #HARDCODE
6314         for node in instance.all_nodes:
6315           f_create = node == instance.primary_node
6316           try:
6317             _CreateBlockDev(self, node, instance, new_disk,
6318                             f_create, info, f_create)
6319           except errors.OpExecError, err:
6320             self.LogWarning("Failed to create volume %s (%s) on"
6321                             " node %s: %s",
6322                             new_disk.iv_name, new_disk, node, err)
6323         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6324                        (new_disk.size, new_disk.mode)))
6325       else:
6326         # change a given disk
6327         instance.disks[disk_op].mode = disk_dict['mode']
6328         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6329     # NIC changes
6330     for nic_op, nic_dict in self.op.nics:
6331       if nic_op == constants.DDM_REMOVE:
6332         # remove the last nic
6333         del instance.nics[-1]
6334         result.append(("nic.%d" % len(instance.nics), "remove"))
6335       elif nic_op == constants.DDM_ADD:
6336         # mac and bridge should be set, by now
6337         mac = nic_dict['mac']
6338         bridge = nic_dict['bridge']
6339         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6340                               bridge=bridge)
6341         instance.nics.append(new_nic)
6342         result.append(("nic.%d" % (len(instance.nics) - 1),
6343                        "add:mac=%s,ip=%s,bridge=%s" %
6344                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6345       else:
6346         # change a given nic
6347         for key in 'mac', 'ip', 'bridge':
6348           if key in nic_dict:
6349             setattr(instance.nics[nic_op], key, nic_dict[key])
6350             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6351
6352     # hvparams changes
6353     if self.op.hvparams:
6354       instance.hvparams = self.hv_inst
6355       for key, val in self.op.hvparams.iteritems():
6356         result.append(("hv/%s" % key, val))
6357
6358     # beparams changes
6359     if self.op.beparams:
6360       instance.beparams = self.be_inst
6361       for key, val in self.op.beparams.iteritems():
6362         result.append(("be/%s" % key, val))
6363
6364     self.cfg.Update(instance)
6365
6366     return result
6367
6368
6369 class LUQueryExports(NoHooksLU):
6370   """Query the exports list
6371
6372   """
6373   _OP_REQP = ['nodes']
6374   REQ_BGL = False
6375
6376   def ExpandNames(self):
6377     self.needed_locks = {}
6378     self.share_locks[locking.LEVEL_NODE] = 1
6379     if not self.op.nodes:
6380       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6381     else:
6382       self.needed_locks[locking.LEVEL_NODE] = \
6383         _GetWantedNodes(self, self.op.nodes)
6384
6385   def CheckPrereq(self):
6386     """Check prerequisites.
6387
6388     """
6389     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6390
6391   def Exec(self, feedback_fn):
6392     """Compute the list of all the exported system images.
6393
6394     @rtype: dict
6395     @return: a dictionary with the structure node->(export-list)
6396         where export-list is a list of the instances exported on
6397         that node.
6398
6399     """
6400     rpcresult = self.rpc.call_export_list(self.nodes)
6401     result = {}
6402     for node in rpcresult:
6403       if rpcresult[node].failed:
6404         result[node] = False
6405       else:
6406         result[node] = rpcresult[node].data
6407
6408     return result
6409
6410
6411 class LUExportInstance(LogicalUnit):
6412   """Export an instance to an image in the cluster.
6413
6414   """
6415   HPATH = "instance-export"
6416   HTYPE = constants.HTYPE_INSTANCE
6417   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6418   REQ_BGL = False
6419
6420   def ExpandNames(self):
6421     self._ExpandAndLockInstance()
6422     # FIXME: lock only instance primary and destination node
6423     #
6424     # Sad but true, for now we have do lock all nodes, as we don't know where
6425     # the previous export might be, and and in this LU we search for it and
6426     # remove it from its current node. In the future we could fix this by:
6427     #  - making a tasklet to search (share-lock all), then create the new one,
6428     #    then one to remove, after
6429     #  - removing the removal operation altoghether
6430     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6431
6432   def DeclareLocks(self, level):
6433     """Last minute lock declaration."""
6434     # All nodes are locked anyway, so nothing to do here.
6435
6436   def BuildHooksEnv(self):
6437     """Build hooks env.
6438
6439     This will run on the master, primary node and target node.
6440
6441     """
6442     env = {
6443       "EXPORT_NODE": self.op.target_node,
6444       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6445       }
6446     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6447     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6448           self.op.target_node]
6449     return env, nl, nl
6450
6451   def CheckPrereq(self):
6452     """Check prerequisites.
6453
6454     This checks that the instance and node names are valid.
6455
6456     """
6457     instance_name = self.op.instance_name
6458     self.instance = self.cfg.GetInstanceInfo(instance_name)
6459     assert self.instance is not None, \
6460           "Cannot retrieve locked instance %s" % self.op.instance_name
6461     _CheckNodeOnline(self, self.instance.primary_node)
6462
6463     self.dst_node = self.cfg.GetNodeInfo(
6464       self.cfg.ExpandNodeName(self.op.target_node))
6465
6466     if self.dst_node is None:
6467       # This is wrong node name, not a non-locked node
6468       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6469     _CheckNodeOnline(self, self.dst_node.name)
6470     _CheckNodeNotDrained(self, self.dst_node.name)
6471
6472     # instance disk type verification
6473     for disk in self.instance.disks:
6474       if disk.dev_type == constants.LD_FILE:
6475         raise errors.OpPrereqError("Export not supported for instances with"
6476                                    " file-based disks")
6477
6478   def Exec(self, feedback_fn):
6479     """Export an instance to an image in the cluster.
6480
6481     """
6482     instance = self.instance
6483     dst_node = self.dst_node
6484     src_node = instance.primary_node
6485     if self.op.shutdown:
6486       # shutdown the instance, but not the disks
6487       result = self.rpc.call_instance_shutdown(src_node, instance)
6488       msg = result.RemoteFailMsg()
6489       if msg:
6490         raise errors.OpExecError("Could not shutdown instance %s on"
6491                                  " node %s: %s" %
6492                                  (instance.name, src_node, msg))
6493
6494     vgname = self.cfg.GetVGName()
6495
6496     snap_disks = []
6497
6498     # set the disks ID correctly since call_instance_start needs the
6499     # correct drbd minor to create the symlinks
6500     for disk in instance.disks:
6501       self.cfg.SetDiskID(disk, src_node)
6502
6503     # per-disk results
6504     dresults = []
6505     try:
6506       for idx, disk in enumerate(instance.disks):
6507         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6508         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6509         if new_dev_name.failed or not new_dev_name.data:
6510           self.LogWarning("Could not snapshot disk/%d on node %s",
6511                           idx, src_node)
6512           snap_disks.append(False)
6513         else:
6514           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6515                                  logical_id=(vgname, new_dev_name.data),
6516                                  physical_id=(vgname, new_dev_name.data),
6517                                  iv_name=disk.iv_name)
6518           snap_disks.append(new_dev)
6519
6520     finally:
6521       if self.op.shutdown and instance.admin_up:
6522         result = self.rpc.call_instance_start(src_node, instance, None, None)
6523         msg = result.RemoteFailMsg()
6524         if msg:
6525           _ShutdownInstanceDisks(self, instance)
6526           raise errors.OpExecError("Could not start instance: %s" % msg)
6527
6528     # TODO: check for size
6529
6530     cluster_name = self.cfg.GetClusterName()
6531     for idx, dev in enumerate(snap_disks):
6532       if dev:
6533         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6534                                                instance, cluster_name, idx)
6535         if result.failed or not result.data:
6536           self.LogWarning("Could not export disk/%d from node %s to"
6537                           " node %s", idx, src_node, dst_node.name)
6538           dresults.append(False)
6539         else:
6540           dresults.append(True)
6541         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6542         if msg:
6543           self.LogWarning("Could not remove snapshot for disk/%d from node"
6544                           " %s: %s", idx, src_node, msg)
6545       else:
6546         dresults.append(False)
6547
6548     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6549     fin_resu = True
6550     if result.failed or not result.data:
6551       self.LogWarning("Could not finalize export for instance %s on node %s",
6552                       instance.name, dst_node.name)
6553       fin_resu = False
6554
6555     nodelist = self.cfg.GetNodeList()
6556     nodelist.remove(dst_node.name)
6557
6558     # on one-node clusters nodelist will be empty after the removal
6559     # if we proceed the backup would be removed because OpQueryExports
6560     # substitutes an empty list with the full cluster node list.
6561     if nodelist:
6562       exportlist = self.rpc.call_export_list(nodelist)
6563       for node in exportlist:
6564         if exportlist[node].failed:
6565           continue
6566         if instance.name in exportlist[node].data:
6567           if not self.rpc.call_export_remove(node, instance.name):
6568             self.LogWarning("Could not remove older export for instance %s"
6569                             " on node %s", instance.name, node)
6570     return fin_resu, dresults
6571
6572
6573 class LURemoveExport(NoHooksLU):
6574   """Remove exports related to the named instance.
6575
6576   """
6577   _OP_REQP = ["instance_name"]
6578   REQ_BGL = False
6579
6580   def ExpandNames(self):
6581     self.needed_locks = {}
6582     # We need all nodes to be locked in order for RemoveExport to work, but we
6583     # don't need to lock the instance itself, as nothing will happen to it (and
6584     # we can remove exports also for a removed instance)
6585     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6586
6587   def CheckPrereq(self):
6588     """Check prerequisites.
6589     """
6590     pass
6591
6592   def Exec(self, feedback_fn):
6593     """Remove any export.
6594
6595     """
6596     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6597     # If the instance was not found we'll try with the name that was passed in.
6598     # This will only work if it was an FQDN, though.
6599     fqdn_warn = False
6600     if not instance_name:
6601       fqdn_warn = True
6602       instance_name = self.op.instance_name
6603
6604     exportlist = self.rpc.call_export_list(self.acquired_locks[
6605       locking.LEVEL_NODE])
6606     found = False
6607     for node in exportlist:
6608       if exportlist[node].failed:
6609         self.LogWarning("Failed to query node %s, continuing" % node)
6610         continue
6611       if instance_name in exportlist[node].data:
6612         found = True
6613         result = self.rpc.call_export_remove(node, instance_name)
6614         if result.failed or not result.data:
6615           logging.error("Could not remove export for instance %s"
6616                         " on node %s", instance_name, node)
6617
6618     if fqdn_warn and not found:
6619       feedback_fn("Export not found. If trying to remove an export belonging"
6620                   " to a deleted instance please use its Fully Qualified"
6621                   " Domain Name.")
6622
6623
6624 class TagsLU(NoHooksLU):
6625   """Generic tags LU.
6626
6627   This is an abstract class which is the parent of all the other tags LUs.
6628
6629   """
6630
6631   def ExpandNames(self):
6632     self.needed_locks = {}
6633     if self.op.kind == constants.TAG_NODE:
6634       name = self.cfg.ExpandNodeName(self.op.name)
6635       if name is None:
6636         raise errors.OpPrereqError("Invalid node name (%s)" %
6637                                    (self.op.name,))
6638       self.op.name = name
6639       self.needed_locks[locking.LEVEL_NODE] = name
6640     elif self.op.kind == constants.TAG_INSTANCE:
6641       name = self.cfg.ExpandInstanceName(self.op.name)
6642       if name is None:
6643         raise errors.OpPrereqError("Invalid instance name (%s)" %
6644                                    (self.op.name,))
6645       self.op.name = name
6646       self.needed_locks[locking.LEVEL_INSTANCE] = name
6647
6648   def CheckPrereq(self):
6649     """Check prerequisites.
6650
6651     """
6652     if self.op.kind == constants.TAG_CLUSTER:
6653       self.target = self.cfg.GetClusterInfo()
6654     elif self.op.kind == constants.TAG_NODE:
6655       self.target = self.cfg.GetNodeInfo(self.op.name)
6656     elif self.op.kind == constants.TAG_INSTANCE:
6657       self.target = self.cfg.GetInstanceInfo(self.op.name)
6658     else:
6659       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6660                                  str(self.op.kind))
6661
6662
6663 class LUGetTags(TagsLU):
6664   """Returns the tags of a given object.
6665
6666   """
6667   _OP_REQP = ["kind", "name"]
6668   REQ_BGL = False
6669
6670   def Exec(self, feedback_fn):
6671     """Returns the tag list.
6672
6673     """
6674     return list(self.target.GetTags())
6675
6676
6677 class LUSearchTags(NoHooksLU):
6678   """Searches the tags for a given pattern.
6679
6680   """
6681   _OP_REQP = ["pattern"]
6682   REQ_BGL = False
6683
6684   def ExpandNames(self):
6685     self.needed_locks = {}
6686
6687   def CheckPrereq(self):
6688     """Check prerequisites.
6689
6690     This checks the pattern passed for validity by compiling it.
6691
6692     """
6693     try:
6694       self.re = re.compile(self.op.pattern)
6695     except re.error, err:
6696       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6697                                  (self.op.pattern, err))
6698
6699   def Exec(self, feedback_fn):
6700     """Returns the tag list.
6701
6702     """
6703     cfg = self.cfg
6704     tgts = [("/cluster", cfg.GetClusterInfo())]
6705     ilist = cfg.GetAllInstancesInfo().values()
6706     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6707     nlist = cfg.GetAllNodesInfo().values()
6708     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6709     results = []
6710     for path, target in tgts:
6711       for tag in target.GetTags():
6712         if self.re.search(tag):
6713           results.append((path, tag))
6714     return results
6715
6716
6717 class LUAddTags(TagsLU):
6718   """Sets a tag on a given object.
6719
6720   """
6721   _OP_REQP = ["kind", "name", "tags"]
6722   REQ_BGL = False
6723
6724   def CheckPrereq(self):
6725     """Check prerequisites.
6726
6727     This checks the type and length of the tag name and value.
6728
6729     """
6730     TagsLU.CheckPrereq(self)
6731     for tag in self.op.tags:
6732       objects.TaggableObject.ValidateTag(tag)
6733
6734   def Exec(self, feedback_fn):
6735     """Sets the tag.
6736
6737     """
6738     try:
6739       for tag in self.op.tags:
6740         self.target.AddTag(tag)
6741     except errors.TagError, err:
6742       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6743     try:
6744       self.cfg.Update(self.target)
6745     except errors.ConfigurationError:
6746       raise errors.OpRetryError("There has been a modification to the"
6747                                 " config file and the operation has been"
6748                                 " aborted. Please retry.")
6749
6750
6751 class LUDelTags(TagsLU):
6752   """Delete a list of tags from a given object.
6753
6754   """
6755   _OP_REQP = ["kind", "name", "tags"]
6756   REQ_BGL = False
6757
6758   def CheckPrereq(self):
6759     """Check prerequisites.
6760
6761     This checks that we have the given tag.
6762
6763     """
6764     TagsLU.CheckPrereq(self)
6765     for tag in self.op.tags:
6766       objects.TaggableObject.ValidateTag(tag)
6767     del_tags = frozenset(self.op.tags)
6768     cur_tags = self.target.GetTags()
6769     if not del_tags <= cur_tags:
6770       diff_tags = del_tags - cur_tags
6771       diff_names = ["'%s'" % tag for tag in diff_tags]
6772       diff_names.sort()
6773       raise errors.OpPrereqError("Tag(s) %s not found" %
6774                                  (",".join(diff_names)))
6775
6776   def Exec(self, feedback_fn):
6777     """Remove the tag from the object.
6778
6779     """
6780     for tag in self.op.tags:
6781       self.target.RemoveTag(tag)
6782     try:
6783       self.cfg.Update(self.target)
6784     except errors.ConfigurationError:
6785       raise errors.OpRetryError("There has been a modification to the"
6786                                 " config file and the operation has been"
6787                                 " aborted. Please retry.")
6788
6789
6790 class LUTestDelay(NoHooksLU):
6791   """Sleep for a specified amount of time.
6792
6793   This LU sleeps on the master and/or nodes for a specified amount of
6794   time.
6795
6796   """
6797   _OP_REQP = ["duration", "on_master", "on_nodes"]
6798   REQ_BGL = False
6799
6800   def ExpandNames(self):
6801     """Expand names and set required locks.
6802
6803     This expands the node list, if any.
6804
6805     """
6806     self.needed_locks = {}
6807     if self.op.on_nodes:
6808       # _GetWantedNodes can be used here, but is not always appropriate to use
6809       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6810       # more information.
6811       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6812       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6813
6814   def CheckPrereq(self):
6815     """Check prerequisites.
6816
6817     """
6818
6819   def Exec(self, feedback_fn):
6820     """Do the actual sleep.
6821
6822     """
6823     if self.op.on_master:
6824       if not utils.TestDelay(self.op.duration):
6825         raise errors.OpExecError("Error during master delay test")
6826     if self.op.on_nodes:
6827       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6828       if not result:
6829         raise errors.OpExecError("Complete failure from rpc call")
6830       for node, node_result in result.items():
6831         node_result.Raise()
6832         if not node_result.data:
6833           raise errors.OpExecError("Failure during rpc call to node %s,"
6834                                    " result: %s" % (node, node_result.data))
6835
6836
6837 class IAllocator(object):
6838   """IAllocator framework.
6839
6840   An IAllocator instance has three sets of attributes:
6841     - cfg that is needed to query the cluster
6842     - input data (all members of the _KEYS class attribute are required)
6843     - four buffer attributes (in|out_data|text), that represent the
6844       input (to the external script) in text and data structure format,
6845       and the output from it, again in two formats
6846     - the result variables from the script (success, info, nodes) for
6847       easy usage
6848
6849   """
6850   _ALLO_KEYS = [
6851     "mem_size", "disks", "disk_template",
6852     "os", "tags", "nics", "vcpus", "hypervisor",
6853     ]
6854   _RELO_KEYS = [
6855     "relocate_from",
6856     ]
6857
6858   def __init__(self, lu, mode, name, **kwargs):
6859     self.lu = lu
6860     # init buffer variables
6861     self.in_text = self.out_text = self.in_data = self.out_data = None
6862     # init all input fields so that pylint is happy
6863     self.mode = mode
6864     self.name = name
6865     self.mem_size = self.disks = self.disk_template = None
6866     self.os = self.tags = self.nics = self.vcpus = None
6867     self.hypervisor = None
6868     self.relocate_from = None
6869     # computed fields
6870     self.required_nodes = None
6871     # init result fields
6872     self.success = self.info = self.nodes = None
6873     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6874       keyset = self._ALLO_KEYS
6875     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6876       keyset = self._RELO_KEYS
6877     else:
6878       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6879                                    " IAllocator" % self.mode)
6880     for key in kwargs:
6881       if key not in keyset:
6882         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6883                                      " IAllocator" % key)
6884       setattr(self, key, kwargs[key])
6885     for key in keyset:
6886       if key not in kwargs:
6887         raise errors.ProgrammerError("Missing input parameter '%s' to"
6888                                      " IAllocator" % key)
6889     self._BuildInputData()
6890
6891   def _ComputeClusterData(self):
6892     """Compute the generic allocator input data.
6893
6894     This is the data that is independent of the actual operation.
6895
6896     """
6897     cfg = self.lu.cfg
6898     cluster_info = cfg.GetClusterInfo()
6899     # cluster data
6900     data = {
6901       "version": constants.IALLOCATOR_VERSION,
6902       "cluster_name": cfg.GetClusterName(),
6903       "cluster_tags": list(cluster_info.GetTags()),
6904       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6905       # we don't have job IDs
6906       }
6907     iinfo = cfg.GetAllInstancesInfo().values()
6908     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6909
6910     # node data
6911     node_results = {}
6912     node_list = cfg.GetNodeList()
6913
6914     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6915       hypervisor_name = self.hypervisor
6916     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6917       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6918
6919     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6920                                            hypervisor_name)
6921     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6922                        cluster_info.enabled_hypervisors)
6923     for nname, nresult in node_data.items():
6924       # first fill in static (config-based) values
6925       ninfo = cfg.GetNodeInfo(nname)
6926       pnr = {
6927         "tags": list(ninfo.GetTags()),
6928         "primary_ip": ninfo.primary_ip,
6929         "secondary_ip": ninfo.secondary_ip,
6930         "offline": ninfo.offline,
6931         "drained": ninfo.drained,
6932         "master_candidate": ninfo.master_candidate,
6933         }
6934
6935       if not ninfo.offline:
6936         nresult.Raise()
6937         if not isinstance(nresult.data, dict):
6938           raise errors.OpExecError("Can't get data for node %s" % nname)
6939         remote_info = nresult.data
6940         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6941                      'vg_size', 'vg_free', 'cpu_total']:
6942           if attr not in remote_info:
6943             raise errors.OpExecError("Node '%s' didn't return attribute"
6944                                      " '%s'" % (nname, attr))
6945           try:
6946             remote_info[attr] = int(remote_info[attr])
6947           except ValueError, err:
6948             raise errors.OpExecError("Node '%s' returned invalid value"
6949                                      " for '%s': %s" % (nname, attr, err))
6950         # compute memory used by primary instances
6951         i_p_mem = i_p_up_mem = 0
6952         for iinfo, beinfo in i_list:
6953           if iinfo.primary_node == nname:
6954             i_p_mem += beinfo[constants.BE_MEMORY]
6955             if iinfo.name not in node_iinfo[nname].data:
6956               i_used_mem = 0
6957             else:
6958               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6959             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6960             remote_info['memory_free'] -= max(0, i_mem_diff)
6961
6962             if iinfo.admin_up:
6963               i_p_up_mem += beinfo[constants.BE_MEMORY]
6964
6965         # compute memory used by instances
6966         pnr_dyn = {
6967           "total_memory": remote_info['memory_total'],
6968           "reserved_memory": remote_info['memory_dom0'],
6969           "free_memory": remote_info['memory_free'],
6970           "total_disk": remote_info['vg_size'],
6971           "free_disk": remote_info['vg_free'],
6972           "total_cpus": remote_info['cpu_total'],
6973           "i_pri_memory": i_p_mem,
6974           "i_pri_up_memory": i_p_up_mem,
6975           }
6976         pnr.update(pnr_dyn)
6977
6978       node_results[nname] = pnr
6979     data["nodes"] = node_results
6980
6981     # instance data
6982     instance_data = {}
6983     for iinfo, beinfo in i_list:
6984       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6985                   for n in iinfo.nics]
6986       pir = {
6987         "tags": list(iinfo.GetTags()),
6988         "admin_up": iinfo.admin_up,
6989         "vcpus": beinfo[constants.BE_VCPUS],
6990         "memory": beinfo[constants.BE_MEMORY],
6991         "os": iinfo.os,
6992         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6993         "nics": nic_data,
6994         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6995         "disk_template": iinfo.disk_template,
6996         "hypervisor": iinfo.hypervisor,
6997         }
6998       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6999                                                  pir["disks"])
7000       instance_data[iinfo.name] = pir
7001
7002     data["instances"] = instance_data
7003
7004     self.in_data = data
7005
7006   def _AddNewInstance(self):
7007     """Add new instance data to allocator structure.
7008
7009     This in combination with _AllocatorGetClusterData will create the
7010     correct structure needed as input for the allocator.
7011
7012     The checks for the completeness of the opcode must have already been
7013     done.
7014
7015     """
7016     data = self.in_data
7017
7018     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7019
7020     if self.disk_template in constants.DTS_NET_MIRROR:
7021       self.required_nodes = 2
7022     else:
7023       self.required_nodes = 1
7024     request = {
7025       "type": "allocate",
7026       "name": self.name,
7027       "disk_template": self.disk_template,
7028       "tags": self.tags,
7029       "os": self.os,
7030       "vcpus": self.vcpus,
7031       "memory": self.mem_size,
7032       "disks": self.disks,
7033       "disk_space_total": disk_space,
7034       "nics": self.nics,
7035       "required_nodes": self.required_nodes,
7036       }
7037     data["request"] = request
7038
7039   def _AddRelocateInstance(self):
7040     """Add relocate instance data to allocator structure.
7041
7042     This in combination with _IAllocatorGetClusterData will create the
7043     correct structure needed as input for the allocator.
7044
7045     The checks for the completeness of the opcode must have already been
7046     done.
7047
7048     """
7049     instance = self.lu.cfg.GetInstanceInfo(self.name)
7050     if instance is None:
7051       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7052                                    " IAllocator" % self.name)
7053
7054     if instance.disk_template not in constants.DTS_NET_MIRROR:
7055       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7056
7057     if len(instance.secondary_nodes) != 1:
7058       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7059
7060     self.required_nodes = 1
7061     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7062     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7063
7064     request = {
7065       "type": "relocate",
7066       "name": self.name,
7067       "disk_space_total": disk_space,
7068       "required_nodes": self.required_nodes,
7069       "relocate_from": self.relocate_from,
7070       }
7071     self.in_data["request"] = request
7072
7073   def _BuildInputData(self):
7074     """Build input data structures.
7075
7076     """
7077     self._ComputeClusterData()
7078
7079     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7080       self._AddNewInstance()
7081     else:
7082       self._AddRelocateInstance()
7083
7084     self.in_text = serializer.Dump(self.in_data)
7085
7086   def Run(self, name, validate=True, call_fn=None):
7087     """Run an instance allocator and return the results.
7088
7089     """
7090     if call_fn is None:
7091       call_fn = self.lu.rpc.call_iallocator_runner
7092     data = self.in_text
7093
7094     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7095     result.Raise()
7096
7097     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7098       raise errors.OpExecError("Invalid result from master iallocator runner")
7099
7100     rcode, stdout, stderr, fail = result.data
7101
7102     if rcode == constants.IARUN_NOTFOUND:
7103       raise errors.OpExecError("Can't find allocator '%s'" % name)
7104     elif rcode == constants.IARUN_FAILURE:
7105       raise errors.OpExecError("Instance allocator call failed: %s,"
7106                                " output: %s" % (fail, stdout+stderr))
7107     self.out_text = stdout
7108     if validate:
7109       self._ValidateResult()
7110
7111   def _ValidateResult(self):
7112     """Process the allocator results.
7113
7114     This will process and if successful save the result in
7115     self.out_data and the other parameters.
7116
7117     """
7118     try:
7119       rdict = serializer.Load(self.out_text)
7120     except Exception, err:
7121       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7122
7123     if not isinstance(rdict, dict):
7124       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7125
7126     for key in "success", "info", "nodes":
7127       if key not in rdict:
7128         raise errors.OpExecError("Can't parse iallocator results:"
7129                                  " missing key '%s'" % key)
7130       setattr(self, key, rdict[key])
7131
7132     if not isinstance(rdict["nodes"], list):
7133       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7134                                " is not a list")
7135     self.out_data = rdict
7136
7137
7138 class LUTestAllocator(NoHooksLU):
7139   """Run allocator tests.
7140
7141   This LU runs the allocator tests
7142
7143   """
7144   _OP_REQP = ["direction", "mode", "name"]
7145
7146   def CheckPrereq(self):
7147     """Check prerequisites.
7148
7149     This checks the opcode parameters depending on the director and mode test.
7150
7151     """
7152     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7153       for attr in ["name", "mem_size", "disks", "disk_template",
7154                    "os", "tags", "nics", "vcpus"]:
7155         if not hasattr(self.op, attr):
7156           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7157                                      attr)
7158       iname = self.cfg.ExpandInstanceName(self.op.name)
7159       if iname is not None:
7160         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7161                                    iname)
7162       if not isinstance(self.op.nics, list):
7163         raise errors.OpPrereqError("Invalid parameter 'nics'")
7164       for row in self.op.nics:
7165         if (not isinstance(row, dict) or
7166             "mac" not in row or
7167             "ip" not in row or
7168             "bridge" not in row):
7169           raise errors.OpPrereqError("Invalid contents of the"
7170                                      " 'nics' parameter")
7171       if not isinstance(self.op.disks, list):
7172         raise errors.OpPrereqError("Invalid parameter 'disks'")
7173       for row in self.op.disks:
7174         if (not isinstance(row, dict) or
7175             "size" not in row or
7176             not isinstance(row["size"], int) or
7177             "mode" not in row or
7178             row["mode"] not in ['r', 'w']):
7179           raise errors.OpPrereqError("Invalid contents of the"
7180                                      " 'disks' parameter")
7181       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7182         self.op.hypervisor = self.cfg.GetHypervisorType()
7183     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7184       if not hasattr(self.op, "name"):
7185         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7186       fname = self.cfg.ExpandInstanceName(self.op.name)
7187       if fname is None:
7188         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7189                                    self.op.name)
7190       self.op.name = fname
7191       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7192     else:
7193       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7194                                  self.op.mode)
7195
7196     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7197       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7198         raise errors.OpPrereqError("Missing allocator name")
7199     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7200       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7201                                  self.op.direction)
7202
7203   def Exec(self, feedback_fn):
7204     """Run the allocator test.
7205
7206     """
7207     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7208       ial = IAllocator(self,
7209                        mode=self.op.mode,
7210                        name=self.op.name,
7211                        mem_size=self.op.mem_size,
7212                        disks=self.op.disks,
7213                        disk_template=self.op.disk_template,
7214                        os=self.op.os,
7215                        tags=self.op.tags,
7216                        nics=self.op.nics,
7217                        vcpus=self.op.vcpus,
7218                        hypervisor=self.op.hypervisor,
7219                        )
7220     else:
7221       ial = IAllocator(self,
7222                        mode=self.op.mode,
7223                        name=self.op.name,
7224                        relocate_from=list(self.relocate_from),
7225                        )
7226
7227     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7228       result = ial.in_text
7229     else:
7230       ial.Run(self.op.allocator, validate=False)
7231       result = ial.out_text
7232     return result