Fix yet another bug in LURepairDiskSizes
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: file '%s' should not exist on non master"
755                         " candidates (and the file is outdated)" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURepairDiskSizes(NoHooksLU):
1333   """Verifies the cluster disks sizes.
1334
1335   """
1336   _OP_REQP = ["instances"]
1337   REQ_BGL = False
1338
1339   def ExpandNames(self):
1340
1341     if not isinstance(self.op.instances, list):
1342       raise errors.OpPrereqError("Invalid argument type 'instances'")
1343
1344     if self.op.instances:
1345       self.wanted_names = []
1346       for name in self.op.instances:
1347         full_name = self.cfg.ExpandInstanceName(name)
1348         if full_name is None:
1349           raise errors.OpPrereqError("Instance '%s' not known" % name)
1350         self.wanted_names.append(full_name)
1351       self.needed_locks = {
1352         locking.LEVEL_NODE: [],
1353         locking.LEVEL_INSTANCE: self.wanted_names,
1354         }
1355       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1356     else:
1357       self.wanted_names = None
1358       self.needed_locks = {
1359         locking.LEVEL_NODE: locking.ALL_SET,
1360         locking.LEVEL_INSTANCE: locking.ALL_SET,
1361         }
1362     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1363
1364   def DeclareLocks(self, level):
1365     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1366       self._LockInstancesNodes(primary_only=True)
1367
1368   def CheckPrereq(self):
1369     """Check prerequisites.
1370
1371     This only checks the optional instance list against the existing names.
1372
1373     """
1374     if self.wanted_names is None:
1375       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1376
1377     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1378                              in self.wanted_names]
1379
1380   def Exec(self, feedback_fn):
1381     """Verify the size of cluster disks.
1382
1383     """
1384     # TODO: check child disks too
1385     # TODO: check differences in size between primary/secondary nodes
1386     per_node_disks = {}
1387     for instance in self.wanted_instances:
1388       pnode = instance.primary_node
1389       if pnode not in per_node_disks:
1390         per_node_disks[pnode] = []
1391       for idx, disk in enumerate(instance.disks):
1392         per_node_disks[pnode].append((instance, idx, disk))
1393
1394     changed = []
1395     for node, dskl in per_node_disks.items():
1396       newl = [v[2].Copy() for v in dskl]
1397       for dsk in newl:
1398         self.cfg.SetDiskID(dsk, node)
1399       result = self.rpc.call_blockdev_getsizes(node, newl)
1400       if result.failed:
1401         self.LogWarning("Failure in blockdev_getsizes call to node"
1402                         " %s, ignoring", node)
1403         continue
1404       if len(result.data) != len(dskl):
1405         self.LogWarning("Invalid result from node %s, ignoring node results",
1406                         node)
1407         continue
1408       for ((instance, idx, disk), size) in zip(dskl, result.data):
1409         if size is None:
1410           self.LogWarning("Disk %d of instance %s did not return size"
1411                           " information, ignoring", idx, instance.name)
1412           continue
1413         if not isinstance(size, (int, long)):
1414           self.LogWarning("Disk %d of instance %s did not return valid"
1415                           " size information, ignoring", idx, instance.name)
1416           continue
1417         size = size >> 20
1418         if size != disk.size:
1419           self.LogInfo("Disk %d of instance %s has mismatched size,"
1420                        " correcting: recorded %d, actual %d", idx,
1421                        instance.name, disk.size, size)
1422           disk.size = size
1423           self.cfg.Update(instance)
1424           changed.append((instance.name, idx, size))
1425     return changed
1426
1427
1428 class LURenameCluster(LogicalUnit):
1429   """Rename the cluster.
1430
1431   """
1432   HPATH = "cluster-rename"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = ["name"]
1435
1436   def BuildHooksEnv(self):
1437     """Build hooks env.
1438
1439     """
1440     env = {
1441       "OP_TARGET": self.cfg.GetClusterName(),
1442       "NEW_NAME": self.op.name,
1443       }
1444     mn = self.cfg.GetMasterNode()
1445     return env, [mn], [mn]
1446
1447   def CheckPrereq(self):
1448     """Verify that the passed name is a valid one.
1449
1450     """
1451     hostname = utils.HostInfo(self.op.name)
1452
1453     new_name = hostname.name
1454     self.ip = new_ip = hostname.ip
1455     old_name = self.cfg.GetClusterName()
1456     old_ip = self.cfg.GetMasterIP()
1457     if new_name == old_name and new_ip == old_ip:
1458       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1459                                  " cluster has changed")
1460     if new_ip != old_ip:
1461       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1462         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1463                                    " reachable on the network. Aborting." %
1464                                    new_ip)
1465
1466     self.op.name = new_name
1467
1468   def Exec(self, feedback_fn):
1469     """Rename the cluster.
1470
1471     """
1472     clustername = self.op.name
1473     ip = self.ip
1474
1475     # shutdown the master IP
1476     master = self.cfg.GetMasterNode()
1477     result = self.rpc.call_node_stop_master(master, False)
1478     if result.failed or not result.data:
1479       raise errors.OpExecError("Could not disable the master role")
1480
1481     try:
1482       cluster = self.cfg.GetClusterInfo()
1483       cluster.cluster_name = clustername
1484       cluster.master_ip = ip
1485       self.cfg.Update(cluster)
1486
1487       # update the known hosts file
1488       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1489       node_list = self.cfg.GetNodeList()
1490       try:
1491         node_list.remove(master)
1492       except ValueError:
1493         pass
1494       result = self.rpc.call_upload_file(node_list,
1495                                          constants.SSH_KNOWN_HOSTS_FILE)
1496       for to_node, to_result in result.iteritems():
1497         if to_result.failed or not to_result.data:
1498           logging.error("Copy of file %s to node %s failed",
1499                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1500
1501     finally:
1502       result = self.rpc.call_node_start_master(master, False, False)
1503       if result.failed or not result.data:
1504         self.LogWarning("Could not re-enable the master role on"
1505                         " the master, please restart manually.")
1506
1507
1508 def _RecursiveCheckIfLVMBased(disk):
1509   """Check if the given disk or its children are lvm-based.
1510
1511   @type disk: L{objects.Disk}
1512   @param disk: the disk to check
1513   @rtype: booleean
1514   @return: boolean indicating whether a LD_LV dev_type was found or not
1515
1516   """
1517   if disk.children:
1518     for chdisk in disk.children:
1519       if _RecursiveCheckIfLVMBased(chdisk):
1520         return True
1521   return disk.dev_type == constants.LD_LV
1522
1523
1524 class LUSetClusterParams(LogicalUnit):
1525   """Change the parameters of the cluster.
1526
1527   """
1528   HPATH = "cluster-modify"
1529   HTYPE = constants.HTYPE_CLUSTER
1530   _OP_REQP = []
1531   REQ_BGL = False
1532
1533   def CheckArguments(self):
1534     """Check parameters
1535
1536     """
1537     if not hasattr(self.op, "candidate_pool_size"):
1538       self.op.candidate_pool_size = None
1539     if self.op.candidate_pool_size is not None:
1540       try:
1541         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1542       except (ValueError, TypeError), err:
1543         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1544                                    str(err))
1545       if self.op.candidate_pool_size < 1:
1546         raise errors.OpPrereqError("At least one master candidate needed")
1547
1548   def ExpandNames(self):
1549     # FIXME: in the future maybe other cluster params won't require checking on
1550     # all nodes to be modified.
1551     self.needed_locks = {
1552       locking.LEVEL_NODE: locking.ALL_SET,
1553     }
1554     self.share_locks[locking.LEVEL_NODE] = 1
1555
1556   def BuildHooksEnv(self):
1557     """Build hooks env.
1558
1559     """
1560     env = {
1561       "OP_TARGET": self.cfg.GetClusterName(),
1562       "NEW_VG_NAME": self.op.vg_name,
1563       }
1564     mn = self.cfg.GetMasterNode()
1565     return env, [mn], [mn]
1566
1567   def CheckPrereq(self):
1568     """Check prerequisites.
1569
1570     This checks whether the given params don't conflict and
1571     if the given volume group is valid.
1572
1573     """
1574     if self.op.vg_name is not None and not self.op.vg_name:
1575       instances = self.cfg.GetAllInstancesInfo().values()
1576       for inst in instances:
1577         for disk in inst.disks:
1578           if _RecursiveCheckIfLVMBased(disk):
1579             raise errors.OpPrereqError("Cannot disable lvm storage while"
1580                                        " lvm-based instances exist")
1581
1582     node_list = self.acquired_locks[locking.LEVEL_NODE]
1583
1584     # if vg_name not None, checks given volume group on all nodes
1585     if self.op.vg_name:
1586       vglist = self.rpc.call_vg_list(node_list)
1587       for node in node_list:
1588         if vglist[node].failed:
1589           # ignoring down node
1590           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1591           continue
1592         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1593                                               self.op.vg_name,
1594                                               constants.MIN_VG_SIZE)
1595         if vgstatus:
1596           raise errors.OpPrereqError("Error on node '%s': %s" %
1597                                      (node, vgstatus))
1598
1599     self.cluster = cluster = self.cfg.GetClusterInfo()
1600     # validate beparams changes
1601     if self.op.beparams:
1602       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1603       self.new_beparams = cluster.FillDict(
1604         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1605
1606     # hypervisor list/parameters
1607     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1608     if self.op.hvparams:
1609       if not isinstance(self.op.hvparams, dict):
1610         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1611       for hv_name, hv_dict in self.op.hvparams.items():
1612         if hv_name not in self.new_hvparams:
1613           self.new_hvparams[hv_name] = hv_dict
1614         else:
1615           self.new_hvparams[hv_name].update(hv_dict)
1616
1617     if self.op.enabled_hypervisors is not None:
1618       self.hv_list = self.op.enabled_hypervisors
1619     else:
1620       self.hv_list = cluster.enabled_hypervisors
1621
1622     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1623       # either the enabled list has changed, or the parameters have, validate
1624       for hv_name, hv_params in self.new_hvparams.items():
1625         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1626             (self.op.enabled_hypervisors and
1627              hv_name in self.op.enabled_hypervisors)):
1628           # either this is a new hypervisor, or its parameters have changed
1629           hv_class = hypervisor.GetHypervisor(hv_name)
1630           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1631           hv_class.CheckParameterSyntax(hv_params)
1632           _CheckHVParams(self, node_list, hv_name, hv_params)
1633
1634   def Exec(self, feedback_fn):
1635     """Change the parameters of the cluster.
1636
1637     """
1638     if self.op.vg_name is not None:
1639       new_volume = self.op.vg_name
1640       if not new_volume:
1641         new_volume = None
1642       if new_volume != self.cfg.GetVGName():
1643         self.cfg.SetVGName(new_volume)
1644       else:
1645         feedback_fn("Cluster LVM configuration already in desired"
1646                     " state, not changing")
1647     if self.op.hvparams:
1648       self.cluster.hvparams = self.new_hvparams
1649     if self.op.enabled_hypervisors is not None:
1650       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1651     if self.op.beparams:
1652       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1653     if self.op.candidate_pool_size is not None:
1654       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1655       # we need to update the pool size here, otherwise the save will fail
1656       _AdjustCandidatePool(self)
1657
1658     self.cfg.Update(self.cluster)
1659
1660
1661 class LURedistributeConfig(NoHooksLU):
1662   """Force the redistribution of cluster configuration.
1663
1664   This is a very simple LU.
1665
1666   """
1667   _OP_REQP = []
1668   REQ_BGL = False
1669
1670   def ExpandNames(self):
1671     self.needed_locks = {
1672       locking.LEVEL_NODE: locking.ALL_SET,
1673     }
1674     self.share_locks[locking.LEVEL_NODE] = 1
1675
1676   def CheckPrereq(self):
1677     """Check prerequisites.
1678
1679     """
1680
1681   def Exec(self, feedback_fn):
1682     """Redistribute the configuration.
1683
1684     """
1685     self.cfg.Update(self.cfg.GetClusterInfo())
1686
1687
1688 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1689   """Sleep and poll for an instance's disk to sync.
1690
1691   """
1692   if not instance.disks:
1693     return True
1694
1695   if not oneshot:
1696     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1697
1698   node = instance.primary_node
1699
1700   for dev in instance.disks:
1701     lu.cfg.SetDiskID(dev, node)
1702
1703   retries = 0
1704   degr_retries = 10 # in seconds, as we sleep 1 second each time
1705   while True:
1706     max_time = 0
1707     done = True
1708     cumul_degraded = False
1709     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1710     if rstats.failed or not rstats.data:
1711       lu.LogWarning("Can't get any data from node %s", node)
1712       retries += 1
1713       if retries >= 10:
1714         raise errors.RemoteError("Can't contact node %s for mirror data,"
1715                                  " aborting." % node)
1716       time.sleep(6)
1717       continue
1718     rstats = rstats.data
1719     retries = 0
1720     for i, mstat in enumerate(rstats):
1721       if mstat is None:
1722         lu.LogWarning("Can't compute data for node %s/%s",
1723                            node, instance.disks[i].iv_name)
1724         continue
1725       # we ignore the ldisk parameter
1726       perc_done, est_time, is_degraded, _ = mstat
1727       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1728       if perc_done is not None:
1729         done = False
1730         if est_time is not None:
1731           rem_time = "%d estimated seconds remaining" % est_time
1732           max_time = est_time
1733         else:
1734           rem_time = "no time estimate"
1735         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1736                         (instance.disks[i].iv_name, perc_done, rem_time))
1737
1738     # if we're done but degraded, let's do a few small retries, to
1739     # make sure we see a stable and not transient situation; therefore
1740     # we force restart of the loop
1741     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1742       logging.info("Degraded disks found, %d retries left", degr_retries)
1743       degr_retries -= 1
1744       time.sleep(1)
1745       continue
1746
1747     if done or oneshot:
1748       break
1749
1750     time.sleep(min(60, max_time))
1751
1752   if done:
1753     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1754   return not cumul_degraded
1755
1756
1757 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1758   """Check that mirrors are not degraded.
1759
1760   The ldisk parameter, if True, will change the test from the
1761   is_degraded attribute (which represents overall non-ok status for
1762   the device(s)) to the ldisk (representing the local storage status).
1763
1764   """
1765   lu.cfg.SetDiskID(dev, node)
1766   if ldisk:
1767     idx = 6
1768   else:
1769     idx = 5
1770
1771   result = True
1772   if on_primary or dev.AssembleOnSecondary():
1773     rstats = lu.rpc.call_blockdev_find(node, dev)
1774     msg = rstats.RemoteFailMsg()
1775     if msg:
1776       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1777       result = False
1778     elif not rstats.payload:
1779       lu.LogWarning("Can't find disk on node %s", node)
1780       result = False
1781     else:
1782       result = result and (not rstats.payload[idx])
1783   if dev.children:
1784     for child in dev.children:
1785       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1786
1787   return result
1788
1789
1790 class LUDiagnoseOS(NoHooksLU):
1791   """Logical unit for OS diagnose/query.
1792
1793   """
1794   _OP_REQP = ["output_fields", "names"]
1795   REQ_BGL = False
1796   _FIELDS_STATIC = utils.FieldSet()
1797   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1798
1799   def ExpandNames(self):
1800     if self.op.names:
1801       raise errors.OpPrereqError("Selective OS query not supported")
1802
1803     _CheckOutputFields(static=self._FIELDS_STATIC,
1804                        dynamic=self._FIELDS_DYNAMIC,
1805                        selected=self.op.output_fields)
1806
1807     # Lock all nodes, in shared mode
1808     # Temporary removal of locks, should be reverted later
1809     # TODO: reintroduce locks when they are lighter-weight
1810     self.needed_locks = {}
1811     #self.share_locks[locking.LEVEL_NODE] = 1
1812     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1813
1814   def CheckPrereq(self):
1815     """Check prerequisites.
1816
1817     """
1818
1819   @staticmethod
1820   def _DiagnoseByOS(node_list, rlist):
1821     """Remaps a per-node return list into an a per-os per-node dictionary
1822
1823     @param node_list: a list with the names of all nodes
1824     @param rlist: a map with node names as keys and OS objects as values
1825
1826     @rtype: dict
1827     @return: a dictionary with osnames as keys and as value another map, with
1828         nodes as keys and list of OS objects as values, eg::
1829
1830           {"debian-etch": {"node1": [<object>,...],
1831                            "node2": [<object>,]}
1832           }
1833
1834     """
1835     all_os = {}
1836     # we build here the list of nodes that didn't fail the RPC (at RPC
1837     # level), so that nodes with a non-responding node daemon don't
1838     # make all OSes invalid
1839     good_nodes = [node_name for node_name in rlist
1840                   if not rlist[node_name].failed]
1841     for node_name, nr in rlist.iteritems():
1842       if nr.failed or not nr.data:
1843         continue
1844       for os_obj in nr.data:
1845         if os_obj.name not in all_os:
1846           # build a list of nodes for this os containing empty lists
1847           # for each node in node_list
1848           all_os[os_obj.name] = {}
1849           for nname in good_nodes:
1850             all_os[os_obj.name][nname] = []
1851         all_os[os_obj.name][node_name].append(os_obj)
1852     return all_os
1853
1854   def Exec(self, feedback_fn):
1855     """Compute the list of OSes.
1856
1857     """
1858     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1859     node_data = self.rpc.call_os_diagnose(valid_nodes)
1860     if node_data == False:
1861       raise errors.OpExecError("Can't gather the list of OSes")
1862     pol = self._DiagnoseByOS(valid_nodes, node_data)
1863     output = []
1864     for os_name, os_data in pol.iteritems():
1865       row = []
1866       for field in self.op.output_fields:
1867         if field == "name":
1868           val = os_name
1869         elif field == "valid":
1870           val = utils.all([osl and osl[0] for osl in os_data.values()])
1871         elif field == "node_status":
1872           val = {}
1873           for node_name, nos_list in os_data.iteritems():
1874             val[node_name] = [(v.status, v.path) for v in nos_list]
1875         else:
1876           raise errors.ParameterError(field)
1877         row.append(val)
1878       output.append(row)
1879
1880     return output
1881
1882
1883 class LURemoveNode(LogicalUnit):
1884   """Logical unit for removing a node.
1885
1886   """
1887   HPATH = "node-remove"
1888   HTYPE = constants.HTYPE_NODE
1889   _OP_REQP = ["node_name"]
1890
1891   def BuildHooksEnv(self):
1892     """Build hooks env.
1893
1894     This doesn't run on the target node in the pre phase as a failed
1895     node would then be impossible to remove.
1896
1897     """
1898     env = {
1899       "OP_TARGET": self.op.node_name,
1900       "NODE_NAME": self.op.node_name,
1901       }
1902     all_nodes = self.cfg.GetNodeList()
1903     all_nodes.remove(self.op.node_name)
1904     return env, all_nodes, all_nodes
1905
1906   def CheckPrereq(self):
1907     """Check prerequisites.
1908
1909     This checks:
1910      - the node exists in the configuration
1911      - it does not have primary or secondary instances
1912      - it's not the master
1913
1914     Any errors are signalled by raising errors.OpPrereqError.
1915
1916     """
1917     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1918     if node is None:
1919       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1920
1921     instance_list = self.cfg.GetInstanceList()
1922
1923     masternode = self.cfg.GetMasterNode()
1924     if node.name == masternode:
1925       raise errors.OpPrereqError("Node is the master node,"
1926                                  " you need to failover first.")
1927
1928     for instance_name in instance_list:
1929       instance = self.cfg.GetInstanceInfo(instance_name)
1930       if node.name in instance.all_nodes:
1931         raise errors.OpPrereqError("Instance %s is still running on the node,"
1932                                    " please remove first." % instance_name)
1933     self.op.node_name = node.name
1934     self.node = node
1935
1936   def Exec(self, feedback_fn):
1937     """Removes the node from the cluster.
1938
1939     """
1940     node = self.node
1941     logging.info("Stopping the node daemon and removing configs from node %s",
1942                  node.name)
1943
1944     self.context.RemoveNode(node.name)
1945
1946     self.rpc.call_node_leave_cluster(node.name)
1947
1948     # Promote nodes to master candidate as needed
1949     _AdjustCandidatePool(self)
1950
1951
1952 class LUQueryNodes(NoHooksLU):
1953   """Logical unit for querying nodes.
1954
1955   """
1956   _OP_REQP = ["output_fields", "names", "use_locking"]
1957   REQ_BGL = False
1958   _FIELDS_DYNAMIC = utils.FieldSet(
1959     "dtotal", "dfree",
1960     "mtotal", "mnode", "mfree",
1961     "bootid",
1962     "ctotal", "cnodes", "csockets",
1963     )
1964
1965   _FIELDS_STATIC = utils.FieldSet(
1966     "name", "pinst_cnt", "sinst_cnt",
1967     "pinst_list", "sinst_list",
1968     "pip", "sip", "tags",
1969     "serial_no",
1970     "master_candidate",
1971     "master",
1972     "offline",
1973     "drained",
1974     "role",
1975     )
1976
1977   def ExpandNames(self):
1978     _CheckOutputFields(static=self._FIELDS_STATIC,
1979                        dynamic=self._FIELDS_DYNAMIC,
1980                        selected=self.op.output_fields)
1981
1982     self.needed_locks = {}
1983     self.share_locks[locking.LEVEL_NODE] = 1
1984
1985     if self.op.names:
1986       self.wanted = _GetWantedNodes(self, self.op.names)
1987     else:
1988       self.wanted = locking.ALL_SET
1989
1990     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1991     self.do_locking = self.do_node_query and self.op.use_locking
1992     if self.do_locking:
1993       # if we don't request only static fields, we need to lock the nodes
1994       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1995
1996
1997   def CheckPrereq(self):
1998     """Check prerequisites.
1999
2000     """
2001     # The validation of the node list is done in the _GetWantedNodes,
2002     # if non empty, and if empty, there's no validation to do
2003     pass
2004
2005   def Exec(self, feedback_fn):
2006     """Computes the list of nodes and their attributes.
2007
2008     """
2009     all_info = self.cfg.GetAllNodesInfo()
2010     if self.do_locking:
2011       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2012     elif self.wanted != locking.ALL_SET:
2013       nodenames = self.wanted
2014       missing = set(nodenames).difference(all_info.keys())
2015       if missing:
2016         raise errors.OpExecError(
2017           "Some nodes were removed before retrieving their data: %s" % missing)
2018     else:
2019       nodenames = all_info.keys()
2020
2021     nodenames = utils.NiceSort(nodenames)
2022     nodelist = [all_info[name] for name in nodenames]
2023
2024     # begin data gathering
2025
2026     if self.do_node_query:
2027       live_data = {}
2028       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2029                                           self.cfg.GetHypervisorType())
2030       for name in nodenames:
2031         nodeinfo = node_data[name]
2032         if not nodeinfo.failed and nodeinfo.data:
2033           nodeinfo = nodeinfo.data
2034           fn = utils.TryConvert
2035           live_data[name] = {
2036             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2037             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2038             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2039             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2040             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2041             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2042             "bootid": nodeinfo.get('bootid', None),
2043             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2044             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2045             }
2046         else:
2047           live_data[name] = {}
2048     else:
2049       live_data = dict.fromkeys(nodenames, {})
2050
2051     node_to_primary = dict([(name, set()) for name in nodenames])
2052     node_to_secondary = dict([(name, set()) for name in nodenames])
2053
2054     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2055                              "sinst_cnt", "sinst_list"))
2056     if inst_fields & frozenset(self.op.output_fields):
2057       instancelist = self.cfg.GetInstanceList()
2058
2059       for instance_name in instancelist:
2060         inst = self.cfg.GetInstanceInfo(instance_name)
2061         if inst.primary_node in node_to_primary:
2062           node_to_primary[inst.primary_node].add(inst.name)
2063         for secnode in inst.secondary_nodes:
2064           if secnode in node_to_secondary:
2065             node_to_secondary[secnode].add(inst.name)
2066
2067     master_node = self.cfg.GetMasterNode()
2068
2069     # end data gathering
2070
2071     output = []
2072     for node in nodelist:
2073       node_output = []
2074       for field in self.op.output_fields:
2075         if field == "name":
2076           val = node.name
2077         elif field == "pinst_list":
2078           val = list(node_to_primary[node.name])
2079         elif field == "sinst_list":
2080           val = list(node_to_secondary[node.name])
2081         elif field == "pinst_cnt":
2082           val = len(node_to_primary[node.name])
2083         elif field == "sinst_cnt":
2084           val = len(node_to_secondary[node.name])
2085         elif field == "pip":
2086           val = node.primary_ip
2087         elif field == "sip":
2088           val = node.secondary_ip
2089         elif field == "tags":
2090           val = list(node.GetTags())
2091         elif field == "serial_no":
2092           val = node.serial_no
2093         elif field == "master_candidate":
2094           val = node.master_candidate
2095         elif field == "master":
2096           val = node.name == master_node
2097         elif field == "offline":
2098           val = node.offline
2099         elif field == "drained":
2100           val = node.drained
2101         elif self._FIELDS_DYNAMIC.Matches(field):
2102           val = live_data[node.name].get(field, None)
2103         elif field == "role":
2104           if node.name == master_node:
2105             val = "M"
2106           elif node.master_candidate:
2107             val = "C"
2108           elif node.drained:
2109             val = "D"
2110           elif node.offline:
2111             val = "O"
2112           else:
2113             val = "R"
2114         else:
2115           raise errors.ParameterError(field)
2116         node_output.append(val)
2117       output.append(node_output)
2118
2119     return output
2120
2121
2122 class LUQueryNodeVolumes(NoHooksLU):
2123   """Logical unit for getting volumes on node(s).
2124
2125   """
2126   _OP_REQP = ["nodes", "output_fields"]
2127   REQ_BGL = False
2128   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2129   _FIELDS_STATIC = utils.FieldSet("node")
2130
2131   def ExpandNames(self):
2132     _CheckOutputFields(static=self._FIELDS_STATIC,
2133                        dynamic=self._FIELDS_DYNAMIC,
2134                        selected=self.op.output_fields)
2135
2136     self.needed_locks = {}
2137     self.share_locks[locking.LEVEL_NODE] = 1
2138     if not self.op.nodes:
2139       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2140     else:
2141       self.needed_locks[locking.LEVEL_NODE] = \
2142         _GetWantedNodes(self, self.op.nodes)
2143
2144   def CheckPrereq(self):
2145     """Check prerequisites.
2146
2147     This checks that the fields required are valid output fields.
2148
2149     """
2150     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2151
2152   def Exec(self, feedback_fn):
2153     """Computes the list of nodes and their attributes.
2154
2155     """
2156     nodenames = self.nodes
2157     volumes = self.rpc.call_node_volumes(nodenames)
2158
2159     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2160              in self.cfg.GetInstanceList()]
2161
2162     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2163
2164     output = []
2165     for node in nodenames:
2166       if node not in volumes or volumes[node].failed or not volumes[node].data:
2167         continue
2168
2169       node_vols = volumes[node].data[:]
2170       node_vols.sort(key=lambda vol: vol['dev'])
2171
2172       for vol in node_vols:
2173         node_output = []
2174         for field in self.op.output_fields:
2175           if field == "node":
2176             val = node
2177           elif field == "phys":
2178             val = vol['dev']
2179           elif field == "vg":
2180             val = vol['vg']
2181           elif field == "name":
2182             val = vol['name']
2183           elif field == "size":
2184             val = int(float(vol['size']))
2185           elif field == "instance":
2186             for inst in ilist:
2187               if node not in lv_by_node[inst]:
2188                 continue
2189               if vol['name'] in lv_by_node[inst][node]:
2190                 val = inst.name
2191                 break
2192             else:
2193               val = '-'
2194           else:
2195             raise errors.ParameterError(field)
2196           node_output.append(str(val))
2197
2198         output.append(node_output)
2199
2200     return output
2201
2202
2203 class LUAddNode(LogicalUnit):
2204   """Logical unit for adding node to the cluster.
2205
2206   """
2207   HPATH = "node-add"
2208   HTYPE = constants.HTYPE_NODE
2209   _OP_REQP = ["node_name"]
2210
2211   def BuildHooksEnv(self):
2212     """Build hooks env.
2213
2214     This will run on all nodes before, and on all nodes + the new node after.
2215
2216     """
2217     env = {
2218       "OP_TARGET": self.op.node_name,
2219       "NODE_NAME": self.op.node_name,
2220       "NODE_PIP": self.op.primary_ip,
2221       "NODE_SIP": self.op.secondary_ip,
2222       }
2223     nodes_0 = self.cfg.GetNodeList()
2224     nodes_1 = nodes_0 + [self.op.node_name, ]
2225     return env, nodes_0, nodes_1
2226
2227   def CheckPrereq(self):
2228     """Check prerequisites.
2229
2230     This checks:
2231      - the new node is not already in the config
2232      - it is resolvable
2233      - its parameters (single/dual homed) matches the cluster
2234
2235     Any errors are signalled by raising errors.OpPrereqError.
2236
2237     """
2238     node_name = self.op.node_name
2239     cfg = self.cfg
2240
2241     dns_data = utils.HostInfo(node_name)
2242
2243     node = dns_data.name
2244     primary_ip = self.op.primary_ip = dns_data.ip
2245     secondary_ip = getattr(self.op, "secondary_ip", None)
2246     if secondary_ip is None:
2247       secondary_ip = primary_ip
2248     if not utils.IsValidIP(secondary_ip):
2249       raise errors.OpPrereqError("Invalid secondary IP given")
2250     self.op.secondary_ip = secondary_ip
2251
2252     node_list = cfg.GetNodeList()
2253     if not self.op.readd and node in node_list:
2254       raise errors.OpPrereqError("Node %s is already in the configuration" %
2255                                  node)
2256     elif self.op.readd and node not in node_list:
2257       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2258
2259     for existing_node_name in node_list:
2260       existing_node = cfg.GetNodeInfo(existing_node_name)
2261
2262       if self.op.readd and node == existing_node_name:
2263         if (existing_node.primary_ip != primary_ip or
2264             existing_node.secondary_ip != secondary_ip):
2265           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2266                                      " address configuration as before")
2267         continue
2268
2269       if (existing_node.primary_ip == primary_ip or
2270           existing_node.secondary_ip == primary_ip or
2271           existing_node.primary_ip == secondary_ip or
2272           existing_node.secondary_ip == secondary_ip):
2273         raise errors.OpPrereqError("New node ip address(es) conflict with"
2274                                    " existing node %s" % existing_node.name)
2275
2276     # check that the type of the node (single versus dual homed) is the
2277     # same as for the master
2278     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2279     master_singlehomed = myself.secondary_ip == myself.primary_ip
2280     newbie_singlehomed = secondary_ip == primary_ip
2281     if master_singlehomed != newbie_singlehomed:
2282       if master_singlehomed:
2283         raise errors.OpPrereqError("The master has no private ip but the"
2284                                    " new node has one")
2285       else:
2286         raise errors.OpPrereqError("The master has a private ip but the"
2287                                    " new node doesn't have one")
2288
2289     # checks reachablity
2290     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2291       raise errors.OpPrereqError("Node not reachable by ping")
2292
2293     if not newbie_singlehomed:
2294       # check reachability from my secondary ip to newbie's secondary ip
2295       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2296                            source=myself.secondary_ip):
2297         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2298                                    " based ping to noded port")
2299
2300     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2301     if self.op.readd:
2302       exceptions = [node]
2303     else:
2304       exceptions = []
2305     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2306     # the new node will increase mc_max with one, so:
2307     mc_max = min(mc_max + 1, cp_size)
2308     self.master_candidate = mc_now < mc_max
2309
2310     if self.op.readd:
2311       self.new_node = self.cfg.GetNodeInfo(node)
2312       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2313     else:
2314       self.new_node = objects.Node(name=node,
2315                                    primary_ip=primary_ip,
2316                                    secondary_ip=secondary_ip,
2317                                    master_candidate=self.master_candidate,
2318                                    offline=False, drained=False)
2319
2320   def Exec(self, feedback_fn):
2321     """Adds the new node to the cluster.
2322
2323     """
2324     new_node = self.new_node
2325     node = new_node.name
2326
2327     # for re-adds, reset the offline/drained/master-candidate flags;
2328     # we need to reset here, otherwise offline would prevent RPC calls
2329     # later in the procedure; this also means that if the re-add
2330     # fails, we are left with a non-offlined, broken node
2331     if self.op.readd:
2332       new_node.drained = new_node.offline = False
2333       self.LogInfo("Readding a node, the offline/drained flags were reset")
2334       # if we demote the node, we do cleanup later in the procedure
2335       new_node.master_candidate = self.master_candidate
2336
2337     # notify the user about any possible mc promotion
2338     if new_node.master_candidate:
2339       self.LogInfo("Node will be a master candidate")
2340
2341     # check connectivity
2342     result = self.rpc.call_version([node])[node]
2343     result.Raise()
2344     if result.data:
2345       if constants.PROTOCOL_VERSION == result.data:
2346         logging.info("Communication to node %s fine, sw version %s match",
2347                      node, result.data)
2348       else:
2349         raise errors.OpExecError("Version mismatch master version %s,"
2350                                  " node version %s" %
2351                                  (constants.PROTOCOL_VERSION, result.data))
2352     else:
2353       raise errors.OpExecError("Cannot get version from the new node")
2354
2355     # setup ssh on node
2356     logging.info("Copy ssh key to node %s", node)
2357     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2358     keyarray = []
2359     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2360                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2361                 priv_key, pub_key]
2362
2363     for i in keyfiles:
2364       f = open(i, 'r')
2365       try:
2366         keyarray.append(f.read())
2367       finally:
2368         f.close()
2369
2370     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2371                                     keyarray[2],
2372                                     keyarray[3], keyarray[4], keyarray[5])
2373
2374     msg = result.RemoteFailMsg()
2375     if msg:
2376       raise errors.OpExecError("Cannot transfer ssh keys to the"
2377                                " new node: %s" % msg)
2378
2379     # Add node to our /etc/hosts, and add key to known_hosts
2380     utils.AddHostToEtcHosts(new_node.name)
2381
2382     if new_node.secondary_ip != new_node.primary_ip:
2383       result = self.rpc.call_node_has_ip_address(new_node.name,
2384                                                  new_node.secondary_ip)
2385       if result.failed or not result.data:
2386         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2387                                  " you gave (%s). Please fix and re-run this"
2388                                  " command." % new_node.secondary_ip)
2389
2390     node_verify_list = [self.cfg.GetMasterNode()]
2391     node_verify_param = {
2392       'nodelist': [node],
2393       # TODO: do a node-net-test as well?
2394     }
2395
2396     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2397                                        self.cfg.GetClusterName())
2398     for verifier in node_verify_list:
2399       if result[verifier].failed or not result[verifier].data:
2400         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2401                                  " for remote verification" % verifier)
2402       if result[verifier].data['nodelist']:
2403         for failed in result[verifier].data['nodelist']:
2404           feedback_fn("ssh/hostname verification failed %s -> %s" %
2405                       (verifier, result[verifier].data['nodelist'][failed]))
2406         raise errors.OpExecError("ssh/hostname verification failed.")
2407
2408     # Distribute updated /etc/hosts and known_hosts to all nodes,
2409     # including the node just added
2410     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2411     dist_nodes = self.cfg.GetNodeList()
2412     if not self.op.readd:
2413       dist_nodes.append(node)
2414     if myself.name in dist_nodes:
2415       dist_nodes.remove(myself.name)
2416
2417     logging.debug("Copying hosts and known_hosts to all nodes")
2418     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2419       result = self.rpc.call_upload_file(dist_nodes, fname)
2420       for to_node, to_result in result.iteritems():
2421         if to_result.failed or not to_result.data:
2422           logging.error("Copy of file %s to node %s failed", fname, to_node)
2423
2424     to_copy = []
2425     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2426     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2427       to_copy.append(constants.VNC_PASSWORD_FILE)
2428
2429     for fname in to_copy:
2430       result = self.rpc.call_upload_file([node], fname)
2431       if result[node].failed or not result[node]:
2432         logging.error("Could not copy file %s to node %s", fname, node)
2433
2434     if self.op.readd:
2435       self.context.ReaddNode(new_node)
2436       # make sure we redistribute the config
2437       self.cfg.Update(new_node)
2438       # and make sure the new node will not have old files around
2439       if not new_node.master_candidate:
2440         result = self.rpc.call_node_demote_from_mc(new_node.name)
2441         msg = result.RemoteFailMsg()
2442         if msg:
2443           self.LogWarning("Node failed to demote itself from master"
2444                           " candidate status: %s" % msg)
2445     else:
2446       self.context.AddNode(new_node)
2447
2448
2449 class LUSetNodeParams(LogicalUnit):
2450   """Modifies the parameters of a node.
2451
2452   """
2453   HPATH = "node-modify"
2454   HTYPE = constants.HTYPE_NODE
2455   _OP_REQP = ["node_name"]
2456   REQ_BGL = False
2457
2458   def CheckArguments(self):
2459     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2460     if node_name is None:
2461       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2462     self.op.node_name = node_name
2463     _CheckBooleanOpField(self.op, 'master_candidate')
2464     _CheckBooleanOpField(self.op, 'offline')
2465     _CheckBooleanOpField(self.op, 'drained')
2466     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2467     if all_mods.count(None) == 3:
2468       raise errors.OpPrereqError("Please pass at least one modification")
2469     if all_mods.count(True) > 1:
2470       raise errors.OpPrereqError("Can't set the node into more than one"
2471                                  " state at the same time")
2472
2473   def ExpandNames(self):
2474     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2475
2476   def BuildHooksEnv(self):
2477     """Build hooks env.
2478
2479     This runs on the master node.
2480
2481     """
2482     env = {
2483       "OP_TARGET": self.op.node_name,
2484       "MASTER_CANDIDATE": str(self.op.master_candidate),
2485       "OFFLINE": str(self.op.offline),
2486       "DRAINED": str(self.op.drained),
2487       }
2488     nl = [self.cfg.GetMasterNode(),
2489           self.op.node_name]
2490     return env, nl, nl
2491
2492   def CheckPrereq(self):
2493     """Check prerequisites.
2494
2495     This only checks the instance list against the existing names.
2496
2497     """
2498     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2499
2500     if ((self.op.master_candidate == False or self.op.offline == True or
2501          self.op.drained == True) and node.master_candidate):
2502       # we will demote the node from master_candidate
2503       if self.op.node_name == self.cfg.GetMasterNode():
2504         raise errors.OpPrereqError("The master node has to be a"
2505                                    " master candidate, online and not drained")
2506       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2507       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2508       if num_candidates <= cp_size:
2509         msg = ("Not enough master candidates (desired"
2510                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2511         if self.op.force:
2512           self.LogWarning(msg)
2513         else:
2514           raise errors.OpPrereqError(msg)
2515
2516     if (self.op.master_candidate == True and
2517         ((node.offline and not self.op.offline == False) or
2518          (node.drained and not self.op.drained == False))):
2519       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2520                                  " to master_candidate" % node.name)
2521
2522     return
2523
2524   def Exec(self, feedback_fn):
2525     """Modifies a node.
2526
2527     """
2528     node = self.node
2529
2530     result = []
2531     changed_mc = False
2532
2533     if self.op.offline is not None:
2534       node.offline = self.op.offline
2535       result.append(("offline", str(self.op.offline)))
2536       if self.op.offline == True:
2537         if node.master_candidate:
2538           node.master_candidate = False
2539           changed_mc = True
2540           result.append(("master_candidate", "auto-demotion due to offline"))
2541         if node.drained:
2542           node.drained = False
2543           result.append(("drained", "clear drained status due to offline"))
2544
2545     if self.op.master_candidate is not None:
2546       node.master_candidate = self.op.master_candidate
2547       changed_mc = True
2548       result.append(("master_candidate", str(self.op.master_candidate)))
2549       if self.op.master_candidate == False:
2550         rrc = self.rpc.call_node_demote_from_mc(node.name)
2551         msg = rrc.RemoteFailMsg()
2552         if msg:
2553           self.LogWarning("Node failed to demote itself: %s" % msg)
2554
2555     if self.op.drained is not None:
2556       node.drained = self.op.drained
2557       result.append(("drained", str(self.op.drained)))
2558       if self.op.drained == True:
2559         if node.master_candidate:
2560           node.master_candidate = False
2561           changed_mc = True
2562           result.append(("master_candidate", "auto-demotion due to drain"))
2563           rrc = self.rpc.call_node_demote_from_mc(node.name)
2564           msg = rrc.RemoteFailMsg()
2565           if msg:
2566             self.LogWarning("Node failed to demote itself: %s" % msg)
2567         if node.offline:
2568           node.offline = False
2569           result.append(("offline", "clear offline status due to drain"))
2570
2571     # this will trigger configuration file update, if needed
2572     self.cfg.Update(node)
2573     # this will trigger job queue propagation or cleanup
2574     if changed_mc:
2575       self.context.ReaddNode(node)
2576
2577     return result
2578
2579
2580 class LUQueryClusterInfo(NoHooksLU):
2581   """Query cluster configuration.
2582
2583   """
2584   _OP_REQP = []
2585   REQ_BGL = False
2586
2587   def ExpandNames(self):
2588     self.needed_locks = {}
2589
2590   def CheckPrereq(self):
2591     """No prerequsites needed for this LU.
2592
2593     """
2594     pass
2595
2596   def Exec(self, feedback_fn):
2597     """Return cluster config.
2598
2599     """
2600     cluster = self.cfg.GetClusterInfo()
2601     result = {
2602       "software_version": constants.RELEASE_VERSION,
2603       "protocol_version": constants.PROTOCOL_VERSION,
2604       "config_version": constants.CONFIG_VERSION,
2605       "os_api_version": constants.OS_API_VERSION,
2606       "export_version": constants.EXPORT_VERSION,
2607       "architecture": (platform.architecture()[0], platform.machine()),
2608       "name": cluster.cluster_name,
2609       "master": cluster.master_node,
2610       "default_hypervisor": cluster.default_hypervisor,
2611       "enabled_hypervisors": cluster.enabled_hypervisors,
2612       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2613                         for hypervisor in cluster.enabled_hypervisors]),
2614       "beparams": cluster.beparams,
2615       "candidate_pool_size": cluster.candidate_pool_size,
2616       "default_bridge": cluster.default_bridge,
2617       "master_netdev": cluster.master_netdev,
2618       "volume_group_name": cluster.volume_group_name,
2619       "file_storage_dir": cluster.file_storage_dir,
2620       }
2621
2622     return result
2623
2624
2625 class LUQueryConfigValues(NoHooksLU):
2626   """Return configuration values.
2627
2628   """
2629   _OP_REQP = []
2630   REQ_BGL = False
2631   _FIELDS_DYNAMIC = utils.FieldSet()
2632   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2633
2634   def ExpandNames(self):
2635     self.needed_locks = {}
2636
2637     _CheckOutputFields(static=self._FIELDS_STATIC,
2638                        dynamic=self._FIELDS_DYNAMIC,
2639                        selected=self.op.output_fields)
2640
2641   def CheckPrereq(self):
2642     """No prerequisites.
2643
2644     """
2645     pass
2646
2647   def Exec(self, feedback_fn):
2648     """Dump a representation of the cluster config to the standard output.
2649
2650     """
2651     values = []
2652     for field in self.op.output_fields:
2653       if field == "cluster_name":
2654         entry = self.cfg.GetClusterName()
2655       elif field == "master_node":
2656         entry = self.cfg.GetMasterNode()
2657       elif field == "drain_flag":
2658         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2659       else:
2660         raise errors.ParameterError(field)
2661       values.append(entry)
2662     return values
2663
2664
2665 class LUActivateInstanceDisks(NoHooksLU):
2666   """Bring up an instance's disks.
2667
2668   """
2669   _OP_REQP = ["instance_name"]
2670   REQ_BGL = False
2671
2672   def ExpandNames(self):
2673     self._ExpandAndLockInstance()
2674     self.needed_locks[locking.LEVEL_NODE] = []
2675     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2676
2677   def DeclareLocks(self, level):
2678     if level == locking.LEVEL_NODE:
2679       self._LockInstancesNodes()
2680
2681   def CheckPrereq(self):
2682     """Check prerequisites.
2683
2684     This checks that the instance is in the cluster.
2685
2686     """
2687     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2688     assert self.instance is not None, \
2689       "Cannot retrieve locked instance %s" % self.op.instance_name
2690     _CheckNodeOnline(self, self.instance.primary_node)
2691     if not hasattr(self.op, "ignore_size"):
2692       self.op.ignore_size = False
2693
2694   def Exec(self, feedback_fn):
2695     """Activate the disks.
2696
2697     """
2698     disks_ok, disks_info = \
2699               _AssembleInstanceDisks(self, self.instance,
2700                                      ignore_size=self.op.ignore_size)
2701     if not disks_ok:
2702       raise errors.OpExecError("Cannot activate block devices")
2703
2704     return disks_info
2705
2706
2707 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2708                            ignore_size=False):
2709   """Prepare the block devices for an instance.
2710
2711   This sets up the block devices on all nodes.
2712
2713   @type lu: L{LogicalUnit}
2714   @param lu: the logical unit on whose behalf we execute
2715   @type instance: L{objects.Instance}
2716   @param instance: the instance for whose disks we assemble
2717   @type ignore_secondaries: boolean
2718   @param ignore_secondaries: if true, errors on secondary nodes
2719       won't result in an error return from the function
2720   @type ignore_size: boolean
2721   @param ignore_size: if true, the current known size of the disk
2722       will not be used during the disk activation, useful for cases
2723       when the size is wrong
2724   @return: False if the operation failed, otherwise a list of
2725       (host, instance_visible_name, node_visible_name)
2726       with the mapping from node devices to instance devices
2727
2728   """
2729   device_info = []
2730   disks_ok = True
2731   iname = instance.name
2732   # With the two passes mechanism we try to reduce the window of
2733   # opportunity for the race condition of switching DRBD to primary
2734   # before handshaking occured, but we do not eliminate it
2735
2736   # The proper fix would be to wait (with some limits) until the
2737   # connection has been made and drbd transitions from WFConnection
2738   # into any other network-connected state (Connected, SyncTarget,
2739   # SyncSource, etc.)
2740
2741   # 1st pass, assemble on all nodes in secondary mode
2742   for inst_disk in instance.disks:
2743     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2744       if ignore_size:
2745         node_disk = node_disk.Copy()
2746         node_disk.UnsetSize()
2747       lu.cfg.SetDiskID(node_disk, node)
2748       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2749       msg = result.RemoteFailMsg()
2750       if msg:
2751         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2752                            " (is_primary=False, pass=1): %s",
2753                            inst_disk.iv_name, node, msg)
2754         if not ignore_secondaries:
2755           disks_ok = False
2756
2757   # FIXME: race condition on drbd migration to primary
2758
2759   # 2nd pass, do only the primary node
2760   for inst_disk in instance.disks:
2761     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2762       if node != instance.primary_node:
2763         continue
2764       if ignore_size:
2765         node_disk = node_disk.Copy()
2766         node_disk.UnsetSize()
2767       lu.cfg.SetDiskID(node_disk, node)
2768       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2769       msg = result.RemoteFailMsg()
2770       if msg:
2771         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2772                            " (is_primary=True, pass=2): %s",
2773                            inst_disk.iv_name, node, msg)
2774         disks_ok = False
2775     device_info.append((instance.primary_node, inst_disk.iv_name,
2776                         result.payload))
2777
2778   # leave the disks configured for the primary node
2779   # this is a workaround that would be fixed better by
2780   # improving the logical/physical id handling
2781   for disk in instance.disks:
2782     lu.cfg.SetDiskID(disk, instance.primary_node)
2783
2784   return disks_ok, device_info
2785
2786
2787 def _StartInstanceDisks(lu, instance, force):
2788   """Start the disks of an instance.
2789
2790   """
2791   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2792                                            ignore_secondaries=force)
2793   if not disks_ok:
2794     _ShutdownInstanceDisks(lu, instance)
2795     if force is not None and not force:
2796       lu.proc.LogWarning("", hint="If the message above refers to a"
2797                          " secondary node,"
2798                          " you can retry the operation using '--force'.")
2799     raise errors.OpExecError("Disk consistency error")
2800
2801
2802 class LUDeactivateInstanceDisks(NoHooksLU):
2803   """Shutdown an instance's disks.
2804
2805   """
2806   _OP_REQP = ["instance_name"]
2807   REQ_BGL = False
2808
2809   def ExpandNames(self):
2810     self._ExpandAndLockInstance()
2811     self.needed_locks[locking.LEVEL_NODE] = []
2812     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2813
2814   def DeclareLocks(self, level):
2815     if level == locking.LEVEL_NODE:
2816       self._LockInstancesNodes()
2817
2818   def CheckPrereq(self):
2819     """Check prerequisites.
2820
2821     This checks that the instance is in the cluster.
2822
2823     """
2824     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2825     assert self.instance is not None, \
2826       "Cannot retrieve locked instance %s" % self.op.instance_name
2827
2828   def Exec(self, feedback_fn):
2829     """Deactivate the disks
2830
2831     """
2832     instance = self.instance
2833     _SafeShutdownInstanceDisks(self, instance)
2834
2835
2836 def _SafeShutdownInstanceDisks(lu, instance):
2837   """Shutdown block devices of an instance.
2838
2839   This function checks if an instance is running, before calling
2840   _ShutdownInstanceDisks.
2841
2842   """
2843   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2844                                       [instance.hypervisor])
2845   ins_l = ins_l[instance.primary_node]
2846   if ins_l.failed or not isinstance(ins_l.data, list):
2847     raise errors.OpExecError("Can't contact node '%s'" %
2848                              instance.primary_node)
2849
2850   if instance.name in ins_l.data:
2851     raise errors.OpExecError("Instance is running, can't shutdown"
2852                              " block devices.")
2853
2854   _ShutdownInstanceDisks(lu, instance)
2855
2856
2857 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2858   """Shutdown block devices of an instance.
2859
2860   This does the shutdown on all nodes of the instance.
2861
2862   If the ignore_primary is false, errors on the primary node are
2863   ignored.
2864
2865   """
2866   all_result = True
2867   for disk in instance.disks:
2868     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2869       lu.cfg.SetDiskID(top_disk, node)
2870       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2871       msg = result.RemoteFailMsg()
2872       if msg:
2873         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2874                       disk.iv_name, node, msg)
2875         if not ignore_primary or node != instance.primary_node:
2876           all_result = False
2877   return all_result
2878
2879
2880 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2881   """Checks if a node has enough free memory.
2882
2883   This function check if a given node has the needed amount of free
2884   memory. In case the node has less memory or we cannot get the
2885   information from the node, this function raise an OpPrereqError
2886   exception.
2887
2888   @type lu: C{LogicalUnit}
2889   @param lu: a logical unit from which we get configuration data
2890   @type node: C{str}
2891   @param node: the node to check
2892   @type reason: C{str}
2893   @param reason: string to use in the error message
2894   @type requested: C{int}
2895   @param requested: the amount of memory in MiB to check for
2896   @type hypervisor_name: C{str}
2897   @param hypervisor_name: the hypervisor to ask for memory stats
2898   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2899       we cannot check the node
2900
2901   """
2902   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2903   nodeinfo[node].Raise()
2904   free_mem = nodeinfo[node].data.get('memory_free')
2905   if not isinstance(free_mem, int):
2906     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2907                              " was '%s'" % (node, free_mem))
2908   if requested > free_mem:
2909     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2910                              " needed %s MiB, available %s MiB" %
2911                              (node, reason, requested, free_mem))
2912
2913
2914 class LUStartupInstance(LogicalUnit):
2915   """Starts an instance.
2916
2917   """
2918   HPATH = "instance-start"
2919   HTYPE = constants.HTYPE_INSTANCE
2920   _OP_REQP = ["instance_name", "force"]
2921   REQ_BGL = False
2922
2923   def ExpandNames(self):
2924     self._ExpandAndLockInstance()
2925
2926   def BuildHooksEnv(self):
2927     """Build hooks env.
2928
2929     This runs on master, primary and secondary nodes of the instance.
2930
2931     """
2932     env = {
2933       "FORCE": self.op.force,
2934       }
2935     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2936     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2937     return env, nl, nl
2938
2939   def CheckPrereq(self):
2940     """Check prerequisites.
2941
2942     This checks that the instance is in the cluster.
2943
2944     """
2945     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2946     assert self.instance is not None, \
2947       "Cannot retrieve locked instance %s" % self.op.instance_name
2948
2949     # extra beparams
2950     self.beparams = getattr(self.op, "beparams", {})
2951     if self.beparams:
2952       if not isinstance(self.beparams, dict):
2953         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2954                                    " dict" % (type(self.beparams), ))
2955       # fill the beparams dict
2956       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2957       self.op.beparams = self.beparams
2958
2959     # extra hvparams
2960     self.hvparams = getattr(self.op, "hvparams", {})
2961     if self.hvparams:
2962       if not isinstance(self.hvparams, dict):
2963         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2964                                    " dict" % (type(self.hvparams), ))
2965
2966       # check hypervisor parameter syntax (locally)
2967       cluster = self.cfg.GetClusterInfo()
2968       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2969       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2970                                     instance.hvparams)
2971       filled_hvp.update(self.hvparams)
2972       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2973       hv_type.CheckParameterSyntax(filled_hvp)
2974       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2975       self.op.hvparams = self.hvparams
2976
2977     _CheckNodeOnline(self, instance.primary_node)
2978
2979     bep = self.cfg.GetClusterInfo().FillBE(instance)
2980     # check bridges existance
2981     _CheckInstanceBridgesExist(self, instance)
2982
2983     remote_info = self.rpc.call_instance_info(instance.primary_node,
2984                                               instance.name,
2985                                               instance.hypervisor)
2986     remote_info.Raise()
2987     if not remote_info.data:
2988       _CheckNodeFreeMemory(self, instance.primary_node,
2989                            "starting instance %s" % instance.name,
2990                            bep[constants.BE_MEMORY], instance.hypervisor)
2991
2992   def Exec(self, feedback_fn):
2993     """Start the instance.
2994
2995     """
2996     instance = self.instance
2997     force = self.op.force
2998
2999     self.cfg.MarkInstanceUp(instance.name)
3000
3001     node_current = instance.primary_node
3002
3003     _StartInstanceDisks(self, instance, force)
3004
3005     result = self.rpc.call_instance_start(node_current, instance,
3006                                           self.hvparams, self.beparams)
3007     msg = result.RemoteFailMsg()
3008     if msg:
3009       _ShutdownInstanceDisks(self, instance)
3010       raise errors.OpExecError("Could not start instance: %s" % msg)
3011
3012
3013 class LURebootInstance(LogicalUnit):
3014   """Reboot an instance.
3015
3016   """
3017   HPATH = "instance-reboot"
3018   HTYPE = constants.HTYPE_INSTANCE
3019   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3020   REQ_BGL = False
3021
3022   def ExpandNames(self):
3023     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3024                                    constants.INSTANCE_REBOOT_HARD,
3025                                    constants.INSTANCE_REBOOT_FULL]:
3026       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3027                                   (constants.INSTANCE_REBOOT_SOFT,
3028                                    constants.INSTANCE_REBOOT_HARD,
3029                                    constants.INSTANCE_REBOOT_FULL))
3030     self._ExpandAndLockInstance()
3031
3032   def BuildHooksEnv(self):
3033     """Build hooks env.
3034
3035     This runs on master, primary and secondary nodes of the instance.
3036
3037     """
3038     env = {
3039       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3040       "REBOOT_TYPE": self.op.reboot_type,
3041       }
3042     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3043     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3044     return env, nl, nl
3045
3046   def CheckPrereq(self):
3047     """Check prerequisites.
3048
3049     This checks that the instance is in the cluster.
3050
3051     """
3052     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3053     assert self.instance is not None, \
3054       "Cannot retrieve locked instance %s" % self.op.instance_name
3055
3056     _CheckNodeOnline(self, instance.primary_node)
3057
3058     # check bridges existance
3059     _CheckInstanceBridgesExist(self, instance)
3060
3061   def Exec(self, feedback_fn):
3062     """Reboot the instance.
3063
3064     """
3065     instance = self.instance
3066     ignore_secondaries = self.op.ignore_secondaries
3067     reboot_type = self.op.reboot_type
3068
3069     node_current = instance.primary_node
3070
3071     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3072                        constants.INSTANCE_REBOOT_HARD]:
3073       for disk in instance.disks:
3074         self.cfg.SetDiskID(disk, node_current)
3075       result = self.rpc.call_instance_reboot(node_current, instance,
3076                                              reboot_type)
3077       msg = result.RemoteFailMsg()
3078       if msg:
3079         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3080     else:
3081       result = self.rpc.call_instance_shutdown(node_current, instance)
3082       msg = result.RemoteFailMsg()
3083       if msg:
3084         raise errors.OpExecError("Could not shutdown instance for"
3085                                  " full reboot: %s" % msg)
3086       _ShutdownInstanceDisks(self, instance)
3087       _StartInstanceDisks(self, instance, ignore_secondaries)
3088       result = self.rpc.call_instance_start(node_current, instance, None, None)
3089       msg = result.RemoteFailMsg()
3090       if msg:
3091         _ShutdownInstanceDisks(self, instance)
3092         raise errors.OpExecError("Could not start instance for"
3093                                  " full reboot: %s" % msg)
3094
3095     self.cfg.MarkInstanceUp(instance.name)
3096
3097
3098 class LUShutdownInstance(LogicalUnit):
3099   """Shutdown an instance.
3100
3101   """
3102   HPATH = "instance-stop"
3103   HTYPE = constants.HTYPE_INSTANCE
3104   _OP_REQP = ["instance_name"]
3105   REQ_BGL = False
3106
3107   def ExpandNames(self):
3108     self._ExpandAndLockInstance()
3109
3110   def BuildHooksEnv(self):
3111     """Build hooks env.
3112
3113     This runs on master, primary and secondary nodes of the instance.
3114
3115     """
3116     env = _BuildInstanceHookEnvByObject(self, self.instance)
3117     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3118     return env, nl, nl
3119
3120   def CheckPrereq(self):
3121     """Check prerequisites.
3122
3123     This checks that the instance is in the cluster.
3124
3125     """
3126     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127     assert self.instance is not None, \
3128       "Cannot retrieve locked instance %s" % self.op.instance_name
3129     _CheckNodeOnline(self, self.instance.primary_node)
3130
3131   def Exec(self, feedback_fn):
3132     """Shutdown the instance.
3133
3134     """
3135     instance = self.instance
3136     node_current = instance.primary_node
3137     self.cfg.MarkInstanceDown(instance.name)
3138     result = self.rpc.call_instance_shutdown(node_current, instance)
3139     msg = result.RemoteFailMsg()
3140     if msg:
3141       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3142
3143     _ShutdownInstanceDisks(self, instance)
3144
3145
3146 class LUReinstallInstance(LogicalUnit):
3147   """Reinstall an instance.
3148
3149   """
3150   HPATH = "instance-reinstall"
3151   HTYPE = constants.HTYPE_INSTANCE
3152   _OP_REQP = ["instance_name"]
3153   REQ_BGL = False
3154
3155   def ExpandNames(self):
3156     self._ExpandAndLockInstance()
3157
3158   def BuildHooksEnv(self):
3159     """Build hooks env.
3160
3161     This runs on master, primary and secondary nodes of the instance.
3162
3163     """
3164     env = _BuildInstanceHookEnvByObject(self, self.instance)
3165     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3166     return env, nl, nl
3167
3168   def CheckPrereq(self):
3169     """Check prerequisites.
3170
3171     This checks that the instance is in the cluster and is not running.
3172
3173     """
3174     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3175     assert instance is not None, \
3176       "Cannot retrieve locked instance %s" % self.op.instance_name
3177     _CheckNodeOnline(self, instance.primary_node)
3178
3179     if instance.disk_template == constants.DT_DISKLESS:
3180       raise errors.OpPrereqError("Instance '%s' has no disks" %
3181                                  self.op.instance_name)
3182     if instance.admin_up:
3183       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3184                                  self.op.instance_name)
3185     remote_info = self.rpc.call_instance_info(instance.primary_node,
3186                                               instance.name,
3187                                               instance.hypervisor)
3188     remote_info.Raise()
3189     if remote_info.data:
3190       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3191                                  (self.op.instance_name,
3192                                   instance.primary_node))
3193
3194     self.op.os_type = getattr(self.op, "os_type", None)
3195     if self.op.os_type is not None:
3196       # OS verification
3197       pnode = self.cfg.GetNodeInfo(
3198         self.cfg.ExpandNodeName(instance.primary_node))
3199       if pnode is None:
3200         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3201                                    self.op.pnode)
3202       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3203       result.Raise()
3204       if not isinstance(result.data, objects.OS):
3205         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3206                                    " primary node"  % self.op.os_type)
3207
3208     self.instance = instance
3209
3210   def Exec(self, feedback_fn):
3211     """Reinstall the instance.
3212
3213     """
3214     inst = self.instance
3215
3216     if self.op.os_type is not None:
3217       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3218       inst.os = self.op.os_type
3219       self.cfg.Update(inst)
3220
3221     _StartInstanceDisks(self, inst, None)
3222     try:
3223       feedback_fn("Running the instance OS create scripts...")
3224       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3225       msg = result.RemoteFailMsg()
3226       if msg:
3227         raise errors.OpExecError("Could not install OS for instance %s"
3228                                  " on node %s: %s" %
3229                                  (inst.name, inst.primary_node, msg))
3230     finally:
3231       _ShutdownInstanceDisks(self, inst)
3232
3233
3234 class LURenameInstance(LogicalUnit):
3235   """Rename an instance.
3236
3237   """
3238   HPATH = "instance-rename"
3239   HTYPE = constants.HTYPE_INSTANCE
3240   _OP_REQP = ["instance_name", "new_name"]
3241
3242   def BuildHooksEnv(self):
3243     """Build hooks env.
3244
3245     This runs on master, primary and secondary nodes of the instance.
3246
3247     """
3248     env = _BuildInstanceHookEnvByObject(self, self.instance)
3249     env["INSTANCE_NEW_NAME"] = self.op.new_name
3250     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3251     return env, nl, nl
3252
3253   def CheckPrereq(self):
3254     """Check prerequisites.
3255
3256     This checks that the instance is in the cluster and is not running.
3257
3258     """
3259     instance = self.cfg.GetInstanceInfo(
3260       self.cfg.ExpandInstanceName(self.op.instance_name))
3261     if instance is None:
3262       raise errors.OpPrereqError("Instance '%s' not known" %
3263                                  self.op.instance_name)
3264     _CheckNodeOnline(self, instance.primary_node)
3265
3266     if instance.admin_up:
3267       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3268                                  self.op.instance_name)
3269     remote_info = self.rpc.call_instance_info(instance.primary_node,
3270                                               instance.name,
3271                                               instance.hypervisor)
3272     remote_info.Raise()
3273     if remote_info.data:
3274       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3275                                  (self.op.instance_name,
3276                                   instance.primary_node))
3277     self.instance = instance
3278
3279     # new name verification
3280     name_info = utils.HostInfo(self.op.new_name)
3281
3282     self.op.new_name = new_name = name_info.name
3283     instance_list = self.cfg.GetInstanceList()
3284     if new_name in instance_list:
3285       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3286                                  new_name)
3287
3288     if not getattr(self.op, "ignore_ip", False):
3289       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3290         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3291                                    (name_info.ip, new_name))
3292
3293
3294   def Exec(self, feedback_fn):
3295     """Reinstall the instance.
3296
3297     """
3298     inst = self.instance
3299     old_name = inst.name
3300
3301     if inst.disk_template == constants.DT_FILE:
3302       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3303
3304     self.cfg.RenameInstance(inst.name, self.op.new_name)
3305     # Change the instance lock. This is definitely safe while we hold the BGL
3306     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3307     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3308
3309     # re-read the instance from the configuration after rename
3310     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3311
3312     if inst.disk_template == constants.DT_FILE:
3313       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3314       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3315                                                      old_file_storage_dir,
3316                                                      new_file_storage_dir)
3317       result.Raise()
3318       if not result.data:
3319         raise errors.OpExecError("Could not connect to node '%s' to rename"
3320                                  " directory '%s' to '%s' (but the instance"
3321                                  " has been renamed in Ganeti)" % (
3322                                  inst.primary_node, old_file_storage_dir,
3323                                  new_file_storage_dir))
3324
3325       if not result.data[0]:
3326         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3327                                  " (but the instance has been renamed in"
3328                                  " Ganeti)" % (old_file_storage_dir,
3329                                                new_file_storage_dir))
3330
3331     _StartInstanceDisks(self, inst, None)
3332     try:
3333       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3334                                                  old_name)
3335       msg = result.RemoteFailMsg()
3336       if msg:
3337         msg = ("Could not run OS rename script for instance %s on node %s"
3338                " (but the instance has been renamed in Ganeti): %s" %
3339                (inst.name, inst.primary_node, msg))
3340         self.proc.LogWarning(msg)
3341     finally:
3342       _ShutdownInstanceDisks(self, inst)
3343
3344
3345 class LURemoveInstance(LogicalUnit):
3346   """Remove an instance.
3347
3348   """
3349   HPATH = "instance-remove"
3350   HTYPE = constants.HTYPE_INSTANCE
3351   _OP_REQP = ["instance_name", "ignore_failures"]
3352   REQ_BGL = False
3353
3354   def ExpandNames(self):
3355     self._ExpandAndLockInstance()
3356     self.needed_locks[locking.LEVEL_NODE] = []
3357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3358
3359   def DeclareLocks(self, level):
3360     if level == locking.LEVEL_NODE:
3361       self._LockInstancesNodes()
3362
3363   def BuildHooksEnv(self):
3364     """Build hooks env.
3365
3366     This runs on master, primary and secondary nodes of the instance.
3367
3368     """
3369     env = _BuildInstanceHookEnvByObject(self, self.instance)
3370     nl = [self.cfg.GetMasterNode()]
3371     return env, nl, nl
3372
3373   def CheckPrereq(self):
3374     """Check prerequisites.
3375
3376     This checks that the instance is in the cluster.
3377
3378     """
3379     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3380     assert self.instance is not None, \
3381       "Cannot retrieve locked instance %s" % self.op.instance_name
3382
3383   def Exec(self, feedback_fn):
3384     """Remove the instance.
3385
3386     """
3387     instance = self.instance
3388     logging.info("Shutting down instance %s on node %s",
3389                  instance.name, instance.primary_node)
3390
3391     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3392     msg = result.RemoteFailMsg()
3393     if msg:
3394       if self.op.ignore_failures:
3395         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3396       else:
3397         raise errors.OpExecError("Could not shutdown instance %s on"
3398                                  " node %s: %s" %
3399                                  (instance.name, instance.primary_node, msg))
3400
3401     logging.info("Removing block devices for instance %s", instance.name)
3402
3403     if not _RemoveDisks(self, instance):
3404       if self.op.ignore_failures:
3405         feedback_fn("Warning: can't remove instance's disks")
3406       else:
3407         raise errors.OpExecError("Can't remove instance's disks")
3408
3409     logging.info("Removing instance %s out of cluster config", instance.name)
3410
3411     self.cfg.RemoveInstance(instance.name)
3412     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3413
3414
3415 class LUQueryInstances(NoHooksLU):
3416   """Logical unit for querying instances.
3417
3418   """
3419   _OP_REQP = ["output_fields", "names", "use_locking"]
3420   REQ_BGL = False
3421   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3422                                     "admin_state",
3423                                     "disk_template", "ip", "mac", "bridge",
3424                                     "sda_size", "sdb_size", "vcpus", "tags",
3425                                     "network_port", "beparams",
3426                                     r"(disk)\.(size)/([0-9]+)",
3427                                     r"(disk)\.(sizes)", "disk_usage",
3428                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3429                                     r"(nic)\.(macs|ips|bridges)",
3430                                     r"(disk|nic)\.(count)",
3431                                     "serial_no", "hypervisor", "hvparams",] +
3432                                   ["hv/%s" % name
3433                                    for name in constants.HVS_PARAMETERS] +
3434                                   ["be/%s" % name
3435                                    for name in constants.BES_PARAMETERS])
3436   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3437
3438
3439   def ExpandNames(self):
3440     _CheckOutputFields(static=self._FIELDS_STATIC,
3441                        dynamic=self._FIELDS_DYNAMIC,
3442                        selected=self.op.output_fields)
3443
3444     self.needed_locks = {}
3445     self.share_locks[locking.LEVEL_INSTANCE] = 1
3446     self.share_locks[locking.LEVEL_NODE] = 1
3447
3448     if self.op.names:
3449       self.wanted = _GetWantedInstances(self, self.op.names)
3450     else:
3451       self.wanted = locking.ALL_SET
3452
3453     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3454     self.do_locking = self.do_node_query and self.op.use_locking
3455     if self.do_locking:
3456       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3457       self.needed_locks[locking.LEVEL_NODE] = []
3458       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3459
3460   def DeclareLocks(self, level):
3461     if level == locking.LEVEL_NODE and self.do_locking:
3462       self._LockInstancesNodes()
3463
3464   def CheckPrereq(self):
3465     """Check prerequisites.
3466
3467     """
3468     pass
3469
3470   def Exec(self, feedback_fn):
3471     """Computes the list of nodes and their attributes.
3472
3473     """
3474     all_info = self.cfg.GetAllInstancesInfo()
3475     if self.wanted == locking.ALL_SET:
3476       # caller didn't specify instance names, so ordering is not important
3477       if self.do_locking:
3478         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3479       else:
3480         instance_names = all_info.keys()
3481       instance_names = utils.NiceSort(instance_names)
3482     else:
3483       # caller did specify names, so we must keep the ordering
3484       if self.do_locking:
3485         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3486       else:
3487         tgt_set = all_info.keys()
3488       missing = set(self.wanted).difference(tgt_set)
3489       if missing:
3490         raise errors.OpExecError("Some instances were removed before"
3491                                  " retrieving their data: %s" % missing)
3492       instance_names = self.wanted
3493
3494     instance_list = [all_info[iname] for iname in instance_names]
3495
3496     # begin data gathering
3497
3498     nodes = frozenset([inst.primary_node for inst in instance_list])
3499     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3500
3501     bad_nodes = []
3502     off_nodes = []
3503     if self.do_node_query:
3504       live_data = {}
3505       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3506       for name in nodes:
3507         result = node_data[name]
3508         if result.offline:
3509           # offline nodes will be in both lists
3510           off_nodes.append(name)
3511         if result.failed:
3512           bad_nodes.append(name)
3513         else:
3514           if result.data:
3515             live_data.update(result.data)
3516             # else no instance is alive
3517     else:
3518       live_data = dict([(name, {}) for name in instance_names])
3519
3520     # end data gathering
3521
3522     HVPREFIX = "hv/"
3523     BEPREFIX = "be/"
3524     output = []
3525     for instance in instance_list:
3526       iout = []
3527       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3528       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3529       for field in self.op.output_fields:
3530         st_match = self._FIELDS_STATIC.Matches(field)
3531         if field == "name":
3532           val = instance.name
3533         elif field == "os":
3534           val = instance.os
3535         elif field == "pnode":
3536           val = instance.primary_node
3537         elif field == "snodes":
3538           val = list(instance.secondary_nodes)
3539         elif field == "admin_state":
3540           val = instance.admin_up
3541         elif field == "oper_state":
3542           if instance.primary_node in bad_nodes:
3543             val = None
3544           else:
3545             val = bool(live_data.get(instance.name))
3546         elif field == "status":
3547           if instance.primary_node in off_nodes:
3548             val = "ERROR_nodeoffline"
3549           elif instance.primary_node in bad_nodes:
3550             val = "ERROR_nodedown"
3551           else:
3552             running = bool(live_data.get(instance.name))
3553             if running:
3554               if instance.admin_up:
3555                 val = "running"
3556               else:
3557                 val = "ERROR_up"
3558             else:
3559               if instance.admin_up:
3560                 val = "ERROR_down"
3561               else:
3562                 val = "ADMIN_down"
3563         elif field == "oper_ram":
3564           if instance.primary_node in bad_nodes:
3565             val = None
3566           elif instance.name in live_data:
3567             val = live_data[instance.name].get("memory", "?")
3568           else:
3569             val = "-"
3570         elif field == "vcpus":
3571           val = i_be[constants.BE_VCPUS]
3572         elif field == "disk_template":
3573           val = instance.disk_template
3574         elif field == "ip":
3575           if instance.nics:
3576             val = instance.nics[0].ip
3577           else:
3578             val = None
3579         elif field == "bridge":
3580           if instance.nics:
3581             val = instance.nics[0].bridge
3582           else:
3583             val = None
3584         elif field == "mac":
3585           if instance.nics:
3586             val = instance.nics[0].mac
3587           else:
3588             val = None
3589         elif field == "sda_size" or field == "sdb_size":
3590           idx = ord(field[2]) - ord('a')
3591           try:
3592             val = instance.FindDisk(idx).size
3593           except errors.OpPrereqError:
3594             val = None
3595         elif field == "disk_usage": # total disk usage per node
3596           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3597           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3598         elif field == "tags":
3599           val = list(instance.GetTags())
3600         elif field == "serial_no":
3601           val = instance.serial_no
3602         elif field == "network_port":
3603           val = instance.network_port
3604         elif field == "hypervisor":
3605           val = instance.hypervisor
3606         elif field == "hvparams":
3607           val = i_hv
3608         elif (field.startswith(HVPREFIX) and
3609               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3610           val = i_hv.get(field[len(HVPREFIX):], None)
3611         elif field == "beparams":
3612           val = i_be
3613         elif (field.startswith(BEPREFIX) and
3614               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3615           val = i_be.get(field[len(BEPREFIX):], None)
3616         elif st_match and st_match.groups():
3617           # matches a variable list
3618           st_groups = st_match.groups()
3619           if st_groups and st_groups[0] == "disk":
3620             if st_groups[1] == "count":
3621               val = len(instance.disks)
3622             elif st_groups[1] == "sizes":
3623               val = [disk.size for disk in instance.disks]
3624             elif st_groups[1] == "size":
3625               try:
3626                 val = instance.FindDisk(st_groups[2]).size
3627               except errors.OpPrereqError:
3628                 val = None
3629             else:
3630               assert False, "Unhandled disk parameter"
3631           elif st_groups[0] == "nic":
3632             if st_groups[1] == "count":
3633               val = len(instance.nics)
3634             elif st_groups[1] == "macs":
3635               val = [nic.mac for nic in instance.nics]
3636             elif st_groups[1] == "ips":
3637               val = [nic.ip for nic in instance.nics]
3638             elif st_groups[1] == "bridges":
3639               val = [nic.bridge for nic in instance.nics]
3640             else:
3641               # index-based item
3642               nic_idx = int(st_groups[2])
3643               if nic_idx >= len(instance.nics):
3644                 val = None
3645               else:
3646                 if st_groups[1] == "mac":
3647                   val = instance.nics[nic_idx].mac
3648                 elif st_groups[1] == "ip":
3649                   val = instance.nics[nic_idx].ip
3650                 elif st_groups[1] == "bridge":
3651                   val = instance.nics[nic_idx].bridge
3652                 else:
3653                   assert False, "Unhandled NIC parameter"
3654           else:
3655             assert False, ("Declared but unhandled variable parameter '%s'" %
3656                            field)
3657         else:
3658           assert False, "Declared but unhandled parameter '%s'" % field
3659         iout.append(val)
3660       output.append(iout)
3661
3662     return output
3663
3664
3665 class LUFailoverInstance(LogicalUnit):
3666   """Failover an instance.
3667
3668   """
3669   HPATH = "instance-failover"
3670   HTYPE = constants.HTYPE_INSTANCE
3671   _OP_REQP = ["instance_name", "ignore_consistency"]
3672   REQ_BGL = False
3673
3674   def ExpandNames(self):
3675     self._ExpandAndLockInstance()
3676     self.needed_locks[locking.LEVEL_NODE] = []
3677     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3678
3679   def DeclareLocks(self, level):
3680     if level == locking.LEVEL_NODE:
3681       self._LockInstancesNodes()
3682
3683   def BuildHooksEnv(self):
3684     """Build hooks env.
3685
3686     This runs on master, primary and secondary nodes of the instance.
3687
3688     """
3689     env = {
3690       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3691       }
3692     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3693     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3694     return env, nl, nl
3695
3696   def CheckPrereq(self):
3697     """Check prerequisites.
3698
3699     This checks that the instance is in the cluster.
3700
3701     """
3702     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3703     assert self.instance is not None, \
3704       "Cannot retrieve locked instance %s" % self.op.instance_name
3705
3706     bep = self.cfg.GetClusterInfo().FillBE(instance)
3707     if instance.disk_template not in constants.DTS_NET_MIRROR:
3708       raise errors.OpPrereqError("Instance's disk layout is not"
3709                                  " network mirrored, cannot failover.")
3710
3711     secondary_nodes = instance.secondary_nodes
3712     if not secondary_nodes:
3713       raise errors.ProgrammerError("no secondary node but using "
3714                                    "a mirrored disk template")
3715
3716     target_node = secondary_nodes[0]
3717     _CheckNodeOnline(self, target_node)
3718     _CheckNodeNotDrained(self, target_node)
3719
3720     if instance.admin_up:
3721       # check memory requirements on the secondary node
3722       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3723                            instance.name, bep[constants.BE_MEMORY],
3724                            instance.hypervisor)
3725     else:
3726       self.LogInfo("Not checking memory on the secondary node as"
3727                    " instance will not be started")
3728
3729     # check bridge existance
3730     brlist = [nic.bridge for nic in instance.nics]
3731     result = self.rpc.call_bridges_exist(target_node, brlist)
3732     result.Raise()
3733     if not result.data:
3734       raise errors.OpPrereqError("One or more target bridges %s does not"
3735                                  " exist on destination node '%s'" %
3736                                  (brlist, target_node))
3737
3738   def Exec(self, feedback_fn):
3739     """Failover an instance.
3740
3741     The failover is done by shutting it down on its present node and
3742     starting it on the secondary.
3743
3744     """
3745     instance = self.instance
3746
3747     source_node = instance.primary_node
3748     target_node = instance.secondary_nodes[0]
3749
3750     feedback_fn("* checking disk consistency between source and target")
3751     for dev in instance.disks:
3752       # for drbd, these are drbd over lvm
3753       if not _CheckDiskConsistency(self, dev, target_node, False):
3754         if instance.admin_up and not self.op.ignore_consistency:
3755           raise errors.OpExecError("Disk %s is degraded on target node,"
3756                                    " aborting failover." % dev.iv_name)
3757
3758     feedback_fn("* shutting down instance on source node")
3759     logging.info("Shutting down instance %s on node %s",
3760                  instance.name, source_node)
3761
3762     result = self.rpc.call_instance_shutdown(source_node, instance)
3763     msg = result.RemoteFailMsg()
3764     if msg:
3765       if self.op.ignore_consistency:
3766         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3767                              " Proceeding anyway. Please make sure node"
3768                              " %s is down. Error details: %s",
3769                              instance.name, source_node, source_node, msg)
3770       else:
3771         raise errors.OpExecError("Could not shutdown instance %s on"
3772                                  " node %s: %s" %
3773                                  (instance.name, source_node, msg))
3774
3775     feedback_fn("* deactivating the instance's disks on source node")
3776     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3777       raise errors.OpExecError("Can't shut down the instance's disks.")
3778
3779     instance.primary_node = target_node
3780     # distribute new instance config to the other nodes
3781     self.cfg.Update(instance)
3782
3783     # Only start the instance if it's marked as up
3784     if instance.admin_up:
3785       feedback_fn("* activating the instance's disks on target node")
3786       logging.info("Starting instance %s on node %s",
3787                    instance.name, target_node)
3788
3789       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3790                                                ignore_secondaries=True)
3791       if not disks_ok:
3792         _ShutdownInstanceDisks(self, instance)
3793         raise errors.OpExecError("Can't activate the instance's disks")
3794
3795       feedback_fn("* starting the instance on the target node")
3796       result = self.rpc.call_instance_start(target_node, instance, None, None)
3797       msg = result.RemoteFailMsg()
3798       if msg:
3799         _ShutdownInstanceDisks(self, instance)
3800         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3801                                  (instance.name, target_node, msg))
3802
3803
3804 class LUMigrateInstance(LogicalUnit):
3805   """Migrate an instance.
3806
3807   This is migration without shutting down, compared to the failover,
3808   which is done with shutdown.
3809
3810   """
3811   HPATH = "instance-migrate"
3812   HTYPE = constants.HTYPE_INSTANCE
3813   _OP_REQP = ["instance_name", "live", "cleanup"]
3814
3815   REQ_BGL = False
3816
3817   def ExpandNames(self):
3818     self._ExpandAndLockInstance()
3819     self.needed_locks[locking.LEVEL_NODE] = []
3820     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3821
3822   def DeclareLocks(self, level):
3823     if level == locking.LEVEL_NODE:
3824       self._LockInstancesNodes()
3825
3826   def BuildHooksEnv(self):
3827     """Build hooks env.
3828
3829     This runs on master, primary and secondary nodes of the instance.
3830
3831     """
3832     env = _BuildInstanceHookEnvByObject(self, self.instance)
3833     env["MIGRATE_LIVE"] = self.op.live
3834     env["MIGRATE_CLEANUP"] = self.op.cleanup
3835     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3836     return env, nl, nl
3837
3838   def CheckPrereq(self):
3839     """Check prerequisites.
3840
3841     This checks that the instance is in the cluster.
3842
3843     """
3844     instance = self.cfg.GetInstanceInfo(
3845       self.cfg.ExpandInstanceName(self.op.instance_name))
3846     if instance is None:
3847       raise errors.OpPrereqError("Instance '%s' not known" %
3848                                  self.op.instance_name)
3849
3850     if instance.disk_template != constants.DT_DRBD8:
3851       raise errors.OpPrereqError("Instance's disk layout is not"
3852                                  " drbd8, cannot migrate.")
3853
3854     secondary_nodes = instance.secondary_nodes
3855     if not secondary_nodes:
3856       raise errors.ConfigurationError("No secondary node but using"
3857                                       " drbd8 disk template")
3858
3859     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3860
3861     target_node = secondary_nodes[0]
3862     # check memory requirements on the secondary node
3863     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3864                          instance.name, i_be[constants.BE_MEMORY],
3865                          instance.hypervisor)
3866
3867     # check bridge existance
3868     brlist = [nic.bridge for nic in instance.nics]
3869     result = self.rpc.call_bridges_exist(target_node, brlist)
3870     if result.failed or not result.data:
3871       raise errors.OpPrereqError("One or more target bridges %s does not"
3872                                  " exist on destination node '%s'" %
3873                                  (brlist, target_node))
3874
3875     if not self.op.cleanup:
3876       _CheckNodeNotDrained(self, target_node)
3877       result = self.rpc.call_instance_migratable(instance.primary_node,
3878                                                  instance)
3879       msg = result.RemoteFailMsg()
3880       if msg:
3881         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3882                                    msg)
3883
3884     self.instance = instance
3885
3886   def _WaitUntilSync(self):
3887     """Poll with custom rpc for disk sync.
3888
3889     This uses our own step-based rpc call.
3890
3891     """
3892     self.feedback_fn("* wait until resync is done")
3893     all_done = False
3894     while not all_done:
3895       all_done = True
3896       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3897                                             self.nodes_ip,
3898                                             self.instance.disks)
3899       min_percent = 100
3900       for node, nres in result.items():
3901         msg = nres.RemoteFailMsg()
3902         if msg:
3903           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3904                                    (node, msg))
3905         node_done, node_percent = nres.payload
3906         all_done = all_done and node_done
3907         if node_percent is not None:
3908           min_percent = min(min_percent, node_percent)
3909       if not all_done:
3910         if min_percent < 100:
3911           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3912         time.sleep(2)
3913
3914   def _EnsureSecondary(self, node):
3915     """Demote a node to secondary.
3916
3917     """
3918     self.feedback_fn("* switching node %s to secondary mode" % node)
3919
3920     for dev in self.instance.disks:
3921       self.cfg.SetDiskID(dev, node)
3922
3923     result = self.rpc.call_blockdev_close(node, self.instance.name,
3924                                           self.instance.disks)
3925     msg = result.RemoteFailMsg()
3926     if msg:
3927       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3928                                " error %s" % (node, msg))
3929
3930   def _GoStandalone(self):
3931     """Disconnect from the network.
3932
3933     """
3934     self.feedback_fn("* changing into standalone mode")
3935     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3936                                                self.instance.disks)
3937     for node, nres in result.items():
3938       msg = nres.RemoteFailMsg()
3939       if msg:
3940         raise errors.OpExecError("Cannot disconnect disks node %s,"
3941                                  " error %s" % (node, msg))
3942
3943   def _GoReconnect(self, multimaster):
3944     """Reconnect to the network.
3945
3946     """
3947     if multimaster:
3948       msg = "dual-master"
3949     else:
3950       msg = "single-master"
3951     self.feedback_fn("* changing disks into %s mode" % msg)
3952     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3953                                            self.instance.disks,
3954                                            self.instance.name, multimaster)
3955     for node, nres in result.items():
3956       msg = nres.RemoteFailMsg()
3957       if msg:
3958         raise errors.OpExecError("Cannot change disks config on node %s,"
3959                                  " error: %s" % (node, msg))
3960
3961   def _ExecCleanup(self):
3962     """Try to cleanup after a failed migration.
3963
3964     The cleanup is done by:
3965       - check that the instance is running only on one node
3966         (and update the config if needed)
3967       - change disks on its secondary node to secondary
3968       - wait until disks are fully synchronized
3969       - disconnect from the network
3970       - change disks into single-master mode
3971       - wait again until disks are fully synchronized
3972
3973     """
3974     instance = self.instance
3975     target_node = self.target_node
3976     source_node = self.source_node
3977
3978     # check running on only one node
3979     self.feedback_fn("* checking where the instance actually runs"
3980                      " (if this hangs, the hypervisor might be in"
3981                      " a bad state)")
3982     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3983     for node, result in ins_l.items():
3984       result.Raise()
3985       if not isinstance(result.data, list):
3986         raise errors.OpExecError("Can't contact node '%s'" % node)
3987
3988     runningon_source = instance.name in ins_l[source_node].data
3989     runningon_target = instance.name in ins_l[target_node].data
3990
3991     if runningon_source and runningon_target:
3992       raise errors.OpExecError("Instance seems to be running on two nodes,"
3993                                " or the hypervisor is confused. You will have"
3994                                " to ensure manually that it runs only on one"
3995                                " and restart this operation.")
3996
3997     if not (runningon_source or runningon_target):
3998       raise errors.OpExecError("Instance does not seem to be running at all."
3999                                " In this case, it's safer to repair by"
4000                                " running 'gnt-instance stop' to ensure disk"
4001                                " shutdown, and then restarting it.")
4002
4003     if runningon_target:
4004       # the migration has actually succeeded, we need to update the config
4005       self.feedback_fn("* instance running on secondary node (%s),"
4006                        " updating config" % target_node)
4007       instance.primary_node = target_node
4008       self.cfg.Update(instance)
4009       demoted_node = source_node
4010     else:
4011       self.feedback_fn("* instance confirmed to be running on its"
4012                        " primary node (%s)" % source_node)
4013       demoted_node = target_node
4014
4015     self._EnsureSecondary(demoted_node)
4016     try:
4017       self._WaitUntilSync()
4018     except errors.OpExecError:
4019       # we ignore here errors, since if the device is standalone, it
4020       # won't be able to sync
4021       pass
4022     self._GoStandalone()
4023     self._GoReconnect(False)
4024     self._WaitUntilSync()
4025
4026     self.feedback_fn("* done")
4027
4028   def _RevertDiskStatus(self):
4029     """Try to revert the disk status after a failed migration.
4030
4031     """
4032     target_node = self.target_node
4033     try:
4034       self._EnsureSecondary(target_node)
4035       self._GoStandalone()
4036       self._GoReconnect(False)
4037       self._WaitUntilSync()
4038     except errors.OpExecError, err:
4039       self.LogWarning("Migration failed and I can't reconnect the"
4040                       " drives: error '%s'\n"
4041                       "Please look and recover the instance status" %
4042                       str(err))
4043
4044   def _AbortMigration(self):
4045     """Call the hypervisor code to abort a started migration.
4046
4047     """
4048     instance = self.instance
4049     target_node = self.target_node
4050     migration_info = self.migration_info
4051
4052     abort_result = self.rpc.call_finalize_migration(target_node,
4053                                                     instance,
4054                                                     migration_info,
4055                                                     False)
4056     abort_msg = abort_result.RemoteFailMsg()
4057     if abort_msg:
4058       logging.error("Aborting migration failed on target node %s: %s" %
4059                     (target_node, abort_msg))
4060       # Don't raise an exception here, as we stil have to try to revert the
4061       # disk status, even if this step failed.
4062
4063   def _ExecMigration(self):
4064     """Migrate an instance.
4065
4066     The migrate is done by:
4067       - change the disks into dual-master mode
4068       - wait until disks are fully synchronized again
4069       - migrate the instance
4070       - change disks on the new secondary node (the old primary) to secondary
4071       - wait until disks are fully synchronized
4072       - change disks into single-master mode
4073
4074     """
4075     instance = self.instance
4076     target_node = self.target_node
4077     source_node = self.source_node
4078
4079     self.feedback_fn("* checking disk consistency between source and target")
4080     for dev in instance.disks:
4081       if not _CheckDiskConsistency(self, dev, target_node, False):
4082         raise errors.OpExecError("Disk %s is degraded or not fully"
4083                                  " synchronized on target node,"
4084                                  " aborting migrate." % dev.iv_name)
4085
4086     # First get the migration information from the remote node
4087     result = self.rpc.call_migration_info(source_node, instance)
4088     msg = result.RemoteFailMsg()
4089     if msg:
4090       log_err = ("Failed fetching source migration information from %s: %s" %
4091                  (source_node, msg))
4092       logging.error(log_err)
4093       raise errors.OpExecError(log_err)
4094
4095     self.migration_info = migration_info = result.payload
4096
4097     # Then switch the disks to master/master mode
4098     self._EnsureSecondary(target_node)
4099     self._GoStandalone()
4100     self._GoReconnect(True)
4101     self._WaitUntilSync()
4102
4103     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4104     result = self.rpc.call_accept_instance(target_node,
4105                                            instance,
4106                                            migration_info,
4107                                            self.nodes_ip[target_node])
4108
4109     msg = result.RemoteFailMsg()
4110     if msg:
4111       logging.error("Instance pre-migration failed, trying to revert"
4112                     " disk status: %s", msg)
4113       self._AbortMigration()
4114       self._RevertDiskStatus()
4115       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4116                                (instance.name, msg))
4117
4118     self.feedback_fn("* migrating instance to %s" % target_node)
4119     time.sleep(10)
4120     result = self.rpc.call_instance_migrate(source_node, instance,
4121                                             self.nodes_ip[target_node],
4122                                             self.op.live)
4123     msg = result.RemoteFailMsg()
4124     if msg:
4125       logging.error("Instance migration failed, trying to revert"
4126                     " disk status: %s", msg)
4127       self._AbortMigration()
4128       self._RevertDiskStatus()
4129       raise errors.OpExecError("Could not migrate instance %s: %s" %
4130                                (instance.name, msg))
4131     time.sleep(10)
4132
4133     instance.primary_node = target_node
4134     # distribute new instance config to the other nodes
4135     self.cfg.Update(instance)
4136
4137     result = self.rpc.call_finalize_migration(target_node,
4138                                               instance,
4139                                               migration_info,
4140                                               True)
4141     msg = result.RemoteFailMsg()
4142     if msg:
4143       logging.error("Instance migration succeeded, but finalization failed:"
4144                     " %s" % msg)
4145       raise errors.OpExecError("Could not finalize instance migration: %s" %
4146                                msg)
4147
4148     self._EnsureSecondary(source_node)
4149     self._WaitUntilSync()
4150     self._GoStandalone()
4151     self._GoReconnect(False)
4152     self._WaitUntilSync()
4153
4154     self.feedback_fn("* done")
4155
4156   def Exec(self, feedback_fn):
4157     """Perform the migration.
4158
4159     """
4160     self.feedback_fn = feedback_fn
4161
4162     self.source_node = self.instance.primary_node
4163     self.target_node = self.instance.secondary_nodes[0]
4164     self.all_nodes = [self.source_node, self.target_node]
4165     self.nodes_ip = {
4166       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4167       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4168       }
4169     if self.op.cleanup:
4170       return self._ExecCleanup()
4171     else:
4172       return self._ExecMigration()
4173
4174
4175 def _CreateBlockDev(lu, node, instance, device, force_create,
4176                     info, force_open):
4177   """Create a tree of block devices on a given node.
4178
4179   If this device type has to be created on secondaries, create it and
4180   all its children.
4181
4182   If not, just recurse to children keeping the same 'force' value.
4183
4184   @param lu: the lu on whose behalf we execute
4185   @param node: the node on which to create the device
4186   @type instance: L{objects.Instance}
4187   @param instance: the instance which owns the device
4188   @type device: L{objects.Disk}
4189   @param device: the device to create
4190   @type force_create: boolean
4191   @param force_create: whether to force creation of this device; this
4192       will be change to True whenever we find a device which has
4193       CreateOnSecondary() attribute
4194   @param info: the extra 'metadata' we should attach to the device
4195       (this will be represented as a LVM tag)
4196   @type force_open: boolean
4197   @param force_open: this parameter will be passes to the
4198       L{backend.BlockdevCreate} function where it specifies
4199       whether we run on primary or not, and it affects both
4200       the child assembly and the device own Open() execution
4201
4202   """
4203   if device.CreateOnSecondary():
4204     force_create = True
4205
4206   if device.children:
4207     for child in device.children:
4208       _CreateBlockDev(lu, node, instance, child, force_create,
4209                       info, force_open)
4210
4211   if not force_create:
4212     return
4213
4214   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4215
4216
4217 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4218   """Create a single block device on a given node.
4219
4220   This will not recurse over children of the device, so they must be
4221   created in advance.
4222
4223   @param lu: the lu on whose behalf we execute
4224   @param node: the node on which to create the device
4225   @type instance: L{objects.Instance}
4226   @param instance: the instance which owns the device
4227   @type device: L{objects.Disk}
4228   @param device: the device to create
4229   @param info: the extra 'metadata' we should attach to the device
4230       (this will be represented as a LVM tag)
4231   @type force_open: boolean
4232   @param force_open: this parameter will be passes to the
4233       L{backend.BlockdevCreate} function where it specifies
4234       whether we run on primary or not, and it affects both
4235       the child assembly and the device own Open() execution
4236
4237   """
4238   lu.cfg.SetDiskID(device, node)
4239   result = lu.rpc.call_blockdev_create(node, device, device.size,
4240                                        instance.name, force_open, info)
4241   msg = result.RemoteFailMsg()
4242   if msg:
4243     raise errors.OpExecError("Can't create block device %s on"
4244                              " node %s for instance %s: %s" %
4245                              (device, node, instance.name, msg))
4246   if device.physical_id is None:
4247     device.physical_id = result.payload
4248
4249
4250 def _GenerateUniqueNames(lu, exts):
4251   """Generate a suitable LV name.
4252
4253   This will generate a logical volume name for the given instance.
4254
4255   """
4256   results = []
4257   for val in exts:
4258     new_id = lu.cfg.GenerateUniqueID()
4259     results.append("%s%s" % (new_id, val))
4260   return results
4261
4262
4263 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4264                          p_minor, s_minor):
4265   """Generate a drbd8 device complete with its children.
4266
4267   """
4268   port = lu.cfg.AllocatePort()
4269   vgname = lu.cfg.GetVGName()
4270   shared_secret = lu.cfg.GenerateDRBDSecret()
4271   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4272                           logical_id=(vgname, names[0]))
4273   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4274                           logical_id=(vgname, names[1]))
4275   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4276                           logical_id=(primary, secondary, port,
4277                                       p_minor, s_minor,
4278                                       shared_secret),
4279                           children=[dev_data, dev_meta],
4280                           iv_name=iv_name)
4281   return drbd_dev
4282
4283
4284 def _GenerateDiskTemplate(lu, template_name,
4285                           instance_name, primary_node,
4286                           secondary_nodes, disk_info,
4287                           file_storage_dir, file_driver,
4288                           base_index):
4289   """Generate the entire disk layout for a given template type.
4290
4291   """
4292   #TODO: compute space requirements
4293
4294   vgname = lu.cfg.GetVGName()
4295   disk_count = len(disk_info)
4296   disks = []
4297   if template_name == constants.DT_DISKLESS:
4298     pass
4299   elif template_name == constants.DT_PLAIN:
4300     if len(secondary_nodes) != 0:
4301       raise errors.ProgrammerError("Wrong template configuration")
4302
4303     names = _GenerateUniqueNames(lu, [".disk%d" % i
4304                                       for i in range(disk_count)])
4305     for idx, disk in enumerate(disk_info):
4306       disk_index = idx + base_index
4307       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4308                               logical_id=(vgname, names[idx]),
4309                               iv_name="disk/%d" % disk_index,
4310                               mode=disk["mode"])
4311       disks.append(disk_dev)
4312   elif template_name == constants.DT_DRBD8:
4313     if len(secondary_nodes) != 1:
4314       raise errors.ProgrammerError("Wrong template configuration")
4315     remote_node = secondary_nodes[0]
4316     minors = lu.cfg.AllocateDRBDMinor(
4317       [primary_node, remote_node] * len(disk_info), instance_name)
4318
4319     names = []
4320     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4321                                                for i in range(disk_count)]):
4322       names.append(lv_prefix + "_data")
4323       names.append(lv_prefix + "_meta")
4324     for idx, disk in enumerate(disk_info):
4325       disk_index = idx + base_index
4326       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4327                                       disk["size"], names[idx*2:idx*2+2],
4328                                       "disk/%d" % disk_index,
4329                                       minors[idx*2], minors[idx*2+1])
4330       disk_dev.mode = disk["mode"]
4331       disks.append(disk_dev)
4332   elif template_name == constants.DT_FILE:
4333     if len(secondary_nodes) != 0:
4334       raise errors.ProgrammerError("Wrong template configuration")
4335
4336     for idx, disk in enumerate(disk_info):
4337       disk_index = idx + base_index
4338       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4339                               iv_name="disk/%d" % disk_index,
4340                               logical_id=(file_driver,
4341                                           "%s/disk%d" % (file_storage_dir,
4342                                                          disk_index)),
4343                               mode=disk["mode"])
4344       disks.append(disk_dev)
4345   else:
4346     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4347   return disks
4348
4349
4350 def _GetInstanceInfoText(instance):
4351   """Compute that text that should be added to the disk's metadata.
4352
4353   """
4354   return "originstname+%s" % instance.name
4355
4356
4357 def _CreateDisks(lu, instance):
4358   """Create all disks for an instance.
4359
4360   This abstracts away some work from AddInstance.
4361
4362   @type lu: L{LogicalUnit}
4363   @param lu: the logical unit on whose behalf we execute
4364   @type instance: L{objects.Instance}
4365   @param instance: the instance whose disks we should create
4366   @rtype: boolean
4367   @return: the success of the creation
4368
4369   """
4370   info = _GetInstanceInfoText(instance)
4371   pnode = instance.primary_node
4372
4373   if instance.disk_template == constants.DT_FILE:
4374     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4375     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4376
4377     if result.failed or not result.data:
4378       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4379
4380     if not result.data[0]:
4381       raise errors.OpExecError("Failed to create directory '%s'" %
4382                                file_storage_dir)
4383
4384   # Note: this needs to be kept in sync with adding of disks in
4385   # LUSetInstanceParams
4386   for device in instance.disks:
4387     logging.info("Creating volume %s for instance %s",
4388                  device.iv_name, instance.name)
4389     #HARDCODE
4390     for node in instance.all_nodes:
4391       f_create = node == pnode
4392       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4393
4394
4395 def _RemoveDisks(lu, instance):
4396   """Remove all disks for an instance.
4397
4398   This abstracts away some work from `AddInstance()` and
4399   `RemoveInstance()`. Note that in case some of the devices couldn't
4400   be removed, the removal will continue with the other ones (compare
4401   with `_CreateDisks()`).
4402
4403   @type lu: L{LogicalUnit}
4404   @param lu: the logical unit on whose behalf we execute
4405   @type instance: L{objects.Instance}
4406   @param instance: the instance whose disks we should remove
4407   @rtype: boolean
4408   @return: the success of the removal
4409
4410   """
4411   logging.info("Removing block devices for instance %s", instance.name)
4412
4413   all_result = True
4414   for device in instance.disks:
4415     for node, disk in device.ComputeNodeTree(instance.primary_node):
4416       lu.cfg.SetDiskID(disk, node)
4417       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4418       if msg:
4419         lu.LogWarning("Could not remove block device %s on node %s,"
4420                       " continuing anyway: %s", device.iv_name, node, msg)
4421         all_result = False
4422
4423   if instance.disk_template == constants.DT_FILE:
4424     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4425     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4426                                                  file_storage_dir)
4427     if result.failed or not result.data:
4428       logging.error("Could not remove directory '%s'", file_storage_dir)
4429       all_result = False
4430
4431   return all_result
4432
4433
4434 def _ComputeDiskSize(disk_template, disks):
4435   """Compute disk size requirements in the volume group
4436
4437   """
4438   # Required free disk space as a function of disk and swap space
4439   req_size_dict = {
4440     constants.DT_DISKLESS: None,
4441     constants.DT_PLAIN: sum(d["size"] for d in disks),
4442     # 128 MB are added for drbd metadata for each disk
4443     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4444     constants.DT_FILE: None,
4445   }
4446
4447   if disk_template not in req_size_dict:
4448     raise errors.ProgrammerError("Disk template '%s' size requirement"
4449                                  " is unknown" %  disk_template)
4450
4451   return req_size_dict[disk_template]
4452
4453
4454 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4455   """Hypervisor parameter validation.
4456
4457   This function abstract the hypervisor parameter validation to be
4458   used in both instance create and instance modify.
4459
4460   @type lu: L{LogicalUnit}
4461   @param lu: the logical unit for which we check
4462   @type nodenames: list
4463   @param nodenames: the list of nodes on which we should check
4464   @type hvname: string
4465   @param hvname: the name of the hypervisor we should use
4466   @type hvparams: dict
4467   @param hvparams: the parameters which we need to check
4468   @raise errors.OpPrereqError: if the parameters are not valid
4469
4470   """
4471   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4472                                                   hvname,
4473                                                   hvparams)
4474   for node in nodenames:
4475     info = hvinfo[node]
4476     if info.offline:
4477       continue
4478     msg = info.RemoteFailMsg()
4479     if msg:
4480       raise errors.OpPrereqError("Hypervisor parameter validation"
4481                                  " failed on node %s: %s" % (node, msg))
4482
4483
4484 class LUCreateInstance(LogicalUnit):
4485   """Create an instance.
4486
4487   """
4488   HPATH = "instance-add"
4489   HTYPE = constants.HTYPE_INSTANCE
4490   _OP_REQP = ["instance_name", "disks", "disk_template",
4491               "mode", "start",
4492               "wait_for_sync", "ip_check", "nics",
4493               "hvparams", "beparams"]
4494   REQ_BGL = False
4495
4496   def _ExpandNode(self, node):
4497     """Expands and checks one node name.
4498
4499     """
4500     node_full = self.cfg.ExpandNodeName(node)
4501     if node_full is None:
4502       raise errors.OpPrereqError("Unknown node %s" % node)
4503     return node_full
4504
4505   def ExpandNames(self):
4506     """ExpandNames for CreateInstance.
4507
4508     Figure out the right locks for instance creation.
4509
4510     """
4511     self.needed_locks = {}
4512
4513     # set optional parameters to none if they don't exist
4514     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4515       if not hasattr(self.op, attr):
4516         setattr(self.op, attr, None)
4517
4518     # cheap checks, mostly valid constants given
4519
4520     # verify creation mode
4521     if self.op.mode not in (constants.INSTANCE_CREATE,
4522                             constants.INSTANCE_IMPORT):
4523       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4524                                  self.op.mode)
4525
4526     # disk template and mirror node verification
4527     if self.op.disk_template not in constants.DISK_TEMPLATES:
4528       raise errors.OpPrereqError("Invalid disk template name")
4529
4530     if self.op.hypervisor is None:
4531       self.op.hypervisor = self.cfg.GetHypervisorType()
4532
4533     cluster = self.cfg.GetClusterInfo()
4534     enabled_hvs = cluster.enabled_hypervisors
4535     if self.op.hypervisor not in enabled_hvs:
4536       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4537                                  " cluster (%s)" % (self.op.hypervisor,
4538                                   ",".join(enabled_hvs)))
4539
4540     # check hypervisor parameter syntax (locally)
4541     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4542     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4543                                   self.op.hvparams)
4544     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4545     hv_type.CheckParameterSyntax(filled_hvp)
4546     self.hv_full = filled_hvp
4547
4548     # fill and remember the beparams dict
4549     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4550     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4551                                     self.op.beparams)
4552
4553     #### instance parameters check
4554
4555     # instance name verification
4556     hostname1 = utils.HostInfo(self.op.instance_name)
4557     self.op.instance_name = instance_name = hostname1.name
4558
4559     # this is just a preventive check, but someone might still add this
4560     # instance in the meantime, and creation will fail at lock-add time
4561     if instance_name in self.cfg.GetInstanceList():
4562       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4563                                  instance_name)
4564
4565     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4566
4567     # NIC buildup
4568     self.nics = []
4569     for nic in self.op.nics:
4570       # ip validity checks
4571       ip = nic.get("ip", None)
4572       if ip is None or ip.lower() == "none":
4573         nic_ip = None
4574       elif ip.lower() == constants.VALUE_AUTO:
4575         nic_ip = hostname1.ip
4576       else:
4577         if not utils.IsValidIP(ip):
4578           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4579                                      " like a valid IP" % ip)
4580         nic_ip = ip
4581
4582       # MAC address verification
4583       mac = nic.get("mac", constants.VALUE_AUTO)
4584       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4585         if not utils.IsValidMac(mac.lower()):
4586           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4587                                      mac)
4588       # bridge verification
4589       bridge = nic.get("bridge", None)
4590       if bridge is None:
4591         bridge = self.cfg.GetDefBridge()
4592       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4593
4594     # disk checks/pre-build
4595     self.disks = []
4596     for disk in self.op.disks:
4597       mode = disk.get("mode", constants.DISK_RDWR)
4598       if mode not in constants.DISK_ACCESS_SET:
4599         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4600                                    mode)
4601       size = disk.get("size", None)
4602       if size is None:
4603         raise errors.OpPrereqError("Missing disk size")
4604       try:
4605         size = int(size)
4606       except ValueError:
4607         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4608       self.disks.append({"size": size, "mode": mode})
4609
4610     # used in CheckPrereq for ip ping check
4611     self.check_ip = hostname1.ip
4612
4613     # file storage checks
4614     if (self.op.file_driver and
4615         not self.op.file_driver in constants.FILE_DRIVER):
4616       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4617                                  self.op.file_driver)
4618
4619     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4620       raise errors.OpPrereqError("File storage directory path not absolute")
4621
4622     ### Node/iallocator related checks
4623     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4624       raise errors.OpPrereqError("One and only one of iallocator and primary"
4625                                  " node must be given")
4626
4627     if self.op.iallocator:
4628       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4629     else:
4630       self.op.pnode = self._ExpandNode(self.op.pnode)
4631       nodelist = [self.op.pnode]
4632       if self.op.snode is not None:
4633         self.op.snode = self._ExpandNode(self.op.snode)
4634         nodelist.append(self.op.snode)
4635       self.needed_locks[locking.LEVEL_NODE] = nodelist
4636
4637     # in case of import lock the source node too
4638     if self.op.mode == constants.INSTANCE_IMPORT:
4639       src_node = getattr(self.op, "src_node", None)
4640       src_path = getattr(self.op, "src_path", None)
4641
4642       if src_path is None:
4643         self.op.src_path = src_path = self.op.instance_name
4644
4645       if src_node is None:
4646         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4647         self.op.src_node = None
4648         if os.path.isabs(src_path):
4649           raise errors.OpPrereqError("Importing an instance from an absolute"
4650                                      " path requires a source node option.")
4651       else:
4652         self.op.src_node = src_node = self._ExpandNode(src_node)
4653         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4654           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4655         if not os.path.isabs(src_path):
4656           self.op.src_path = src_path = \
4657             os.path.join(constants.EXPORT_DIR, src_path)
4658
4659     else: # INSTANCE_CREATE
4660       if getattr(self.op, "os_type", None) is None:
4661         raise errors.OpPrereqError("No guest OS specified")
4662
4663   def _RunAllocator(self):
4664     """Run the allocator based on input opcode.
4665
4666     """
4667     nics = [n.ToDict() for n in self.nics]
4668     ial = IAllocator(self,
4669                      mode=constants.IALLOCATOR_MODE_ALLOC,
4670                      name=self.op.instance_name,
4671                      disk_template=self.op.disk_template,
4672                      tags=[],
4673                      os=self.op.os_type,
4674                      vcpus=self.be_full[constants.BE_VCPUS],
4675                      mem_size=self.be_full[constants.BE_MEMORY],
4676                      disks=self.disks,
4677                      nics=nics,
4678                      hypervisor=self.op.hypervisor,
4679                      )
4680
4681     ial.Run(self.op.iallocator)
4682
4683     if not ial.success:
4684       raise errors.OpPrereqError("Can't compute nodes using"
4685                                  " iallocator '%s': %s" % (self.op.iallocator,
4686                                                            ial.info))
4687     if len(ial.nodes) != ial.required_nodes:
4688       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4689                                  " of nodes (%s), required %s" %
4690                                  (self.op.iallocator, len(ial.nodes),
4691                                   ial.required_nodes))
4692     self.op.pnode = ial.nodes[0]
4693     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4694                  self.op.instance_name, self.op.iallocator,
4695                  ", ".join(ial.nodes))
4696     if ial.required_nodes == 2:
4697       self.op.snode = ial.nodes[1]
4698
4699   def BuildHooksEnv(self):
4700     """Build hooks env.
4701
4702     This runs on master, primary and secondary nodes of the instance.
4703
4704     """
4705     env = {
4706       "ADD_MODE": self.op.mode,
4707       }
4708     if self.op.mode == constants.INSTANCE_IMPORT:
4709       env["SRC_NODE"] = self.op.src_node
4710       env["SRC_PATH"] = self.op.src_path
4711       env["SRC_IMAGES"] = self.src_images
4712
4713     env.update(_BuildInstanceHookEnv(
4714       name=self.op.instance_name,
4715       primary_node=self.op.pnode,
4716       secondary_nodes=self.secondaries,
4717       status=self.op.start,
4718       os_type=self.op.os_type,
4719       memory=self.be_full[constants.BE_MEMORY],
4720       vcpus=self.be_full[constants.BE_VCPUS],
4721       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4722       disk_template=self.op.disk_template,
4723       disks=[(d["size"], d["mode"]) for d in self.disks],
4724       bep=self.be_full,
4725       hvp=self.hv_full,
4726       hypervisor=self.op.hypervisor,
4727     ))
4728
4729     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4730           self.secondaries)
4731     return env, nl, nl
4732
4733
4734   def CheckPrereq(self):
4735     """Check prerequisites.
4736
4737     """
4738     if (not self.cfg.GetVGName() and
4739         self.op.disk_template not in constants.DTS_NOT_LVM):
4740       raise errors.OpPrereqError("Cluster does not support lvm-based"
4741                                  " instances")
4742
4743     if self.op.mode == constants.INSTANCE_IMPORT:
4744       src_node = self.op.src_node
4745       src_path = self.op.src_path
4746
4747       if src_node is None:
4748         exp_list = self.rpc.call_export_list(
4749           self.acquired_locks[locking.LEVEL_NODE])
4750         found = False
4751         for node in exp_list:
4752           if not exp_list[node].failed and src_path in exp_list[node].data:
4753             found = True
4754             self.op.src_node = src_node = node
4755             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4756                                                        src_path)
4757             break
4758         if not found:
4759           raise errors.OpPrereqError("No export found for relative path %s" %
4760                                       src_path)
4761
4762       _CheckNodeOnline(self, src_node)
4763       result = self.rpc.call_export_info(src_node, src_path)
4764       result.Raise()
4765       if not result.data:
4766         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4767
4768       export_info = result.data
4769       if not export_info.has_section(constants.INISECT_EXP):
4770         raise errors.ProgrammerError("Corrupted export config")
4771
4772       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4773       if (int(ei_version) != constants.EXPORT_VERSION):
4774         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4775                                    (ei_version, constants.EXPORT_VERSION))
4776
4777       # Check that the new instance doesn't have less disks than the export
4778       instance_disks = len(self.disks)
4779       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4780       if instance_disks < export_disks:
4781         raise errors.OpPrereqError("Not enough disks to import."
4782                                    " (instance: %d, export: %d)" %
4783                                    (instance_disks, export_disks))
4784
4785       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4786       disk_images = []
4787       for idx in range(export_disks):
4788         option = 'disk%d_dump' % idx
4789         if export_info.has_option(constants.INISECT_INS, option):
4790           # FIXME: are the old os-es, disk sizes, etc. useful?
4791           export_name = export_info.get(constants.INISECT_INS, option)
4792           image = os.path.join(src_path, export_name)
4793           disk_images.append(image)
4794         else:
4795           disk_images.append(False)
4796
4797       self.src_images = disk_images
4798
4799       old_name = export_info.get(constants.INISECT_INS, 'name')
4800       # FIXME: int() here could throw a ValueError on broken exports
4801       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4802       if self.op.instance_name == old_name:
4803         for idx, nic in enumerate(self.nics):
4804           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4805             nic_mac_ini = 'nic%d_mac' % idx
4806             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4807
4808     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4809     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4810     if self.op.start and not self.op.ip_check:
4811       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4812                                  " adding an instance in start mode")
4813
4814     if self.op.ip_check:
4815       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4816         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4817                                    (self.check_ip, self.op.instance_name))
4818
4819     #### mac address generation
4820     # By generating here the mac address both the allocator and the hooks get
4821     # the real final mac address rather than the 'auto' or 'generate' value.
4822     # There is a race condition between the generation and the instance object
4823     # creation, which means that we know the mac is valid now, but we're not
4824     # sure it will be when we actually add the instance. If things go bad
4825     # adding the instance will abort because of a duplicate mac, and the
4826     # creation job will fail.
4827     for nic in self.nics:
4828       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4829         nic.mac = self.cfg.GenerateMAC()
4830
4831     #### allocator run
4832
4833     if self.op.iallocator is not None:
4834       self._RunAllocator()
4835
4836     #### node related checks
4837
4838     # check primary node
4839     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4840     assert self.pnode is not None, \
4841       "Cannot retrieve locked node %s" % self.op.pnode
4842     if pnode.offline:
4843       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4844                                  pnode.name)
4845     if pnode.drained:
4846       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4847                                  pnode.name)
4848
4849     self.secondaries = []
4850
4851     # mirror node verification
4852     if self.op.disk_template in constants.DTS_NET_MIRROR:
4853       if self.op.snode is None:
4854         raise errors.OpPrereqError("The networked disk templates need"
4855                                    " a mirror node")
4856       if self.op.snode == pnode.name:
4857         raise errors.OpPrereqError("The secondary node cannot be"
4858                                    " the primary node.")
4859       _CheckNodeOnline(self, self.op.snode)
4860       _CheckNodeNotDrained(self, self.op.snode)
4861       self.secondaries.append(self.op.snode)
4862
4863     nodenames = [pnode.name] + self.secondaries
4864
4865     req_size = _ComputeDiskSize(self.op.disk_template,
4866                                 self.disks)
4867
4868     # Check lv size requirements
4869     if req_size is not None:
4870       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4871                                          self.op.hypervisor)
4872       for node in nodenames:
4873         info = nodeinfo[node]
4874         info.Raise()
4875         info = info.data
4876         if not info:
4877           raise errors.OpPrereqError("Cannot get current information"
4878                                      " from node '%s'" % node)
4879         vg_free = info.get('vg_free', None)
4880         if not isinstance(vg_free, int):
4881           raise errors.OpPrereqError("Can't compute free disk space on"
4882                                      " node %s" % node)
4883         if req_size > info['vg_free']:
4884           raise errors.OpPrereqError("Not enough disk space on target node %s."
4885                                      " %d MB available, %d MB required" %
4886                                      (node, info['vg_free'], req_size))
4887
4888     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4889
4890     # os verification
4891     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4892     result.Raise()
4893     if not isinstance(result.data, objects.OS) or not result.data:
4894       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4895                                  " primary node"  % self.op.os_type)
4896
4897     # bridge check on primary node
4898     bridges = [n.bridge for n in self.nics]
4899     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4900     result.Raise()
4901     if not result.data:
4902       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4903                                  " exist on destination node '%s'" %
4904                                  (",".join(bridges), pnode.name))
4905
4906     # memory check on primary node
4907     if self.op.start:
4908       _CheckNodeFreeMemory(self, self.pnode.name,
4909                            "creating instance %s" % self.op.instance_name,
4910                            self.be_full[constants.BE_MEMORY],
4911                            self.op.hypervisor)
4912
4913   def Exec(self, feedback_fn):
4914     """Create and add the instance to the cluster.
4915
4916     """
4917     instance = self.op.instance_name
4918     pnode_name = self.pnode.name
4919
4920     ht_kind = self.op.hypervisor
4921     if ht_kind in constants.HTS_REQ_PORT:
4922       network_port = self.cfg.AllocatePort()
4923     else:
4924       network_port = None
4925
4926     ##if self.op.vnc_bind_address is None:
4927     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4928
4929     # this is needed because os.path.join does not accept None arguments
4930     if self.op.file_storage_dir is None:
4931       string_file_storage_dir = ""
4932     else:
4933       string_file_storage_dir = self.op.file_storage_dir
4934
4935     # build the full file storage dir path
4936     file_storage_dir = os.path.normpath(os.path.join(
4937                                         self.cfg.GetFileStorageDir(),
4938                                         string_file_storage_dir, instance))
4939
4940
4941     disks = _GenerateDiskTemplate(self,
4942                                   self.op.disk_template,
4943                                   instance, pnode_name,
4944                                   self.secondaries,
4945                                   self.disks,
4946                                   file_storage_dir,
4947                                   self.op.file_driver,
4948                                   0)
4949
4950     iobj = objects.Instance(name=instance, os=self.op.os_type,
4951                             primary_node=pnode_name,
4952                             nics=self.nics, disks=disks,
4953                             disk_template=self.op.disk_template,
4954                             admin_up=False,
4955                             network_port=network_port,
4956                             beparams=self.op.beparams,
4957                             hvparams=self.op.hvparams,
4958                             hypervisor=self.op.hypervisor,
4959                             )
4960
4961     feedback_fn("* creating instance disks...")
4962     try:
4963       _CreateDisks(self, iobj)
4964     except errors.OpExecError:
4965       self.LogWarning("Device creation failed, reverting...")
4966       try:
4967         _RemoveDisks(self, iobj)
4968       finally:
4969         self.cfg.ReleaseDRBDMinors(instance)
4970         raise
4971
4972     feedback_fn("adding instance %s to cluster config" % instance)
4973
4974     self.cfg.AddInstance(iobj)
4975     # Declare that we don't want to remove the instance lock anymore, as we've
4976     # added the instance to the config
4977     del self.remove_locks[locking.LEVEL_INSTANCE]
4978     # Unlock all the nodes
4979     if self.op.mode == constants.INSTANCE_IMPORT:
4980       nodes_keep = [self.op.src_node]
4981       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4982                        if node != self.op.src_node]
4983       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4984       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4985     else:
4986       self.context.glm.release(locking.LEVEL_NODE)
4987       del self.acquired_locks[locking.LEVEL_NODE]
4988
4989     if self.op.wait_for_sync:
4990       disk_abort = not _WaitForSync(self, iobj)
4991     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4992       # make sure the disks are not degraded (still sync-ing is ok)
4993       time.sleep(15)
4994       feedback_fn("* checking mirrors status")
4995       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4996     else:
4997       disk_abort = False
4998
4999     if disk_abort:
5000       _RemoveDisks(self, iobj)
5001       self.cfg.RemoveInstance(iobj.name)
5002       # Make sure the instance lock gets removed
5003       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5004       raise errors.OpExecError("There are some degraded disks for"
5005                                " this instance")
5006
5007     feedback_fn("creating os for instance %s on node %s" %
5008                 (instance, pnode_name))
5009
5010     if iobj.disk_template != constants.DT_DISKLESS:
5011       if self.op.mode == constants.INSTANCE_CREATE:
5012         feedback_fn("* running the instance OS create scripts...")
5013         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5014         msg = result.RemoteFailMsg()
5015         if msg:
5016           raise errors.OpExecError("Could not add os for instance %s"
5017                                    " on node %s: %s" %
5018                                    (instance, pnode_name, msg))
5019
5020       elif self.op.mode == constants.INSTANCE_IMPORT:
5021         feedback_fn("* running the instance OS import scripts...")
5022         src_node = self.op.src_node
5023         src_images = self.src_images
5024         cluster_name = self.cfg.GetClusterName()
5025         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5026                                                          src_node, src_images,
5027                                                          cluster_name)
5028         import_result.Raise()
5029         for idx, result in enumerate(import_result.data):
5030           if not result:
5031             self.LogWarning("Could not import the image %s for instance"
5032                             " %s, disk %d, on node %s" %
5033                             (src_images[idx], instance, idx, pnode_name))
5034       else:
5035         # also checked in the prereq part
5036         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5037                                      % self.op.mode)
5038
5039     if self.op.start:
5040       iobj.admin_up = True
5041       self.cfg.Update(iobj)
5042       logging.info("Starting instance %s on node %s", instance, pnode_name)
5043       feedback_fn("* starting instance...")
5044       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5045       msg = result.RemoteFailMsg()
5046       if msg:
5047         raise errors.OpExecError("Could not start instance: %s" % msg)
5048
5049
5050 class LUConnectConsole(NoHooksLU):
5051   """Connect to an instance's console.
5052
5053   This is somewhat special in that it returns the command line that
5054   you need to run on the master node in order to connect to the
5055   console.
5056
5057   """
5058   _OP_REQP = ["instance_name"]
5059   REQ_BGL = False
5060
5061   def ExpandNames(self):
5062     self._ExpandAndLockInstance()
5063
5064   def CheckPrereq(self):
5065     """Check prerequisites.
5066
5067     This checks that the instance is in the cluster.
5068
5069     """
5070     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5071     assert self.instance is not None, \
5072       "Cannot retrieve locked instance %s" % self.op.instance_name
5073     _CheckNodeOnline(self, self.instance.primary_node)
5074
5075   def Exec(self, feedback_fn):
5076     """Connect to the console of an instance
5077
5078     """
5079     instance = self.instance
5080     node = instance.primary_node
5081
5082     node_insts = self.rpc.call_instance_list([node],
5083                                              [instance.hypervisor])[node]
5084     node_insts.Raise()
5085
5086     if instance.name not in node_insts.data:
5087       raise errors.OpExecError("Instance %s is not running." % instance.name)
5088
5089     logging.debug("Connecting to console of %s on %s", instance.name, node)
5090
5091     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5092     cluster = self.cfg.GetClusterInfo()
5093     # beparams and hvparams are passed separately, to avoid editing the
5094     # instance and then saving the defaults in the instance itself.
5095     hvparams = cluster.FillHV(instance)
5096     beparams = cluster.FillBE(instance)
5097     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5098
5099     # build ssh cmdline
5100     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5101
5102
5103 class LUReplaceDisks(LogicalUnit):
5104   """Replace the disks of an instance.
5105
5106   """
5107   HPATH = "mirrors-replace"
5108   HTYPE = constants.HTYPE_INSTANCE
5109   _OP_REQP = ["instance_name", "mode", "disks"]
5110   REQ_BGL = False
5111
5112   def CheckArguments(self):
5113     if not hasattr(self.op, "remote_node"):
5114       self.op.remote_node = None
5115     if not hasattr(self.op, "iallocator"):
5116       self.op.iallocator = None
5117
5118     # check for valid parameter combination
5119     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5120     if self.op.mode == constants.REPLACE_DISK_CHG:
5121       if cnt == 2:
5122         raise errors.OpPrereqError("When changing the secondary either an"
5123                                    " iallocator script must be used or the"
5124                                    " new node given")
5125       elif cnt == 0:
5126         raise errors.OpPrereqError("Give either the iallocator or the new"
5127                                    " secondary, not both")
5128     else: # not replacing the secondary
5129       if cnt != 2:
5130         raise errors.OpPrereqError("The iallocator and new node options can"
5131                                    " be used only when changing the"
5132                                    " secondary node")
5133
5134   def ExpandNames(self):
5135     self._ExpandAndLockInstance()
5136
5137     if self.op.iallocator is not None:
5138       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5139     elif self.op.remote_node is not None:
5140       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5141       if remote_node is None:
5142         raise errors.OpPrereqError("Node '%s' not known" %
5143                                    self.op.remote_node)
5144       self.op.remote_node = remote_node
5145       # Warning: do not remove the locking of the new secondary here
5146       # unless DRBD8.AddChildren is changed to work in parallel;
5147       # currently it doesn't since parallel invocations of
5148       # FindUnusedMinor will conflict
5149       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5150       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5151     else:
5152       self.needed_locks[locking.LEVEL_NODE] = []
5153       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5154
5155   def DeclareLocks(self, level):
5156     # If we're not already locking all nodes in the set we have to declare the
5157     # instance's primary/secondary nodes.
5158     if (level == locking.LEVEL_NODE and
5159         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5160       self._LockInstancesNodes()
5161
5162   def _RunAllocator(self):
5163     """Compute a new secondary node using an IAllocator.
5164
5165     """
5166     ial = IAllocator(self,
5167                      mode=constants.IALLOCATOR_MODE_RELOC,
5168                      name=self.op.instance_name,
5169                      relocate_from=[self.sec_node])
5170
5171     ial.Run(self.op.iallocator)
5172
5173     if not ial.success:
5174       raise errors.OpPrereqError("Can't compute nodes using"
5175                                  " iallocator '%s': %s" % (self.op.iallocator,
5176                                                            ial.info))
5177     if len(ial.nodes) != ial.required_nodes:
5178       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5179                                  " of nodes (%s), required %s" %
5180                                  (len(ial.nodes), ial.required_nodes))
5181     self.op.remote_node = ial.nodes[0]
5182     self.LogInfo("Selected new secondary for the instance: %s",
5183                  self.op.remote_node)
5184
5185   def BuildHooksEnv(self):
5186     """Build hooks env.
5187
5188     This runs on the master, the primary and all the secondaries.
5189
5190     """
5191     env = {
5192       "MODE": self.op.mode,
5193       "NEW_SECONDARY": self.op.remote_node,
5194       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5195       }
5196     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5197     nl = [
5198       self.cfg.GetMasterNode(),
5199       self.instance.primary_node,
5200       ]
5201     if self.op.remote_node is not None:
5202       nl.append(self.op.remote_node)
5203     return env, nl, nl
5204
5205   def CheckPrereq(self):
5206     """Check prerequisites.
5207
5208     This checks that the instance is in the cluster.
5209
5210     """
5211     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5212     assert instance is not None, \
5213       "Cannot retrieve locked instance %s" % self.op.instance_name
5214     self.instance = instance
5215
5216     if instance.disk_template != constants.DT_DRBD8:
5217       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5218                                  " instances")
5219
5220     if len(instance.secondary_nodes) != 1:
5221       raise errors.OpPrereqError("The instance has a strange layout,"
5222                                  " expected one secondary but found %d" %
5223                                  len(instance.secondary_nodes))
5224
5225     self.sec_node = instance.secondary_nodes[0]
5226
5227     if self.op.iallocator is not None:
5228       self._RunAllocator()
5229
5230     remote_node = self.op.remote_node
5231     if remote_node is not None:
5232       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5233       assert self.remote_node_info is not None, \
5234         "Cannot retrieve locked node %s" % remote_node
5235     else:
5236       self.remote_node_info = None
5237     if remote_node == instance.primary_node:
5238       raise errors.OpPrereqError("The specified node is the primary node of"
5239                                  " the instance.")
5240     elif remote_node == self.sec_node:
5241       raise errors.OpPrereqError("The specified node is already the"
5242                                  " secondary node of the instance.")
5243
5244     if self.op.mode == constants.REPLACE_DISK_PRI:
5245       n1 = self.tgt_node = instance.primary_node
5246       n2 = self.oth_node = self.sec_node
5247     elif self.op.mode == constants.REPLACE_DISK_SEC:
5248       n1 = self.tgt_node = self.sec_node
5249       n2 = self.oth_node = instance.primary_node
5250     elif self.op.mode == constants.REPLACE_DISK_CHG:
5251       n1 = self.new_node = remote_node
5252       n2 = self.oth_node = instance.primary_node
5253       self.tgt_node = self.sec_node
5254       _CheckNodeNotDrained(self, remote_node)
5255     else:
5256       raise errors.ProgrammerError("Unhandled disk replace mode")
5257
5258     _CheckNodeOnline(self, n1)
5259     _CheckNodeOnline(self, n2)
5260
5261     if not self.op.disks:
5262       self.op.disks = range(len(instance.disks))
5263
5264     for disk_idx in self.op.disks:
5265       instance.FindDisk(disk_idx)
5266
5267   def _ExecD8DiskOnly(self, feedback_fn):
5268     """Replace a disk on the primary or secondary for dbrd8.
5269
5270     The algorithm for replace is quite complicated:
5271
5272       1. for each disk to be replaced:
5273
5274         1. create new LVs on the target node with unique names
5275         1. detach old LVs from the drbd device
5276         1. rename old LVs to name_replaced.<time_t>
5277         1. rename new LVs to old LVs
5278         1. attach the new LVs (with the old names now) to the drbd device
5279
5280       1. wait for sync across all devices
5281
5282       1. for each modified disk:
5283
5284         1. remove old LVs (which have the name name_replaces.<time_t>)
5285
5286     Failures are not very well handled.
5287
5288     """
5289     steps_total = 6
5290     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5291     instance = self.instance
5292     iv_names = {}
5293     vgname = self.cfg.GetVGName()
5294     # start of work
5295     cfg = self.cfg
5296     tgt_node = self.tgt_node
5297     oth_node = self.oth_node
5298
5299     # Step: check device activation
5300     self.proc.LogStep(1, steps_total, "check device existence")
5301     info("checking volume groups")
5302     my_vg = cfg.GetVGName()
5303     results = self.rpc.call_vg_list([oth_node, tgt_node])
5304     if not results:
5305       raise errors.OpExecError("Can't list volume groups on the nodes")
5306     for node in oth_node, tgt_node:
5307       res = results[node]
5308       if res.failed or not res.data or my_vg not in res.data:
5309         raise errors.OpExecError("Volume group '%s' not found on %s" %
5310                                  (my_vg, node))
5311     for idx, dev in enumerate(instance.disks):
5312       if idx not in self.op.disks:
5313         continue
5314       for node in tgt_node, oth_node:
5315         info("checking disk/%d on %s" % (idx, node))
5316         cfg.SetDiskID(dev, node)
5317         result = self.rpc.call_blockdev_find(node, dev)
5318         msg = result.RemoteFailMsg()
5319         if not msg and not result.payload:
5320           msg = "disk not found"
5321         if msg:
5322           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5323                                    (idx, node, msg))
5324
5325     # Step: check other node consistency
5326     self.proc.LogStep(2, steps_total, "check peer consistency")
5327     for idx, dev in enumerate(instance.disks):
5328       if idx not in self.op.disks:
5329         continue
5330       info("checking disk/%d consistency on %s" % (idx, oth_node))
5331       if not _CheckDiskConsistency(self, dev, oth_node,
5332                                    oth_node==instance.primary_node):
5333         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5334                                  " to replace disks on this node (%s)" %
5335                                  (oth_node, tgt_node))
5336
5337     # Step: create new storage
5338     self.proc.LogStep(3, steps_total, "allocate new storage")
5339     for idx, dev in enumerate(instance.disks):
5340       if idx not in self.op.disks:
5341         continue
5342       size = dev.size
5343       cfg.SetDiskID(dev, tgt_node)
5344       lv_names = [".disk%d_%s" % (idx, suf)
5345                   for suf in ["data", "meta"]]
5346       names = _GenerateUniqueNames(self, lv_names)
5347       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5348                              logical_id=(vgname, names[0]))
5349       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5350                              logical_id=(vgname, names[1]))
5351       new_lvs = [lv_data, lv_meta]
5352       old_lvs = dev.children
5353       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5354       info("creating new local storage on %s for %s" %
5355            (tgt_node, dev.iv_name))
5356       # we pass force_create=True to force the LVM creation
5357       for new_lv in new_lvs:
5358         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5359                         _GetInstanceInfoText(instance), False)
5360
5361     # Step: for each lv, detach+rename*2+attach
5362     self.proc.LogStep(4, steps_total, "change drbd configuration")
5363     for dev, old_lvs, new_lvs in iv_names.itervalues():
5364       info("detaching %s drbd from local storage" % dev.iv_name)
5365       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5366       result.Raise()
5367       if not result.data:
5368         raise errors.OpExecError("Can't detach drbd from local storage on node"
5369                                  " %s for device %s" % (tgt_node, dev.iv_name))
5370       #dev.children = []
5371       #cfg.Update(instance)
5372
5373       # ok, we created the new LVs, so now we know we have the needed
5374       # storage; as such, we proceed on the target node to rename
5375       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5376       # using the assumption that logical_id == physical_id (which in
5377       # turn is the unique_id on that node)
5378
5379       # FIXME(iustin): use a better name for the replaced LVs
5380       temp_suffix = int(time.time())
5381       ren_fn = lambda d, suff: (d.physical_id[0],
5382                                 d.physical_id[1] + "_replaced-%s" % suff)
5383       # build the rename list based on what LVs exist on the node
5384       rlist = []
5385       for to_ren in old_lvs:
5386         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5387         if not result.RemoteFailMsg() and result.payload:
5388           # device exists
5389           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5390
5391       info("renaming the old LVs on the target node")
5392       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5393       result.Raise()
5394       if not result.data:
5395         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5396       # now we rename the new LVs to the old LVs
5397       info("renaming the new LVs on the target node")
5398       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5399       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5400       result.Raise()
5401       if not result.data:
5402         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5403
5404       for old, new in zip(old_lvs, new_lvs):
5405         new.logical_id = old.logical_id
5406         cfg.SetDiskID(new, tgt_node)
5407
5408       for disk in old_lvs:
5409         disk.logical_id = ren_fn(disk, temp_suffix)
5410         cfg.SetDiskID(disk, tgt_node)
5411
5412       # now that the new lvs have the old name, we can add them to the device
5413       info("adding new mirror component on %s" % tgt_node)
5414       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5415       if result.failed or not result.data:
5416         for new_lv in new_lvs:
5417           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5418           if msg:
5419             warning("Can't rollback device %s: %s", dev, msg,
5420                     hint="cleanup manually the unused logical volumes")
5421         raise errors.OpExecError("Can't add local storage to drbd")
5422
5423       dev.children = new_lvs
5424       cfg.Update(instance)
5425
5426     # Step: wait for sync
5427
5428     # this can fail as the old devices are degraded and _WaitForSync
5429     # does a combined result over all disks, so we don't check its
5430     # return value
5431     self.proc.LogStep(5, steps_total, "sync devices")
5432     _WaitForSync(self, instance, unlock=True)
5433
5434     # so check manually all the devices
5435     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5436       cfg.SetDiskID(dev, instance.primary_node)
5437       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5438       msg = result.RemoteFailMsg()
5439       if not msg and not result.payload:
5440         msg = "disk not found"
5441       if msg:
5442         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5443                                  (name, msg))
5444       if result.payload[5]:
5445         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5446
5447     # Step: remove old storage
5448     self.proc.LogStep(6, steps_total, "removing old storage")
5449     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5450       info("remove logical volumes for %s" % name)
5451       for lv in old_lvs:
5452         cfg.SetDiskID(lv, tgt_node)
5453         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5454         if msg:
5455           warning("Can't remove old LV: %s" % msg,
5456                   hint="manually remove unused LVs")
5457           continue
5458
5459   def _ExecD8Secondary(self, feedback_fn):
5460     """Replace the secondary node for drbd8.
5461
5462     The algorithm for replace is quite complicated:
5463       - for all disks of the instance:
5464         - create new LVs on the new node with same names
5465         - shutdown the drbd device on the old secondary
5466         - disconnect the drbd network on the primary
5467         - create the drbd device on the new secondary
5468         - network attach the drbd on the primary, using an artifice:
5469           the drbd code for Attach() will connect to the network if it
5470           finds a device which is connected to the good local disks but
5471           not network enabled
5472       - wait for sync across all devices
5473       - remove all disks from the old secondary
5474
5475     Failures are not very well handled.
5476
5477     """
5478     steps_total = 6
5479     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5480     instance = self.instance
5481     iv_names = {}
5482     # start of work
5483     cfg = self.cfg
5484     old_node = self.tgt_node
5485     new_node = self.new_node
5486     pri_node = instance.primary_node
5487     nodes_ip = {
5488       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5489       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5490       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5491       }
5492
5493     # Step: check device activation
5494     self.proc.LogStep(1, steps_total, "check device existence")
5495     info("checking volume groups")
5496     my_vg = cfg.GetVGName()
5497     results = self.rpc.call_vg_list([pri_node, new_node])
5498     for node in pri_node, new_node:
5499       res = results[node]
5500       if res.failed or not res.data or my_vg not in res.data:
5501         raise errors.OpExecError("Volume group '%s' not found on %s" %
5502                                  (my_vg, node))
5503     for idx, dev in enumerate(instance.disks):
5504       if idx not in self.op.disks:
5505         continue
5506       info("checking disk/%d on %s" % (idx, pri_node))
5507       cfg.SetDiskID(dev, pri_node)
5508       result = self.rpc.call_blockdev_find(pri_node, dev)
5509       msg = result.RemoteFailMsg()
5510       if not msg and not result.payload:
5511         msg = "disk not found"
5512       if msg:
5513         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5514                                  (idx, pri_node, msg))
5515
5516     # Step: check other node consistency
5517     self.proc.LogStep(2, steps_total, "check peer consistency")
5518     for idx, dev in enumerate(instance.disks):
5519       if idx not in self.op.disks:
5520         continue
5521       info("checking disk/%d consistency on %s" % (idx, pri_node))
5522       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5523         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5524                                  " unsafe to replace the secondary" %
5525                                  pri_node)
5526
5527     # Step: create new storage
5528     self.proc.LogStep(3, steps_total, "allocate new storage")
5529     for idx, dev in enumerate(instance.disks):
5530       info("adding new local storage on %s for disk/%d" %
5531            (new_node, idx))
5532       # we pass force_create=True to force LVM creation
5533       for new_lv in dev.children:
5534         _CreateBlockDev(self, new_node, instance, new_lv, True,
5535                         _GetInstanceInfoText(instance), False)
5536
5537     # Step 4: dbrd minors and drbd setups changes
5538     # after this, we must manually remove the drbd minors on both the
5539     # error and the success paths
5540     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5541                                    instance.name)
5542     logging.debug("Allocated minors %s" % (minors,))
5543     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5544     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5545       size = dev.size
5546       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5547       # create new devices on new_node; note that we create two IDs:
5548       # one without port, so the drbd will be activated without
5549       # networking information on the new node at this stage, and one
5550       # with network, for the latter activation in step 4
5551       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5552       if pri_node == o_node1:
5553         p_minor = o_minor1
5554       else:
5555         p_minor = o_minor2
5556
5557       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5558       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5559
5560       iv_names[idx] = (dev, dev.children, new_net_id)
5561       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5562                     new_net_id)
5563       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5564                               logical_id=new_alone_id,
5565                               children=dev.children,
5566                               size=dev.size)
5567       try:
5568         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5569                               _GetInstanceInfoText(instance), False)
5570       except errors.GenericError:
5571         self.cfg.ReleaseDRBDMinors(instance.name)
5572         raise
5573
5574     for idx, dev in enumerate(instance.disks):
5575       # we have new devices, shutdown the drbd on the old secondary
5576       info("shutting down drbd for disk/%d on old node" % idx)
5577       cfg.SetDiskID(dev, old_node)
5578       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5579       if msg:
5580         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5581                 (idx, msg),
5582                 hint="Please cleanup this device manually as soon as possible")
5583
5584     info("detaching primary drbds from the network (=> standalone)")
5585     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5586                                                instance.disks)[pri_node]
5587
5588     msg = result.RemoteFailMsg()
5589     if msg:
5590       # detaches didn't succeed (unlikely)
5591       self.cfg.ReleaseDRBDMinors(instance.name)
5592       raise errors.OpExecError("Can't detach the disks from the network on"
5593                                " old node: %s" % (msg,))
5594
5595     # if we managed to detach at least one, we update all the disks of
5596     # the instance to point to the new secondary
5597     info("updating instance configuration")
5598     for dev, _, new_logical_id in iv_names.itervalues():
5599       dev.logical_id = new_logical_id
5600       cfg.SetDiskID(dev, pri_node)
5601     cfg.Update(instance)
5602
5603     # and now perform the drbd attach
5604     info("attaching primary drbds to new secondary (standalone => connected)")
5605     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5606                                            instance.disks, instance.name,
5607                                            False)
5608     for to_node, to_result in result.items():
5609       msg = to_result.RemoteFailMsg()
5610       if msg:
5611         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5612                 hint="please do a gnt-instance info to see the"
5613                 " status of disks")
5614
5615     # this can fail as the old devices are degraded and _WaitForSync
5616     # does a combined result over all disks, so we don't check its
5617     # return value
5618     self.proc.LogStep(5, steps_total, "sync devices")
5619     _WaitForSync(self, instance, unlock=True)
5620
5621     # so check manually all the devices
5622     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5623       cfg.SetDiskID(dev, pri_node)
5624       result = self.rpc.call_blockdev_find(pri_node, dev)
5625       msg = result.RemoteFailMsg()
5626       if not msg and not result.payload:
5627         msg = "disk not found"
5628       if msg:
5629         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5630                                  (idx, msg))
5631       if result.payload[5]:
5632         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5633
5634     self.proc.LogStep(6, steps_total, "removing old storage")
5635     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5636       info("remove logical volumes for disk/%d" % idx)
5637       for lv in old_lvs:
5638         cfg.SetDiskID(lv, old_node)
5639         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5640         if msg:
5641           warning("Can't remove LV on old secondary: %s", msg,
5642                   hint="Cleanup stale volumes by hand")
5643
5644   def Exec(self, feedback_fn):
5645     """Execute disk replacement.
5646
5647     This dispatches the disk replacement to the appropriate handler.
5648
5649     """
5650     instance = self.instance
5651
5652     # Activate the instance disks if we're replacing them on a down instance
5653     if not instance.admin_up:
5654       _StartInstanceDisks(self, instance, True)
5655
5656     if self.op.mode == constants.REPLACE_DISK_CHG:
5657       fn = self._ExecD8Secondary
5658     else:
5659       fn = self._ExecD8DiskOnly
5660
5661     ret = fn(feedback_fn)
5662
5663     # Deactivate the instance disks if we're replacing them on a down instance
5664     if not instance.admin_up:
5665       _SafeShutdownInstanceDisks(self, instance)
5666
5667     return ret
5668
5669
5670 class LUGrowDisk(LogicalUnit):
5671   """Grow a disk of an instance.
5672
5673   """
5674   HPATH = "disk-grow"
5675   HTYPE = constants.HTYPE_INSTANCE
5676   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5677   REQ_BGL = False
5678
5679   def ExpandNames(self):
5680     self._ExpandAndLockInstance()
5681     self.needed_locks[locking.LEVEL_NODE] = []
5682     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5683
5684   def DeclareLocks(self, level):
5685     if level == locking.LEVEL_NODE:
5686       self._LockInstancesNodes()
5687
5688   def BuildHooksEnv(self):
5689     """Build hooks env.
5690
5691     This runs on the master, the primary and all the secondaries.
5692
5693     """
5694     env = {
5695       "DISK": self.op.disk,
5696       "AMOUNT": self.op.amount,
5697       }
5698     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5699     nl = [
5700       self.cfg.GetMasterNode(),
5701       self.instance.primary_node,
5702       ]
5703     return env, nl, nl
5704
5705   def CheckPrereq(self):
5706     """Check prerequisites.
5707
5708     This checks that the instance is in the cluster.
5709
5710     """
5711     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5712     assert instance is not None, \
5713       "Cannot retrieve locked instance %s" % self.op.instance_name
5714     nodenames = list(instance.all_nodes)
5715     for node in nodenames:
5716       _CheckNodeOnline(self, node)
5717
5718
5719     self.instance = instance
5720
5721     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5722       raise errors.OpPrereqError("Instance's disk layout does not support"
5723                                  " growing.")
5724
5725     self.disk = instance.FindDisk(self.op.disk)
5726
5727     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5728                                        instance.hypervisor)
5729     for node in nodenames:
5730       info = nodeinfo[node]
5731       if info.failed or not info.data:
5732         raise errors.OpPrereqError("Cannot get current information"
5733                                    " from node '%s'" % node)
5734       vg_free = info.data.get('vg_free', None)
5735       if not isinstance(vg_free, int):
5736         raise errors.OpPrereqError("Can't compute free disk space on"
5737                                    " node %s" % node)
5738       if self.op.amount > vg_free:
5739         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5740                                    " %d MiB available, %d MiB required" %
5741                                    (node, vg_free, self.op.amount))
5742
5743   def Exec(self, feedback_fn):
5744     """Execute disk grow.
5745
5746     """
5747     instance = self.instance
5748     disk = self.disk
5749     for node in instance.all_nodes:
5750       self.cfg.SetDiskID(disk, node)
5751       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5752       msg = result.RemoteFailMsg()
5753       if msg:
5754         raise errors.OpExecError("Grow request failed to node %s: %s" %
5755                                  (node, msg))
5756     disk.RecordGrow(self.op.amount)
5757     self.cfg.Update(instance)
5758     if self.op.wait_for_sync:
5759       disk_abort = not _WaitForSync(self, instance)
5760       if disk_abort:
5761         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5762                              " status.\nPlease check the instance.")
5763
5764
5765 class LUQueryInstanceData(NoHooksLU):
5766   """Query runtime instance data.
5767
5768   """
5769   _OP_REQP = ["instances", "static"]
5770   REQ_BGL = False
5771
5772   def ExpandNames(self):
5773     self.needed_locks = {}
5774     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5775
5776     if not isinstance(self.op.instances, list):
5777       raise errors.OpPrereqError("Invalid argument type 'instances'")
5778
5779     if self.op.instances:
5780       self.wanted_names = []
5781       for name in self.op.instances:
5782         full_name = self.cfg.ExpandInstanceName(name)
5783         if full_name is None:
5784           raise errors.OpPrereqError("Instance '%s' not known" % name)
5785         self.wanted_names.append(full_name)
5786       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5787     else:
5788       self.wanted_names = None
5789       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5790
5791     self.needed_locks[locking.LEVEL_NODE] = []
5792     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5793
5794   def DeclareLocks(self, level):
5795     if level == locking.LEVEL_NODE:
5796       self._LockInstancesNodes()
5797
5798   def CheckPrereq(self):
5799     """Check prerequisites.
5800
5801     This only checks the optional instance list against the existing names.
5802
5803     """
5804     if self.wanted_names is None:
5805       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5806
5807     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5808                              in self.wanted_names]
5809     return
5810
5811   def _ComputeDiskStatus(self, instance, snode, dev):
5812     """Compute block device status.
5813
5814     """
5815     static = self.op.static
5816     if not static:
5817       self.cfg.SetDiskID(dev, instance.primary_node)
5818       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5819       if dev_pstatus.offline:
5820         dev_pstatus = None
5821       else:
5822         msg = dev_pstatus.RemoteFailMsg()
5823         if msg:
5824           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5825                                    (instance.name, msg))
5826         dev_pstatus = dev_pstatus.payload
5827     else:
5828       dev_pstatus = None
5829
5830     if dev.dev_type in constants.LDS_DRBD:
5831       # we change the snode then (otherwise we use the one passed in)
5832       if dev.logical_id[0] == instance.primary_node:
5833         snode = dev.logical_id[1]
5834       else:
5835         snode = dev.logical_id[0]
5836
5837     if snode and not static:
5838       self.cfg.SetDiskID(dev, snode)
5839       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5840       if dev_sstatus.offline:
5841         dev_sstatus = None
5842       else:
5843         msg = dev_sstatus.RemoteFailMsg()
5844         if msg:
5845           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5846                                    (instance.name, msg))
5847         dev_sstatus = dev_sstatus.payload
5848     else:
5849       dev_sstatus = None
5850
5851     if dev.children:
5852       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5853                       for child in dev.children]
5854     else:
5855       dev_children = []
5856
5857     data = {
5858       "iv_name": dev.iv_name,
5859       "dev_type": dev.dev_type,
5860       "logical_id": dev.logical_id,
5861       "physical_id": dev.physical_id,
5862       "pstatus": dev_pstatus,
5863       "sstatus": dev_sstatus,
5864       "children": dev_children,
5865       "mode": dev.mode,
5866       "size": dev.size,
5867       }
5868
5869     return data
5870
5871   def Exec(self, feedback_fn):
5872     """Gather and return data"""
5873     result = {}
5874
5875     cluster = self.cfg.GetClusterInfo()
5876
5877     for instance in self.wanted_instances:
5878       if not self.op.static:
5879         remote_info = self.rpc.call_instance_info(instance.primary_node,
5880                                                   instance.name,
5881                                                   instance.hypervisor)
5882         remote_info.Raise()
5883         remote_info = remote_info.data
5884         if remote_info and "state" in remote_info:
5885           remote_state = "up"
5886         else:
5887           remote_state = "down"
5888       else:
5889         remote_state = None
5890       if instance.admin_up:
5891         config_state = "up"
5892       else:
5893         config_state = "down"
5894
5895       disks = [self._ComputeDiskStatus(instance, None, device)
5896                for device in instance.disks]
5897
5898       idict = {
5899         "name": instance.name,
5900         "config_state": config_state,
5901         "run_state": remote_state,
5902         "pnode": instance.primary_node,
5903         "snodes": instance.secondary_nodes,
5904         "os": instance.os,
5905         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5906         "disks": disks,
5907         "hypervisor": instance.hypervisor,
5908         "network_port": instance.network_port,
5909         "hv_instance": instance.hvparams,
5910         "hv_actual": cluster.FillHV(instance),
5911         "be_instance": instance.beparams,
5912         "be_actual": cluster.FillBE(instance),
5913         }
5914
5915       result[instance.name] = idict
5916
5917     return result
5918
5919
5920 class LUSetInstanceParams(LogicalUnit):
5921   """Modifies an instances's parameters.
5922
5923   """
5924   HPATH = "instance-modify"
5925   HTYPE = constants.HTYPE_INSTANCE
5926   _OP_REQP = ["instance_name"]
5927   REQ_BGL = False
5928
5929   def CheckArguments(self):
5930     if not hasattr(self.op, 'nics'):
5931       self.op.nics = []
5932     if not hasattr(self.op, 'disks'):
5933       self.op.disks = []
5934     if not hasattr(self.op, 'beparams'):
5935       self.op.beparams = {}
5936     if not hasattr(self.op, 'hvparams'):
5937       self.op.hvparams = {}
5938     self.op.force = getattr(self.op, "force", False)
5939     if not (self.op.nics or self.op.disks or
5940             self.op.hvparams or self.op.beparams):
5941       raise errors.OpPrereqError("No changes submitted")
5942
5943     # Disk validation
5944     disk_addremove = 0
5945     for disk_op, disk_dict in self.op.disks:
5946       if disk_op == constants.DDM_REMOVE:
5947         disk_addremove += 1
5948         continue
5949       elif disk_op == constants.DDM_ADD:
5950         disk_addremove += 1
5951       else:
5952         if not isinstance(disk_op, int):
5953           raise errors.OpPrereqError("Invalid disk index")
5954       if disk_op == constants.DDM_ADD:
5955         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5956         if mode not in constants.DISK_ACCESS_SET:
5957           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5958         size = disk_dict.get('size', None)
5959         if size is None:
5960           raise errors.OpPrereqError("Required disk parameter size missing")
5961         try:
5962           size = int(size)
5963         except ValueError, err:
5964           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5965                                      str(err))
5966         disk_dict['size'] = size
5967       else:
5968         # modification of disk
5969         if 'size' in disk_dict:
5970           raise errors.OpPrereqError("Disk size change not possible, use"
5971                                      " grow-disk")
5972
5973     if disk_addremove > 1:
5974       raise errors.OpPrereqError("Only one disk add or remove operation"
5975                                  " supported at a time")
5976
5977     # NIC validation
5978     nic_addremove = 0
5979     for nic_op, nic_dict in self.op.nics:
5980       if nic_op == constants.DDM_REMOVE:
5981         nic_addremove += 1
5982         continue
5983       elif nic_op == constants.DDM_ADD:
5984         nic_addremove += 1
5985       else:
5986         if not isinstance(nic_op, int):
5987           raise errors.OpPrereqError("Invalid nic index")
5988
5989       # nic_dict should be a dict
5990       nic_ip = nic_dict.get('ip', None)
5991       if nic_ip is not None:
5992         if nic_ip.lower() == constants.VALUE_NONE:
5993           nic_dict['ip'] = None
5994         else:
5995           if not utils.IsValidIP(nic_ip):
5996             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5997
5998       if nic_op == constants.DDM_ADD:
5999         nic_bridge = nic_dict.get('bridge', None)
6000         if nic_bridge is None:
6001           nic_dict['bridge'] = self.cfg.GetDefBridge()
6002         nic_mac = nic_dict.get('mac', None)
6003         if nic_mac is None:
6004           nic_dict['mac'] = constants.VALUE_AUTO
6005
6006       if 'mac' in nic_dict:
6007         nic_mac = nic_dict['mac']
6008         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6009           if not utils.IsValidMac(nic_mac):
6010             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6011         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6012           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6013                                      " modifying an existing nic")
6014
6015     if nic_addremove > 1:
6016       raise errors.OpPrereqError("Only one NIC add or remove operation"
6017                                  " supported at a time")
6018
6019   def ExpandNames(self):
6020     self._ExpandAndLockInstance()
6021     self.needed_locks[locking.LEVEL_NODE] = []
6022     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6023
6024   def DeclareLocks(self, level):
6025     if level == locking.LEVEL_NODE:
6026       self._LockInstancesNodes()
6027
6028   def BuildHooksEnv(self):
6029     """Build hooks env.
6030
6031     This runs on the master, primary and secondaries.
6032
6033     """
6034     args = dict()
6035     if constants.BE_MEMORY in self.be_new:
6036       args['memory'] = self.be_new[constants.BE_MEMORY]
6037     if constants.BE_VCPUS in self.be_new:
6038       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6039     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6040     # information at all.
6041     if self.op.nics:
6042       args['nics'] = []
6043       nic_override = dict(self.op.nics)
6044       for idx, nic in enumerate(self.instance.nics):
6045         if idx in nic_override:
6046           this_nic_override = nic_override[idx]
6047         else:
6048           this_nic_override = {}
6049         if 'ip' in this_nic_override:
6050           ip = this_nic_override['ip']
6051         else:
6052           ip = nic.ip
6053         if 'bridge' in this_nic_override:
6054           bridge = this_nic_override['bridge']
6055         else:
6056           bridge = nic.bridge
6057         if 'mac' in this_nic_override:
6058           mac = this_nic_override['mac']
6059         else:
6060           mac = nic.mac
6061         args['nics'].append((ip, bridge, mac))
6062       if constants.DDM_ADD in nic_override:
6063         ip = nic_override[constants.DDM_ADD].get('ip', None)
6064         bridge = nic_override[constants.DDM_ADD]['bridge']
6065         mac = nic_override[constants.DDM_ADD]['mac']
6066         args['nics'].append((ip, bridge, mac))
6067       elif constants.DDM_REMOVE in nic_override:
6068         del args['nics'][-1]
6069
6070     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6071     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6072     return env, nl, nl
6073
6074   def CheckPrereq(self):
6075     """Check prerequisites.
6076
6077     This only checks the instance list against the existing names.
6078
6079     """
6080     force = self.force = self.op.force
6081
6082     # checking the new params on the primary/secondary nodes
6083
6084     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6085     assert self.instance is not None, \
6086       "Cannot retrieve locked instance %s" % self.op.instance_name
6087     pnode = instance.primary_node
6088     nodelist = list(instance.all_nodes)
6089
6090     # hvparams processing
6091     if self.op.hvparams:
6092       i_hvdict = copy.deepcopy(instance.hvparams)
6093       for key, val in self.op.hvparams.iteritems():
6094         if val == constants.VALUE_DEFAULT:
6095           try:
6096             del i_hvdict[key]
6097           except KeyError:
6098             pass
6099         else:
6100           i_hvdict[key] = val
6101       cluster = self.cfg.GetClusterInfo()
6102       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6103       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6104                                 i_hvdict)
6105       # local check
6106       hypervisor.GetHypervisor(
6107         instance.hypervisor).CheckParameterSyntax(hv_new)
6108       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6109       self.hv_new = hv_new # the new actual values
6110       self.hv_inst = i_hvdict # the new dict (without defaults)
6111     else:
6112       self.hv_new = self.hv_inst = {}
6113
6114     # beparams processing
6115     if self.op.beparams:
6116       i_bedict = copy.deepcopy(instance.beparams)
6117       for key, val in self.op.beparams.iteritems():
6118         if val == constants.VALUE_DEFAULT:
6119           try:
6120             del i_bedict[key]
6121           except KeyError:
6122             pass
6123         else:
6124           i_bedict[key] = val
6125       cluster = self.cfg.GetClusterInfo()
6126       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6127       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6128                                 i_bedict)
6129       self.be_new = be_new # the new actual values
6130       self.be_inst = i_bedict # the new dict (without defaults)
6131     else:
6132       self.be_new = self.be_inst = {}
6133
6134     self.warn = []
6135
6136     if constants.BE_MEMORY in self.op.beparams and not self.force:
6137       mem_check_list = [pnode]
6138       if be_new[constants.BE_AUTO_BALANCE]:
6139         # either we changed auto_balance to yes or it was from before
6140         mem_check_list.extend(instance.secondary_nodes)
6141       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6142                                                   instance.hypervisor)
6143       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6144                                          instance.hypervisor)
6145       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6146         # Assume the primary node is unreachable and go ahead
6147         self.warn.append("Can't get info from primary node %s" % pnode)
6148       else:
6149         if not instance_info.failed and instance_info.data:
6150           current_mem = int(instance_info.data['memory'])
6151         else:
6152           # Assume instance not running
6153           # (there is a slight race condition here, but it's not very probable,
6154           # and we have no other way to check)
6155           current_mem = 0
6156         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6157                     nodeinfo[pnode].data['memory_free'])
6158         if miss_mem > 0:
6159           raise errors.OpPrereqError("This change will prevent the instance"
6160                                      " from starting, due to %d MB of memory"
6161                                      " missing on its primary node" % miss_mem)
6162
6163       if be_new[constants.BE_AUTO_BALANCE]:
6164         for node, nres in nodeinfo.iteritems():
6165           if node not in instance.secondary_nodes:
6166             continue
6167           if nres.failed or not isinstance(nres.data, dict):
6168             self.warn.append("Can't get info from secondary node %s" % node)
6169           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6170             self.warn.append("Not enough memory to failover instance to"
6171                              " secondary node %s" % node)
6172
6173     # NIC processing
6174     for nic_op, nic_dict in self.op.nics:
6175       if nic_op == constants.DDM_REMOVE:
6176         if not instance.nics:
6177           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6178         continue
6179       if nic_op != constants.DDM_ADD:
6180         # an existing nic
6181         if nic_op < 0 or nic_op >= len(instance.nics):
6182           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6183                                      " are 0 to %d" %
6184                                      (nic_op, len(instance.nics)))
6185       if 'bridge' in nic_dict:
6186         nic_bridge = nic_dict['bridge']
6187         if nic_bridge is None:
6188           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6189         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6190           msg = ("Bridge '%s' doesn't exist on one of"
6191                  " the instance nodes" % nic_bridge)
6192           if self.force:
6193             self.warn.append(msg)
6194           else:
6195             raise errors.OpPrereqError(msg)
6196       if 'mac' in nic_dict:
6197         nic_mac = nic_dict['mac']
6198         if nic_mac is None:
6199           raise errors.OpPrereqError('Cannot set the nic mac to None')
6200         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6201           # otherwise generate the mac
6202           nic_dict['mac'] = self.cfg.GenerateMAC()
6203         else:
6204           # or validate/reserve the current one
6205           if self.cfg.IsMacInUse(nic_mac):
6206             raise errors.OpPrereqError("MAC address %s already in use"
6207                                        " in cluster" % nic_mac)
6208
6209     # DISK processing
6210     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6211       raise errors.OpPrereqError("Disk operations not supported for"
6212                                  " diskless instances")
6213     for disk_op, disk_dict in self.op.disks:
6214       if disk_op == constants.DDM_REMOVE:
6215         if len(instance.disks) == 1:
6216           raise errors.OpPrereqError("Cannot remove the last disk of"
6217                                      " an instance")
6218         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6219         ins_l = ins_l[pnode]
6220         if ins_l.failed or not isinstance(ins_l.data, list):
6221           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6222         if instance.name in ins_l.data:
6223           raise errors.OpPrereqError("Instance is running, can't remove"
6224                                      " disks.")
6225
6226       if (disk_op == constants.DDM_ADD and
6227           len(instance.nics) >= constants.MAX_DISKS):
6228         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6229                                    " add more" % constants.MAX_DISKS)
6230       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6231         # an existing disk
6232         if disk_op < 0 or disk_op >= len(instance.disks):
6233           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6234                                      " are 0 to %d" %
6235                                      (disk_op, len(instance.disks)))
6236
6237     return
6238
6239   def Exec(self, feedback_fn):
6240     """Modifies an instance.
6241
6242     All parameters take effect only at the next restart of the instance.
6243
6244     """
6245     # Process here the warnings from CheckPrereq, as we don't have a
6246     # feedback_fn there.
6247     for warn in self.warn:
6248       feedback_fn("WARNING: %s" % warn)
6249
6250     result = []
6251     instance = self.instance
6252     # disk changes
6253     for disk_op, disk_dict in self.op.disks:
6254       if disk_op == constants.DDM_REMOVE:
6255         # remove the last disk
6256         device = instance.disks.pop()
6257         device_idx = len(instance.disks)
6258         for node, disk in device.ComputeNodeTree(instance.primary_node):
6259           self.cfg.SetDiskID(disk, node)
6260           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6261           if msg:
6262             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6263                             " continuing anyway", device_idx, node, msg)
6264         result.append(("disk/%d" % device_idx, "remove"))
6265       elif disk_op == constants.DDM_ADD:
6266         # add a new disk
6267         if instance.disk_template == constants.DT_FILE:
6268           file_driver, file_path = instance.disks[0].logical_id
6269           file_path = os.path.dirname(file_path)
6270         else:
6271           file_driver = file_path = None
6272         disk_idx_base = len(instance.disks)
6273         new_disk = _GenerateDiskTemplate(self,
6274                                          instance.disk_template,
6275                                          instance.name, instance.primary_node,
6276                                          instance.secondary_nodes,
6277                                          [disk_dict],
6278                                          file_path,
6279                                          file_driver,
6280                                          disk_idx_base)[0]
6281         instance.disks.append(new_disk)
6282         info = _GetInstanceInfoText(instance)
6283
6284         logging.info("Creating volume %s for instance %s",
6285                      new_disk.iv_name, instance.name)
6286         # Note: this needs to be kept in sync with _CreateDisks
6287         #HARDCODE
6288         for node in instance.all_nodes:
6289           f_create = node == instance.primary_node
6290           try:
6291             _CreateBlockDev(self, node, instance, new_disk,
6292                             f_create, info, f_create)
6293           except errors.OpExecError, err:
6294             self.LogWarning("Failed to create volume %s (%s) on"
6295                             " node %s: %s",
6296                             new_disk.iv_name, new_disk, node, err)
6297         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6298                        (new_disk.size, new_disk.mode)))
6299       else:
6300         # change a given disk
6301         instance.disks[disk_op].mode = disk_dict['mode']
6302         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6303     # NIC changes
6304     for nic_op, nic_dict in self.op.nics:
6305       if nic_op == constants.DDM_REMOVE:
6306         # remove the last nic
6307         del instance.nics[-1]
6308         result.append(("nic.%d" % len(instance.nics), "remove"))
6309       elif nic_op == constants.DDM_ADD:
6310         # mac and bridge should be set, by now
6311         mac = nic_dict['mac']
6312         bridge = nic_dict['bridge']
6313         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6314                               bridge=bridge)
6315         instance.nics.append(new_nic)
6316         result.append(("nic.%d" % (len(instance.nics) - 1),
6317                        "add:mac=%s,ip=%s,bridge=%s" %
6318                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6319       else:
6320         # change a given nic
6321         for key in 'mac', 'ip', 'bridge':
6322           if key in nic_dict:
6323             setattr(instance.nics[nic_op], key, nic_dict[key])
6324             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6325
6326     # hvparams changes
6327     if self.op.hvparams:
6328       instance.hvparams = self.hv_inst
6329       for key, val in self.op.hvparams.iteritems():
6330         result.append(("hv/%s" % key, val))
6331
6332     # beparams changes
6333     if self.op.beparams:
6334       instance.beparams = self.be_inst
6335       for key, val in self.op.beparams.iteritems():
6336         result.append(("be/%s" % key, val))
6337
6338     self.cfg.Update(instance)
6339
6340     return result
6341
6342
6343 class LUQueryExports(NoHooksLU):
6344   """Query the exports list
6345
6346   """
6347   _OP_REQP = ['nodes']
6348   REQ_BGL = False
6349
6350   def ExpandNames(self):
6351     self.needed_locks = {}
6352     self.share_locks[locking.LEVEL_NODE] = 1
6353     if not self.op.nodes:
6354       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6355     else:
6356       self.needed_locks[locking.LEVEL_NODE] = \
6357         _GetWantedNodes(self, self.op.nodes)
6358
6359   def CheckPrereq(self):
6360     """Check prerequisites.
6361
6362     """
6363     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6364
6365   def Exec(self, feedback_fn):
6366     """Compute the list of all the exported system images.
6367
6368     @rtype: dict
6369     @return: a dictionary with the structure node->(export-list)
6370         where export-list is a list of the instances exported on
6371         that node.
6372
6373     """
6374     rpcresult = self.rpc.call_export_list(self.nodes)
6375     result = {}
6376     for node in rpcresult:
6377       if rpcresult[node].failed:
6378         result[node] = False
6379       else:
6380         result[node] = rpcresult[node].data
6381
6382     return result
6383
6384
6385 class LUExportInstance(LogicalUnit):
6386   """Export an instance to an image in the cluster.
6387
6388   """
6389   HPATH = "instance-export"
6390   HTYPE = constants.HTYPE_INSTANCE
6391   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6392   REQ_BGL = False
6393
6394   def ExpandNames(self):
6395     self._ExpandAndLockInstance()
6396     # FIXME: lock only instance primary and destination node
6397     #
6398     # Sad but true, for now we have do lock all nodes, as we don't know where
6399     # the previous export might be, and and in this LU we search for it and
6400     # remove it from its current node. In the future we could fix this by:
6401     #  - making a tasklet to search (share-lock all), then create the new one,
6402     #    then one to remove, after
6403     #  - removing the removal operation altoghether
6404     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6405
6406   def DeclareLocks(self, level):
6407     """Last minute lock declaration."""
6408     # All nodes are locked anyway, so nothing to do here.
6409
6410   def BuildHooksEnv(self):
6411     """Build hooks env.
6412
6413     This will run on the master, primary node and target node.
6414
6415     """
6416     env = {
6417       "EXPORT_NODE": self.op.target_node,
6418       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6419       }
6420     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6421     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6422           self.op.target_node]
6423     return env, nl, nl
6424
6425   def CheckPrereq(self):
6426     """Check prerequisites.
6427
6428     This checks that the instance and node names are valid.
6429
6430     """
6431     instance_name = self.op.instance_name
6432     self.instance = self.cfg.GetInstanceInfo(instance_name)
6433     assert self.instance is not None, \
6434           "Cannot retrieve locked instance %s" % self.op.instance_name
6435     _CheckNodeOnline(self, self.instance.primary_node)
6436
6437     self.dst_node = self.cfg.GetNodeInfo(
6438       self.cfg.ExpandNodeName(self.op.target_node))
6439
6440     if self.dst_node is None:
6441       # This is wrong node name, not a non-locked node
6442       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6443     _CheckNodeOnline(self, self.dst_node.name)
6444     _CheckNodeNotDrained(self, self.dst_node.name)
6445
6446     # instance disk type verification
6447     for disk in self.instance.disks:
6448       if disk.dev_type == constants.LD_FILE:
6449         raise errors.OpPrereqError("Export not supported for instances with"
6450                                    " file-based disks")
6451
6452   def Exec(self, feedback_fn):
6453     """Export an instance to an image in the cluster.
6454
6455     """
6456     instance = self.instance
6457     dst_node = self.dst_node
6458     src_node = instance.primary_node
6459     if self.op.shutdown:
6460       # shutdown the instance, but not the disks
6461       result = self.rpc.call_instance_shutdown(src_node, instance)
6462       msg = result.RemoteFailMsg()
6463       if msg:
6464         raise errors.OpExecError("Could not shutdown instance %s on"
6465                                  " node %s: %s" %
6466                                  (instance.name, src_node, msg))
6467
6468     vgname = self.cfg.GetVGName()
6469
6470     snap_disks = []
6471
6472     # set the disks ID correctly since call_instance_start needs the
6473     # correct drbd minor to create the symlinks
6474     for disk in instance.disks:
6475       self.cfg.SetDiskID(disk, src_node)
6476
6477     # per-disk results
6478     dresults = []
6479     try:
6480       for idx, disk in enumerate(instance.disks):
6481         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6482         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6483         if new_dev_name.failed or not new_dev_name.data:
6484           self.LogWarning("Could not snapshot disk/%d on node %s",
6485                           idx, src_node)
6486           snap_disks.append(False)
6487         else:
6488           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6489                                  logical_id=(vgname, new_dev_name.data),
6490                                  physical_id=(vgname, new_dev_name.data),
6491                                  iv_name=disk.iv_name)
6492           snap_disks.append(new_dev)
6493
6494     finally:
6495       if self.op.shutdown and instance.admin_up:
6496         result = self.rpc.call_instance_start(src_node, instance, None, None)
6497         msg = result.RemoteFailMsg()
6498         if msg:
6499           _ShutdownInstanceDisks(self, instance)
6500           raise errors.OpExecError("Could not start instance: %s" % msg)
6501
6502     # TODO: check for size
6503
6504     cluster_name = self.cfg.GetClusterName()
6505     for idx, dev in enumerate(snap_disks):
6506       if dev:
6507         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6508                                                instance, cluster_name, idx)
6509         if result.failed or not result.data:
6510           self.LogWarning("Could not export disk/%d from node %s to"
6511                           " node %s", idx, src_node, dst_node.name)
6512           dresults.append(False)
6513         else:
6514           dresults.append(True)
6515         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6516         if msg:
6517           self.LogWarning("Could not remove snapshot for disk/%d from node"
6518                           " %s: %s", idx, src_node, msg)
6519       else:
6520         dresults.append(False)
6521
6522     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6523     fin_resu = True
6524     if result.failed or not result.data:
6525       self.LogWarning("Could not finalize export for instance %s on node %s",
6526                       instance.name, dst_node.name)
6527       fin_resu = False
6528
6529     nodelist = self.cfg.GetNodeList()
6530     nodelist.remove(dst_node.name)
6531
6532     # on one-node clusters nodelist will be empty after the removal
6533     # if we proceed the backup would be removed because OpQueryExports
6534     # substitutes an empty list with the full cluster node list.
6535     if nodelist:
6536       exportlist = self.rpc.call_export_list(nodelist)
6537       for node in exportlist:
6538         if exportlist[node].failed:
6539           continue
6540         if instance.name in exportlist[node].data:
6541           if not self.rpc.call_export_remove(node, instance.name):
6542             self.LogWarning("Could not remove older export for instance %s"
6543                             " on node %s", instance.name, node)
6544     return fin_resu, dresults
6545
6546
6547 class LURemoveExport(NoHooksLU):
6548   """Remove exports related to the named instance.
6549
6550   """
6551   _OP_REQP = ["instance_name"]
6552   REQ_BGL = False
6553
6554   def ExpandNames(self):
6555     self.needed_locks = {}
6556     # We need all nodes to be locked in order for RemoveExport to work, but we
6557     # don't need to lock the instance itself, as nothing will happen to it (and
6558     # we can remove exports also for a removed instance)
6559     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6560
6561   def CheckPrereq(self):
6562     """Check prerequisites.
6563     """
6564     pass
6565
6566   def Exec(self, feedback_fn):
6567     """Remove any export.
6568
6569     """
6570     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6571     # If the instance was not found we'll try with the name that was passed in.
6572     # This will only work if it was an FQDN, though.
6573     fqdn_warn = False
6574     if not instance_name:
6575       fqdn_warn = True
6576       instance_name = self.op.instance_name
6577
6578     exportlist = self.rpc.call_export_list(self.acquired_locks[
6579       locking.LEVEL_NODE])
6580     found = False
6581     for node in exportlist:
6582       if exportlist[node].failed:
6583         self.LogWarning("Failed to query node %s, continuing" % node)
6584         continue
6585       if instance_name in exportlist[node].data:
6586         found = True
6587         result = self.rpc.call_export_remove(node, instance_name)
6588         if result.failed or not result.data:
6589           logging.error("Could not remove export for instance %s"
6590                         " on node %s", instance_name, node)
6591
6592     if fqdn_warn and not found:
6593       feedback_fn("Export not found. If trying to remove an export belonging"
6594                   " to a deleted instance please use its Fully Qualified"
6595                   " Domain Name.")
6596
6597
6598 class TagsLU(NoHooksLU):
6599   """Generic tags LU.
6600
6601   This is an abstract class which is the parent of all the other tags LUs.
6602
6603   """
6604
6605   def ExpandNames(self):
6606     self.needed_locks = {}
6607     if self.op.kind == constants.TAG_NODE:
6608       name = self.cfg.ExpandNodeName(self.op.name)
6609       if name is None:
6610         raise errors.OpPrereqError("Invalid node name (%s)" %
6611                                    (self.op.name,))
6612       self.op.name = name
6613       self.needed_locks[locking.LEVEL_NODE] = name
6614     elif self.op.kind == constants.TAG_INSTANCE:
6615       name = self.cfg.ExpandInstanceName(self.op.name)
6616       if name is None:
6617         raise errors.OpPrereqError("Invalid instance name (%s)" %
6618                                    (self.op.name,))
6619       self.op.name = name
6620       self.needed_locks[locking.LEVEL_INSTANCE] = name
6621
6622   def CheckPrereq(self):
6623     """Check prerequisites.
6624
6625     """
6626     if self.op.kind == constants.TAG_CLUSTER:
6627       self.target = self.cfg.GetClusterInfo()
6628     elif self.op.kind == constants.TAG_NODE:
6629       self.target = self.cfg.GetNodeInfo(self.op.name)
6630     elif self.op.kind == constants.TAG_INSTANCE:
6631       self.target = self.cfg.GetInstanceInfo(self.op.name)
6632     else:
6633       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6634                                  str(self.op.kind))
6635
6636
6637 class LUGetTags(TagsLU):
6638   """Returns the tags of a given object.
6639
6640   """
6641   _OP_REQP = ["kind", "name"]
6642   REQ_BGL = False
6643
6644   def Exec(self, feedback_fn):
6645     """Returns the tag list.
6646
6647     """
6648     return list(self.target.GetTags())
6649
6650
6651 class LUSearchTags(NoHooksLU):
6652   """Searches the tags for a given pattern.
6653
6654   """
6655   _OP_REQP = ["pattern"]
6656   REQ_BGL = False
6657
6658   def ExpandNames(self):
6659     self.needed_locks = {}
6660
6661   def CheckPrereq(self):
6662     """Check prerequisites.
6663
6664     This checks the pattern passed for validity by compiling it.
6665
6666     """
6667     try:
6668       self.re = re.compile(self.op.pattern)
6669     except re.error, err:
6670       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6671                                  (self.op.pattern, err))
6672
6673   def Exec(self, feedback_fn):
6674     """Returns the tag list.
6675
6676     """
6677     cfg = self.cfg
6678     tgts = [("/cluster", cfg.GetClusterInfo())]
6679     ilist = cfg.GetAllInstancesInfo().values()
6680     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6681     nlist = cfg.GetAllNodesInfo().values()
6682     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6683     results = []
6684     for path, target in tgts:
6685       for tag in target.GetTags():
6686         if self.re.search(tag):
6687           results.append((path, tag))
6688     return results
6689
6690
6691 class LUAddTags(TagsLU):
6692   """Sets a tag on a given object.
6693
6694   """
6695   _OP_REQP = ["kind", "name", "tags"]
6696   REQ_BGL = False
6697
6698   def CheckPrereq(self):
6699     """Check prerequisites.
6700
6701     This checks the type and length of the tag name and value.
6702
6703     """
6704     TagsLU.CheckPrereq(self)
6705     for tag in self.op.tags:
6706       objects.TaggableObject.ValidateTag(tag)
6707
6708   def Exec(self, feedback_fn):
6709     """Sets the tag.
6710
6711     """
6712     try:
6713       for tag in self.op.tags:
6714         self.target.AddTag(tag)
6715     except errors.TagError, err:
6716       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6717     try:
6718       self.cfg.Update(self.target)
6719     except errors.ConfigurationError:
6720       raise errors.OpRetryError("There has been a modification to the"
6721                                 " config file and the operation has been"
6722                                 " aborted. Please retry.")
6723
6724
6725 class LUDelTags(TagsLU):
6726   """Delete a list of tags from a given object.
6727
6728   """
6729   _OP_REQP = ["kind", "name", "tags"]
6730   REQ_BGL = False
6731
6732   def CheckPrereq(self):
6733     """Check prerequisites.
6734
6735     This checks that we have the given tag.
6736
6737     """
6738     TagsLU.CheckPrereq(self)
6739     for tag in self.op.tags:
6740       objects.TaggableObject.ValidateTag(tag)
6741     del_tags = frozenset(self.op.tags)
6742     cur_tags = self.target.GetTags()
6743     if not del_tags <= cur_tags:
6744       diff_tags = del_tags - cur_tags
6745       diff_names = ["'%s'" % tag for tag in diff_tags]
6746       diff_names.sort()
6747       raise errors.OpPrereqError("Tag(s) %s not found" %
6748                                  (",".join(diff_names)))
6749
6750   def Exec(self, feedback_fn):
6751     """Remove the tag from the object.
6752
6753     """
6754     for tag in self.op.tags:
6755       self.target.RemoveTag(tag)
6756     try:
6757       self.cfg.Update(self.target)
6758     except errors.ConfigurationError:
6759       raise errors.OpRetryError("There has been a modification to the"
6760                                 " config file and the operation has been"
6761                                 " aborted. Please retry.")
6762
6763
6764 class LUTestDelay(NoHooksLU):
6765   """Sleep for a specified amount of time.
6766
6767   This LU sleeps on the master and/or nodes for a specified amount of
6768   time.
6769
6770   """
6771   _OP_REQP = ["duration", "on_master", "on_nodes"]
6772   REQ_BGL = False
6773
6774   def ExpandNames(self):
6775     """Expand names and set required locks.
6776
6777     This expands the node list, if any.
6778
6779     """
6780     self.needed_locks = {}
6781     if self.op.on_nodes:
6782       # _GetWantedNodes can be used here, but is not always appropriate to use
6783       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6784       # more information.
6785       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6786       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6787
6788   def CheckPrereq(self):
6789     """Check prerequisites.
6790
6791     """
6792
6793   def Exec(self, feedback_fn):
6794     """Do the actual sleep.
6795
6796     """
6797     if self.op.on_master:
6798       if not utils.TestDelay(self.op.duration):
6799         raise errors.OpExecError("Error during master delay test")
6800     if self.op.on_nodes:
6801       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6802       if not result:
6803         raise errors.OpExecError("Complete failure from rpc call")
6804       for node, node_result in result.items():
6805         node_result.Raise()
6806         if not node_result.data:
6807           raise errors.OpExecError("Failure during rpc call to node %s,"
6808                                    " result: %s" % (node, node_result.data))
6809
6810
6811 class IAllocator(object):
6812   """IAllocator framework.
6813
6814   An IAllocator instance has three sets of attributes:
6815     - cfg that is needed to query the cluster
6816     - input data (all members of the _KEYS class attribute are required)
6817     - four buffer attributes (in|out_data|text), that represent the
6818       input (to the external script) in text and data structure format,
6819       and the output from it, again in two formats
6820     - the result variables from the script (success, info, nodes) for
6821       easy usage
6822
6823   """
6824   _ALLO_KEYS = [
6825     "mem_size", "disks", "disk_template",
6826     "os", "tags", "nics", "vcpus", "hypervisor",
6827     ]
6828   _RELO_KEYS = [
6829     "relocate_from",
6830     ]
6831
6832   def __init__(self, lu, mode, name, **kwargs):
6833     self.lu = lu
6834     # init buffer variables
6835     self.in_text = self.out_text = self.in_data = self.out_data = None
6836     # init all input fields so that pylint is happy
6837     self.mode = mode
6838     self.name = name
6839     self.mem_size = self.disks = self.disk_template = None
6840     self.os = self.tags = self.nics = self.vcpus = None
6841     self.hypervisor = None
6842     self.relocate_from = None
6843     # computed fields
6844     self.required_nodes = None
6845     # init result fields
6846     self.success = self.info = self.nodes = None
6847     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6848       keyset = self._ALLO_KEYS
6849     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6850       keyset = self._RELO_KEYS
6851     else:
6852       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6853                                    " IAllocator" % self.mode)
6854     for key in kwargs:
6855       if key not in keyset:
6856         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6857                                      " IAllocator" % key)
6858       setattr(self, key, kwargs[key])
6859     for key in keyset:
6860       if key not in kwargs:
6861         raise errors.ProgrammerError("Missing input parameter '%s' to"
6862                                      " IAllocator" % key)
6863     self._BuildInputData()
6864
6865   def _ComputeClusterData(self):
6866     """Compute the generic allocator input data.
6867
6868     This is the data that is independent of the actual operation.
6869
6870     """
6871     cfg = self.lu.cfg
6872     cluster_info = cfg.GetClusterInfo()
6873     # cluster data
6874     data = {
6875       "version": constants.IALLOCATOR_VERSION,
6876       "cluster_name": cfg.GetClusterName(),
6877       "cluster_tags": list(cluster_info.GetTags()),
6878       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6879       # we don't have job IDs
6880       }
6881     iinfo = cfg.GetAllInstancesInfo().values()
6882     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6883
6884     # node data
6885     node_results = {}
6886     node_list = cfg.GetNodeList()
6887
6888     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889       hypervisor_name = self.hypervisor
6890     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6892
6893     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6894                                            hypervisor_name)
6895     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6896                        cluster_info.enabled_hypervisors)
6897     for nname, nresult in node_data.items():
6898       # first fill in static (config-based) values
6899       ninfo = cfg.GetNodeInfo(nname)
6900       pnr = {
6901         "tags": list(ninfo.GetTags()),
6902         "primary_ip": ninfo.primary_ip,
6903         "secondary_ip": ninfo.secondary_ip,
6904         "offline": ninfo.offline,
6905         "drained": ninfo.drained,
6906         "master_candidate": ninfo.master_candidate,
6907         }
6908
6909       if not ninfo.offline:
6910         nresult.Raise()
6911         if not isinstance(nresult.data, dict):
6912           raise errors.OpExecError("Can't get data for node %s" % nname)
6913         remote_info = nresult.data
6914         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6915                      'vg_size', 'vg_free', 'cpu_total']:
6916           if attr not in remote_info:
6917             raise errors.OpExecError("Node '%s' didn't return attribute"
6918                                      " '%s'" % (nname, attr))
6919           try:
6920             remote_info[attr] = int(remote_info[attr])
6921           except ValueError, err:
6922             raise errors.OpExecError("Node '%s' returned invalid value"
6923                                      " for '%s': %s" % (nname, attr, err))
6924         # compute memory used by primary instances
6925         i_p_mem = i_p_up_mem = 0
6926         for iinfo, beinfo in i_list:
6927           if iinfo.primary_node == nname:
6928             i_p_mem += beinfo[constants.BE_MEMORY]
6929             if iinfo.name not in node_iinfo[nname].data:
6930               i_used_mem = 0
6931             else:
6932               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6933             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6934             remote_info['memory_free'] -= max(0, i_mem_diff)
6935
6936             if iinfo.admin_up:
6937               i_p_up_mem += beinfo[constants.BE_MEMORY]
6938
6939         # compute memory used by instances
6940         pnr_dyn = {
6941           "total_memory": remote_info['memory_total'],
6942           "reserved_memory": remote_info['memory_dom0'],
6943           "free_memory": remote_info['memory_free'],
6944           "total_disk": remote_info['vg_size'],
6945           "free_disk": remote_info['vg_free'],
6946           "total_cpus": remote_info['cpu_total'],
6947           "i_pri_memory": i_p_mem,
6948           "i_pri_up_memory": i_p_up_mem,
6949           }
6950         pnr.update(pnr_dyn)
6951
6952       node_results[nname] = pnr
6953     data["nodes"] = node_results
6954
6955     # instance data
6956     instance_data = {}
6957     for iinfo, beinfo in i_list:
6958       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6959                   for n in iinfo.nics]
6960       pir = {
6961         "tags": list(iinfo.GetTags()),
6962         "admin_up": iinfo.admin_up,
6963         "vcpus": beinfo[constants.BE_VCPUS],
6964         "memory": beinfo[constants.BE_MEMORY],
6965         "os": iinfo.os,
6966         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6967         "nics": nic_data,
6968         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6969         "disk_template": iinfo.disk_template,
6970         "hypervisor": iinfo.hypervisor,
6971         }
6972       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6973                                                  pir["disks"])
6974       instance_data[iinfo.name] = pir
6975
6976     data["instances"] = instance_data
6977
6978     self.in_data = data
6979
6980   def _AddNewInstance(self):
6981     """Add new instance data to allocator structure.
6982
6983     This in combination with _AllocatorGetClusterData will create the
6984     correct structure needed as input for the allocator.
6985
6986     The checks for the completeness of the opcode must have already been
6987     done.
6988
6989     """
6990     data = self.in_data
6991
6992     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6993
6994     if self.disk_template in constants.DTS_NET_MIRROR:
6995       self.required_nodes = 2
6996     else:
6997       self.required_nodes = 1
6998     request = {
6999       "type": "allocate",
7000       "name": self.name,
7001       "disk_template": self.disk_template,
7002       "tags": self.tags,
7003       "os": self.os,
7004       "vcpus": self.vcpus,
7005       "memory": self.mem_size,
7006       "disks": self.disks,
7007       "disk_space_total": disk_space,
7008       "nics": self.nics,
7009       "required_nodes": self.required_nodes,
7010       }
7011     data["request"] = request
7012
7013   def _AddRelocateInstance(self):
7014     """Add relocate instance data to allocator structure.
7015
7016     This in combination with _IAllocatorGetClusterData will create the
7017     correct structure needed as input for the allocator.
7018
7019     The checks for the completeness of the opcode must have already been
7020     done.
7021
7022     """
7023     instance = self.lu.cfg.GetInstanceInfo(self.name)
7024     if instance is None:
7025       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7026                                    " IAllocator" % self.name)
7027
7028     if instance.disk_template not in constants.DTS_NET_MIRROR:
7029       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7030
7031     if len(instance.secondary_nodes) != 1:
7032       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7033
7034     self.required_nodes = 1
7035     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7036     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7037
7038     request = {
7039       "type": "relocate",
7040       "name": self.name,
7041       "disk_space_total": disk_space,
7042       "required_nodes": self.required_nodes,
7043       "relocate_from": self.relocate_from,
7044       }
7045     self.in_data["request"] = request
7046
7047   def _BuildInputData(self):
7048     """Build input data structures.
7049
7050     """
7051     self._ComputeClusterData()
7052
7053     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7054       self._AddNewInstance()
7055     else:
7056       self._AddRelocateInstance()
7057
7058     self.in_text = serializer.Dump(self.in_data)
7059
7060   def Run(self, name, validate=True, call_fn=None):
7061     """Run an instance allocator and return the results.
7062
7063     """
7064     if call_fn is None:
7065       call_fn = self.lu.rpc.call_iallocator_runner
7066     data = self.in_text
7067
7068     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7069     result.Raise()
7070
7071     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7072       raise errors.OpExecError("Invalid result from master iallocator runner")
7073
7074     rcode, stdout, stderr, fail = result.data
7075
7076     if rcode == constants.IARUN_NOTFOUND:
7077       raise errors.OpExecError("Can't find allocator '%s'" % name)
7078     elif rcode == constants.IARUN_FAILURE:
7079       raise errors.OpExecError("Instance allocator call failed: %s,"
7080                                " output: %s" % (fail, stdout+stderr))
7081     self.out_text = stdout
7082     if validate:
7083       self._ValidateResult()
7084
7085   def _ValidateResult(self):
7086     """Process the allocator results.
7087
7088     This will process and if successful save the result in
7089     self.out_data and the other parameters.
7090
7091     """
7092     try:
7093       rdict = serializer.Load(self.out_text)
7094     except Exception, err:
7095       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7096
7097     if not isinstance(rdict, dict):
7098       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7099
7100     for key in "success", "info", "nodes":
7101       if key not in rdict:
7102         raise errors.OpExecError("Can't parse iallocator results:"
7103                                  " missing key '%s'" % key)
7104       setattr(self, key, rdict[key])
7105
7106     if not isinstance(rdict["nodes"], list):
7107       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7108                                " is not a list")
7109     self.out_data = rdict
7110
7111
7112 class LUTestAllocator(NoHooksLU):
7113   """Run allocator tests.
7114
7115   This LU runs the allocator tests
7116
7117   """
7118   _OP_REQP = ["direction", "mode", "name"]
7119
7120   def CheckPrereq(self):
7121     """Check prerequisites.
7122
7123     This checks the opcode parameters depending on the director and mode test.
7124
7125     """
7126     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7127       for attr in ["name", "mem_size", "disks", "disk_template",
7128                    "os", "tags", "nics", "vcpus"]:
7129         if not hasattr(self.op, attr):
7130           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7131                                      attr)
7132       iname = self.cfg.ExpandInstanceName(self.op.name)
7133       if iname is not None:
7134         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7135                                    iname)
7136       if not isinstance(self.op.nics, list):
7137         raise errors.OpPrereqError("Invalid parameter 'nics'")
7138       for row in self.op.nics:
7139         if (not isinstance(row, dict) or
7140             "mac" not in row or
7141             "ip" not in row or
7142             "bridge" not in row):
7143           raise errors.OpPrereqError("Invalid contents of the"
7144                                      " 'nics' parameter")
7145       if not isinstance(self.op.disks, list):
7146         raise errors.OpPrereqError("Invalid parameter 'disks'")
7147       for row in self.op.disks:
7148         if (not isinstance(row, dict) or
7149             "size" not in row or
7150             not isinstance(row["size"], int) or
7151             "mode" not in row or
7152             row["mode"] not in ['r', 'w']):
7153           raise errors.OpPrereqError("Invalid contents of the"
7154                                      " 'disks' parameter")
7155       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7156         self.op.hypervisor = self.cfg.GetHypervisorType()
7157     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7158       if not hasattr(self.op, "name"):
7159         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7160       fname = self.cfg.ExpandInstanceName(self.op.name)
7161       if fname is None:
7162         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7163                                    self.op.name)
7164       self.op.name = fname
7165       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7166     else:
7167       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7168                                  self.op.mode)
7169
7170     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7171       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7172         raise errors.OpPrereqError("Missing allocator name")
7173     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7174       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7175                                  self.op.direction)
7176
7177   def Exec(self, feedback_fn):
7178     """Run the allocator test.
7179
7180     """
7181     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7182       ial = IAllocator(self,
7183                        mode=self.op.mode,
7184                        name=self.op.name,
7185                        mem_size=self.op.mem_size,
7186                        disks=self.op.disks,
7187                        disk_template=self.op.disk_template,
7188                        os=self.op.os,
7189                        tags=self.op.tags,
7190                        nics=self.op.nics,
7191                        vcpus=self.op.vcpus,
7192                        hypervisor=self.op.hypervisor,
7193                        )
7194     else:
7195       ial = IAllocator(self,
7196                        mode=self.op.mode,
7197                        name=self.op.name,
7198                        relocate_from=list(self.relocate_from),
7199                        )
7200
7201     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7202       result = ial.in_text
7203     else:
7204       ial.Run(self.op.allocator, validate=False)
7205       result = ial.out_text
7206     return result