Convert node_verify rpc to new result style
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     result.Raise()
660     if not result.data:
661       raise errors.OpExecError("Could not disable the master role")
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     @rtype: tuple of three items
1293     @return: a tuple of (dict of node-to-node_error, list of instances
1294         which need activate-disks, dict of instance: (node, volume) for
1295         missing volumes
1296
1297     """
1298     result = res_nodes, res_instances, res_missing = {}, [], {}
1299
1300     vg_name = self.cfg.GetVGName()
1301     nodes = utils.NiceSort(self.cfg.GetNodeList())
1302     instances = [self.cfg.GetInstanceInfo(name)
1303                  for name in self.cfg.GetInstanceList()]
1304
1305     nv_dict = {}
1306     for inst in instances:
1307       inst_lvs = {}
1308       if (not inst.admin_up or
1309           inst.disk_template not in constants.DTS_NET_MIRROR):
1310         continue
1311       inst.MapLVsByNode(inst_lvs)
1312       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313       for node, vol_list in inst_lvs.iteritems():
1314         for vol in vol_list:
1315           nv_dict[(node, vol)] = inst
1316
1317     if not nv_dict:
1318       return result
1319
1320     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321
1322     to_act = set()
1323     for node in nodes:
1324       # node_volume
1325       node_res = node_lvs[node]
1326       if node_res.offline:
1327         continue
1328       msg = node_res.RemoteFailMsg()
1329       if msg:
1330         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331         res_nodes[node] = msg
1332         continue
1333
1334       lvs = node_res.payload
1335       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336         inst = nv_dict.pop((node, lv_name), None)
1337         if (not lv_online and inst is not None
1338             and inst.name not in res_instances):
1339           res_instances.append(inst.name)
1340
1341     # any leftover items in nv_dict are missing LVs, let's arrange the
1342     # data better
1343     for key, inst in nv_dict.iteritems():
1344       if inst.name not in res_missing:
1345         res_missing[inst.name] = []
1346       res_missing[inst.name].append(key)
1347
1348     return result
1349
1350
1351 class LURenameCluster(LogicalUnit):
1352   """Rename the cluster.
1353
1354   """
1355   HPATH = "cluster-rename"
1356   HTYPE = constants.HTYPE_CLUSTER
1357   _OP_REQP = ["name"]
1358
1359   def BuildHooksEnv(self):
1360     """Build hooks env.
1361
1362     """
1363     env = {
1364       "OP_TARGET": self.cfg.GetClusterName(),
1365       "NEW_NAME": self.op.name,
1366       }
1367     mn = self.cfg.GetMasterNode()
1368     return env, [mn], [mn]
1369
1370   def CheckPrereq(self):
1371     """Verify that the passed name is a valid one.
1372
1373     """
1374     hostname = utils.HostInfo(self.op.name)
1375
1376     new_name = hostname.name
1377     self.ip = new_ip = hostname.ip
1378     old_name = self.cfg.GetClusterName()
1379     old_ip = self.cfg.GetMasterIP()
1380     if new_name == old_name and new_ip == old_ip:
1381       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382                                  " cluster has changed")
1383     if new_ip != old_ip:
1384       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386                                    " reachable on the network. Aborting." %
1387                                    new_ip)
1388
1389     self.op.name = new_name
1390
1391   def Exec(self, feedback_fn):
1392     """Rename the cluster.
1393
1394     """
1395     clustername = self.op.name
1396     ip = self.ip
1397
1398     # shutdown the master IP
1399     master = self.cfg.GetMasterNode()
1400     result = self.rpc.call_node_stop_master(master, False)
1401     if result.failed or not result.data:
1402       raise errors.OpExecError("Could not disable the master role")
1403
1404     try:
1405       cluster = self.cfg.GetClusterInfo()
1406       cluster.cluster_name = clustername
1407       cluster.master_ip = ip
1408       self.cfg.Update(cluster)
1409
1410       # update the known hosts file
1411       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1412       node_list = self.cfg.GetNodeList()
1413       try:
1414         node_list.remove(master)
1415       except ValueError:
1416         pass
1417       result = self.rpc.call_upload_file(node_list,
1418                                          constants.SSH_KNOWN_HOSTS_FILE)
1419       for to_node, to_result in result.iteritems():
1420          msg = to_result.RemoteFailMsg()
1421          if msg:
1422            msg = ("Copy of file %s to node %s failed: %s" %
1423                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1424            self.proc.LogWarning(msg)
1425
1426     finally:
1427       result = self.rpc.call_node_start_master(master, False)
1428       if result.failed or not result.data:
1429         self.LogWarning("Could not re-enable the master role on"
1430                         " the master, please restart manually.")
1431
1432
1433 def _RecursiveCheckIfLVMBased(disk):
1434   """Check if the given disk or its children are lvm-based.
1435
1436   @type disk: L{objects.Disk}
1437   @param disk: the disk to check
1438   @rtype: booleean
1439   @return: boolean indicating whether a LD_LV dev_type was found or not
1440
1441   """
1442   if disk.children:
1443     for chdisk in disk.children:
1444       if _RecursiveCheckIfLVMBased(chdisk):
1445         return True
1446   return disk.dev_type == constants.LD_LV
1447
1448
1449 class LUSetClusterParams(LogicalUnit):
1450   """Change the parameters of the cluster.
1451
1452   """
1453   HPATH = "cluster-modify"
1454   HTYPE = constants.HTYPE_CLUSTER
1455   _OP_REQP = []
1456   REQ_BGL = False
1457
1458   def CheckArguments(self):
1459     """Check parameters
1460
1461     """
1462     if not hasattr(self.op, "candidate_pool_size"):
1463       self.op.candidate_pool_size = None
1464     if self.op.candidate_pool_size is not None:
1465       try:
1466         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1467       except (ValueError, TypeError), err:
1468         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1469                                    str(err))
1470       if self.op.candidate_pool_size < 1:
1471         raise errors.OpPrereqError("At least one master candidate needed")
1472
1473   def ExpandNames(self):
1474     # FIXME: in the future maybe other cluster params won't require checking on
1475     # all nodes to be modified.
1476     self.needed_locks = {
1477       locking.LEVEL_NODE: locking.ALL_SET,
1478     }
1479     self.share_locks[locking.LEVEL_NODE] = 1
1480
1481   def BuildHooksEnv(self):
1482     """Build hooks env.
1483
1484     """
1485     env = {
1486       "OP_TARGET": self.cfg.GetClusterName(),
1487       "NEW_VG_NAME": self.op.vg_name,
1488       }
1489     mn = self.cfg.GetMasterNode()
1490     return env, [mn], [mn]
1491
1492   def CheckPrereq(self):
1493     """Check prerequisites.
1494
1495     This checks whether the given params don't conflict and
1496     if the given volume group is valid.
1497
1498     """
1499     if self.op.vg_name is not None and not self.op.vg_name:
1500       instances = self.cfg.GetAllInstancesInfo().values()
1501       for inst in instances:
1502         for disk in inst.disks:
1503           if _RecursiveCheckIfLVMBased(disk):
1504             raise errors.OpPrereqError("Cannot disable lvm storage while"
1505                                        " lvm-based instances exist")
1506
1507     node_list = self.acquired_locks[locking.LEVEL_NODE]
1508
1509     # if vg_name not None, checks given volume group on all nodes
1510     if self.op.vg_name:
1511       vglist = self.rpc.call_vg_list(node_list)
1512       for node in node_list:
1513         msg = vglist[node].RemoteFailMsg()
1514         if msg:
1515           # ignoring down node
1516           self.LogWarning("Error while gathering data on node %s"
1517                           " (ignoring node): %s", node, msg)
1518           continue
1519         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1520                                               self.op.vg_name,
1521                                               constants.MIN_VG_SIZE)
1522         if vgstatus:
1523           raise errors.OpPrereqError("Error on node '%s': %s" %
1524                                      (node, vgstatus))
1525
1526     self.cluster = cluster = self.cfg.GetClusterInfo()
1527     # validate params changes
1528     if self.op.beparams:
1529       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530       self.new_beparams = objects.FillDict(
1531         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1532
1533     if self.op.nicparams:
1534       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535       self.new_nicparams = objects.FillDict(
1536         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1538
1539     # hypervisor list/parameters
1540     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541     if self.op.hvparams:
1542       if not isinstance(self.op.hvparams, dict):
1543         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544       for hv_name, hv_dict in self.op.hvparams.items():
1545         if hv_name not in self.new_hvparams:
1546           self.new_hvparams[hv_name] = hv_dict
1547         else:
1548           self.new_hvparams[hv_name].update(hv_dict)
1549
1550     if self.op.enabled_hypervisors is not None:
1551       self.hv_list = self.op.enabled_hypervisors
1552     else:
1553       self.hv_list = cluster.enabled_hypervisors
1554
1555     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556       # either the enabled list has changed, or the parameters have, validate
1557       for hv_name, hv_params in self.new_hvparams.items():
1558         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559             (self.op.enabled_hypervisors and
1560              hv_name in self.op.enabled_hypervisors)):
1561           # either this is a new hypervisor, or its parameters have changed
1562           hv_class = hypervisor.GetHypervisor(hv_name)
1563           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564           hv_class.CheckParameterSyntax(hv_params)
1565           _CheckHVParams(self, node_list, hv_name, hv_params)
1566
1567   def Exec(self, feedback_fn):
1568     """Change the parameters of the cluster.
1569
1570     """
1571     if self.op.vg_name is not None:
1572       new_volume = self.op.vg_name
1573       if not new_volume:
1574         new_volume = None
1575       if new_volume != self.cfg.GetVGName():
1576         self.cfg.SetVGName(new_volume)
1577       else:
1578         feedback_fn("Cluster LVM configuration already in desired"
1579                     " state, not changing")
1580     if self.op.hvparams:
1581       self.cluster.hvparams = self.new_hvparams
1582     if self.op.enabled_hypervisors is not None:
1583       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584     if self.op.beparams:
1585       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586     if self.op.nicparams:
1587       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1588
1589     if self.op.candidate_pool_size is not None:
1590       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1591
1592     self.cfg.Update(self.cluster)
1593
1594     # we want to update nodes after the cluster so that if any errors
1595     # happen, we have recorded and saved the cluster info
1596     if self.op.candidate_pool_size is not None:
1597       _AdjustCandidatePool(self)
1598
1599
1600 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601   """Distribute additional files which are part of the cluster configuration.
1602
1603   ConfigWriter takes care of distributing the config and ssconf files, but
1604   there are more files which should be distributed to all nodes. This function
1605   makes sure those are copied.
1606
1607   @param lu: calling logical unit
1608   @param additional_nodes: list of nodes not in the config to distribute to
1609
1610   """
1611   # 1. Gather target nodes
1612   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613   dist_nodes = lu.cfg.GetNodeList()
1614   if additional_nodes is not None:
1615     dist_nodes.extend(additional_nodes)
1616   if myself.name in dist_nodes:
1617     dist_nodes.remove(myself.name)
1618   # 2. Gather files to distribute
1619   dist_files = set([constants.ETC_HOSTS,
1620                     constants.SSH_KNOWN_HOSTS_FILE,
1621                     constants.RAPI_CERT_FILE,
1622                     constants.RAPI_USERS_FILE,
1623                    ])
1624
1625   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626   for hv_name in enabled_hypervisors:
1627     hv_class = hypervisor.GetHypervisor(hv_name)
1628     dist_files.update(hv_class.GetAncillaryFiles())
1629
1630   # 3. Perform the files upload
1631   for fname in dist_files:
1632     if os.path.exists(fname):
1633       result = lu.rpc.call_upload_file(dist_nodes, fname)
1634       for to_node, to_result in result.items():
1635          msg = to_result.RemoteFailMsg()
1636          if msg:
1637            msg = ("Copy of file %s to node %s failed: %s" %
1638                    (fname, to_node, msg))
1639            lu.proc.LogWarning(msg)
1640
1641
1642 class LURedistributeConfig(NoHooksLU):
1643   """Force the redistribution of cluster configuration.
1644
1645   This is a very simple LU.
1646
1647   """
1648   _OP_REQP = []
1649   REQ_BGL = False
1650
1651   def ExpandNames(self):
1652     self.needed_locks = {
1653       locking.LEVEL_NODE: locking.ALL_SET,
1654     }
1655     self.share_locks[locking.LEVEL_NODE] = 1
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     """
1661
1662   def Exec(self, feedback_fn):
1663     """Redistribute the configuration.
1664
1665     """
1666     self.cfg.Update(self.cfg.GetClusterInfo())
1667     _RedistributeAncillaryFiles(self)
1668
1669
1670 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671   """Sleep and poll for an instance's disk to sync.
1672
1673   """
1674   if not instance.disks:
1675     return True
1676
1677   if not oneshot:
1678     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1679
1680   node = instance.primary_node
1681
1682   for dev in instance.disks:
1683     lu.cfg.SetDiskID(dev, node)
1684
1685   retries = 0
1686   while True:
1687     max_time = 0
1688     done = True
1689     cumul_degraded = False
1690     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691     msg = rstats.RemoteFailMsg()
1692     if msg:
1693       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1694       retries += 1
1695       if retries >= 10:
1696         raise errors.RemoteError("Can't contact node %s for mirror data,"
1697                                  " aborting." % node)
1698       time.sleep(6)
1699       continue
1700     rstats = rstats.payload
1701     retries = 0
1702     for i, mstat in enumerate(rstats):
1703       if mstat is None:
1704         lu.LogWarning("Can't compute data for node %s/%s",
1705                            node, instance.disks[i].iv_name)
1706         continue
1707       # we ignore the ldisk parameter
1708       perc_done, est_time, is_degraded, _ = mstat
1709       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710       if perc_done is not None:
1711         done = False
1712         if est_time is not None:
1713           rem_time = "%d estimated seconds remaining" % est_time
1714           max_time = est_time
1715         else:
1716           rem_time = "no time estimate"
1717         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718                         (instance.disks[i].iv_name, perc_done, rem_time))
1719     if done or oneshot:
1720       break
1721
1722     time.sleep(min(60, max_time))
1723
1724   if done:
1725     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726   return not cumul_degraded
1727
1728
1729 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730   """Check that mirrors are not degraded.
1731
1732   The ldisk parameter, if True, will change the test from the
1733   is_degraded attribute (which represents overall non-ok status for
1734   the device(s)) to the ldisk (representing the local storage status).
1735
1736   """
1737   lu.cfg.SetDiskID(dev, node)
1738   if ldisk:
1739     idx = 6
1740   else:
1741     idx = 5
1742
1743   result = True
1744   if on_primary or dev.AssembleOnSecondary():
1745     rstats = lu.rpc.call_blockdev_find(node, dev)
1746     msg = rstats.RemoteFailMsg()
1747     if msg:
1748       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749       result = False
1750     elif not rstats.payload:
1751       lu.LogWarning("Can't find disk on node %s", node)
1752       result = False
1753     else:
1754       result = result and (not rstats.payload[idx])
1755   if dev.children:
1756     for child in dev.children:
1757       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1758
1759   return result
1760
1761
1762 class LUDiagnoseOS(NoHooksLU):
1763   """Logical unit for OS diagnose/query.
1764
1765   """
1766   _OP_REQP = ["output_fields", "names"]
1767   REQ_BGL = False
1768   _FIELDS_STATIC = utils.FieldSet()
1769   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1770
1771   def ExpandNames(self):
1772     if self.op.names:
1773       raise errors.OpPrereqError("Selective OS query not supported")
1774
1775     _CheckOutputFields(static=self._FIELDS_STATIC,
1776                        dynamic=self._FIELDS_DYNAMIC,
1777                        selected=self.op.output_fields)
1778
1779     # Lock all nodes, in shared mode
1780     # Temporary removal of locks, should be reverted later
1781     # TODO: reintroduce locks when they are lighter-weight
1782     self.needed_locks = {}
1783     #self.share_locks[locking.LEVEL_NODE] = 1
1784     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1785
1786   def CheckPrereq(self):
1787     """Check prerequisites.
1788
1789     """
1790
1791   @staticmethod
1792   def _DiagnoseByOS(node_list, rlist):
1793     """Remaps a per-node return list into an a per-os per-node dictionary
1794
1795     @param node_list: a list with the names of all nodes
1796     @param rlist: a map with node names as keys and OS objects as values
1797
1798     @rtype: dict
1799     @return: a dictionary with osnames as keys and as value another map, with
1800         nodes as keys and list of OS objects as values, eg::
1801
1802           {"debian-etch": {"node1": [<object>,...],
1803                            "node2": [<object>,]}
1804           }
1805
1806     """
1807     all_os = {}
1808     # we build here the list of nodes that didn't fail the RPC (at RPC
1809     # level), so that nodes with a non-responding node daemon don't
1810     # make all OSes invalid
1811     good_nodes = [node_name for node_name in rlist
1812                   if not rlist[node_name].failed]
1813     for node_name, nr in rlist.iteritems():
1814       if nr.failed or not nr.data:
1815         continue
1816       for os_obj in nr.data:
1817         if os_obj.name not in all_os:
1818           # build a list of nodes for this os containing empty lists
1819           # for each node in node_list
1820           all_os[os_obj.name] = {}
1821           for nname in good_nodes:
1822             all_os[os_obj.name][nname] = []
1823         all_os[os_obj.name][node_name].append(os_obj)
1824     return all_os
1825
1826   def Exec(self, feedback_fn):
1827     """Compute the list of OSes.
1828
1829     """
1830     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831     node_data = self.rpc.call_os_diagnose(valid_nodes)
1832     if node_data == False:
1833       raise errors.OpExecError("Can't gather the list of OSes")
1834     pol = self._DiagnoseByOS(valid_nodes, node_data)
1835     output = []
1836     for os_name, os_data in pol.iteritems():
1837       row = []
1838       for field in self.op.output_fields:
1839         if field == "name":
1840           val = os_name
1841         elif field == "valid":
1842           val = utils.all([osl and osl[0] for osl in os_data.values()])
1843         elif field == "node_status":
1844           val = {}
1845           for node_name, nos_list in os_data.iteritems():
1846             val[node_name] = [(v.status, v.path) for v in nos_list]
1847         else:
1848           raise errors.ParameterError(field)
1849         row.append(val)
1850       output.append(row)
1851
1852     return output
1853
1854
1855 class LURemoveNode(LogicalUnit):
1856   """Logical unit for removing a node.
1857
1858   """
1859   HPATH = "node-remove"
1860   HTYPE = constants.HTYPE_NODE
1861   _OP_REQP = ["node_name"]
1862
1863   def BuildHooksEnv(self):
1864     """Build hooks env.
1865
1866     This doesn't run on the target node in the pre phase as a failed
1867     node would then be impossible to remove.
1868
1869     """
1870     env = {
1871       "OP_TARGET": self.op.node_name,
1872       "NODE_NAME": self.op.node_name,
1873       }
1874     all_nodes = self.cfg.GetNodeList()
1875     all_nodes.remove(self.op.node_name)
1876     return env, all_nodes, all_nodes
1877
1878   def CheckPrereq(self):
1879     """Check prerequisites.
1880
1881     This checks:
1882      - the node exists in the configuration
1883      - it does not have primary or secondary instances
1884      - it's not the master
1885
1886     Any errors are signalled by raising errors.OpPrereqError.
1887
1888     """
1889     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1890     if node is None:
1891       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1892
1893     instance_list = self.cfg.GetInstanceList()
1894
1895     masternode = self.cfg.GetMasterNode()
1896     if node.name == masternode:
1897       raise errors.OpPrereqError("Node is the master node,"
1898                                  " you need to failover first.")
1899
1900     for instance_name in instance_list:
1901       instance = self.cfg.GetInstanceInfo(instance_name)
1902       if node.name in instance.all_nodes:
1903         raise errors.OpPrereqError("Instance %s is still running on the node,"
1904                                    " please remove first." % instance_name)
1905     self.op.node_name = node.name
1906     self.node = node
1907
1908   def Exec(self, feedback_fn):
1909     """Removes the node from the cluster.
1910
1911     """
1912     node = self.node
1913     logging.info("Stopping the node daemon and removing configs from node %s",
1914                  node.name)
1915
1916     self.context.RemoveNode(node.name)
1917
1918     self.rpc.call_node_leave_cluster(node.name)
1919
1920     # Promote nodes to master candidate as needed
1921     _AdjustCandidatePool(self)
1922
1923
1924 class LUQueryNodes(NoHooksLU):
1925   """Logical unit for querying nodes.
1926
1927   """
1928   _OP_REQP = ["output_fields", "names", "use_locking"]
1929   REQ_BGL = False
1930   _FIELDS_DYNAMIC = utils.FieldSet(
1931     "dtotal", "dfree",
1932     "mtotal", "mnode", "mfree",
1933     "bootid",
1934     "ctotal", "cnodes", "csockets",
1935     )
1936
1937   _FIELDS_STATIC = utils.FieldSet(
1938     "name", "pinst_cnt", "sinst_cnt",
1939     "pinst_list", "sinst_list",
1940     "pip", "sip", "tags",
1941     "serial_no",
1942     "master_candidate",
1943     "master",
1944     "offline",
1945     "drained",
1946     )
1947
1948   def ExpandNames(self):
1949     _CheckOutputFields(static=self._FIELDS_STATIC,
1950                        dynamic=self._FIELDS_DYNAMIC,
1951                        selected=self.op.output_fields)
1952
1953     self.needed_locks = {}
1954     self.share_locks[locking.LEVEL_NODE] = 1
1955
1956     if self.op.names:
1957       self.wanted = _GetWantedNodes(self, self.op.names)
1958     else:
1959       self.wanted = locking.ALL_SET
1960
1961     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962     self.do_locking = self.do_node_query and self.op.use_locking
1963     if self.do_locking:
1964       # if we don't request only static fields, we need to lock the nodes
1965       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1966
1967
1968   def CheckPrereq(self):
1969     """Check prerequisites.
1970
1971     """
1972     # The validation of the node list is done in the _GetWantedNodes,
1973     # if non empty, and if empty, there's no validation to do
1974     pass
1975
1976   def Exec(self, feedback_fn):
1977     """Computes the list of nodes and their attributes.
1978
1979     """
1980     all_info = self.cfg.GetAllNodesInfo()
1981     if self.do_locking:
1982       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983     elif self.wanted != locking.ALL_SET:
1984       nodenames = self.wanted
1985       missing = set(nodenames).difference(all_info.keys())
1986       if missing:
1987         raise errors.OpExecError(
1988           "Some nodes were removed before retrieving their data: %s" % missing)
1989     else:
1990       nodenames = all_info.keys()
1991
1992     nodenames = utils.NiceSort(nodenames)
1993     nodelist = [all_info[name] for name in nodenames]
1994
1995     # begin data gathering
1996
1997     if self.do_node_query:
1998       live_data = {}
1999       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000                                           self.cfg.GetHypervisorType())
2001       for name in nodenames:
2002         nodeinfo = node_data[name]
2003         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2004           nodeinfo = nodeinfo.payload
2005           fn = utils.TryConvert
2006           live_data[name] = {
2007             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013             "bootid": nodeinfo.get('bootid', None),
2014             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2016             }
2017         else:
2018           live_data[name] = {}
2019     else:
2020       live_data = dict.fromkeys(nodenames, {})
2021
2022     node_to_primary = dict([(name, set()) for name in nodenames])
2023     node_to_secondary = dict([(name, set()) for name in nodenames])
2024
2025     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026                              "sinst_cnt", "sinst_list"))
2027     if inst_fields & frozenset(self.op.output_fields):
2028       instancelist = self.cfg.GetInstanceList()
2029
2030       for instance_name in instancelist:
2031         inst = self.cfg.GetInstanceInfo(instance_name)
2032         if inst.primary_node in node_to_primary:
2033           node_to_primary[inst.primary_node].add(inst.name)
2034         for secnode in inst.secondary_nodes:
2035           if secnode in node_to_secondary:
2036             node_to_secondary[secnode].add(inst.name)
2037
2038     master_node = self.cfg.GetMasterNode()
2039
2040     # end data gathering
2041
2042     output = []
2043     for node in nodelist:
2044       node_output = []
2045       for field in self.op.output_fields:
2046         if field == "name":
2047           val = node.name
2048         elif field == "pinst_list":
2049           val = list(node_to_primary[node.name])
2050         elif field == "sinst_list":
2051           val = list(node_to_secondary[node.name])
2052         elif field == "pinst_cnt":
2053           val = len(node_to_primary[node.name])
2054         elif field == "sinst_cnt":
2055           val = len(node_to_secondary[node.name])
2056         elif field == "pip":
2057           val = node.primary_ip
2058         elif field == "sip":
2059           val = node.secondary_ip
2060         elif field == "tags":
2061           val = list(node.GetTags())
2062         elif field == "serial_no":
2063           val = node.serial_no
2064         elif field == "master_candidate":
2065           val = node.master_candidate
2066         elif field == "master":
2067           val = node.name == master_node
2068         elif field == "offline":
2069           val = node.offline
2070         elif field == "drained":
2071           val = node.drained
2072         elif self._FIELDS_DYNAMIC.Matches(field):
2073           val = live_data[node.name].get(field, None)
2074         else:
2075           raise errors.ParameterError(field)
2076         node_output.append(val)
2077       output.append(node_output)
2078
2079     return output
2080
2081
2082 class LUQueryNodeVolumes(NoHooksLU):
2083   """Logical unit for getting volumes on node(s).
2084
2085   """
2086   _OP_REQP = ["nodes", "output_fields"]
2087   REQ_BGL = False
2088   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089   _FIELDS_STATIC = utils.FieldSet("node")
2090
2091   def ExpandNames(self):
2092     _CheckOutputFields(static=self._FIELDS_STATIC,
2093                        dynamic=self._FIELDS_DYNAMIC,
2094                        selected=self.op.output_fields)
2095
2096     self.needed_locks = {}
2097     self.share_locks[locking.LEVEL_NODE] = 1
2098     if not self.op.nodes:
2099       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2100     else:
2101       self.needed_locks[locking.LEVEL_NODE] = \
2102         _GetWantedNodes(self, self.op.nodes)
2103
2104   def CheckPrereq(self):
2105     """Check prerequisites.
2106
2107     This checks that the fields required are valid output fields.
2108
2109     """
2110     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2111
2112   def Exec(self, feedback_fn):
2113     """Computes the list of nodes and their attributes.
2114
2115     """
2116     nodenames = self.nodes
2117     volumes = self.rpc.call_node_volumes(nodenames)
2118
2119     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120              in self.cfg.GetInstanceList()]
2121
2122     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2123
2124     output = []
2125     for node in nodenames:
2126       if node not in volumes or volumes[node].failed or not volumes[node].data:
2127         continue
2128
2129       node_vols = volumes[node].data[:]
2130       node_vols.sort(key=lambda vol: vol['dev'])
2131
2132       for vol in node_vols:
2133         node_output = []
2134         for field in self.op.output_fields:
2135           if field == "node":
2136             val = node
2137           elif field == "phys":
2138             val = vol['dev']
2139           elif field == "vg":
2140             val = vol['vg']
2141           elif field == "name":
2142             val = vol['name']
2143           elif field == "size":
2144             val = int(float(vol['size']))
2145           elif field == "instance":
2146             for inst in ilist:
2147               if node not in lv_by_node[inst]:
2148                 continue
2149               if vol['name'] in lv_by_node[inst][node]:
2150                 val = inst.name
2151                 break
2152             else:
2153               val = '-'
2154           else:
2155             raise errors.ParameterError(field)
2156           node_output.append(str(val))
2157
2158         output.append(node_output)
2159
2160     return output
2161
2162
2163 class LUAddNode(LogicalUnit):
2164   """Logical unit for adding node to the cluster.
2165
2166   """
2167   HPATH = "node-add"
2168   HTYPE = constants.HTYPE_NODE
2169   _OP_REQP = ["node_name"]
2170
2171   def BuildHooksEnv(self):
2172     """Build hooks env.
2173
2174     This will run on all nodes before, and on all nodes + the new node after.
2175
2176     """
2177     env = {
2178       "OP_TARGET": self.op.node_name,
2179       "NODE_NAME": self.op.node_name,
2180       "NODE_PIP": self.op.primary_ip,
2181       "NODE_SIP": self.op.secondary_ip,
2182       }
2183     nodes_0 = self.cfg.GetNodeList()
2184     nodes_1 = nodes_0 + [self.op.node_name, ]
2185     return env, nodes_0, nodes_1
2186
2187   def CheckPrereq(self):
2188     """Check prerequisites.
2189
2190     This checks:
2191      - the new node is not already in the config
2192      - it is resolvable
2193      - its parameters (single/dual homed) matches the cluster
2194
2195     Any errors are signalled by raising errors.OpPrereqError.
2196
2197     """
2198     node_name = self.op.node_name
2199     cfg = self.cfg
2200
2201     dns_data = utils.HostInfo(node_name)
2202
2203     node = dns_data.name
2204     primary_ip = self.op.primary_ip = dns_data.ip
2205     secondary_ip = getattr(self.op, "secondary_ip", None)
2206     if secondary_ip is None:
2207       secondary_ip = primary_ip
2208     if not utils.IsValidIP(secondary_ip):
2209       raise errors.OpPrereqError("Invalid secondary IP given")
2210     self.op.secondary_ip = secondary_ip
2211
2212     node_list = cfg.GetNodeList()
2213     if not self.op.readd and node in node_list:
2214       raise errors.OpPrereqError("Node %s is already in the configuration" %
2215                                  node)
2216     elif self.op.readd and node not in node_list:
2217       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2218
2219     for existing_node_name in node_list:
2220       existing_node = cfg.GetNodeInfo(existing_node_name)
2221
2222       if self.op.readd and node == existing_node_name:
2223         if (existing_node.primary_ip != primary_ip or
2224             existing_node.secondary_ip != secondary_ip):
2225           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226                                      " address configuration as before")
2227         continue
2228
2229       if (existing_node.primary_ip == primary_ip or
2230           existing_node.secondary_ip == primary_ip or
2231           existing_node.primary_ip == secondary_ip or
2232           existing_node.secondary_ip == secondary_ip):
2233         raise errors.OpPrereqError("New node ip address(es) conflict with"
2234                                    " existing node %s" % existing_node.name)
2235
2236     # check that the type of the node (single versus dual homed) is the
2237     # same as for the master
2238     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239     master_singlehomed = myself.secondary_ip == myself.primary_ip
2240     newbie_singlehomed = secondary_ip == primary_ip
2241     if master_singlehomed != newbie_singlehomed:
2242       if master_singlehomed:
2243         raise errors.OpPrereqError("The master has no private ip but the"
2244                                    " new node has one")
2245       else:
2246         raise errors.OpPrereqError("The master has a private ip but the"
2247                                    " new node doesn't have one")
2248
2249     # checks reachablity
2250     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251       raise errors.OpPrereqError("Node not reachable by ping")
2252
2253     if not newbie_singlehomed:
2254       # check reachability from my secondary ip to newbie's secondary ip
2255       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256                            source=myself.secondary_ip):
2257         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258                                    " based ping to noded port")
2259
2260     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261     mc_now, _ = self.cfg.GetMasterCandidateStats()
2262     master_candidate = mc_now < cp_size
2263
2264     self.new_node = objects.Node(name=node,
2265                                  primary_ip=primary_ip,
2266                                  secondary_ip=secondary_ip,
2267                                  master_candidate=master_candidate,
2268                                  offline=False, drained=False)
2269
2270   def Exec(self, feedback_fn):
2271     """Adds the new node to the cluster.
2272
2273     """
2274     new_node = self.new_node
2275     node = new_node.name
2276
2277     # check connectivity
2278     result = self.rpc.call_version([node])[node]
2279     result.Raise()
2280     if result.data:
2281       if constants.PROTOCOL_VERSION == result.data:
2282         logging.info("Communication to node %s fine, sw version %s match",
2283                      node, result.data)
2284       else:
2285         raise errors.OpExecError("Version mismatch master version %s,"
2286                                  " node version %s" %
2287                                  (constants.PROTOCOL_VERSION, result.data))
2288     else:
2289       raise errors.OpExecError("Cannot get version from the new node")
2290
2291     # setup ssh on node
2292     logging.info("Copy ssh key to node %s", node)
2293     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2294     keyarray = []
2295     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2297                 priv_key, pub_key]
2298
2299     for i in keyfiles:
2300       f = open(i, 'r')
2301       try:
2302         keyarray.append(f.read())
2303       finally:
2304         f.close()
2305
2306     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2307                                     keyarray[2],
2308                                     keyarray[3], keyarray[4], keyarray[5])
2309
2310     msg = result.RemoteFailMsg()
2311     if msg:
2312       raise errors.OpExecError("Cannot transfer ssh keys to the"
2313                                " new node: %s" % msg)
2314
2315     # Add node to our /etc/hosts, and add key to known_hosts
2316     if self.cfg.GetClusterInfo().modify_etc_hosts:
2317       utils.AddHostToEtcHosts(new_node.name)
2318
2319     if new_node.secondary_ip != new_node.primary_ip:
2320       result = self.rpc.call_node_has_ip_address(new_node.name,
2321                                                  new_node.secondary_ip)
2322       msg = result.RemoteFailMsg()
2323       if msg:
2324         raise errors.OpPrereqError("Failure checking secondary ip"
2325                                    " on node %s: %s" % (new_node.name, msg))
2326       if not result.payload:
2327         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2328                                  " you gave (%s). Please fix and re-run this"
2329                                  " command." % new_node.secondary_ip)
2330
2331     node_verify_list = [self.cfg.GetMasterNode()]
2332     node_verify_param = {
2333       'nodelist': [node],
2334       # TODO: do a node-net-test as well?
2335     }
2336
2337     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2338                                        self.cfg.GetClusterName())
2339     for verifier in node_verify_list:
2340       msg = result[verifier].RemoteFailMsg()
2341       if msg:
2342         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2343                                  (verifier, msg))
2344       nl_payload = result[verifier].payload['nodelist']
2345       if nl_payload:
2346         for failed in nl_payload:
2347           feedback_fn("ssh/hostname verification failed %s -> %s" %
2348                       (verifier, nl_payload[failed]))
2349         raise errors.OpExecError("ssh/hostname verification failed.")
2350
2351     if self.op.readd:
2352       _RedistributeAncillaryFiles(self)
2353       self.context.ReaddNode(new_node)
2354     else:
2355       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2356       self.context.AddNode(new_node)
2357
2358
2359 class LUSetNodeParams(LogicalUnit):
2360   """Modifies the parameters of a node.
2361
2362   """
2363   HPATH = "node-modify"
2364   HTYPE = constants.HTYPE_NODE
2365   _OP_REQP = ["node_name"]
2366   REQ_BGL = False
2367
2368   def CheckArguments(self):
2369     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2370     if node_name is None:
2371       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2372     self.op.node_name = node_name
2373     _CheckBooleanOpField(self.op, 'master_candidate')
2374     _CheckBooleanOpField(self.op, 'offline')
2375     _CheckBooleanOpField(self.op, 'drained')
2376     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2377     if all_mods.count(None) == 3:
2378       raise errors.OpPrereqError("Please pass at least one modification")
2379     if all_mods.count(True) > 1:
2380       raise errors.OpPrereqError("Can't set the node into more than one"
2381                                  " state at the same time")
2382
2383   def ExpandNames(self):
2384     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2385
2386   def BuildHooksEnv(self):
2387     """Build hooks env.
2388
2389     This runs on the master node.
2390
2391     """
2392     env = {
2393       "OP_TARGET": self.op.node_name,
2394       "MASTER_CANDIDATE": str(self.op.master_candidate),
2395       "OFFLINE": str(self.op.offline),
2396       "DRAINED": str(self.op.drained),
2397       }
2398     nl = [self.cfg.GetMasterNode(),
2399           self.op.node_name]
2400     return env, nl, nl
2401
2402   def CheckPrereq(self):
2403     """Check prerequisites.
2404
2405     This only checks the instance list against the existing names.
2406
2407     """
2408     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2409
2410     if ((self.op.master_candidate == False or self.op.offline == True or
2411          self.op.drained == True) and node.master_candidate):
2412       # we will demote the node from master_candidate
2413       if self.op.node_name == self.cfg.GetMasterNode():
2414         raise errors.OpPrereqError("The master node has to be a"
2415                                    " master candidate, online and not drained")
2416       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2417       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2418       if num_candidates <= cp_size:
2419         msg = ("Not enough master candidates (desired"
2420                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2421         if self.op.force:
2422           self.LogWarning(msg)
2423         else:
2424           raise errors.OpPrereqError(msg)
2425
2426     if (self.op.master_candidate == True and
2427         ((node.offline and not self.op.offline == False) or
2428          (node.drained and not self.op.drained == False))):
2429       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2430                                  " to master_candidate" % node.name)
2431
2432     return
2433
2434   def Exec(self, feedback_fn):
2435     """Modifies a node.
2436
2437     """
2438     node = self.node
2439
2440     result = []
2441     changed_mc = False
2442
2443     if self.op.offline is not None:
2444       node.offline = self.op.offline
2445       result.append(("offline", str(self.op.offline)))
2446       if self.op.offline == True:
2447         if node.master_candidate:
2448           node.master_candidate = False
2449           changed_mc = True
2450           result.append(("master_candidate", "auto-demotion due to offline"))
2451         if node.drained:
2452           node.drained = False
2453           result.append(("drained", "clear drained status due to offline"))
2454
2455     if self.op.master_candidate is not None:
2456       node.master_candidate = self.op.master_candidate
2457       changed_mc = True
2458       result.append(("master_candidate", str(self.op.master_candidate)))
2459       if self.op.master_candidate == False:
2460         rrc = self.rpc.call_node_demote_from_mc(node.name)
2461         msg = rrc.RemoteFailMsg()
2462         if msg:
2463           self.LogWarning("Node failed to demote itself: %s" % msg)
2464
2465     if self.op.drained is not None:
2466       node.drained = self.op.drained
2467       result.append(("drained", str(self.op.drained)))
2468       if self.op.drained == True:
2469         if node.master_candidate:
2470           node.master_candidate = False
2471           changed_mc = True
2472           result.append(("master_candidate", "auto-demotion due to drain"))
2473         if node.offline:
2474           node.offline = False
2475           result.append(("offline", "clear offline status due to drain"))
2476
2477     # this will trigger configuration file update, if needed
2478     self.cfg.Update(node)
2479     # this will trigger job queue propagation or cleanup
2480     if changed_mc:
2481       self.context.ReaddNode(node)
2482
2483     return result
2484
2485
2486 class LUPowercycleNode(NoHooksLU):
2487   """Powercycles a node.
2488
2489   """
2490   _OP_REQP = ["node_name", "force"]
2491   REQ_BGL = False
2492
2493   def CheckArguments(self):
2494     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2495     if node_name is None:
2496       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2497     self.op.node_name = node_name
2498     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2499       raise errors.OpPrereqError("The node is the master and the force"
2500                                  " parameter was not set")
2501
2502   def ExpandNames(self):
2503     """Locking for PowercycleNode.
2504
2505     This is a last-resource option and shouldn't block on other
2506     jobs. Therefore, we grab no locks.
2507
2508     """
2509     self.needed_locks = {}
2510
2511   def CheckPrereq(self):
2512     """Check prerequisites.
2513
2514     This LU has no prereqs.
2515
2516     """
2517     pass
2518
2519   def Exec(self, feedback_fn):
2520     """Reboots a node.
2521
2522     """
2523     result = self.rpc.call_node_powercycle(self.op.node_name,
2524                                            self.cfg.GetHypervisorType())
2525     msg = result.RemoteFailMsg()
2526     if msg:
2527       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2528     return result.payload
2529
2530
2531 class LUQueryClusterInfo(NoHooksLU):
2532   """Query cluster configuration.
2533
2534   """
2535   _OP_REQP = []
2536   REQ_BGL = False
2537
2538   def ExpandNames(self):
2539     self.needed_locks = {}
2540
2541   def CheckPrereq(self):
2542     """No prerequsites needed for this LU.
2543
2544     """
2545     pass
2546
2547   def Exec(self, feedback_fn):
2548     """Return cluster config.
2549
2550     """
2551     cluster = self.cfg.GetClusterInfo()
2552     result = {
2553       "software_version": constants.RELEASE_VERSION,
2554       "protocol_version": constants.PROTOCOL_VERSION,
2555       "config_version": constants.CONFIG_VERSION,
2556       "os_api_version": constants.OS_API_VERSION,
2557       "export_version": constants.EXPORT_VERSION,
2558       "architecture": (platform.architecture()[0], platform.machine()),
2559       "name": cluster.cluster_name,
2560       "master": cluster.master_node,
2561       "default_hypervisor": cluster.default_hypervisor,
2562       "enabled_hypervisors": cluster.enabled_hypervisors,
2563       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2564                         for hypervisor in cluster.enabled_hypervisors]),
2565       "beparams": cluster.beparams,
2566       "nicparams": cluster.nicparams,
2567       "candidate_pool_size": cluster.candidate_pool_size,
2568       "master_netdev": cluster.master_netdev,
2569       "volume_group_name": cluster.volume_group_name,
2570       "file_storage_dir": cluster.file_storage_dir,
2571       }
2572
2573     return result
2574
2575
2576 class LUQueryConfigValues(NoHooksLU):
2577   """Return configuration values.
2578
2579   """
2580   _OP_REQP = []
2581   REQ_BGL = False
2582   _FIELDS_DYNAMIC = utils.FieldSet()
2583   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2584
2585   def ExpandNames(self):
2586     self.needed_locks = {}
2587
2588     _CheckOutputFields(static=self._FIELDS_STATIC,
2589                        dynamic=self._FIELDS_DYNAMIC,
2590                        selected=self.op.output_fields)
2591
2592   def CheckPrereq(self):
2593     """No prerequisites.
2594
2595     """
2596     pass
2597
2598   def Exec(self, feedback_fn):
2599     """Dump a representation of the cluster config to the standard output.
2600
2601     """
2602     values = []
2603     for field in self.op.output_fields:
2604       if field == "cluster_name":
2605         entry = self.cfg.GetClusterName()
2606       elif field == "master_node":
2607         entry = self.cfg.GetMasterNode()
2608       elif field == "drain_flag":
2609         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2610       else:
2611         raise errors.ParameterError(field)
2612       values.append(entry)
2613     return values
2614
2615
2616 class LUActivateInstanceDisks(NoHooksLU):
2617   """Bring up an instance's disks.
2618
2619   """
2620   _OP_REQP = ["instance_name"]
2621   REQ_BGL = False
2622
2623   def ExpandNames(self):
2624     self._ExpandAndLockInstance()
2625     self.needed_locks[locking.LEVEL_NODE] = []
2626     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2627
2628   def DeclareLocks(self, level):
2629     if level == locking.LEVEL_NODE:
2630       self._LockInstancesNodes()
2631
2632   def CheckPrereq(self):
2633     """Check prerequisites.
2634
2635     This checks that the instance is in the cluster.
2636
2637     """
2638     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639     assert self.instance is not None, \
2640       "Cannot retrieve locked instance %s" % self.op.instance_name
2641     _CheckNodeOnline(self, self.instance.primary_node)
2642
2643   def Exec(self, feedback_fn):
2644     """Activate the disks.
2645
2646     """
2647     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2648     if not disks_ok:
2649       raise errors.OpExecError("Cannot activate block devices")
2650
2651     return disks_info
2652
2653
2654 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2655   """Prepare the block devices for an instance.
2656
2657   This sets up the block devices on all nodes.
2658
2659   @type lu: L{LogicalUnit}
2660   @param lu: the logical unit on whose behalf we execute
2661   @type instance: L{objects.Instance}
2662   @param instance: the instance for whose disks we assemble
2663   @type ignore_secondaries: boolean
2664   @param ignore_secondaries: if true, errors on secondary nodes
2665       won't result in an error return from the function
2666   @return: False if the operation failed, otherwise a list of
2667       (host, instance_visible_name, node_visible_name)
2668       with the mapping from node devices to instance devices
2669
2670   """
2671   device_info = []
2672   disks_ok = True
2673   iname = instance.name
2674   # With the two passes mechanism we try to reduce the window of
2675   # opportunity for the race condition of switching DRBD to primary
2676   # before handshaking occured, but we do not eliminate it
2677
2678   # The proper fix would be to wait (with some limits) until the
2679   # connection has been made and drbd transitions from WFConnection
2680   # into any other network-connected state (Connected, SyncTarget,
2681   # SyncSource, etc.)
2682
2683   # 1st pass, assemble on all nodes in secondary mode
2684   for inst_disk in instance.disks:
2685     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2686       lu.cfg.SetDiskID(node_disk, node)
2687       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2688       msg = result.RemoteFailMsg()
2689       if msg:
2690         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2691                            " (is_primary=False, pass=1): %s",
2692                            inst_disk.iv_name, node, msg)
2693         if not ignore_secondaries:
2694           disks_ok = False
2695
2696   # FIXME: race condition on drbd migration to primary
2697
2698   # 2nd pass, do only the primary node
2699   for inst_disk in instance.disks:
2700     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2701       if node != instance.primary_node:
2702         continue
2703       lu.cfg.SetDiskID(node_disk, node)
2704       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2705       msg = result.RemoteFailMsg()
2706       if msg:
2707         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2708                            " (is_primary=True, pass=2): %s",
2709                            inst_disk.iv_name, node, msg)
2710         disks_ok = False
2711     device_info.append((instance.primary_node, inst_disk.iv_name,
2712                         result.payload))
2713
2714   # leave the disks configured for the primary node
2715   # this is a workaround that would be fixed better by
2716   # improving the logical/physical id handling
2717   for disk in instance.disks:
2718     lu.cfg.SetDiskID(disk, instance.primary_node)
2719
2720   return disks_ok, device_info
2721
2722
2723 def _StartInstanceDisks(lu, instance, force):
2724   """Start the disks of an instance.
2725
2726   """
2727   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2728                                            ignore_secondaries=force)
2729   if not disks_ok:
2730     _ShutdownInstanceDisks(lu, instance)
2731     if force is not None and not force:
2732       lu.proc.LogWarning("", hint="If the message above refers to a"
2733                          " secondary node,"
2734                          " you can retry the operation using '--force'.")
2735     raise errors.OpExecError("Disk consistency error")
2736
2737
2738 class LUDeactivateInstanceDisks(NoHooksLU):
2739   """Shutdown an instance's disks.
2740
2741   """
2742   _OP_REQP = ["instance_name"]
2743   REQ_BGL = False
2744
2745   def ExpandNames(self):
2746     self._ExpandAndLockInstance()
2747     self.needed_locks[locking.LEVEL_NODE] = []
2748     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2749
2750   def DeclareLocks(self, level):
2751     if level == locking.LEVEL_NODE:
2752       self._LockInstancesNodes()
2753
2754   def CheckPrereq(self):
2755     """Check prerequisites.
2756
2757     This checks that the instance is in the cluster.
2758
2759     """
2760     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2761     assert self.instance is not None, \
2762       "Cannot retrieve locked instance %s" % self.op.instance_name
2763
2764   def Exec(self, feedback_fn):
2765     """Deactivate the disks
2766
2767     """
2768     instance = self.instance
2769     _SafeShutdownInstanceDisks(self, instance)
2770
2771
2772 def _SafeShutdownInstanceDisks(lu, instance):
2773   """Shutdown block devices of an instance.
2774
2775   This function checks if an instance is running, before calling
2776   _ShutdownInstanceDisks.
2777
2778   """
2779   pnode = instance.primary_node
2780   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2781   ins_l = ins_l[pnode]
2782   msg = ins_l.RemoteFailMsg()
2783   if msg:
2784     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2785
2786   if instance.name in ins_l.payload:
2787     raise errors.OpExecError("Instance is running, can't shutdown"
2788                              " block devices.")
2789
2790   _ShutdownInstanceDisks(lu, instance)
2791
2792
2793 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2794   """Shutdown block devices of an instance.
2795
2796   This does the shutdown on all nodes of the instance.
2797
2798   If the ignore_primary is false, errors on the primary node are
2799   ignored.
2800
2801   """
2802   all_result = True
2803   for disk in instance.disks:
2804     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2805       lu.cfg.SetDiskID(top_disk, node)
2806       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2807       msg = result.RemoteFailMsg()
2808       if msg:
2809         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2810                       disk.iv_name, node, msg)
2811         if not ignore_primary or node != instance.primary_node:
2812           all_result = False
2813   return all_result
2814
2815
2816 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2817   """Checks if a node has enough free memory.
2818
2819   This function check if a given node has the needed amount of free
2820   memory. In case the node has less memory or we cannot get the
2821   information from the node, this function raise an OpPrereqError
2822   exception.
2823
2824   @type lu: C{LogicalUnit}
2825   @param lu: a logical unit from which we get configuration data
2826   @type node: C{str}
2827   @param node: the node to check
2828   @type reason: C{str}
2829   @param reason: string to use in the error message
2830   @type requested: C{int}
2831   @param requested: the amount of memory in MiB to check for
2832   @type hypervisor_name: C{str}
2833   @param hypervisor_name: the hypervisor to ask for memory stats
2834   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2835       we cannot check the node
2836
2837   """
2838   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2839   msg = nodeinfo[node].RemoteFailMsg()
2840   if msg:
2841     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2842   free_mem = nodeinfo[node].payload.get('memory_free', None)
2843   if not isinstance(free_mem, int):
2844     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2845                                " was '%s'" % (node, free_mem))
2846   if requested > free_mem:
2847     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2848                                " needed %s MiB, available %s MiB" %
2849                                (node, reason, requested, free_mem))
2850
2851
2852 class LUStartupInstance(LogicalUnit):
2853   """Starts an instance.
2854
2855   """
2856   HPATH = "instance-start"
2857   HTYPE = constants.HTYPE_INSTANCE
2858   _OP_REQP = ["instance_name", "force"]
2859   REQ_BGL = False
2860
2861   def ExpandNames(self):
2862     self._ExpandAndLockInstance()
2863
2864   def BuildHooksEnv(self):
2865     """Build hooks env.
2866
2867     This runs on master, primary and secondary nodes of the instance.
2868
2869     """
2870     env = {
2871       "FORCE": self.op.force,
2872       }
2873     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2874     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2875     return env, nl, nl
2876
2877   def CheckPrereq(self):
2878     """Check prerequisites.
2879
2880     This checks that the instance is in the cluster.
2881
2882     """
2883     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2884     assert self.instance is not None, \
2885       "Cannot retrieve locked instance %s" % self.op.instance_name
2886
2887     # extra beparams
2888     self.beparams = getattr(self.op, "beparams", {})
2889     if self.beparams:
2890       if not isinstance(self.beparams, dict):
2891         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2892                                    " dict" % (type(self.beparams), ))
2893       # fill the beparams dict
2894       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2895       self.op.beparams = self.beparams
2896
2897     # extra hvparams
2898     self.hvparams = getattr(self.op, "hvparams", {})
2899     if self.hvparams:
2900       if not isinstance(self.hvparams, dict):
2901         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2902                                    " dict" % (type(self.hvparams), ))
2903
2904       # check hypervisor parameter syntax (locally)
2905       cluster = self.cfg.GetClusterInfo()
2906       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2907       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2908                                     instance.hvparams)
2909       filled_hvp.update(self.hvparams)
2910       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2911       hv_type.CheckParameterSyntax(filled_hvp)
2912       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2913       self.op.hvparams = self.hvparams
2914
2915     _CheckNodeOnline(self, instance.primary_node)
2916
2917     bep = self.cfg.GetClusterInfo().FillBE(instance)
2918     # check bridges existance
2919     _CheckInstanceBridgesExist(self, instance)
2920
2921     remote_info = self.rpc.call_instance_info(instance.primary_node,
2922                                               instance.name,
2923                                               instance.hypervisor)
2924     msg = remote_info.RemoteFailMsg()
2925     if msg:
2926       raise errors.OpPrereqError("Error checking node %s: %s" %
2927                                  (instance.primary_node, msg))
2928     if not remote_info.payload: # not running already
2929       _CheckNodeFreeMemory(self, instance.primary_node,
2930                            "starting instance %s" % instance.name,
2931                            bep[constants.BE_MEMORY], instance.hypervisor)
2932
2933   def Exec(self, feedback_fn):
2934     """Start the instance.
2935
2936     """
2937     instance = self.instance
2938     force = self.op.force
2939
2940     self.cfg.MarkInstanceUp(instance.name)
2941
2942     node_current = instance.primary_node
2943
2944     _StartInstanceDisks(self, instance, force)
2945
2946     result = self.rpc.call_instance_start(node_current, instance,
2947                                           self.hvparams, self.beparams)
2948     msg = result.RemoteFailMsg()
2949     if msg:
2950       _ShutdownInstanceDisks(self, instance)
2951       raise errors.OpExecError("Could not start instance: %s" % msg)
2952
2953
2954 class LURebootInstance(LogicalUnit):
2955   """Reboot an instance.
2956
2957   """
2958   HPATH = "instance-reboot"
2959   HTYPE = constants.HTYPE_INSTANCE
2960   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2961   REQ_BGL = False
2962
2963   def ExpandNames(self):
2964     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2965                                    constants.INSTANCE_REBOOT_HARD,
2966                                    constants.INSTANCE_REBOOT_FULL]:
2967       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2968                                   (constants.INSTANCE_REBOOT_SOFT,
2969                                    constants.INSTANCE_REBOOT_HARD,
2970                                    constants.INSTANCE_REBOOT_FULL))
2971     self._ExpandAndLockInstance()
2972
2973   def BuildHooksEnv(self):
2974     """Build hooks env.
2975
2976     This runs on master, primary and secondary nodes of the instance.
2977
2978     """
2979     env = {
2980       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2981       "REBOOT_TYPE": self.op.reboot_type,
2982       }
2983     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2984     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2985     return env, nl, nl
2986
2987   def CheckPrereq(self):
2988     """Check prerequisites.
2989
2990     This checks that the instance is in the cluster.
2991
2992     """
2993     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2994     assert self.instance is not None, \
2995       "Cannot retrieve locked instance %s" % self.op.instance_name
2996
2997     _CheckNodeOnline(self, instance.primary_node)
2998
2999     # check bridges existance
3000     _CheckInstanceBridgesExist(self, instance)
3001
3002   def Exec(self, feedback_fn):
3003     """Reboot the instance.
3004
3005     """
3006     instance = self.instance
3007     ignore_secondaries = self.op.ignore_secondaries
3008     reboot_type = self.op.reboot_type
3009
3010     node_current = instance.primary_node
3011
3012     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3013                        constants.INSTANCE_REBOOT_HARD]:
3014       for disk in instance.disks:
3015         self.cfg.SetDiskID(disk, node_current)
3016       result = self.rpc.call_instance_reboot(node_current, instance,
3017                                              reboot_type)
3018       msg = result.RemoteFailMsg()
3019       if msg:
3020         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3021     else:
3022       result = self.rpc.call_instance_shutdown(node_current, instance)
3023       msg = result.RemoteFailMsg()
3024       if msg:
3025         raise errors.OpExecError("Could not shutdown instance for"
3026                                  " full reboot: %s" % msg)
3027       _ShutdownInstanceDisks(self, instance)
3028       _StartInstanceDisks(self, instance, ignore_secondaries)
3029       result = self.rpc.call_instance_start(node_current, instance, None, None)
3030       msg = result.RemoteFailMsg()
3031       if msg:
3032         _ShutdownInstanceDisks(self, instance)
3033         raise errors.OpExecError("Could not start instance for"
3034                                  " full reboot: %s" % msg)
3035
3036     self.cfg.MarkInstanceUp(instance.name)
3037
3038
3039 class LUShutdownInstance(LogicalUnit):
3040   """Shutdown an instance.
3041
3042   """
3043   HPATH = "instance-stop"
3044   HTYPE = constants.HTYPE_INSTANCE
3045   _OP_REQP = ["instance_name"]
3046   REQ_BGL = False
3047
3048   def ExpandNames(self):
3049     self._ExpandAndLockInstance()
3050
3051   def BuildHooksEnv(self):
3052     """Build hooks env.
3053
3054     This runs on master, primary and secondary nodes of the instance.
3055
3056     """
3057     env = _BuildInstanceHookEnvByObject(self, self.instance)
3058     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3059     return env, nl, nl
3060
3061   def CheckPrereq(self):
3062     """Check prerequisites.
3063
3064     This checks that the instance is in the cluster.
3065
3066     """
3067     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3068     assert self.instance is not None, \
3069       "Cannot retrieve locked instance %s" % self.op.instance_name
3070     _CheckNodeOnline(self, self.instance.primary_node)
3071
3072   def Exec(self, feedback_fn):
3073     """Shutdown the instance.
3074
3075     """
3076     instance = self.instance
3077     node_current = instance.primary_node
3078     self.cfg.MarkInstanceDown(instance.name)
3079     result = self.rpc.call_instance_shutdown(node_current, instance)
3080     msg = result.RemoteFailMsg()
3081     if msg:
3082       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3083
3084     _ShutdownInstanceDisks(self, instance)
3085
3086
3087 class LUReinstallInstance(LogicalUnit):
3088   """Reinstall an instance.
3089
3090   """
3091   HPATH = "instance-reinstall"
3092   HTYPE = constants.HTYPE_INSTANCE
3093   _OP_REQP = ["instance_name"]
3094   REQ_BGL = False
3095
3096   def ExpandNames(self):
3097     self._ExpandAndLockInstance()
3098
3099   def BuildHooksEnv(self):
3100     """Build hooks env.
3101
3102     This runs on master, primary and secondary nodes of the instance.
3103
3104     """
3105     env = _BuildInstanceHookEnvByObject(self, self.instance)
3106     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3107     return env, nl, nl
3108
3109   def CheckPrereq(self):
3110     """Check prerequisites.
3111
3112     This checks that the instance is in the cluster and is not running.
3113
3114     """
3115     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3116     assert instance is not None, \
3117       "Cannot retrieve locked instance %s" % self.op.instance_name
3118     _CheckNodeOnline(self, instance.primary_node)
3119
3120     if instance.disk_template == constants.DT_DISKLESS:
3121       raise errors.OpPrereqError("Instance '%s' has no disks" %
3122                                  self.op.instance_name)
3123     if instance.admin_up:
3124       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3125                                  self.op.instance_name)
3126     remote_info = self.rpc.call_instance_info(instance.primary_node,
3127                                               instance.name,
3128                                               instance.hypervisor)
3129     msg = remote_info.RemoteFailMsg()
3130     if msg:
3131       raise errors.OpPrereqError("Error checking node %s: %s" %
3132                                  (instance.primary_node, msg))
3133     if remote_info.payload:
3134       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3135                                  (self.op.instance_name,
3136                                   instance.primary_node))
3137
3138     self.op.os_type = getattr(self.op, "os_type", None)
3139     if self.op.os_type is not None:
3140       # OS verification
3141       pnode = self.cfg.GetNodeInfo(
3142         self.cfg.ExpandNodeName(instance.primary_node))
3143       if pnode is None:
3144         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3145                                    self.op.pnode)
3146       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3147       result.Raise()
3148       if not isinstance(result.data, objects.OS):
3149         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3150                                    " primary node"  % self.op.os_type)
3151
3152     self.instance = instance
3153
3154   def Exec(self, feedback_fn):
3155     """Reinstall the instance.
3156
3157     """
3158     inst = self.instance
3159
3160     if self.op.os_type is not None:
3161       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3162       inst.os = self.op.os_type
3163       self.cfg.Update(inst)
3164
3165     _StartInstanceDisks(self, inst, None)
3166     try:
3167       feedback_fn("Running the instance OS create scripts...")
3168       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3169       msg = result.RemoteFailMsg()
3170       if msg:
3171         raise errors.OpExecError("Could not install OS for instance %s"
3172                                  " on node %s: %s" %
3173                                  (inst.name, inst.primary_node, msg))
3174     finally:
3175       _ShutdownInstanceDisks(self, inst)
3176
3177
3178 class LURenameInstance(LogicalUnit):
3179   """Rename an instance.
3180
3181   """
3182   HPATH = "instance-rename"
3183   HTYPE = constants.HTYPE_INSTANCE
3184   _OP_REQP = ["instance_name", "new_name"]
3185
3186   def BuildHooksEnv(self):
3187     """Build hooks env.
3188
3189     This runs on master, primary and secondary nodes of the instance.
3190
3191     """
3192     env = _BuildInstanceHookEnvByObject(self, self.instance)
3193     env["INSTANCE_NEW_NAME"] = self.op.new_name
3194     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3195     return env, nl, nl
3196
3197   def CheckPrereq(self):
3198     """Check prerequisites.
3199
3200     This checks that the instance is in the cluster and is not running.
3201
3202     """
3203     instance = self.cfg.GetInstanceInfo(
3204       self.cfg.ExpandInstanceName(self.op.instance_name))
3205     if instance is None:
3206       raise errors.OpPrereqError("Instance '%s' not known" %
3207                                  self.op.instance_name)
3208     _CheckNodeOnline(self, instance.primary_node)
3209
3210     if instance.admin_up:
3211       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3212                                  self.op.instance_name)
3213     remote_info = self.rpc.call_instance_info(instance.primary_node,
3214                                               instance.name,
3215                                               instance.hypervisor)
3216     msg = remote_info.RemoteFailMsg()
3217     if msg:
3218       raise errors.OpPrereqError("Error checking node %s: %s" %
3219                                  (instance.primary_node, msg))
3220     if remote_info.payload:
3221       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3222                                  (self.op.instance_name,
3223                                   instance.primary_node))
3224     self.instance = instance
3225
3226     # new name verification
3227     name_info = utils.HostInfo(self.op.new_name)
3228
3229     self.op.new_name = new_name = name_info.name
3230     instance_list = self.cfg.GetInstanceList()
3231     if new_name in instance_list:
3232       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3233                                  new_name)
3234
3235     if not getattr(self.op, "ignore_ip", False):
3236       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3237         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238                                    (name_info.ip, new_name))
3239
3240
3241   def Exec(self, feedback_fn):
3242     """Reinstall the instance.
3243
3244     """
3245     inst = self.instance
3246     old_name = inst.name
3247
3248     if inst.disk_template == constants.DT_FILE:
3249       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3250
3251     self.cfg.RenameInstance(inst.name, self.op.new_name)
3252     # Change the instance lock. This is definitely safe while we hold the BGL
3253     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3254     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3255
3256     # re-read the instance from the configuration after rename
3257     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3258
3259     if inst.disk_template == constants.DT_FILE:
3260       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3262                                                      old_file_storage_dir,
3263                                                      new_file_storage_dir)
3264       result.Raise()
3265       if not result.data:
3266         raise errors.OpExecError("Could not connect to node '%s' to rename"
3267                                  " directory '%s' to '%s' (but the instance"
3268                                  " has been renamed in Ganeti)" % (
3269                                  inst.primary_node, old_file_storage_dir,
3270                                  new_file_storage_dir))
3271
3272       if not result.data[0]:
3273         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3274                                  " (but the instance has been renamed in"
3275                                  " Ganeti)" % (old_file_storage_dir,
3276                                                new_file_storage_dir))
3277
3278     _StartInstanceDisks(self, inst, None)
3279     try:
3280       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3281                                                  old_name)
3282       msg = result.RemoteFailMsg()
3283       if msg:
3284         msg = ("Could not run OS rename script for instance %s on node %s"
3285                " (but the instance has been renamed in Ganeti): %s" %
3286                (inst.name, inst.primary_node, msg))
3287         self.proc.LogWarning(msg)
3288     finally:
3289       _ShutdownInstanceDisks(self, inst)
3290
3291
3292 class LURemoveInstance(LogicalUnit):
3293   """Remove an instance.
3294
3295   """
3296   HPATH = "instance-remove"
3297   HTYPE = constants.HTYPE_INSTANCE
3298   _OP_REQP = ["instance_name", "ignore_failures"]
3299   REQ_BGL = False
3300
3301   def ExpandNames(self):
3302     self._ExpandAndLockInstance()
3303     self.needed_locks[locking.LEVEL_NODE] = []
3304     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3305
3306   def DeclareLocks(self, level):
3307     if level == locking.LEVEL_NODE:
3308       self._LockInstancesNodes()
3309
3310   def BuildHooksEnv(self):
3311     """Build hooks env.
3312
3313     This runs on master, primary and secondary nodes of the instance.
3314
3315     """
3316     env = _BuildInstanceHookEnvByObject(self, self.instance)
3317     nl = [self.cfg.GetMasterNode()]
3318     return env, nl, nl
3319
3320   def CheckPrereq(self):
3321     """Check prerequisites.
3322
3323     This checks that the instance is in the cluster.
3324
3325     """
3326     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3327     assert self.instance is not None, \
3328       "Cannot retrieve locked instance %s" % self.op.instance_name
3329
3330   def Exec(self, feedback_fn):
3331     """Remove the instance.
3332
3333     """
3334     instance = self.instance
3335     logging.info("Shutting down instance %s on node %s",
3336                  instance.name, instance.primary_node)
3337
3338     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3339     msg = result.RemoteFailMsg()
3340     if msg:
3341       if self.op.ignore_failures:
3342         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3343       else:
3344         raise errors.OpExecError("Could not shutdown instance %s on"
3345                                  " node %s: %s" %
3346                                  (instance.name, instance.primary_node, msg))
3347
3348     logging.info("Removing block devices for instance %s", instance.name)
3349
3350     if not _RemoveDisks(self, instance):
3351       if self.op.ignore_failures:
3352         feedback_fn("Warning: can't remove instance's disks")
3353       else:
3354         raise errors.OpExecError("Can't remove instance's disks")
3355
3356     logging.info("Removing instance %s out of cluster config", instance.name)
3357
3358     self.cfg.RemoveInstance(instance.name)
3359     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3360
3361
3362 class LUQueryInstances(NoHooksLU):
3363   """Logical unit for querying instances.
3364
3365   """
3366   _OP_REQP = ["output_fields", "names", "use_locking"]
3367   REQ_BGL = False
3368   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3369                                     "admin_state",
3370                                     "disk_template", "ip", "mac", "bridge",
3371                                     "sda_size", "sdb_size", "vcpus", "tags",
3372                                     "network_port", "beparams",
3373                                     r"(disk)\.(size)/([0-9]+)",
3374                                     r"(disk)\.(sizes)", "disk_usage",
3375                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3376                                     r"(nic)\.(macs|ips|bridges)",
3377                                     r"(disk|nic)\.(count)",
3378                                     "serial_no", "hypervisor", "hvparams",] +
3379                                   ["hv/%s" % name
3380                                    for name in constants.HVS_PARAMETERS] +
3381                                   ["be/%s" % name
3382                                    for name in constants.BES_PARAMETERS])
3383   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3384
3385
3386   def ExpandNames(self):
3387     _CheckOutputFields(static=self._FIELDS_STATIC,
3388                        dynamic=self._FIELDS_DYNAMIC,
3389                        selected=self.op.output_fields)
3390
3391     self.needed_locks = {}
3392     self.share_locks[locking.LEVEL_INSTANCE] = 1
3393     self.share_locks[locking.LEVEL_NODE] = 1
3394
3395     if self.op.names:
3396       self.wanted = _GetWantedInstances(self, self.op.names)
3397     else:
3398       self.wanted = locking.ALL_SET
3399
3400     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3401     self.do_locking = self.do_node_query and self.op.use_locking
3402     if self.do_locking:
3403       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3404       self.needed_locks[locking.LEVEL_NODE] = []
3405       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3406
3407   def DeclareLocks(self, level):
3408     if level == locking.LEVEL_NODE and self.do_locking:
3409       self._LockInstancesNodes()
3410
3411   def CheckPrereq(self):
3412     """Check prerequisites.
3413
3414     """
3415     pass
3416
3417   def Exec(self, feedback_fn):
3418     """Computes the list of nodes and their attributes.
3419
3420     """
3421     all_info = self.cfg.GetAllInstancesInfo()
3422     if self.wanted == locking.ALL_SET:
3423       # caller didn't specify instance names, so ordering is not important
3424       if self.do_locking:
3425         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3426       else:
3427         instance_names = all_info.keys()
3428       instance_names = utils.NiceSort(instance_names)
3429     else:
3430       # caller did specify names, so we must keep the ordering
3431       if self.do_locking:
3432         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3433       else:
3434         tgt_set = all_info.keys()
3435       missing = set(self.wanted).difference(tgt_set)
3436       if missing:
3437         raise errors.OpExecError("Some instances were removed before"
3438                                  " retrieving their data: %s" % missing)
3439       instance_names = self.wanted
3440
3441     instance_list = [all_info[iname] for iname in instance_names]
3442
3443     # begin data gathering
3444
3445     nodes = frozenset([inst.primary_node for inst in instance_list])
3446     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3447
3448     bad_nodes = []
3449     off_nodes = []
3450     if self.do_node_query:
3451       live_data = {}
3452       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3453       for name in nodes:
3454         result = node_data[name]
3455         if result.offline:
3456           # offline nodes will be in both lists
3457           off_nodes.append(name)
3458         if result.failed or result.RemoteFailMsg():
3459           bad_nodes.append(name)
3460         else:
3461           if result.payload:
3462             live_data.update(result.payload)
3463           # else no instance is alive
3464     else:
3465       live_data = dict([(name, {}) for name in instance_names])
3466
3467     # end data gathering
3468
3469     HVPREFIX = "hv/"
3470     BEPREFIX = "be/"
3471     output = []
3472     for instance in instance_list:
3473       iout = []
3474       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3475       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3476       for field in self.op.output_fields:
3477         st_match = self._FIELDS_STATIC.Matches(field)
3478         if field == "name":
3479           val = instance.name
3480         elif field == "os":
3481           val = instance.os
3482         elif field == "pnode":
3483           val = instance.primary_node
3484         elif field == "snodes":
3485           val = list(instance.secondary_nodes)
3486         elif field == "admin_state":
3487           val = instance.admin_up
3488         elif field == "oper_state":
3489           if instance.primary_node in bad_nodes:
3490             val = None
3491           else:
3492             val = bool(live_data.get(instance.name))
3493         elif field == "status":
3494           if instance.primary_node in off_nodes:
3495             val = "ERROR_nodeoffline"
3496           elif instance.primary_node in bad_nodes:
3497             val = "ERROR_nodedown"
3498           else:
3499             running = bool(live_data.get(instance.name))
3500             if running:
3501               if instance.admin_up:
3502                 val = "running"
3503               else:
3504                 val = "ERROR_up"
3505             else:
3506               if instance.admin_up:
3507                 val = "ERROR_down"
3508               else:
3509                 val = "ADMIN_down"
3510         elif field == "oper_ram":
3511           if instance.primary_node in bad_nodes:
3512             val = None
3513           elif instance.name in live_data:
3514             val = live_data[instance.name].get("memory", "?")
3515           else:
3516             val = "-"
3517         elif field == "disk_template":
3518           val = instance.disk_template
3519         elif field == "ip":
3520           val = instance.nics[0].ip
3521         elif field == "bridge":
3522           val = instance.nics[0].bridge
3523         elif field == "mac":
3524           val = instance.nics[0].mac
3525         elif field == "sda_size" or field == "sdb_size":
3526           idx = ord(field[2]) - ord('a')
3527           try:
3528             val = instance.FindDisk(idx).size
3529           except errors.OpPrereqError:
3530             val = None
3531         elif field == "disk_usage": # total disk usage per node
3532           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3533           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3534         elif field == "tags":
3535           val = list(instance.GetTags())
3536         elif field == "serial_no":
3537           val = instance.serial_no
3538         elif field == "network_port":
3539           val = instance.network_port
3540         elif field == "hypervisor":
3541           val = instance.hypervisor
3542         elif field == "hvparams":
3543           val = i_hv
3544         elif (field.startswith(HVPREFIX) and
3545               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3546           val = i_hv.get(field[len(HVPREFIX):], None)
3547         elif field == "beparams":
3548           val = i_be
3549         elif (field.startswith(BEPREFIX) and
3550               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3551           val = i_be.get(field[len(BEPREFIX):], None)
3552         elif st_match and st_match.groups():
3553           # matches a variable list
3554           st_groups = st_match.groups()
3555           if st_groups and st_groups[0] == "disk":
3556             if st_groups[1] == "count":
3557               val = len(instance.disks)
3558             elif st_groups[1] == "sizes":
3559               val = [disk.size for disk in instance.disks]
3560             elif st_groups[1] == "size":
3561               try:
3562                 val = instance.FindDisk(st_groups[2]).size
3563               except errors.OpPrereqError:
3564                 val = None
3565             else:
3566               assert False, "Unhandled disk parameter"
3567           elif st_groups[0] == "nic":
3568             if st_groups[1] == "count":
3569               val = len(instance.nics)
3570             elif st_groups[1] == "macs":
3571               val = [nic.mac for nic in instance.nics]
3572             elif st_groups[1] == "ips":
3573               val = [nic.ip for nic in instance.nics]
3574             elif st_groups[1] == "bridges":
3575               val = [nic.bridge for nic in instance.nics]
3576             else:
3577               # index-based item
3578               nic_idx = int(st_groups[2])
3579               if nic_idx >= len(instance.nics):
3580                 val = None
3581               else:
3582                 if st_groups[1] == "mac":
3583                   val = instance.nics[nic_idx].mac
3584                 elif st_groups[1] == "ip":
3585                   val = instance.nics[nic_idx].ip
3586                 elif st_groups[1] == "bridge":
3587                   val = instance.nics[nic_idx].bridge
3588                 else:
3589                   assert False, "Unhandled NIC parameter"
3590           else:
3591             assert False, "Unhandled variable parameter"
3592         else:
3593           raise errors.ParameterError(field)
3594         iout.append(val)
3595       output.append(iout)
3596
3597     return output
3598
3599
3600 class LUFailoverInstance(LogicalUnit):
3601   """Failover an instance.
3602
3603   """
3604   HPATH = "instance-failover"
3605   HTYPE = constants.HTYPE_INSTANCE
3606   _OP_REQP = ["instance_name", "ignore_consistency"]
3607   REQ_BGL = False
3608
3609   def ExpandNames(self):
3610     self._ExpandAndLockInstance()
3611     self.needed_locks[locking.LEVEL_NODE] = []
3612     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3613
3614   def DeclareLocks(self, level):
3615     if level == locking.LEVEL_NODE:
3616       self._LockInstancesNodes()
3617
3618   def BuildHooksEnv(self):
3619     """Build hooks env.
3620
3621     This runs on master, primary and secondary nodes of the instance.
3622
3623     """
3624     env = {
3625       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3626       }
3627     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3628     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3629     return env, nl, nl
3630
3631   def CheckPrereq(self):
3632     """Check prerequisites.
3633
3634     This checks that the instance is in the cluster.
3635
3636     """
3637     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3638     assert self.instance is not None, \
3639       "Cannot retrieve locked instance %s" % self.op.instance_name
3640
3641     bep = self.cfg.GetClusterInfo().FillBE(instance)
3642     if instance.disk_template not in constants.DTS_NET_MIRROR:
3643       raise errors.OpPrereqError("Instance's disk layout is not"
3644                                  " network mirrored, cannot failover.")
3645
3646     secondary_nodes = instance.secondary_nodes
3647     if not secondary_nodes:
3648       raise errors.ProgrammerError("no secondary node but using "
3649                                    "a mirrored disk template")
3650
3651     target_node = secondary_nodes[0]
3652     _CheckNodeOnline(self, target_node)
3653     _CheckNodeNotDrained(self, target_node)
3654     # check memory requirements on the secondary node
3655     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3656                          instance.name, bep[constants.BE_MEMORY],
3657                          instance.hypervisor)
3658     # check bridge existance
3659     _CheckInstanceBridgesExist(self, instance, node=target_node)
3660
3661   def Exec(self, feedback_fn):
3662     """Failover an instance.
3663
3664     The failover is done by shutting it down on its present node and
3665     starting it on the secondary.
3666
3667     """
3668     instance = self.instance
3669
3670     source_node = instance.primary_node
3671     target_node = instance.secondary_nodes[0]
3672
3673     feedback_fn("* checking disk consistency between source and target")
3674     for dev in instance.disks:
3675       # for drbd, these are drbd over lvm
3676       if not _CheckDiskConsistency(self, dev, target_node, False):
3677         if instance.admin_up and not self.op.ignore_consistency:
3678           raise errors.OpExecError("Disk %s is degraded on target node,"
3679                                    " aborting failover." % dev.iv_name)
3680
3681     feedback_fn("* shutting down instance on source node")
3682     logging.info("Shutting down instance %s on node %s",
3683                  instance.name, source_node)
3684
3685     result = self.rpc.call_instance_shutdown(source_node, instance)
3686     msg = result.RemoteFailMsg()
3687     if msg:
3688       if self.op.ignore_consistency:
3689         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3690                              " Proceeding anyway. Please make sure node"
3691                              " %s is down. Error details: %s",
3692                              instance.name, source_node, source_node, msg)
3693       else:
3694         raise errors.OpExecError("Could not shutdown instance %s on"
3695                                  " node %s: %s" %
3696                                  (instance.name, source_node, msg))
3697
3698     feedback_fn("* deactivating the instance's disks on source node")
3699     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3700       raise errors.OpExecError("Can't shut down the instance's disks.")
3701
3702     instance.primary_node = target_node
3703     # distribute new instance config to the other nodes
3704     self.cfg.Update(instance)
3705
3706     # Only start the instance if it's marked as up
3707     if instance.admin_up:
3708       feedback_fn("* activating the instance's disks on target node")
3709       logging.info("Starting instance %s on node %s",
3710                    instance.name, target_node)
3711
3712       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3713                                                ignore_secondaries=True)
3714       if not disks_ok:
3715         _ShutdownInstanceDisks(self, instance)
3716         raise errors.OpExecError("Can't activate the instance's disks")
3717
3718       feedback_fn("* starting the instance on the target node")
3719       result = self.rpc.call_instance_start(target_node, instance, None, None)
3720       msg = result.RemoteFailMsg()
3721       if msg:
3722         _ShutdownInstanceDisks(self, instance)
3723         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3724                                  (instance.name, target_node, msg))
3725
3726
3727 class LUMigrateInstance(LogicalUnit):
3728   """Migrate an instance.
3729
3730   This is migration without shutting down, compared to the failover,
3731   which is done with shutdown.
3732
3733   """
3734   HPATH = "instance-migrate"
3735   HTYPE = constants.HTYPE_INSTANCE
3736   _OP_REQP = ["instance_name", "live", "cleanup"]
3737
3738   REQ_BGL = False
3739
3740   def ExpandNames(self):
3741     self._ExpandAndLockInstance()
3742     self.needed_locks[locking.LEVEL_NODE] = []
3743     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3744
3745   def DeclareLocks(self, level):
3746     if level == locking.LEVEL_NODE:
3747       self._LockInstancesNodes()
3748
3749   def BuildHooksEnv(self):
3750     """Build hooks env.
3751
3752     This runs on master, primary and secondary nodes of the instance.
3753
3754     """
3755     env = _BuildInstanceHookEnvByObject(self, self.instance)
3756     env["MIGRATE_LIVE"] = self.op.live
3757     env["MIGRATE_CLEANUP"] = self.op.cleanup
3758     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3759     return env, nl, nl
3760
3761   def CheckPrereq(self):
3762     """Check prerequisites.
3763
3764     This checks that the instance is in the cluster.
3765
3766     """
3767     instance = self.cfg.GetInstanceInfo(
3768       self.cfg.ExpandInstanceName(self.op.instance_name))
3769     if instance is None:
3770       raise errors.OpPrereqError("Instance '%s' not known" %
3771                                  self.op.instance_name)
3772
3773     if instance.disk_template != constants.DT_DRBD8:
3774       raise errors.OpPrereqError("Instance's disk layout is not"
3775                                  " drbd8, cannot migrate.")
3776
3777     secondary_nodes = instance.secondary_nodes
3778     if not secondary_nodes:
3779       raise errors.ConfigurationError("No secondary node but using"
3780                                       " drbd8 disk template")
3781
3782     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3783
3784     target_node = secondary_nodes[0]
3785     # check memory requirements on the secondary node
3786     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3787                          instance.name, i_be[constants.BE_MEMORY],
3788                          instance.hypervisor)
3789
3790     # check bridge existance
3791     _CheckInstanceBridgesExist(self, instance, node=target_node)
3792
3793     if not self.op.cleanup:
3794       _CheckNodeNotDrained(self, target_node)
3795       result = self.rpc.call_instance_migratable(instance.primary_node,
3796                                                  instance)
3797       msg = result.RemoteFailMsg()
3798       if msg:
3799         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3800                                    msg)
3801
3802     self.instance = instance
3803
3804   def _WaitUntilSync(self):
3805     """Poll with custom rpc for disk sync.
3806
3807     This uses our own step-based rpc call.
3808
3809     """
3810     self.feedback_fn("* wait until resync is done")
3811     all_done = False
3812     while not all_done:
3813       all_done = True
3814       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3815                                             self.nodes_ip,
3816                                             self.instance.disks)
3817       min_percent = 100
3818       for node, nres in result.items():
3819         msg = nres.RemoteFailMsg()
3820         if msg:
3821           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3822                                    (node, msg))
3823         node_done, node_percent = nres.payload
3824         all_done = all_done and node_done
3825         if node_percent is not None:
3826           min_percent = min(min_percent, node_percent)
3827       if not all_done:
3828         if min_percent < 100:
3829           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3830         time.sleep(2)
3831
3832   def _EnsureSecondary(self, node):
3833     """Demote a node to secondary.
3834
3835     """
3836     self.feedback_fn("* switching node %s to secondary mode" % node)
3837
3838     for dev in self.instance.disks:
3839       self.cfg.SetDiskID(dev, node)
3840
3841     result = self.rpc.call_blockdev_close(node, self.instance.name,
3842                                           self.instance.disks)
3843     msg = result.RemoteFailMsg()
3844     if msg:
3845       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3846                                " error %s" % (node, msg))
3847
3848   def _GoStandalone(self):
3849     """Disconnect from the network.
3850
3851     """
3852     self.feedback_fn("* changing into standalone mode")
3853     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3854                                                self.instance.disks)
3855     for node, nres in result.items():
3856       msg = nres.RemoteFailMsg()
3857       if msg:
3858         raise errors.OpExecError("Cannot disconnect disks node %s,"
3859                                  " error %s" % (node, msg))
3860
3861   def _GoReconnect(self, multimaster):
3862     """Reconnect to the network.
3863
3864     """
3865     if multimaster:
3866       msg = "dual-master"
3867     else:
3868       msg = "single-master"
3869     self.feedback_fn("* changing disks into %s mode" % msg)
3870     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3871                                            self.instance.disks,
3872                                            self.instance.name, multimaster)
3873     for node, nres in result.items():
3874       msg = nres.RemoteFailMsg()
3875       if msg:
3876         raise errors.OpExecError("Cannot change disks config on node %s,"
3877                                  " error: %s" % (node, msg))
3878
3879   def _ExecCleanup(self):
3880     """Try to cleanup after a failed migration.
3881
3882     The cleanup is done by:
3883       - check that the instance is running only on one node
3884         (and update the config if needed)
3885       - change disks on its secondary node to secondary
3886       - wait until disks are fully synchronized
3887       - disconnect from the network
3888       - change disks into single-master mode
3889       - wait again until disks are fully synchronized
3890
3891     """
3892     instance = self.instance
3893     target_node = self.target_node
3894     source_node = self.source_node
3895
3896     # check running on only one node
3897     self.feedback_fn("* checking where the instance actually runs"
3898                      " (if this hangs, the hypervisor might be in"
3899                      " a bad state)")
3900     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3901     for node, result in ins_l.items():
3902       msg = result.RemoteFailMsg()
3903       if msg:
3904         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3905
3906     runningon_source = instance.name in ins_l[source_node].payload
3907     runningon_target = instance.name in ins_l[target_node].payload
3908
3909     if runningon_source and runningon_target:
3910       raise errors.OpExecError("Instance seems to be running on two nodes,"
3911                                " or the hypervisor is confused. You will have"
3912                                " to ensure manually that it runs only on one"
3913                                " and restart this operation.")
3914
3915     if not (runningon_source or runningon_target):
3916       raise errors.OpExecError("Instance does not seem to be running at all."
3917                                " In this case, it's safer to repair by"
3918                                " running 'gnt-instance stop' to ensure disk"
3919                                " shutdown, and then restarting it.")
3920
3921     if runningon_target:
3922       # the migration has actually succeeded, we need to update the config
3923       self.feedback_fn("* instance running on secondary node (%s),"
3924                        " updating config" % target_node)
3925       instance.primary_node = target_node
3926       self.cfg.Update(instance)
3927       demoted_node = source_node
3928     else:
3929       self.feedback_fn("* instance confirmed to be running on its"
3930                        " primary node (%s)" % source_node)
3931       demoted_node = target_node
3932
3933     self._EnsureSecondary(demoted_node)
3934     try:
3935       self._WaitUntilSync()
3936     except errors.OpExecError:
3937       # we ignore here errors, since if the device is standalone, it
3938       # won't be able to sync
3939       pass
3940     self._GoStandalone()
3941     self._GoReconnect(False)
3942     self._WaitUntilSync()
3943
3944     self.feedback_fn("* done")
3945
3946   def _RevertDiskStatus(self):
3947     """Try to revert the disk status after a failed migration.
3948
3949     """
3950     target_node = self.target_node
3951     try:
3952       self._EnsureSecondary(target_node)
3953       self._GoStandalone()
3954       self._GoReconnect(False)
3955       self._WaitUntilSync()
3956     except errors.OpExecError, err:
3957       self.LogWarning("Migration failed and I can't reconnect the"
3958                       " drives: error '%s'\n"
3959                       "Please look and recover the instance status" %
3960                       str(err))
3961
3962   def _AbortMigration(self):
3963     """Call the hypervisor code to abort a started migration.
3964
3965     """
3966     instance = self.instance
3967     target_node = self.target_node
3968     migration_info = self.migration_info
3969
3970     abort_result = self.rpc.call_finalize_migration(target_node,
3971                                                     instance,
3972                                                     migration_info,
3973                                                     False)
3974     abort_msg = abort_result.RemoteFailMsg()
3975     if abort_msg:
3976       logging.error("Aborting migration failed on target node %s: %s" %
3977                     (target_node, abort_msg))
3978       # Don't raise an exception here, as we stil have to try to revert the
3979       # disk status, even if this step failed.
3980
3981   def _ExecMigration(self):
3982     """Migrate an instance.
3983
3984     The migrate is done by:
3985       - change the disks into dual-master mode
3986       - wait until disks are fully synchronized again
3987       - migrate the instance
3988       - change disks on the new secondary node (the old primary) to secondary
3989       - wait until disks are fully synchronized
3990       - change disks into single-master mode
3991
3992     """
3993     instance = self.instance
3994     target_node = self.target_node
3995     source_node = self.source_node
3996
3997     self.feedback_fn("* checking disk consistency between source and target")
3998     for dev in instance.disks:
3999       if not _CheckDiskConsistency(self, dev, target_node, False):
4000         raise errors.OpExecError("Disk %s is degraded or not fully"
4001                                  " synchronized on target node,"
4002                                  " aborting migrate." % dev.iv_name)
4003
4004     # First get the migration information from the remote node
4005     result = self.rpc.call_migration_info(source_node, instance)
4006     msg = result.RemoteFailMsg()
4007     if msg:
4008       log_err = ("Failed fetching source migration information from %s: %s" %
4009                  (source_node, msg))
4010       logging.error(log_err)
4011       raise errors.OpExecError(log_err)
4012
4013     self.migration_info = migration_info = result.payload
4014
4015     # Then switch the disks to master/master mode
4016     self._EnsureSecondary(target_node)
4017     self._GoStandalone()
4018     self._GoReconnect(True)
4019     self._WaitUntilSync()
4020
4021     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4022     result = self.rpc.call_accept_instance(target_node,
4023                                            instance,
4024                                            migration_info,
4025                                            self.nodes_ip[target_node])
4026
4027     msg = result.RemoteFailMsg()
4028     if msg:
4029       logging.error("Instance pre-migration failed, trying to revert"
4030                     " disk status: %s", msg)
4031       self._AbortMigration()
4032       self._RevertDiskStatus()
4033       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4034                                (instance.name, msg))
4035
4036     self.feedback_fn("* migrating instance to %s" % target_node)
4037     time.sleep(10)
4038     result = self.rpc.call_instance_migrate(source_node, instance,
4039                                             self.nodes_ip[target_node],
4040                                             self.op.live)
4041     msg = result.RemoteFailMsg()
4042     if msg:
4043       logging.error("Instance migration failed, trying to revert"
4044                     " disk status: %s", msg)
4045       self._AbortMigration()
4046       self._RevertDiskStatus()
4047       raise errors.OpExecError("Could not migrate instance %s: %s" %
4048                                (instance.name, msg))
4049     time.sleep(10)
4050
4051     instance.primary_node = target_node
4052     # distribute new instance config to the other nodes
4053     self.cfg.Update(instance)
4054
4055     result = self.rpc.call_finalize_migration(target_node,
4056                                               instance,
4057                                               migration_info,
4058                                               True)
4059     msg = result.RemoteFailMsg()
4060     if msg:
4061       logging.error("Instance migration succeeded, but finalization failed:"
4062                     " %s" % msg)
4063       raise errors.OpExecError("Could not finalize instance migration: %s" %
4064                                msg)
4065
4066     self._EnsureSecondary(source_node)
4067     self._WaitUntilSync()
4068     self._GoStandalone()
4069     self._GoReconnect(False)
4070     self._WaitUntilSync()
4071
4072     self.feedback_fn("* done")
4073
4074   def Exec(self, feedback_fn):
4075     """Perform the migration.
4076
4077     """
4078     self.feedback_fn = feedback_fn
4079
4080     self.source_node = self.instance.primary_node
4081     self.target_node = self.instance.secondary_nodes[0]
4082     self.all_nodes = [self.source_node, self.target_node]
4083     self.nodes_ip = {
4084       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4085       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4086       }
4087     if self.op.cleanup:
4088       return self._ExecCleanup()
4089     else:
4090       return self._ExecMigration()
4091
4092
4093 def _CreateBlockDev(lu, node, instance, device, force_create,
4094                     info, force_open):
4095   """Create a tree of block devices on a given node.
4096
4097   If this device type has to be created on secondaries, create it and
4098   all its children.
4099
4100   If not, just recurse to children keeping the same 'force' value.
4101
4102   @param lu: the lu on whose behalf we execute
4103   @param node: the node on which to create the device
4104   @type instance: L{objects.Instance}
4105   @param instance: the instance which owns the device
4106   @type device: L{objects.Disk}
4107   @param device: the device to create
4108   @type force_create: boolean
4109   @param force_create: whether to force creation of this device; this
4110       will be change to True whenever we find a device which has
4111       CreateOnSecondary() attribute
4112   @param info: the extra 'metadata' we should attach to the device
4113       (this will be represented as a LVM tag)
4114   @type force_open: boolean
4115   @param force_open: this parameter will be passes to the
4116       L{backend.BlockdevCreate} function where it specifies
4117       whether we run on primary or not, and it affects both
4118       the child assembly and the device own Open() execution
4119
4120   """
4121   if device.CreateOnSecondary():
4122     force_create = True
4123
4124   if device.children:
4125     for child in device.children:
4126       _CreateBlockDev(lu, node, instance, child, force_create,
4127                       info, force_open)
4128
4129   if not force_create:
4130     return
4131
4132   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4133
4134
4135 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4136   """Create a single block device on a given node.
4137
4138   This will not recurse over children of the device, so they must be
4139   created in advance.
4140
4141   @param lu: the lu on whose behalf we execute
4142   @param node: the node on which to create the device
4143   @type instance: L{objects.Instance}
4144   @param instance: the instance which owns the device
4145   @type device: L{objects.Disk}
4146   @param device: the device to create
4147   @param info: the extra 'metadata' we should attach to the device
4148       (this will be represented as a LVM tag)
4149   @type force_open: boolean
4150   @param force_open: this parameter will be passes to the
4151       L{backend.BlockdevCreate} function where it specifies
4152       whether we run on primary or not, and it affects both
4153       the child assembly and the device own Open() execution
4154
4155   """
4156   lu.cfg.SetDiskID(device, node)
4157   result = lu.rpc.call_blockdev_create(node, device, device.size,
4158                                        instance.name, force_open, info)
4159   msg = result.RemoteFailMsg()
4160   if msg:
4161     raise errors.OpExecError("Can't create block device %s on"
4162                              " node %s for instance %s: %s" %
4163                              (device, node, instance.name, msg))
4164   if device.physical_id is None:
4165     device.physical_id = result.payload
4166
4167
4168 def _GenerateUniqueNames(lu, exts):
4169   """Generate a suitable LV name.
4170
4171   This will generate a logical volume name for the given instance.
4172
4173   """
4174   results = []
4175   for val in exts:
4176     new_id = lu.cfg.GenerateUniqueID()
4177     results.append("%s%s" % (new_id, val))
4178   return results
4179
4180
4181 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4182                          p_minor, s_minor):
4183   """Generate a drbd8 device complete with its children.
4184
4185   """
4186   port = lu.cfg.AllocatePort()
4187   vgname = lu.cfg.GetVGName()
4188   shared_secret = lu.cfg.GenerateDRBDSecret()
4189   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4190                           logical_id=(vgname, names[0]))
4191   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4192                           logical_id=(vgname, names[1]))
4193   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4194                           logical_id=(primary, secondary, port,
4195                                       p_minor, s_minor,
4196                                       shared_secret),
4197                           children=[dev_data, dev_meta],
4198                           iv_name=iv_name)
4199   return drbd_dev
4200
4201
4202 def _GenerateDiskTemplate(lu, template_name,
4203                           instance_name, primary_node,
4204                           secondary_nodes, disk_info,
4205                           file_storage_dir, file_driver,
4206                           base_index):
4207   """Generate the entire disk layout for a given template type.
4208
4209   """
4210   #TODO: compute space requirements
4211
4212   vgname = lu.cfg.GetVGName()
4213   disk_count = len(disk_info)
4214   disks = []
4215   if template_name == constants.DT_DISKLESS:
4216     pass
4217   elif template_name == constants.DT_PLAIN:
4218     if len(secondary_nodes) != 0:
4219       raise errors.ProgrammerError("Wrong template configuration")
4220
4221     names = _GenerateUniqueNames(lu, [".disk%d" % i
4222                                       for i in range(disk_count)])
4223     for idx, disk in enumerate(disk_info):
4224       disk_index = idx + base_index
4225       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4226                               logical_id=(vgname, names[idx]),
4227                               iv_name="disk/%d" % disk_index,
4228                               mode=disk["mode"])
4229       disks.append(disk_dev)
4230   elif template_name == constants.DT_DRBD8:
4231     if len(secondary_nodes) != 1:
4232       raise errors.ProgrammerError("Wrong template configuration")
4233     remote_node = secondary_nodes[0]
4234     minors = lu.cfg.AllocateDRBDMinor(
4235       [primary_node, remote_node] * len(disk_info), instance_name)
4236
4237     names = []
4238     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4239                                                for i in range(disk_count)]):
4240       names.append(lv_prefix + "_data")
4241       names.append(lv_prefix + "_meta")
4242     for idx, disk in enumerate(disk_info):
4243       disk_index = idx + base_index
4244       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4245                                       disk["size"], names[idx*2:idx*2+2],
4246                                       "disk/%d" % disk_index,
4247                                       minors[idx*2], minors[idx*2+1])
4248       disk_dev.mode = disk["mode"]
4249       disks.append(disk_dev)
4250   elif template_name == constants.DT_FILE:
4251     if len(secondary_nodes) != 0:
4252       raise errors.ProgrammerError("Wrong template configuration")
4253
4254     for idx, disk in enumerate(disk_info):
4255       disk_index = idx + base_index
4256       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4257                               iv_name="disk/%d" % disk_index,
4258                               logical_id=(file_driver,
4259                                           "%s/disk%d" % (file_storage_dir,
4260                                                          disk_index)),
4261                               mode=disk["mode"])
4262       disks.append(disk_dev)
4263   else:
4264     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4265   return disks
4266
4267
4268 def _GetInstanceInfoText(instance):
4269   """Compute that text that should be added to the disk's metadata.
4270
4271   """
4272   return "originstname+%s" % instance.name
4273
4274
4275 def _CreateDisks(lu, instance):
4276   """Create all disks for an instance.
4277
4278   This abstracts away some work from AddInstance.
4279
4280   @type lu: L{LogicalUnit}
4281   @param lu: the logical unit on whose behalf we execute
4282   @type instance: L{objects.Instance}
4283   @param instance: the instance whose disks we should create
4284   @rtype: boolean
4285   @return: the success of the creation
4286
4287   """
4288   info = _GetInstanceInfoText(instance)
4289   pnode = instance.primary_node
4290
4291   if instance.disk_template == constants.DT_FILE:
4292     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4293     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4294
4295     if result.failed or not result.data:
4296       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4297
4298     if not result.data[0]:
4299       raise errors.OpExecError("Failed to create directory '%s'" %
4300                                file_storage_dir)
4301
4302   # Note: this needs to be kept in sync with adding of disks in
4303   # LUSetInstanceParams
4304   for device in instance.disks:
4305     logging.info("Creating volume %s for instance %s",
4306                  device.iv_name, instance.name)
4307     #HARDCODE
4308     for node in instance.all_nodes:
4309       f_create = node == pnode
4310       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4311
4312
4313 def _RemoveDisks(lu, instance):
4314   """Remove all disks for an instance.
4315
4316   This abstracts away some work from `AddInstance()` and
4317   `RemoveInstance()`. Note that in case some of the devices couldn't
4318   be removed, the removal will continue with the other ones (compare
4319   with `_CreateDisks()`).
4320
4321   @type lu: L{LogicalUnit}
4322   @param lu: the logical unit on whose behalf we execute
4323   @type instance: L{objects.Instance}
4324   @param instance: the instance whose disks we should remove
4325   @rtype: boolean
4326   @return: the success of the removal
4327
4328   """
4329   logging.info("Removing block devices for instance %s", instance.name)
4330
4331   all_result = True
4332   for device in instance.disks:
4333     for node, disk in device.ComputeNodeTree(instance.primary_node):
4334       lu.cfg.SetDiskID(disk, node)
4335       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4336       if msg:
4337         lu.LogWarning("Could not remove block device %s on node %s,"
4338                       " continuing anyway: %s", device.iv_name, node, msg)
4339         all_result = False
4340
4341   if instance.disk_template == constants.DT_FILE:
4342     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4343     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4344                                                  file_storage_dir)
4345     if result.failed or not result.data:
4346       logging.error("Could not remove directory '%s'", file_storage_dir)
4347       all_result = False
4348
4349   return all_result
4350
4351
4352 def _ComputeDiskSize(disk_template, disks):
4353   """Compute disk size requirements in the volume group
4354
4355   """
4356   # Required free disk space as a function of disk and swap space
4357   req_size_dict = {
4358     constants.DT_DISKLESS: None,
4359     constants.DT_PLAIN: sum(d["size"] for d in disks),
4360     # 128 MB are added for drbd metadata for each disk
4361     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4362     constants.DT_FILE: None,
4363   }
4364
4365   if disk_template not in req_size_dict:
4366     raise errors.ProgrammerError("Disk template '%s' size requirement"
4367                                  " is unknown" %  disk_template)
4368
4369   return req_size_dict[disk_template]
4370
4371
4372 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4373   """Hypervisor parameter validation.
4374
4375   This function abstract the hypervisor parameter validation to be
4376   used in both instance create and instance modify.
4377
4378   @type lu: L{LogicalUnit}
4379   @param lu: the logical unit for which we check
4380   @type nodenames: list
4381   @param nodenames: the list of nodes on which we should check
4382   @type hvname: string
4383   @param hvname: the name of the hypervisor we should use
4384   @type hvparams: dict
4385   @param hvparams: the parameters which we need to check
4386   @raise errors.OpPrereqError: if the parameters are not valid
4387
4388   """
4389   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4390                                                   hvname,
4391                                                   hvparams)
4392   for node in nodenames:
4393     info = hvinfo[node]
4394     if info.offline:
4395       continue
4396     msg = info.RemoteFailMsg()
4397     if msg:
4398       raise errors.OpPrereqError("Hypervisor parameter validation"
4399                                  " failed on node %s: %s" % (node, msg))
4400
4401
4402 class LUCreateInstance(LogicalUnit):
4403   """Create an instance.
4404
4405   """
4406   HPATH = "instance-add"
4407   HTYPE = constants.HTYPE_INSTANCE
4408   _OP_REQP = ["instance_name", "disks", "disk_template",
4409               "mode", "start",
4410               "wait_for_sync", "ip_check", "nics",
4411               "hvparams", "beparams"]
4412   REQ_BGL = False
4413
4414   def _ExpandNode(self, node):
4415     """Expands and checks one node name.
4416
4417     """
4418     node_full = self.cfg.ExpandNodeName(node)
4419     if node_full is None:
4420       raise errors.OpPrereqError("Unknown node %s" % node)
4421     return node_full
4422
4423   def ExpandNames(self):
4424     """ExpandNames for CreateInstance.
4425
4426     Figure out the right locks for instance creation.
4427
4428     """
4429     self.needed_locks = {}
4430
4431     # set optional parameters to none if they don't exist
4432     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4433       if not hasattr(self.op, attr):
4434         setattr(self.op, attr, None)
4435
4436     # cheap checks, mostly valid constants given
4437
4438     # verify creation mode
4439     if self.op.mode not in (constants.INSTANCE_CREATE,
4440                             constants.INSTANCE_IMPORT):
4441       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4442                                  self.op.mode)
4443
4444     # disk template and mirror node verification
4445     if self.op.disk_template not in constants.DISK_TEMPLATES:
4446       raise errors.OpPrereqError("Invalid disk template name")
4447
4448     if self.op.hypervisor is None:
4449       self.op.hypervisor = self.cfg.GetHypervisorType()
4450
4451     cluster = self.cfg.GetClusterInfo()
4452     enabled_hvs = cluster.enabled_hypervisors
4453     if self.op.hypervisor not in enabled_hvs:
4454       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4455                                  " cluster (%s)" % (self.op.hypervisor,
4456                                   ",".join(enabled_hvs)))
4457
4458     # check hypervisor parameter syntax (locally)
4459     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4460     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4461                                   self.op.hvparams)
4462     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4463     hv_type.CheckParameterSyntax(filled_hvp)
4464
4465     # fill and remember the beparams dict
4466     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4467     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4468                                     self.op.beparams)
4469
4470     #### instance parameters check
4471
4472     # instance name verification
4473     hostname1 = utils.HostInfo(self.op.instance_name)
4474     self.op.instance_name = instance_name = hostname1.name
4475
4476     # this is just a preventive check, but someone might still add this
4477     # instance in the meantime, and creation will fail at lock-add time
4478     if instance_name in self.cfg.GetInstanceList():
4479       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4480                                  instance_name)
4481
4482     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4483
4484     # NIC buildup
4485     self.nics = []
4486     for idx, nic in enumerate(self.op.nics):
4487       nic_mode_req = nic.get("mode", None)
4488       nic_mode = nic_mode_req
4489       if nic_mode is None:
4490         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4491
4492       # in routed mode, for the first nic, the default ip is 'auto'
4493       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4494         default_ip_mode = constants.VALUE_AUTO
4495       else:
4496         default_ip_mode = constants.VALUE_NONE
4497
4498       # ip validity checks
4499       ip = nic.get("ip", default_ip_mode)
4500       if ip is None or ip.lower() == constants.VALUE_NONE:
4501         nic_ip = None
4502       elif ip.lower() == constants.VALUE_AUTO:
4503         nic_ip = hostname1.ip
4504       else:
4505         if not utils.IsValidIP(ip):
4506           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4507                                      " like a valid IP" % ip)
4508         nic_ip = ip
4509
4510       # TODO: check the ip for uniqueness !!
4511       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4512         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4513
4514       # MAC address verification
4515       mac = nic.get("mac", constants.VALUE_AUTO)
4516       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4517         if not utils.IsValidMac(mac.lower()):
4518           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4519                                      mac)
4520       # bridge verification
4521       bridge = nic.get("bridge", None)
4522       link = nic.get("link", None)
4523       if bridge and link:
4524         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4525       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4526         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4527       elif bridge:
4528         link = bridge
4529
4530       nicparams = {}
4531       if nic_mode_req:
4532         nicparams[constants.NIC_MODE] = nic_mode_req
4533       if link:
4534         nicparams[constants.NIC_LINK] = link
4535
4536       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4537                                       nicparams)
4538       objects.NIC.CheckParameterSyntax(check_params)
4539       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4540
4541     # disk checks/pre-build
4542     self.disks = []
4543     for disk in self.op.disks:
4544       mode = disk.get("mode", constants.DISK_RDWR)
4545       if mode not in constants.DISK_ACCESS_SET:
4546         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4547                                    mode)
4548       size = disk.get("size", None)
4549       if size is None:
4550         raise errors.OpPrereqError("Missing disk size")
4551       try:
4552         size = int(size)
4553       except ValueError:
4554         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4555       self.disks.append({"size": size, "mode": mode})
4556
4557     # used in CheckPrereq for ip ping check
4558     self.check_ip = hostname1.ip
4559
4560     # file storage checks
4561     if (self.op.file_driver and
4562         not self.op.file_driver in constants.FILE_DRIVER):
4563       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4564                                  self.op.file_driver)
4565
4566     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4567       raise errors.OpPrereqError("File storage directory path not absolute")
4568
4569     ### Node/iallocator related checks
4570     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4571       raise errors.OpPrereqError("One and only one of iallocator and primary"
4572                                  " node must be given")
4573
4574     if self.op.iallocator:
4575       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4576     else:
4577       self.op.pnode = self._ExpandNode(self.op.pnode)
4578       nodelist = [self.op.pnode]
4579       if self.op.snode is not None:
4580         self.op.snode = self._ExpandNode(self.op.snode)
4581         nodelist.append(self.op.snode)
4582       self.needed_locks[locking.LEVEL_NODE] = nodelist
4583
4584     # in case of import lock the source node too
4585     if self.op.mode == constants.INSTANCE_IMPORT:
4586       src_node = getattr(self.op, "src_node", None)
4587       src_path = getattr(self.op, "src_path", None)
4588
4589       if src_path is None:
4590         self.op.src_path = src_path = self.op.instance_name
4591
4592       if src_node is None:
4593         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4594         self.op.src_node = None
4595         if os.path.isabs(src_path):
4596           raise errors.OpPrereqError("Importing an instance from an absolute"
4597                                      " path requires a source node option.")
4598       else:
4599         self.op.src_node = src_node = self._ExpandNode(src_node)
4600         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4601           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4602         if not os.path.isabs(src_path):
4603           self.op.src_path = src_path = \
4604             os.path.join(constants.EXPORT_DIR, src_path)
4605
4606     else: # INSTANCE_CREATE
4607       if getattr(self.op, "os_type", None) is None:
4608         raise errors.OpPrereqError("No guest OS specified")
4609
4610   def _RunAllocator(self):
4611     """Run the allocator based on input opcode.
4612
4613     """
4614     nics = [n.ToDict() for n in self.nics]
4615     ial = IAllocator(self,
4616                      mode=constants.IALLOCATOR_MODE_ALLOC,
4617                      name=self.op.instance_name,
4618                      disk_template=self.op.disk_template,
4619                      tags=[],
4620                      os=self.op.os_type,
4621                      vcpus=self.be_full[constants.BE_VCPUS],
4622                      mem_size=self.be_full[constants.BE_MEMORY],
4623                      disks=self.disks,
4624                      nics=nics,
4625                      hypervisor=self.op.hypervisor,
4626                      )
4627
4628     ial.Run(self.op.iallocator)
4629
4630     if not ial.success:
4631       raise errors.OpPrereqError("Can't compute nodes using"
4632                                  " iallocator '%s': %s" % (self.op.iallocator,
4633                                                            ial.info))
4634     if len(ial.nodes) != ial.required_nodes:
4635       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4636                                  " of nodes (%s), required %s" %
4637                                  (self.op.iallocator, len(ial.nodes),
4638                                   ial.required_nodes))
4639     self.op.pnode = ial.nodes[0]
4640     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4641                  self.op.instance_name, self.op.iallocator,
4642                  ", ".join(ial.nodes))
4643     if ial.required_nodes == 2:
4644       self.op.snode = ial.nodes[1]
4645
4646   def BuildHooksEnv(self):
4647     """Build hooks env.
4648
4649     This runs on master, primary and secondary nodes of the instance.
4650
4651     """
4652     env = {
4653       "ADD_MODE": self.op.mode,
4654       }
4655     if self.op.mode == constants.INSTANCE_IMPORT:
4656       env["SRC_NODE"] = self.op.src_node
4657       env["SRC_PATH"] = self.op.src_path
4658       env["SRC_IMAGES"] = self.src_images
4659
4660     env.update(_BuildInstanceHookEnv(
4661       name=self.op.instance_name,
4662       primary_node=self.op.pnode,
4663       secondary_nodes=self.secondaries,
4664       status=self.op.start,
4665       os_type=self.op.os_type,
4666       memory=self.be_full[constants.BE_MEMORY],
4667       vcpus=self.be_full[constants.BE_VCPUS],
4668       nics=_PreBuildNICHooksList(self, self.nics),
4669       disk_template=self.op.disk_template,
4670       disks=[(d["size"], d["mode"]) for d in self.disks],
4671     ))
4672
4673     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4674           self.secondaries)
4675     return env, nl, nl
4676
4677
4678   def CheckPrereq(self):
4679     """Check prerequisites.
4680
4681     """
4682     if (not self.cfg.GetVGName() and
4683         self.op.disk_template not in constants.DTS_NOT_LVM):
4684       raise errors.OpPrereqError("Cluster does not support lvm-based"
4685                                  " instances")
4686
4687     if self.op.mode == constants.INSTANCE_IMPORT:
4688       src_node = self.op.src_node
4689       src_path = self.op.src_path
4690
4691       if src_node is None:
4692         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4693         exp_list = self.rpc.call_export_list(locked_nodes)
4694         found = False
4695         for node in exp_list:
4696           if exp_list[node].RemoteFailMsg():
4697             continue
4698           if src_path in exp_list[node].payload:
4699             found = True
4700             self.op.src_node = src_node = node
4701             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4702                                                        src_path)
4703             break
4704         if not found:
4705           raise errors.OpPrereqError("No export found for relative path %s" %
4706                                       src_path)
4707
4708       _CheckNodeOnline(self, src_node)
4709       result = self.rpc.call_export_info(src_node, src_path)
4710       msg = result.RemoteFailMsg()
4711       if msg:
4712         raise errors.OpPrereqError("No export or invalid export found in"
4713                                    " dir %s: %s" % (src_path, msg))
4714
4715       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4716       if not export_info.has_section(constants.INISECT_EXP):
4717         raise errors.ProgrammerError("Corrupted export config")
4718
4719       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4720       if (int(ei_version) != constants.EXPORT_VERSION):
4721         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4722                                    (ei_version, constants.EXPORT_VERSION))
4723
4724       # Check that the new instance doesn't have less disks than the export
4725       instance_disks = len(self.disks)
4726       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4727       if instance_disks < export_disks:
4728         raise errors.OpPrereqError("Not enough disks to import."
4729                                    " (instance: %d, export: %d)" %
4730                                    (instance_disks, export_disks))
4731
4732       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4733       disk_images = []
4734       for idx in range(export_disks):
4735         option = 'disk%d_dump' % idx
4736         if export_info.has_option(constants.INISECT_INS, option):
4737           # FIXME: are the old os-es, disk sizes, etc. useful?
4738           export_name = export_info.get(constants.INISECT_INS, option)
4739           image = os.path.join(src_path, export_name)
4740           disk_images.append(image)
4741         else:
4742           disk_images.append(False)
4743
4744       self.src_images = disk_images
4745
4746       old_name = export_info.get(constants.INISECT_INS, 'name')
4747       # FIXME: int() here could throw a ValueError on broken exports
4748       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4749       if self.op.instance_name == old_name:
4750         for idx, nic in enumerate(self.nics):
4751           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4752             nic_mac_ini = 'nic%d_mac' % idx
4753             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4754
4755     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4756     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4757     if self.op.start and not self.op.ip_check:
4758       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4759                                  " adding an instance in start mode")
4760
4761     if self.op.ip_check:
4762       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4763         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4764                                    (self.check_ip, self.op.instance_name))
4765
4766     #### mac address generation
4767     # By generating here the mac address both the allocator and the hooks get
4768     # the real final mac address rather than the 'auto' or 'generate' value.
4769     # There is a race condition between the generation and the instance object
4770     # creation, which means that we know the mac is valid now, but we're not
4771     # sure it will be when we actually add the instance. If things go bad
4772     # adding the instance will abort because of a duplicate mac, and the
4773     # creation job will fail.
4774     for nic in self.nics:
4775       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4776         nic.mac = self.cfg.GenerateMAC()
4777
4778     #### allocator run
4779
4780     if self.op.iallocator is not None:
4781       self._RunAllocator()
4782
4783     #### node related checks
4784
4785     # check primary node
4786     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4787     assert self.pnode is not None, \
4788       "Cannot retrieve locked node %s" % self.op.pnode
4789     if pnode.offline:
4790       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4791                                  pnode.name)
4792     if pnode.drained:
4793       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4794                                  pnode.name)
4795
4796     self.secondaries = []
4797
4798     # mirror node verification
4799     if self.op.disk_template in constants.DTS_NET_MIRROR:
4800       if self.op.snode is None:
4801         raise errors.OpPrereqError("The networked disk templates need"
4802                                    " a mirror node")
4803       if self.op.snode == pnode.name:
4804         raise errors.OpPrereqError("The secondary node cannot be"
4805                                    " the primary node.")
4806       _CheckNodeOnline(self, self.op.snode)
4807       _CheckNodeNotDrained(self, self.op.snode)
4808       self.secondaries.append(self.op.snode)
4809
4810     nodenames = [pnode.name] + self.secondaries
4811
4812     req_size = _ComputeDiskSize(self.op.disk_template,
4813                                 self.disks)
4814
4815     # Check lv size requirements
4816     if req_size is not None:
4817       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4818                                          self.op.hypervisor)
4819       for node in nodenames:
4820         info = nodeinfo[node]
4821         msg = info.RemoteFailMsg()
4822         if msg:
4823           raise errors.OpPrereqError("Cannot get current information"
4824                                      " from node %s: %s" % (node, msg))
4825         info = info.payload
4826         vg_free = info.get('vg_free', None)
4827         if not isinstance(vg_free, int):
4828           raise errors.OpPrereqError("Can't compute free disk space on"
4829                                      " node %s" % node)
4830         if req_size > vg_free:
4831           raise errors.OpPrereqError("Not enough disk space on target node %s."
4832                                      " %d MB available, %d MB required" %
4833                                      (node, vg_free, req_size))
4834
4835     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4836
4837     # os verification
4838     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4839     result.Raise()
4840     if not isinstance(result.data, objects.OS):
4841       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4842                                  " primary node"  % self.op.os_type)
4843
4844     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4845
4846     # memory check on primary node
4847     if self.op.start:
4848       _CheckNodeFreeMemory(self, self.pnode.name,
4849                            "creating instance %s" % self.op.instance_name,
4850                            self.be_full[constants.BE_MEMORY],
4851                            self.op.hypervisor)
4852
4853   def Exec(self, feedback_fn):
4854     """Create and add the instance to the cluster.
4855
4856     """
4857     instance = self.op.instance_name
4858     pnode_name = self.pnode.name
4859
4860     ht_kind = self.op.hypervisor
4861     if ht_kind in constants.HTS_REQ_PORT:
4862       network_port = self.cfg.AllocatePort()
4863     else:
4864       network_port = None
4865
4866     ##if self.op.vnc_bind_address is None:
4867     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4868
4869     # this is needed because os.path.join does not accept None arguments
4870     if self.op.file_storage_dir is None:
4871       string_file_storage_dir = ""
4872     else:
4873       string_file_storage_dir = self.op.file_storage_dir
4874
4875     # build the full file storage dir path
4876     file_storage_dir = os.path.normpath(os.path.join(
4877                                         self.cfg.GetFileStorageDir(),
4878                                         string_file_storage_dir, instance))
4879
4880
4881     disks = _GenerateDiskTemplate(self,
4882                                   self.op.disk_template,
4883                                   instance, pnode_name,
4884                                   self.secondaries,
4885                                   self.disks,
4886                                   file_storage_dir,
4887                                   self.op.file_driver,
4888                                   0)
4889
4890     iobj = objects.Instance(name=instance, os=self.op.os_type,
4891                             primary_node=pnode_name,
4892                             nics=self.nics, disks=disks,
4893                             disk_template=self.op.disk_template,
4894                             admin_up=False,
4895                             network_port=network_port,
4896                             beparams=self.op.beparams,
4897                             hvparams=self.op.hvparams,
4898                             hypervisor=self.op.hypervisor,
4899                             )
4900
4901     feedback_fn("* creating instance disks...")
4902     try:
4903       _CreateDisks(self, iobj)
4904     except errors.OpExecError:
4905       self.LogWarning("Device creation failed, reverting...")
4906       try:
4907         _RemoveDisks(self, iobj)
4908       finally:
4909         self.cfg.ReleaseDRBDMinors(instance)
4910         raise
4911
4912     feedback_fn("adding instance %s to cluster config" % instance)
4913
4914     self.cfg.AddInstance(iobj)
4915     # Declare that we don't want to remove the instance lock anymore, as we've
4916     # added the instance to the config
4917     del self.remove_locks[locking.LEVEL_INSTANCE]
4918     # Unlock all the nodes
4919     if self.op.mode == constants.INSTANCE_IMPORT:
4920       nodes_keep = [self.op.src_node]
4921       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4922                        if node != self.op.src_node]
4923       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4924       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4925     else:
4926       self.context.glm.release(locking.LEVEL_NODE)
4927       del self.acquired_locks[locking.LEVEL_NODE]
4928
4929     if self.op.wait_for_sync:
4930       disk_abort = not _WaitForSync(self, iobj)
4931     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4932       # make sure the disks are not degraded (still sync-ing is ok)
4933       time.sleep(15)
4934       feedback_fn("* checking mirrors status")
4935       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4936     else:
4937       disk_abort = False
4938
4939     if disk_abort:
4940       _RemoveDisks(self, iobj)
4941       self.cfg.RemoveInstance(iobj.name)
4942       # Make sure the instance lock gets removed
4943       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4944       raise errors.OpExecError("There are some degraded disks for"
4945                                " this instance")
4946
4947     feedback_fn("creating os for instance %s on node %s" %
4948                 (instance, pnode_name))
4949
4950     if iobj.disk_template != constants.DT_DISKLESS:
4951       if self.op.mode == constants.INSTANCE_CREATE:
4952         feedback_fn("* running the instance OS create scripts...")
4953         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4954         msg = result.RemoteFailMsg()
4955         if msg:
4956           raise errors.OpExecError("Could not add os for instance %s"
4957                                    " on node %s: %s" %
4958                                    (instance, pnode_name, msg))
4959
4960       elif self.op.mode == constants.INSTANCE_IMPORT:
4961         feedback_fn("* running the instance OS import scripts...")
4962         src_node = self.op.src_node
4963         src_images = self.src_images
4964         cluster_name = self.cfg.GetClusterName()
4965         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4966                                                          src_node, src_images,
4967                                                          cluster_name)
4968         msg = import_result.RemoteFailMsg()
4969         if msg:
4970           self.LogWarning("Error while importing the disk images for instance"
4971                           " %s on node %s: %s" % (instance, pnode_name, msg))
4972       else:
4973         # also checked in the prereq part
4974         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4975                                      % self.op.mode)
4976
4977     if self.op.start:
4978       iobj.admin_up = True
4979       self.cfg.Update(iobj)
4980       logging.info("Starting instance %s on node %s", instance, pnode_name)
4981       feedback_fn("* starting instance...")
4982       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4983       msg = result.RemoteFailMsg()
4984       if msg:
4985         raise errors.OpExecError("Could not start instance: %s" % msg)
4986
4987
4988 class LUConnectConsole(NoHooksLU):
4989   """Connect to an instance's console.
4990
4991   This is somewhat special in that it returns the command line that
4992   you need to run on the master node in order to connect to the
4993   console.
4994
4995   """
4996   _OP_REQP = ["instance_name"]
4997   REQ_BGL = False
4998
4999   def ExpandNames(self):
5000     self._ExpandAndLockInstance()
5001
5002   def CheckPrereq(self):
5003     """Check prerequisites.
5004
5005     This checks that the instance is in the cluster.
5006
5007     """
5008     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5009     assert self.instance is not None, \
5010       "Cannot retrieve locked instance %s" % self.op.instance_name
5011     _CheckNodeOnline(self, self.instance.primary_node)
5012
5013   def Exec(self, feedback_fn):
5014     """Connect to the console of an instance
5015
5016     """
5017     instance = self.instance
5018     node = instance.primary_node
5019
5020     node_insts = self.rpc.call_instance_list([node],
5021                                              [instance.hypervisor])[node]
5022     msg = node_insts.RemoteFailMsg()
5023     if msg:
5024       raise errors.OpExecError("Can't get node information from %s: %s" %
5025                                (node, msg))
5026
5027     if instance.name not in node_insts.payload:
5028       raise errors.OpExecError("Instance %s is not running." % instance.name)
5029
5030     logging.debug("Connecting to console of %s on %s", instance.name, node)
5031
5032     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5033     cluster = self.cfg.GetClusterInfo()
5034     # beparams and hvparams are passed separately, to avoid editing the
5035     # instance and then saving the defaults in the instance itself.
5036     hvparams = cluster.FillHV(instance)
5037     beparams = cluster.FillBE(instance)
5038     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5039
5040     # build ssh cmdline
5041     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5042
5043
5044 class LUReplaceDisks(LogicalUnit):
5045   """Replace the disks of an instance.
5046
5047   """
5048   HPATH = "mirrors-replace"
5049   HTYPE = constants.HTYPE_INSTANCE
5050   _OP_REQP = ["instance_name", "mode", "disks"]
5051   REQ_BGL = False
5052
5053   def CheckArguments(self):
5054     if not hasattr(self.op, "remote_node"):
5055       self.op.remote_node = None
5056     if not hasattr(self.op, "iallocator"):
5057       self.op.iallocator = None
5058
5059     # check for valid parameter combination
5060     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5061     if self.op.mode == constants.REPLACE_DISK_CHG:
5062       if cnt == 2:
5063         raise errors.OpPrereqError("When changing the secondary either an"
5064                                    " iallocator script must be used or the"
5065                                    " new node given")
5066       elif cnt == 0:
5067         raise errors.OpPrereqError("Give either the iallocator or the new"
5068                                    " secondary, not both")
5069     else: # not replacing the secondary
5070       if cnt != 2:
5071         raise errors.OpPrereqError("The iallocator and new node options can"
5072                                    " be used only when changing the"
5073                                    " secondary node")
5074
5075   def ExpandNames(self):
5076     self._ExpandAndLockInstance()
5077
5078     if self.op.iallocator is not None:
5079       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5080     elif self.op.remote_node is not None:
5081       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5082       if remote_node is None:
5083         raise errors.OpPrereqError("Node '%s' not known" %
5084                                    self.op.remote_node)
5085       self.op.remote_node = remote_node
5086       # Warning: do not remove the locking of the new secondary here
5087       # unless DRBD8.AddChildren is changed to work in parallel;
5088       # currently it doesn't since parallel invocations of
5089       # FindUnusedMinor will conflict
5090       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5091       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5092     else:
5093       self.needed_locks[locking.LEVEL_NODE] = []
5094       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5095
5096   def DeclareLocks(self, level):
5097     # If we're not already locking all nodes in the set we have to declare the
5098     # instance's primary/secondary nodes.
5099     if (level == locking.LEVEL_NODE and
5100         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5101       self._LockInstancesNodes()
5102
5103   def _RunAllocator(self):
5104     """Compute a new secondary node using an IAllocator.
5105
5106     """
5107     ial = IAllocator(self,
5108                      mode=constants.IALLOCATOR_MODE_RELOC,
5109                      name=self.op.instance_name,
5110                      relocate_from=[self.sec_node])
5111
5112     ial.Run(self.op.iallocator)
5113
5114     if not ial.success:
5115       raise errors.OpPrereqError("Can't compute nodes using"
5116                                  " iallocator '%s': %s" % (self.op.iallocator,
5117                                                            ial.info))
5118     if len(ial.nodes) != ial.required_nodes:
5119       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5120                                  " of nodes (%s), required %s" %
5121                                  (len(ial.nodes), ial.required_nodes))
5122     self.op.remote_node = ial.nodes[0]
5123     self.LogInfo("Selected new secondary for the instance: %s",
5124                  self.op.remote_node)
5125
5126   def BuildHooksEnv(self):
5127     """Build hooks env.
5128
5129     This runs on the master, the primary and all the secondaries.
5130
5131     """
5132     env = {
5133       "MODE": self.op.mode,
5134       "NEW_SECONDARY": self.op.remote_node,
5135       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5136       }
5137     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5138     nl = [
5139       self.cfg.GetMasterNode(),
5140       self.instance.primary_node,
5141       ]
5142     if self.op.remote_node is not None:
5143       nl.append(self.op.remote_node)
5144     return env, nl, nl
5145
5146   def CheckPrereq(self):
5147     """Check prerequisites.
5148
5149     This checks that the instance is in the cluster.
5150
5151     """
5152     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5153     assert instance is not None, \
5154       "Cannot retrieve locked instance %s" % self.op.instance_name
5155     self.instance = instance
5156
5157     if instance.disk_template != constants.DT_DRBD8:
5158       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5159                                  " instances")
5160
5161     if len(instance.secondary_nodes) != 1:
5162       raise errors.OpPrereqError("The instance has a strange layout,"
5163                                  " expected one secondary but found %d" %
5164                                  len(instance.secondary_nodes))
5165
5166     self.sec_node = instance.secondary_nodes[0]
5167
5168     if self.op.iallocator is not None:
5169       self._RunAllocator()
5170
5171     remote_node = self.op.remote_node
5172     if remote_node is not None:
5173       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5174       assert self.remote_node_info is not None, \
5175         "Cannot retrieve locked node %s" % remote_node
5176     else:
5177       self.remote_node_info = None
5178     if remote_node == instance.primary_node:
5179       raise errors.OpPrereqError("The specified node is the primary node of"
5180                                  " the instance.")
5181     elif remote_node == self.sec_node:
5182       raise errors.OpPrereqError("The specified node is already the"
5183                                  " secondary node of the instance.")
5184
5185     if self.op.mode == constants.REPLACE_DISK_PRI:
5186       n1 = self.tgt_node = instance.primary_node
5187       n2 = self.oth_node = self.sec_node
5188     elif self.op.mode == constants.REPLACE_DISK_SEC:
5189       n1 = self.tgt_node = self.sec_node
5190       n2 = self.oth_node = instance.primary_node
5191     elif self.op.mode == constants.REPLACE_DISK_CHG:
5192       n1 = self.new_node = remote_node
5193       n2 = self.oth_node = instance.primary_node
5194       self.tgt_node = self.sec_node
5195       _CheckNodeNotDrained(self, remote_node)
5196     else:
5197       raise errors.ProgrammerError("Unhandled disk replace mode")
5198
5199     _CheckNodeOnline(self, n1)
5200     _CheckNodeOnline(self, n2)
5201
5202     if not self.op.disks:
5203       self.op.disks = range(len(instance.disks))
5204
5205     for disk_idx in self.op.disks:
5206       instance.FindDisk(disk_idx)
5207
5208   def _ExecD8DiskOnly(self, feedback_fn):
5209     """Replace a disk on the primary or secondary for dbrd8.
5210
5211     The algorithm for replace is quite complicated:
5212
5213       1. for each disk to be replaced:
5214
5215         1. create new LVs on the target node with unique names
5216         1. detach old LVs from the drbd device
5217         1. rename old LVs to name_replaced.<time_t>
5218         1. rename new LVs to old LVs
5219         1. attach the new LVs (with the old names now) to the drbd device
5220
5221       1. wait for sync across all devices
5222
5223       1. for each modified disk:
5224
5225         1. remove old LVs (which have the name name_replaces.<time_t>)
5226
5227     Failures are not very well handled.
5228
5229     """
5230     steps_total = 6
5231     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5232     instance = self.instance
5233     iv_names = {}
5234     vgname = self.cfg.GetVGName()
5235     # start of work
5236     cfg = self.cfg
5237     tgt_node = self.tgt_node
5238     oth_node = self.oth_node
5239
5240     # Step: check device activation
5241     self.proc.LogStep(1, steps_total, "check device existence")
5242     info("checking volume groups")
5243     my_vg = cfg.GetVGName()
5244     results = self.rpc.call_vg_list([oth_node, tgt_node])
5245     if not results:
5246       raise errors.OpExecError("Can't list volume groups on the nodes")
5247     for node in oth_node, tgt_node:
5248       res = results[node]
5249       msg = res.RemoteFailMsg()
5250       if msg:
5251         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5252       if my_vg not in res.payload:
5253         raise errors.OpExecError("Volume group '%s' not found on %s" %
5254                                  (my_vg, node))
5255     for idx, dev in enumerate(instance.disks):
5256       if idx not in self.op.disks:
5257         continue
5258       for node in tgt_node, oth_node:
5259         info("checking disk/%d on %s" % (idx, node))
5260         cfg.SetDiskID(dev, node)
5261         result = self.rpc.call_blockdev_find(node, dev)
5262         msg = result.RemoteFailMsg()
5263         if not msg and not result.payload:
5264           msg = "disk not found"
5265         if msg:
5266           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5267                                    (idx, node, msg))
5268
5269     # Step: check other node consistency
5270     self.proc.LogStep(2, steps_total, "check peer consistency")
5271     for idx, dev in enumerate(instance.disks):
5272       if idx not in self.op.disks:
5273         continue
5274       info("checking disk/%d consistency on %s" % (idx, oth_node))
5275       if not _CheckDiskConsistency(self, dev, oth_node,
5276                                    oth_node==instance.primary_node):
5277         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5278                                  " to replace disks on this node (%s)" %
5279                                  (oth_node, tgt_node))
5280
5281     # Step: create new storage
5282     self.proc.LogStep(3, steps_total, "allocate new storage")
5283     for idx, dev in enumerate(instance.disks):
5284       if idx not in self.op.disks:
5285         continue
5286       size = dev.size
5287       cfg.SetDiskID(dev, tgt_node)
5288       lv_names = [".disk%d_%s" % (idx, suf)
5289                   for suf in ["data", "meta"]]
5290       names = _GenerateUniqueNames(self, lv_names)
5291       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5292                              logical_id=(vgname, names[0]))
5293       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5294                              logical_id=(vgname, names[1]))
5295       new_lvs = [lv_data, lv_meta]
5296       old_lvs = dev.children
5297       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5298       info("creating new local storage on %s for %s" %
5299            (tgt_node, dev.iv_name))
5300       # we pass force_create=True to force the LVM creation
5301       for new_lv in new_lvs:
5302         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5303                         _GetInstanceInfoText(instance), False)
5304
5305     # Step: for each lv, detach+rename*2+attach
5306     self.proc.LogStep(4, steps_total, "change drbd configuration")
5307     for dev, old_lvs, new_lvs in iv_names.itervalues():
5308       info("detaching %s drbd from local storage" % dev.iv_name)
5309       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5310       msg = result.RemoteFailMsg()
5311       if msg:
5312         raise errors.OpExecError("Can't detach drbd from local storage on node"
5313                                  " %s for device %s: %s" %
5314                                  (tgt_node, dev.iv_name, msg))
5315       #dev.children = []
5316       #cfg.Update(instance)
5317
5318       # ok, we created the new LVs, so now we know we have the needed
5319       # storage; as such, we proceed on the target node to rename
5320       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5321       # using the assumption that logical_id == physical_id (which in
5322       # turn is the unique_id on that node)
5323
5324       # FIXME(iustin): use a better name for the replaced LVs
5325       temp_suffix = int(time.time())
5326       ren_fn = lambda d, suff: (d.physical_id[0],
5327                                 d.physical_id[1] + "_replaced-%s" % suff)
5328       # build the rename list based on what LVs exist on the node
5329       rlist = []
5330       for to_ren in old_lvs:
5331         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5332         if not result.RemoteFailMsg() and result.payload:
5333           # device exists
5334           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5335
5336       info("renaming the old LVs on the target node")
5337       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5338       msg = result.RemoteFailMsg()
5339       if msg:
5340         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5341                                  (tgt_node, msg))
5342       # now we rename the new LVs to the old LVs
5343       info("renaming the new LVs on the target node")
5344       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5345       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5346       msg = result.RemoteFailMsg()
5347       if msg:
5348         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5349                                  (tgt_node, msg))
5350
5351       for old, new in zip(old_lvs, new_lvs):
5352         new.logical_id = old.logical_id
5353         cfg.SetDiskID(new, tgt_node)
5354
5355       for disk in old_lvs:
5356         disk.logical_id = ren_fn(disk, temp_suffix)
5357         cfg.SetDiskID(disk, tgt_node)
5358
5359       # now that the new lvs have the old name, we can add them to the device
5360       info("adding new mirror component on %s" % tgt_node)
5361       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5362       msg = result.RemoteFailMsg()
5363       if msg:
5364         for new_lv in new_lvs:
5365           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5366           if msg:
5367             warning("Can't rollback device %s: %s", dev, msg,
5368                     hint="cleanup manually the unused logical volumes")
5369         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5370
5371       dev.children = new_lvs
5372       cfg.Update(instance)
5373
5374     # Step: wait for sync
5375
5376     # this can fail as the old devices are degraded and _WaitForSync
5377     # does a combined result over all disks, so we don't check its
5378     # return value
5379     self.proc.LogStep(5, steps_total, "sync devices")
5380     _WaitForSync(self, instance, unlock=True)
5381
5382     # so check manually all the devices
5383     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5384       cfg.SetDiskID(dev, instance.primary_node)
5385       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5386       msg = result.RemoteFailMsg()
5387       if not msg and not result.payload:
5388         msg = "disk not found"
5389       if msg:
5390         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5391                                  (name, msg))
5392       if result.payload[5]:
5393         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5394
5395     # Step: remove old storage
5396     self.proc.LogStep(6, steps_total, "removing old storage")
5397     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5398       info("remove logical volumes for %s" % name)
5399       for lv in old_lvs:
5400         cfg.SetDiskID(lv, tgt_node)
5401         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5402         if msg:
5403           warning("Can't remove old LV: %s" % msg,
5404                   hint="manually remove unused LVs")
5405           continue
5406
5407   def _ExecD8Secondary(self, feedback_fn):
5408     """Replace the secondary node for drbd8.
5409
5410     The algorithm for replace is quite complicated:
5411       - for all disks of the instance:
5412         - create new LVs on the new node with same names
5413         - shutdown the drbd device on the old secondary
5414         - disconnect the drbd network on the primary
5415         - create the drbd device on the new secondary
5416         - network attach the drbd on the primary, using an artifice:
5417           the drbd code for Attach() will connect to the network if it
5418           finds a device which is connected to the good local disks but
5419           not network enabled
5420       - wait for sync across all devices
5421       - remove all disks from the old secondary
5422
5423     Failures are not very well handled.
5424
5425     """
5426     steps_total = 6
5427     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5428     instance = self.instance
5429     iv_names = {}
5430     # start of work
5431     cfg = self.cfg
5432     old_node = self.tgt_node
5433     new_node = self.new_node
5434     pri_node = instance.primary_node
5435     nodes_ip = {
5436       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5437       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5438       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5439       }
5440
5441     # Step: check device activation
5442     self.proc.LogStep(1, steps_total, "check device existence")
5443     info("checking volume groups")
5444     my_vg = cfg.GetVGName()
5445     results = self.rpc.call_vg_list([pri_node, new_node])
5446     for node in pri_node, new_node:
5447       res = results[node]
5448       msg = res.RemoteFailMsg()
5449       if msg:
5450         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5451       if my_vg not in res.payload:
5452         raise errors.OpExecError("Volume group '%s' not found on %s" %
5453                                  (my_vg, node))
5454     for idx, dev in enumerate(instance.disks):
5455       if idx not in self.op.disks:
5456         continue
5457       info("checking disk/%d on %s" % (idx, pri_node))
5458       cfg.SetDiskID(dev, pri_node)
5459       result = self.rpc.call_blockdev_find(pri_node, dev)
5460       msg = result.RemoteFailMsg()
5461       if not msg and not result.payload:
5462         msg = "disk not found"
5463       if msg:
5464         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5465                                  (idx, pri_node, msg))
5466
5467     # Step: check other node consistency
5468     self.proc.LogStep(2, steps_total, "check peer consistency")
5469     for idx, dev in enumerate(instance.disks):
5470       if idx not in self.op.disks:
5471         continue
5472       info("checking disk/%d consistency on %s" % (idx, pri_node))
5473       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5474         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5475                                  " unsafe to replace the secondary" %
5476                                  pri_node)
5477
5478     # Step: create new storage
5479     self.proc.LogStep(3, steps_total, "allocate new storage")
5480     for idx, dev in enumerate(instance.disks):
5481       info("adding new local storage on %s for disk/%d" %
5482            (new_node, idx))
5483       # we pass force_create=True to force LVM creation
5484       for new_lv in dev.children:
5485         _CreateBlockDev(self, new_node, instance, new_lv, True,
5486                         _GetInstanceInfoText(instance), False)
5487
5488     # Step 4: dbrd minors and drbd setups changes
5489     # after this, we must manually remove the drbd minors on both the
5490     # error and the success paths
5491     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5492                                    instance.name)
5493     logging.debug("Allocated minors %s" % (minors,))
5494     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5495     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5496       size = dev.size
5497       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5498       # create new devices on new_node; note that we create two IDs:
5499       # one without port, so the drbd will be activated without
5500       # networking information on the new node at this stage, and one
5501       # with network, for the latter activation in step 4
5502       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5503       if pri_node == o_node1:
5504         p_minor = o_minor1
5505       else:
5506         p_minor = o_minor2
5507
5508       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5509       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5510
5511       iv_names[idx] = (dev, dev.children, new_net_id)
5512       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5513                     new_net_id)
5514       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5515                               logical_id=new_alone_id,
5516                               children=dev.children)
5517       try:
5518         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5519                               _GetInstanceInfoText(instance), False)
5520       except errors.GenericError:
5521         self.cfg.ReleaseDRBDMinors(instance.name)
5522         raise
5523
5524     for idx, dev in enumerate(instance.disks):
5525       # we have new devices, shutdown the drbd on the old secondary
5526       info("shutting down drbd for disk/%d on old node" % idx)
5527       cfg.SetDiskID(dev, old_node)
5528       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5529       if msg:
5530         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5531                 (idx, msg),
5532                 hint="Please cleanup this device manually as soon as possible")
5533
5534     info("detaching primary drbds from the network (=> standalone)")
5535     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5536                                                instance.disks)[pri_node]
5537
5538     msg = result.RemoteFailMsg()
5539     if msg:
5540       # detaches didn't succeed (unlikely)
5541       self.cfg.ReleaseDRBDMinors(instance.name)
5542       raise errors.OpExecError("Can't detach the disks from the network on"
5543                                " old node: %s" % (msg,))
5544
5545     # if we managed to detach at least one, we update all the disks of
5546     # the instance to point to the new secondary
5547     info("updating instance configuration")
5548     for dev, _, new_logical_id in iv_names.itervalues():
5549       dev.logical_id = new_logical_id
5550       cfg.SetDiskID(dev, pri_node)
5551     cfg.Update(instance)
5552
5553     # and now perform the drbd attach
5554     info("attaching primary drbds to new secondary (standalone => connected)")
5555     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5556                                            instance.disks, instance.name,
5557                                            False)
5558     for to_node, to_result in result.items():
5559       msg = to_result.RemoteFailMsg()
5560       if msg:
5561         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5562                 hint="please do a gnt-instance info to see the"
5563                 " status of disks")
5564
5565     # this can fail as the old devices are degraded and _WaitForSync
5566     # does a combined result over all disks, so we don't check its
5567     # return value
5568     self.proc.LogStep(5, steps_total, "sync devices")
5569     _WaitForSync(self, instance, unlock=True)
5570
5571     # so check manually all the devices
5572     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5573       cfg.SetDiskID(dev, pri_node)
5574       result = self.rpc.call_blockdev_find(pri_node, dev)
5575       msg = result.RemoteFailMsg()
5576       if not msg and not result.payload:
5577         msg = "disk not found"
5578       if msg:
5579         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5580                                  (idx, msg))
5581       if result.payload[5]:
5582         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5583
5584     self.proc.LogStep(6, steps_total, "removing old storage")
5585     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5586       info("remove logical volumes for disk/%d" % idx)
5587       for lv in old_lvs:
5588         cfg.SetDiskID(lv, old_node)
5589         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5590         if msg:
5591           warning("Can't remove LV on old secondary: %s", msg,
5592                   hint="Cleanup stale volumes by hand")
5593
5594   def Exec(self, feedback_fn):
5595     """Execute disk replacement.
5596
5597     This dispatches the disk replacement to the appropriate handler.
5598
5599     """
5600     instance = self.instance
5601
5602     # Activate the instance disks if we're replacing them on a down instance
5603     if not instance.admin_up:
5604       _StartInstanceDisks(self, instance, True)
5605
5606     if self.op.mode == constants.REPLACE_DISK_CHG:
5607       fn = self._ExecD8Secondary
5608     else:
5609       fn = self._ExecD8DiskOnly
5610
5611     ret = fn(feedback_fn)
5612
5613     # Deactivate the instance disks if we're replacing them on a down instance
5614     if not instance.admin_up:
5615       _SafeShutdownInstanceDisks(self, instance)
5616
5617     return ret
5618
5619
5620 class LUGrowDisk(LogicalUnit):
5621   """Grow a disk of an instance.
5622
5623   """
5624   HPATH = "disk-grow"
5625   HTYPE = constants.HTYPE_INSTANCE
5626   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5627   REQ_BGL = False
5628
5629   def ExpandNames(self):
5630     self._ExpandAndLockInstance()
5631     self.needed_locks[locking.LEVEL_NODE] = []
5632     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5633
5634   def DeclareLocks(self, level):
5635     if level == locking.LEVEL_NODE:
5636       self._LockInstancesNodes()
5637
5638   def BuildHooksEnv(self):
5639     """Build hooks env.
5640
5641     This runs on the master, the primary and all the secondaries.
5642
5643     """
5644     env = {
5645       "DISK": self.op.disk,
5646       "AMOUNT": self.op.amount,
5647       }
5648     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5649     nl = [
5650       self.cfg.GetMasterNode(),
5651       self.instance.primary_node,
5652       ]
5653     return env, nl, nl
5654
5655   def CheckPrereq(self):
5656     """Check prerequisites.
5657
5658     This checks that the instance is in the cluster.
5659
5660     """
5661     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5662     assert instance is not None, \
5663       "Cannot retrieve locked instance %s" % self.op.instance_name
5664     nodenames = list(instance.all_nodes)
5665     for node in nodenames:
5666       _CheckNodeOnline(self, node)
5667
5668
5669     self.instance = instance
5670
5671     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5672       raise errors.OpPrereqError("Instance's disk layout does not support"
5673                                  " growing.")
5674
5675     self.disk = instance.FindDisk(self.op.disk)
5676
5677     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5678                                        instance.hypervisor)
5679     for node in nodenames:
5680       info = nodeinfo[node]
5681       msg = info.RemoteFailMsg()
5682       if msg:
5683         raise errors.OpPrereqError("Cannot get current information"
5684                                    " from node %s:" % (node, msg))
5685       vg_free = info.payload.get('vg_free', None)
5686       if not isinstance(vg_free, int):
5687         raise errors.OpPrereqError("Can't compute free disk space on"
5688                                    " node %s" % node)
5689       if self.op.amount > vg_free:
5690         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5691                                    " %d MiB available, %d MiB required" %
5692                                    (node, vg_free, self.op.amount))
5693
5694   def Exec(self, feedback_fn):
5695     """Execute disk grow.
5696
5697     """
5698     instance = self.instance
5699     disk = self.disk
5700     for node in instance.all_nodes:
5701       self.cfg.SetDiskID(disk, node)
5702       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5703       msg = result.RemoteFailMsg()
5704       if msg:
5705         raise errors.OpExecError("Grow request failed to node %s: %s" %
5706                                  (node, msg))
5707     disk.RecordGrow(self.op.amount)
5708     self.cfg.Update(instance)
5709     if self.op.wait_for_sync:
5710       disk_abort = not _WaitForSync(self, instance)
5711       if disk_abort:
5712         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5713                              " status.\nPlease check the instance.")
5714
5715
5716 class LUQueryInstanceData(NoHooksLU):
5717   """Query runtime instance data.
5718
5719   """
5720   _OP_REQP = ["instances", "static"]
5721   REQ_BGL = False
5722
5723   def ExpandNames(self):
5724     self.needed_locks = {}
5725     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5726
5727     if not isinstance(self.op.instances, list):
5728       raise errors.OpPrereqError("Invalid argument type 'instances'")
5729
5730     if self.op.instances:
5731       self.wanted_names = []
5732       for name in self.op.instances:
5733         full_name = self.cfg.ExpandInstanceName(name)
5734         if full_name is None:
5735           raise errors.OpPrereqError("Instance '%s' not known" % name)
5736         self.wanted_names.append(full_name)
5737       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5738     else:
5739       self.wanted_names = None
5740       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5741
5742     self.needed_locks[locking.LEVEL_NODE] = []
5743     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5744
5745   def DeclareLocks(self, level):
5746     if level == locking.LEVEL_NODE:
5747       self._LockInstancesNodes()
5748
5749   def CheckPrereq(self):
5750     """Check prerequisites.
5751
5752     This only checks the optional instance list against the existing names.
5753
5754     """
5755     if self.wanted_names is None:
5756       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5757
5758     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5759                              in self.wanted_names]
5760     return
5761
5762   def _ComputeDiskStatus(self, instance, snode, dev):
5763     """Compute block device status.
5764
5765     """
5766     static = self.op.static
5767     if not static:
5768       self.cfg.SetDiskID(dev, instance.primary_node)
5769       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5770       if dev_pstatus.offline:
5771         dev_pstatus = None
5772       else:
5773         msg = dev_pstatus.RemoteFailMsg()
5774         if msg:
5775           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5776                                    (instance.name, msg))
5777         dev_pstatus = dev_pstatus.payload
5778     else:
5779       dev_pstatus = None
5780
5781     if dev.dev_type in constants.LDS_DRBD:
5782       # we change the snode then (otherwise we use the one passed in)
5783       if dev.logical_id[0] == instance.primary_node:
5784         snode = dev.logical_id[1]
5785       else:
5786         snode = dev.logical_id[0]
5787
5788     if snode and not static:
5789       self.cfg.SetDiskID(dev, snode)
5790       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5791       if dev_sstatus.offline:
5792         dev_sstatus = None
5793       else:
5794         msg = dev_sstatus.RemoteFailMsg()
5795         if msg:
5796           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5797                                    (instance.name, msg))
5798         dev_sstatus = dev_sstatus.payload
5799     else:
5800       dev_sstatus = None
5801
5802     if dev.children:
5803       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5804                       for child in dev.children]
5805     else:
5806       dev_children = []
5807
5808     data = {
5809       "iv_name": dev.iv_name,
5810       "dev_type": dev.dev_type,
5811       "logical_id": dev.logical_id,
5812       "physical_id": dev.physical_id,
5813       "pstatus": dev_pstatus,
5814       "sstatus": dev_sstatus,
5815       "children": dev_children,
5816       "mode": dev.mode,
5817       }
5818
5819     return data
5820
5821   def Exec(self, feedback_fn):
5822     """Gather and return data"""
5823     result = {}
5824
5825     cluster = self.cfg.GetClusterInfo()
5826
5827     for instance in self.wanted_instances:
5828       if not self.op.static:
5829         remote_info = self.rpc.call_instance_info(instance.primary_node,
5830                                                   instance.name,
5831                                                   instance.hypervisor)
5832         msg = remote_info.RemoteFailMsg()
5833         if msg:
5834           raise errors.OpExecError("Error checking node %s: %s" %
5835                                    (instance.primary_node, msg))
5836         remote_info = remote_info.payload
5837         if remote_info and "state" in remote_info:
5838           remote_state = "up"
5839         else:
5840           remote_state = "down"
5841       else:
5842         remote_state = None
5843       if instance.admin_up:
5844         config_state = "up"
5845       else:
5846         config_state = "down"
5847
5848       disks = [self._ComputeDiskStatus(instance, None, device)
5849                for device in instance.disks]
5850
5851       idict = {
5852         "name": instance.name,
5853         "config_state": config_state,
5854         "run_state": remote_state,
5855         "pnode": instance.primary_node,
5856         "snodes": instance.secondary_nodes,
5857         "os": instance.os,
5858         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5859         "disks": disks,
5860         "hypervisor": instance.hypervisor,
5861         "network_port": instance.network_port,
5862         "hv_instance": instance.hvparams,
5863         "hv_actual": cluster.FillHV(instance),
5864         "be_instance": instance.beparams,
5865         "be_actual": cluster.FillBE(instance),
5866         }
5867
5868       result[instance.name] = idict
5869
5870     return result
5871
5872
5873 class LUSetInstanceParams(LogicalUnit):
5874   """Modifies an instances's parameters.
5875
5876   """
5877   HPATH = "instance-modify"
5878   HTYPE = constants.HTYPE_INSTANCE
5879   _OP_REQP = ["instance_name"]
5880   REQ_BGL = False
5881
5882   def CheckArguments(self):
5883     if not hasattr(self.op, 'nics'):
5884       self.op.nics = []
5885     if not hasattr(self.op, 'disks'):
5886       self.op.disks = []
5887     if not hasattr(self.op, 'beparams'):
5888       self.op.beparams = {}
5889     if not hasattr(self.op, 'hvparams'):
5890       self.op.hvparams = {}
5891     self.op.force = getattr(self.op, "force", False)
5892     if not (self.op.nics or self.op.disks or
5893             self.op.hvparams or self.op.beparams):
5894       raise errors.OpPrereqError("No changes submitted")
5895
5896     # Disk validation
5897     disk_addremove = 0
5898     for disk_op, disk_dict in self.op.disks:
5899       if disk_op == constants.DDM_REMOVE:
5900         disk_addremove += 1
5901         continue
5902       elif disk_op == constants.DDM_ADD:
5903         disk_addremove += 1
5904       else:
5905         if not isinstance(disk_op, int):
5906           raise errors.OpPrereqError("Invalid disk index")
5907       if disk_op == constants.DDM_ADD:
5908         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5909         if mode not in constants.DISK_ACCESS_SET:
5910           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5911         size = disk_dict.get('size', None)
5912         if size is None:
5913           raise errors.OpPrereqError("Required disk parameter size missing")
5914         try:
5915           size = int(size)
5916         except ValueError, err:
5917           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5918                                      str(err))
5919         disk_dict['size'] = size
5920       else:
5921         # modification of disk
5922         if 'size' in disk_dict:
5923           raise errors.OpPrereqError("Disk size change not possible, use"
5924                                      " grow-disk")
5925
5926     if disk_addremove > 1:
5927       raise errors.OpPrereqError("Only one disk add or remove operation"
5928                                  " supported at a time")
5929
5930     # NIC validation
5931     nic_addremove = 0
5932     for nic_op, nic_dict in self.op.nics:
5933       if nic_op == constants.DDM_REMOVE:
5934         nic_addremove += 1
5935         continue
5936       elif nic_op == constants.DDM_ADD:
5937         nic_addremove += 1
5938       else:
5939         if not isinstance(nic_op, int):
5940           raise errors.OpPrereqError("Invalid nic index")
5941
5942       # nic_dict should be a dict
5943       nic_ip = nic_dict.get('ip', None)
5944       if nic_ip is not None:
5945         if nic_ip.lower() == constants.VALUE_NONE:
5946           nic_dict['ip'] = None
5947         else:
5948           if not utils.IsValidIP(nic_ip):
5949             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5950
5951       nic_bridge = nic_dict.get('bridge', None)
5952       nic_link = nic_dict.get('link', None)
5953       if nic_bridge and nic_link:
5954         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5955       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5956         nic_dict['bridge'] = None
5957       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5958         nic_dict['link'] = None
5959
5960       if nic_op == constants.DDM_ADD:
5961         nic_mac = nic_dict.get('mac', None)
5962         if nic_mac is None:
5963           nic_dict['mac'] = constants.VALUE_AUTO
5964
5965       if 'mac' in nic_dict:
5966         nic_mac = nic_dict['mac']
5967         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5968           if not utils.IsValidMac(nic_mac):
5969             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5970         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5971           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5972                                      " modifying an existing nic")
5973
5974     if nic_addremove > 1:
5975       raise errors.OpPrereqError("Only one NIC add or remove operation"
5976                                  " supported at a time")
5977
5978   def ExpandNames(self):
5979     self._ExpandAndLockInstance()
5980     self.needed_locks[locking.LEVEL_NODE] = []
5981     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5982
5983   def DeclareLocks(self, level):
5984     if level == locking.LEVEL_NODE:
5985       self._LockInstancesNodes()
5986
5987   def BuildHooksEnv(self):
5988     """Build hooks env.
5989
5990     This runs on the master, primary and secondaries.
5991
5992     """
5993     args = dict()
5994     if constants.BE_MEMORY in self.be_new:
5995       args['memory'] = self.be_new[constants.BE_MEMORY]
5996     if constants.BE_VCPUS in self.be_new:
5997       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5998     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5999     # information at all.
6000     if self.op.nics:
6001       args['nics'] = []
6002       nic_override = dict(self.op.nics)
6003       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6004       for idx, nic in enumerate(self.instance.nics):
6005         if idx in nic_override:
6006           this_nic_override = nic_override[idx]
6007         else:
6008           this_nic_override = {}
6009         if 'ip' in this_nic_override:
6010           ip = this_nic_override['ip']
6011         else:
6012           ip = nic.ip
6013         if 'mac' in this_nic_override:
6014           mac = this_nic_override['mac']
6015         else:
6016           mac = nic.mac
6017         if idx in self.nic_pnew:
6018           nicparams = self.nic_pnew[idx]
6019         else:
6020           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6021         mode = nicparams[constants.NIC_MODE]
6022         link = nicparams[constants.NIC_LINK]
6023         args['nics'].append((ip, mac, mode, link))
6024       if constants.DDM_ADD in nic_override:
6025         ip = nic_override[constants.DDM_ADD].get('ip', None)
6026         mac = nic_override[constants.DDM_ADD]['mac']
6027         nicparams = self.nic_pnew[constants.DDM_ADD]
6028         mode = nicparams[constants.NIC_MODE]
6029         link = nicparams[constants.NIC_LINK]
6030         args['nics'].append((ip, mac, mode, link))
6031       elif constants.DDM_REMOVE in nic_override:
6032         del args['nics'][-1]
6033
6034     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6035     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6036     return env, nl, nl
6037
6038   def _GetUpdatedParams(self, old_params, update_dict,
6039                         default_values, parameter_types):
6040     """Return the new params dict for the given params.
6041
6042     @type old_params: dict
6043     @type old_params: old parameters
6044     @type update_dict: dict
6045     @type update_dict: dict containing new parameter values,
6046                        or constants.VALUE_DEFAULT to reset the
6047                        parameter to its default value
6048     @type default_values: dict
6049     @param default_values: default values for the filled parameters
6050     @type parameter_types: dict
6051     @param parameter_types: dict mapping target dict keys to types
6052                             in constants.ENFORCEABLE_TYPES
6053     @rtype: (dict, dict)
6054     @return: (new_parameters, filled_parameters)
6055
6056     """
6057     params_copy = copy.deepcopy(old_params)
6058     for key, val in update_dict.iteritems():
6059       if val == constants.VALUE_DEFAULT:
6060         try:
6061           del params_copy[key]
6062         except KeyError:
6063           pass
6064       else:
6065         params_copy[key] = val
6066     utils.ForceDictType(params_copy, parameter_types)
6067     params_filled = objects.FillDict(default_values, params_copy)
6068     return (params_copy, params_filled)
6069
6070   def CheckPrereq(self):
6071     """Check prerequisites.
6072
6073     This only checks the instance list against the existing names.
6074
6075     """
6076     force = self.force = self.op.force
6077
6078     # checking the new params on the primary/secondary nodes
6079
6080     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6081     cluster = self.cluster = self.cfg.GetClusterInfo()
6082     assert self.instance is not None, \
6083       "Cannot retrieve locked instance %s" % self.op.instance_name
6084     pnode = instance.primary_node
6085     nodelist = list(instance.all_nodes)
6086
6087     # hvparams processing
6088     if self.op.hvparams:
6089       i_hvdict, hv_new = self._GetUpdatedParams(
6090                              instance.hvparams, self.op.hvparams,
6091                              cluster.hvparams[instance.hypervisor],
6092                              constants.HVS_PARAMETER_TYPES)
6093       # local check
6094       hypervisor.GetHypervisor(
6095         instance.hypervisor).CheckParameterSyntax(hv_new)
6096       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6097       self.hv_new = hv_new # the new actual values
6098       self.hv_inst = i_hvdict # the new dict (without defaults)
6099     else:
6100       self.hv_new = self.hv_inst = {}
6101
6102     # beparams processing
6103     if self.op.beparams:
6104       i_bedict, be_new = self._GetUpdatedParams(
6105                              instance.beparams, self.op.beparams,
6106                              cluster.beparams[constants.PP_DEFAULT],
6107                              constants.BES_PARAMETER_TYPES)
6108       self.be_new = be_new # the new actual values
6109       self.be_inst = i_bedict # the new dict (without defaults)
6110     else:
6111       self.be_new = self.be_inst = {}
6112
6113     self.warn = []
6114
6115     if constants.BE_MEMORY in self.op.beparams and not self.force:
6116       mem_check_list = [pnode]
6117       if be_new[constants.BE_AUTO_BALANCE]:
6118         # either we changed auto_balance to yes or it was from before
6119         mem_check_list.extend(instance.secondary_nodes)
6120       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6121                                                   instance.hypervisor)
6122       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6123                                          instance.hypervisor)
6124       pninfo = nodeinfo[pnode]
6125       msg = pninfo.RemoteFailMsg()
6126       if msg:
6127         # Assume the primary node is unreachable and go ahead
6128         self.warn.append("Can't get info from primary node %s: %s" %
6129                          (pnode,  msg))
6130       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6131         self.warn.append("Node data from primary node %s doesn't contain"
6132                          " free memory information" % pnode)
6133       elif instance_info.RemoteFailMsg():
6134         self.warn.append("Can't get instance runtime information: %s" %
6135                         instance_info.RemoteFailMsg())
6136       else:
6137         if instance_info.payload:
6138           current_mem = int(instance_info.payload['memory'])
6139         else:
6140           # Assume instance not running
6141           # (there is a slight race condition here, but it's not very probable,
6142           # and we have no other way to check)
6143           current_mem = 0
6144         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6145                     pninfo.payload['memory_free'])
6146         if miss_mem > 0:
6147           raise errors.OpPrereqError("This change will prevent the instance"
6148                                      " from starting, due to %d MB of memory"
6149                                      " missing on its primary node" % miss_mem)
6150
6151       if be_new[constants.BE_AUTO_BALANCE]:
6152         for node, nres in nodeinfo.items():
6153           if node not in instance.secondary_nodes:
6154             continue
6155           msg = nres.RemoteFailMsg()
6156           if msg:
6157             self.warn.append("Can't get info from secondary node %s: %s" %
6158                              (node, msg))
6159           elif not isinstance(nres.payload.get('memory_free', None), int):
6160             self.warn.append("Secondary node %s didn't return free"
6161                              " memory information" % node)
6162           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6163             self.warn.append("Not enough memory to failover instance to"
6164                              " secondary node %s" % node)
6165
6166     # NIC processing
6167     self.nic_pnew = {}
6168     self.nic_pinst = {}
6169     for nic_op, nic_dict in self.op.nics:
6170       if nic_op == constants.DDM_REMOVE:
6171         if not instance.nics:
6172           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6173         continue
6174       if nic_op != constants.DDM_ADD:
6175         # an existing nic
6176         if nic_op < 0 or nic_op >= len(instance.nics):
6177           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6178                                      " are 0 to %d" %
6179                                      (nic_op, len(instance.nics)))
6180         old_nic_params = instance.nics[nic_op].nicparams
6181         old_nic_ip = instance.nics[nic_op].ip
6182       else:
6183         old_nic_params = {}
6184         old_nic_ip = None
6185
6186       update_params_dict = dict([(key, nic_dict[key])
6187                                  for key in constants.NICS_PARAMETERS
6188                                  if key in nic_dict])
6189
6190       if 'bridge' in nic_dict:
6191         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6192
6193       new_nic_params, new_filled_nic_params = \
6194           self._GetUpdatedParams(old_nic_params, update_params_dict,
6195                                  cluster.nicparams[constants.PP_DEFAULT],
6196                                  constants.NICS_PARAMETER_TYPES)
6197       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6198       self.nic_pinst[nic_op] = new_nic_params
6199       self.nic_pnew[nic_op] = new_filled_nic_params
6200       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6201
6202       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6203         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6204         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6205         msg = result.RemoteFailMsg()
6206         if msg:
6207           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6208           if self.force:
6209             self.warn.append(msg)
6210           else:
6211             raise errors.OpPrereqError(msg)
6212       if new_nic_mode == constants.NIC_MODE_ROUTED:
6213         if 'ip' in nic_dict:
6214           nic_ip = nic_dict['ip']
6215         else:
6216           nic_ip = old_nic_ip
6217         if nic_ip is None:
6218           raise errors.OpPrereqError('Cannot set the nic ip to None'
6219                                      ' on a routed nic')
6220       if 'mac' in nic_dict:
6221         nic_mac = nic_dict['mac']
6222         if nic_mac is None:
6223           raise errors.OpPrereqError('Cannot set the nic mac to None')
6224         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6225           # otherwise generate the mac
6226           nic_dict['mac'] = self.cfg.GenerateMAC()
6227         else:
6228           # or validate/reserve the current one
6229           if self.cfg.IsMacInUse(nic_mac):
6230             raise errors.OpPrereqError("MAC address %s already in use"
6231                                        " in cluster" % nic_mac)
6232
6233     # DISK processing
6234     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6235       raise errors.OpPrereqError("Disk operations not supported for"
6236                                  " diskless instances")
6237     for disk_op, disk_dict in self.op.disks:
6238       if disk_op == constants.DDM_REMOVE:
6239         if len(instance.disks) == 1:
6240           raise errors.OpPrereqError("Cannot remove the last disk of"
6241                                      " an instance")
6242         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6243         ins_l = ins_l[pnode]
6244         msg = ins_l.RemoteFailMsg()
6245         if msg:
6246           raise errors.OpPrereqError("Can't contact node %s: %s" %
6247                                      (pnode, msg))
6248         if instance.name in ins_l.payload:
6249           raise errors.OpPrereqError("Instance is running, can't remove"
6250                                      " disks.")
6251
6252       if (disk_op == constants.DDM_ADD and
6253           len(instance.nics) >= constants.MAX_DISKS):
6254         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6255                                    " add more" % constants.MAX_DISKS)
6256       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6257         # an existing disk
6258         if disk_op < 0 or disk_op >= len(instance.disks):
6259           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6260                                      " are 0 to %d" %
6261                                      (disk_op, len(instance.disks)))
6262
6263     return
6264
6265   def Exec(self, feedback_fn):
6266     """Modifies an instance.
6267
6268     All parameters take effect only at the next restart of the instance.
6269
6270     """
6271     # Process here the warnings from CheckPrereq, as we don't have a
6272     # feedback_fn there.
6273     for warn in self.warn:
6274       feedback_fn("WARNING: %s" % warn)
6275
6276     result = []
6277     instance = self.instance
6278     cluster = self.cluster
6279     # disk changes
6280     for disk_op, disk_dict in self.op.disks:
6281       if disk_op == constants.DDM_REMOVE:
6282         # remove the last disk
6283         device = instance.disks.pop()
6284         device_idx = len(instance.disks)
6285         for node, disk in device.ComputeNodeTree(instance.primary_node):
6286           self.cfg.SetDiskID(disk, node)
6287           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6288           if msg:
6289             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6290                             " continuing anyway", device_idx, node, msg)
6291         result.append(("disk/%d" % device_idx, "remove"))
6292       elif disk_op == constants.DDM_ADD:
6293         # add a new disk
6294         if instance.disk_template == constants.DT_FILE:
6295           file_driver, file_path = instance.disks[0].logical_id
6296           file_path = os.path.dirname(file_path)
6297         else:
6298           file_driver = file_path = None
6299         disk_idx_base = len(instance.disks)
6300         new_disk = _GenerateDiskTemplate(self,
6301                                          instance.disk_template,
6302                                          instance.name, instance.primary_node,
6303                                          instance.secondary_nodes,
6304                                          [disk_dict],
6305                                          file_path,
6306                                          file_driver,
6307                                          disk_idx_base)[0]
6308         instance.disks.append(new_disk)
6309         info = _GetInstanceInfoText(instance)
6310
6311         logging.info("Creating volume %s for instance %s",
6312                      new_disk.iv_name, instance.name)
6313         # Note: this needs to be kept in sync with _CreateDisks
6314         #HARDCODE
6315         for node in instance.all_nodes:
6316           f_create = node == instance.primary_node
6317           try:
6318             _CreateBlockDev(self, node, instance, new_disk,
6319                             f_create, info, f_create)
6320           except errors.OpExecError, err:
6321             self.LogWarning("Failed to create volume %s (%s) on"
6322                             " node %s: %s",
6323                             new_disk.iv_name, new_disk, node, err)
6324         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6325                        (new_disk.size, new_disk.mode)))
6326       else:
6327         # change a given disk
6328         instance.disks[disk_op].mode = disk_dict['mode']
6329         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6330     # NIC changes
6331     for nic_op, nic_dict in self.op.nics:
6332       if nic_op == constants.DDM_REMOVE:
6333         # remove the last nic
6334         del instance.nics[-1]
6335         result.append(("nic.%d" % len(instance.nics), "remove"))
6336       elif nic_op == constants.DDM_ADD:
6337         # mac and bridge should be set, by now
6338         mac = nic_dict['mac']
6339         ip = nic_dict.get('ip', None)
6340         nicparams = self.nic_pinst[constants.DDM_ADD]
6341         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6342         instance.nics.append(new_nic)
6343         result.append(("nic.%d" % (len(instance.nics) - 1),
6344                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6345                        (new_nic.mac, new_nic.ip,
6346                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6347                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6348                        )))
6349       else:
6350         for key in 'mac', 'ip':
6351           if key in nic_dict:
6352             setattr(instance.nics[nic_op], key, nic_dict[key])
6353         if nic_op in self.nic_pnew:
6354           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6355         for key, val in nic_dict.iteritems():
6356           result.append(("nic.%s/%d" % (key, nic_op), val))
6357
6358     # hvparams changes
6359     if self.op.hvparams:
6360       instance.hvparams = self.hv_inst
6361       for key, val in self.op.hvparams.iteritems():
6362         result.append(("hv/%s" % key, val))
6363
6364     # beparams changes
6365     if self.op.beparams:
6366       instance.beparams = self.be_inst
6367       for key, val in self.op.beparams.iteritems():
6368         result.append(("be/%s" % key, val))
6369
6370     self.cfg.Update(instance)
6371
6372     return result
6373
6374
6375 class LUQueryExports(NoHooksLU):
6376   """Query the exports list
6377
6378   """
6379   _OP_REQP = ['nodes']
6380   REQ_BGL = False
6381
6382   def ExpandNames(self):
6383     self.needed_locks = {}
6384     self.share_locks[locking.LEVEL_NODE] = 1
6385     if not self.op.nodes:
6386       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6387     else:
6388       self.needed_locks[locking.LEVEL_NODE] = \
6389         _GetWantedNodes(self, self.op.nodes)
6390
6391   def CheckPrereq(self):
6392     """Check prerequisites.
6393
6394     """
6395     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6396
6397   def Exec(self, feedback_fn):
6398     """Compute the list of all the exported system images.
6399
6400     @rtype: dict
6401     @return: a dictionary with the structure node->(export-list)
6402         where export-list is a list of the instances exported on
6403         that node.
6404
6405     """
6406     rpcresult = self.rpc.call_export_list(self.nodes)
6407     result = {}
6408     for node in rpcresult:
6409       if rpcresult[node].RemoteFailMsg():
6410         result[node] = False
6411       else:
6412         result[node] = rpcresult[node].payload
6413
6414     return result
6415
6416
6417 class LUExportInstance(LogicalUnit):
6418   """Export an instance to an image in the cluster.
6419
6420   """
6421   HPATH = "instance-export"
6422   HTYPE = constants.HTYPE_INSTANCE
6423   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6424   REQ_BGL = False
6425
6426   def ExpandNames(self):
6427     self._ExpandAndLockInstance()
6428     # FIXME: lock only instance primary and destination node
6429     #
6430     # Sad but true, for now we have do lock all nodes, as we don't know where
6431     # the previous export might be, and and in this LU we search for it and
6432     # remove it from its current node. In the future we could fix this by:
6433     #  - making a tasklet to search (share-lock all), then create the new one,
6434     #    then one to remove, after
6435     #  - removing the removal operation altoghether
6436     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6437
6438   def DeclareLocks(self, level):
6439     """Last minute lock declaration."""
6440     # All nodes are locked anyway, so nothing to do here.
6441
6442   def BuildHooksEnv(self):
6443     """Build hooks env.
6444
6445     This will run on the master, primary node and target node.
6446
6447     """
6448     env = {
6449       "EXPORT_NODE": self.op.target_node,
6450       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6451       }
6452     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6453     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6454           self.op.target_node]
6455     return env, nl, nl
6456
6457   def CheckPrereq(self):
6458     """Check prerequisites.
6459
6460     This checks that the instance and node names are valid.
6461
6462     """
6463     instance_name = self.op.instance_name
6464     self.instance = self.cfg.GetInstanceInfo(instance_name)
6465     assert self.instance is not None, \
6466           "Cannot retrieve locked instance %s" % self.op.instance_name
6467     _CheckNodeOnline(self, self.instance.primary_node)
6468
6469     self.dst_node = self.cfg.GetNodeInfo(
6470       self.cfg.ExpandNodeName(self.op.target_node))
6471
6472     if self.dst_node is None:
6473       # This is wrong node name, not a non-locked node
6474       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6475     _CheckNodeOnline(self, self.dst_node.name)
6476     _CheckNodeNotDrained(self, self.dst_node.name)
6477
6478     # instance disk type verification
6479     for disk in self.instance.disks:
6480       if disk.dev_type == constants.LD_FILE:
6481         raise errors.OpPrereqError("Export not supported for instances with"
6482                                    " file-based disks")
6483
6484   def Exec(self, feedback_fn):
6485     """Export an instance to an image in the cluster.
6486
6487     """
6488     instance = self.instance
6489     dst_node = self.dst_node
6490     src_node = instance.primary_node
6491     if self.op.shutdown:
6492       # shutdown the instance, but not the disks
6493       result = self.rpc.call_instance_shutdown(src_node, instance)
6494       msg = result.RemoteFailMsg()
6495       if msg:
6496         raise errors.OpExecError("Could not shutdown instance %s on"
6497                                  " node %s: %s" %
6498                                  (instance.name, src_node, msg))
6499
6500     vgname = self.cfg.GetVGName()
6501
6502     snap_disks = []
6503
6504     # set the disks ID correctly since call_instance_start needs the
6505     # correct drbd minor to create the symlinks
6506     for disk in instance.disks:
6507       self.cfg.SetDiskID(disk, src_node)
6508
6509     try:
6510       for disk in instance.disks:
6511         # result.payload will be a snapshot of an lvm leaf of the one we passed
6512         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6513         msg = result.RemoteFailMsg()
6514         if msg:
6515           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6516                           disk.logical_id[1], src_node, msg)
6517           snap_disks.append(False)
6518         else:
6519           disk_id = (vgname, result.payload)
6520           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6521                                  logical_id=disk_id, physical_id=disk_id,
6522                                  iv_name=disk.iv_name)
6523           snap_disks.append(new_dev)
6524
6525     finally:
6526       if self.op.shutdown and instance.admin_up:
6527         result = self.rpc.call_instance_start(src_node, instance, None, None)
6528         msg = result.RemoteFailMsg()
6529         if msg:
6530           _ShutdownInstanceDisks(self, instance)
6531           raise errors.OpExecError("Could not start instance: %s" % msg)
6532
6533     # TODO: check for size
6534
6535     cluster_name = self.cfg.GetClusterName()
6536     for idx, dev in enumerate(snap_disks):
6537       if dev:
6538         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6539                                                instance, cluster_name, idx)
6540         msg = result.RemoteFailMsg()
6541         if msg:
6542           self.LogWarning("Could not export block device %s from node %s to"
6543                           " node %s: %s", dev.logical_id[1], src_node,
6544                           dst_node.name, msg)
6545         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6546         if msg:
6547           self.LogWarning("Could not remove snapshot block device %s from node"
6548                           " %s: %s", dev.logical_id[1], src_node, msg)
6549
6550     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6551     msg = result.RemoteFailMsg()
6552     if msg:
6553       self.LogWarning("Could not finalize export for instance %s"
6554                       " on node %s: %s", instance.name, dst_node.name, msg)
6555
6556     nodelist = self.cfg.GetNodeList()
6557     nodelist.remove(dst_node.name)
6558
6559     # on one-node clusters nodelist will be empty after the removal
6560     # if we proceed the backup would be removed because OpQueryExports
6561     # substitutes an empty list with the full cluster node list.
6562     iname = instance.name
6563     if nodelist:
6564       exportlist = self.rpc.call_export_list(nodelist)
6565       for node in exportlist:
6566         if exportlist[node].RemoteFailMsg():
6567           continue
6568         if iname in exportlist[node].payload:
6569           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6570           if msg:
6571             self.LogWarning("Could not remove older export for instance %s"
6572                             " on node %s: %s", iname, node, msg)
6573
6574
6575 class LURemoveExport(NoHooksLU):
6576   """Remove exports related to the named instance.
6577
6578   """
6579   _OP_REQP = ["instance_name"]
6580   REQ_BGL = False
6581
6582   def ExpandNames(self):
6583     self.needed_locks = {}
6584     # We need all nodes to be locked in order for RemoveExport to work, but we
6585     # don't need to lock the instance itself, as nothing will happen to it (and
6586     # we can remove exports also for a removed instance)
6587     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6588
6589   def CheckPrereq(self):
6590     """Check prerequisites.
6591     """
6592     pass
6593
6594   def Exec(self, feedback_fn):
6595     """Remove any export.
6596
6597     """
6598     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6599     # If the instance was not found we'll try with the name that was passed in.
6600     # This will only work if it was an FQDN, though.
6601     fqdn_warn = False
6602     if not instance_name:
6603       fqdn_warn = True
6604       instance_name = self.op.instance_name
6605
6606     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6607     exportlist = self.rpc.call_export_list(locked_nodes)
6608     found = False
6609     for node in exportlist:
6610       msg = exportlist[node].RemoteFailMsg()
6611       if msg:
6612         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6613         continue
6614       if instance_name in exportlist[node].payload:
6615         found = True
6616         result = self.rpc.call_export_remove(node, instance_name)
6617         msg = result.RemoteFailMsg()
6618         if msg:
6619           logging.error("Could not remove export for instance %s"
6620                         " on node %s: %s", instance_name, node, msg)
6621
6622     if fqdn_warn and not found:
6623       feedback_fn("Export not found. If trying to remove an export belonging"
6624                   " to a deleted instance please use its Fully Qualified"
6625                   " Domain Name.")
6626
6627
6628 class TagsLU(NoHooksLU):
6629   """Generic tags LU.
6630
6631   This is an abstract class which is the parent of all the other tags LUs.
6632
6633   """
6634
6635   def ExpandNames(self):
6636     self.needed_locks = {}
6637     if self.op.kind == constants.TAG_NODE:
6638       name = self.cfg.ExpandNodeName(self.op.name)
6639       if name is None:
6640         raise errors.OpPrereqError("Invalid node name (%s)" %
6641                                    (self.op.name,))
6642       self.op.name = name
6643       self.needed_locks[locking.LEVEL_NODE] = name
6644     elif self.op.kind == constants.TAG_INSTANCE:
6645       name = self.cfg.ExpandInstanceName(self.op.name)
6646       if name is None:
6647         raise errors.OpPrereqError("Invalid instance name (%s)" %
6648                                    (self.op.name,))
6649       self.op.name = name
6650       self.needed_locks[locking.LEVEL_INSTANCE] = name
6651
6652   def CheckPrereq(self):
6653     """Check prerequisites.
6654
6655     """
6656     if self.op.kind == constants.TAG_CLUSTER:
6657       self.target = self.cfg.GetClusterInfo()
6658     elif self.op.kind == constants.TAG_NODE:
6659       self.target = self.cfg.GetNodeInfo(self.op.name)
6660     elif self.op.kind == constants.TAG_INSTANCE:
6661       self.target = self.cfg.GetInstanceInfo(self.op.name)
6662     else:
6663       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6664                                  str(self.op.kind))
6665
6666
6667 class LUGetTags(TagsLU):
6668   """Returns the tags of a given object.
6669
6670   """
6671   _OP_REQP = ["kind", "name"]
6672   REQ_BGL = False
6673
6674   def Exec(self, feedback_fn):
6675     """Returns the tag list.
6676
6677     """
6678     return list(self.target.GetTags())
6679
6680
6681 class LUSearchTags(NoHooksLU):
6682   """Searches the tags for a given pattern.
6683
6684   """
6685   _OP_REQP = ["pattern"]
6686   REQ_BGL = False
6687
6688   def ExpandNames(self):
6689     self.needed_locks = {}
6690
6691   def CheckPrereq(self):
6692     """Check prerequisites.
6693
6694     This checks the pattern passed for validity by compiling it.
6695
6696     """
6697     try:
6698       self.re = re.compile(self.op.pattern)
6699     except re.error, err:
6700       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6701                                  (self.op.pattern, err))
6702
6703   def Exec(self, feedback_fn):
6704     """Returns the tag list.
6705
6706     """
6707     cfg = self.cfg
6708     tgts = [("/cluster", cfg.GetClusterInfo())]
6709     ilist = cfg.GetAllInstancesInfo().values()
6710     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6711     nlist = cfg.GetAllNodesInfo().values()
6712     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6713     results = []
6714     for path, target in tgts:
6715       for tag in target.GetTags():
6716         if self.re.search(tag):
6717           results.append((path, tag))
6718     return results
6719
6720
6721 class LUAddTags(TagsLU):
6722   """Sets a tag on a given object.
6723
6724   """
6725   _OP_REQP = ["kind", "name", "tags"]
6726   REQ_BGL = False
6727
6728   def CheckPrereq(self):
6729     """Check prerequisites.
6730
6731     This checks the type and length of the tag name and value.
6732
6733     """
6734     TagsLU.CheckPrereq(self)
6735     for tag in self.op.tags:
6736       objects.TaggableObject.ValidateTag(tag)
6737
6738   def Exec(self, feedback_fn):
6739     """Sets the tag.
6740
6741     """
6742     try:
6743       for tag in self.op.tags:
6744         self.target.AddTag(tag)
6745     except errors.TagError, err:
6746       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6747     try:
6748       self.cfg.Update(self.target)
6749     except errors.ConfigurationError:
6750       raise errors.OpRetryError("There has been a modification to the"
6751                                 " config file and the operation has been"
6752                                 " aborted. Please retry.")
6753
6754
6755 class LUDelTags(TagsLU):
6756   """Delete a list of tags from a given object.
6757
6758   """
6759   _OP_REQP = ["kind", "name", "tags"]
6760   REQ_BGL = False
6761
6762   def CheckPrereq(self):
6763     """Check prerequisites.
6764
6765     This checks that we have the given tag.
6766
6767     """
6768     TagsLU.CheckPrereq(self)
6769     for tag in self.op.tags:
6770       objects.TaggableObject.ValidateTag(tag)
6771     del_tags = frozenset(self.op.tags)
6772     cur_tags = self.target.GetTags()
6773     if not del_tags <= cur_tags:
6774       diff_tags = del_tags - cur_tags
6775       diff_names = ["'%s'" % tag for tag in diff_tags]
6776       diff_names.sort()
6777       raise errors.OpPrereqError("Tag(s) %s not found" %
6778                                  (",".join(diff_names)))
6779
6780   def Exec(self, feedback_fn):
6781     """Remove the tag from the object.
6782
6783     """
6784     for tag in self.op.tags:
6785       self.target.RemoveTag(tag)
6786     try:
6787       self.cfg.Update(self.target)
6788     except errors.ConfigurationError:
6789       raise errors.OpRetryError("There has been a modification to the"
6790                                 " config file and the operation has been"
6791                                 " aborted. Please retry.")
6792
6793
6794 class LUTestDelay(NoHooksLU):
6795   """Sleep for a specified amount of time.
6796
6797   This LU sleeps on the master and/or nodes for a specified amount of
6798   time.
6799
6800   """
6801   _OP_REQP = ["duration", "on_master", "on_nodes"]
6802   REQ_BGL = False
6803
6804   def ExpandNames(self):
6805     """Expand names and set required locks.
6806
6807     This expands the node list, if any.
6808
6809     """
6810     self.needed_locks = {}
6811     if self.op.on_nodes:
6812       # _GetWantedNodes can be used here, but is not always appropriate to use
6813       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6814       # more information.
6815       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6816       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6817
6818   def CheckPrereq(self):
6819     """Check prerequisites.
6820
6821     """
6822
6823   def Exec(self, feedback_fn):
6824     """Do the actual sleep.
6825
6826     """
6827     if self.op.on_master:
6828       if not utils.TestDelay(self.op.duration):
6829         raise errors.OpExecError("Error during master delay test")
6830     if self.op.on_nodes:
6831       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6832       if not result:
6833         raise errors.OpExecError("Complete failure from rpc call")
6834       for node, node_result in result.items():
6835         node_result.Raise()
6836         if not node_result.data:
6837           raise errors.OpExecError("Failure during rpc call to node %s,"
6838                                    " result: %s" % (node, node_result.data))
6839
6840
6841 class IAllocator(object):
6842   """IAllocator framework.
6843
6844   An IAllocator instance has three sets of attributes:
6845     - cfg that is needed to query the cluster
6846     - input data (all members of the _KEYS class attribute are required)
6847     - four buffer attributes (in|out_data|text), that represent the
6848       input (to the external script) in text and data structure format,
6849       and the output from it, again in two formats
6850     - the result variables from the script (success, info, nodes) for
6851       easy usage
6852
6853   """
6854   _ALLO_KEYS = [
6855     "mem_size", "disks", "disk_template",
6856     "os", "tags", "nics", "vcpus", "hypervisor",
6857     ]
6858   _RELO_KEYS = [
6859     "relocate_from",
6860     ]
6861
6862   def __init__(self, lu, mode, name, **kwargs):
6863     self.lu = lu
6864     # init buffer variables
6865     self.in_text = self.out_text = self.in_data = self.out_data = None
6866     # init all input fields so that pylint is happy
6867     self.mode = mode
6868     self.name = name
6869     self.mem_size = self.disks = self.disk_template = None
6870     self.os = self.tags = self.nics = self.vcpus = None
6871     self.hypervisor = None
6872     self.relocate_from = None
6873     # computed fields
6874     self.required_nodes = None
6875     # init result fields
6876     self.success = self.info = self.nodes = None
6877     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6878       keyset = self._ALLO_KEYS
6879     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6880       keyset = self._RELO_KEYS
6881     else:
6882       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6883                                    " IAllocator" % self.mode)
6884     for key in kwargs:
6885       if key not in keyset:
6886         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6887                                      " IAllocator" % key)
6888       setattr(self, key, kwargs[key])
6889     for key in keyset:
6890       if key not in kwargs:
6891         raise errors.ProgrammerError("Missing input parameter '%s' to"
6892                                      " IAllocator" % key)
6893     self._BuildInputData()
6894
6895   def _ComputeClusterData(self):
6896     """Compute the generic allocator input data.
6897
6898     This is the data that is independent of the actual operation.
6899
6900     """
6901     cfg = self.lu.cfg
6902     cluster_info = cfg.GetClusterInfo()
6903     # cluster data
6904     data = {
6905       "version": constants.IALLOCATOR_VERSION,
6906       "cluster_name": cfg.GetClusterName(),
6907       "cluster_tags": list(cluster_info.GetTags()),
6908       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6909       # we don't have job IDs
6910       }
6911     iinfo = cfg.GetAllInstancesInfo().values()
6912     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6913
6914     # node data
6915     node_results = {}
6916     node_list = cfg.GetNodeList()
6917
6918     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6919       hypervisor_name = self.hypervisor
6920     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6921       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6922
6923     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6924                                            hypervisor_name)
6925     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6926                        cluster_info.enabled_hypervisors)
6927     for nname, nresult in node_data.items():
6928       # first fill in static (config-based) values
6929       ninfo = cfg.GetNodeInfo(nname)
6930       pnr = {
6931         "tags": list(ninfo.GetTags()),
6932         "primary_ip": ninfo.primary_ip,
6933         "secondary_ip": ninfo.secondary_ip,
6934         "offline": ninfo.offline,
6935         "drained": ninfo.drained,
6936         "master_candidate": ninfo.master_candidate,
6937         }
6938
6939       if not ninfo.offline:
6940         msg = nresult.RemoteFailMsg()
6941         if msg:
6942           raise errors.OpExecError("Can't get data for node %s: %s" %
6943                                    (nname, msg))
6944         msg = node_iinfo[nname].RemoteFailMsg()
6945         if msg:
6946           raise errors.OpExecError("Can't get node instance info"
6947                                    " from node %s: %s" % (nname, msg))
6948         remote_info = nresult.payload
6949         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6950                      'vg_size', 'vg_free', 'cpu_total']:
6951           if attr not in remote_info:
6952             raise errors.OpExecError("Node '%s' didn't return attribute"
6953                                      " '%s'" % (nname, attr))
6954           if not isinstance(remote_info[attr], int):
6955             raise errors.OpExecError("Node '%s' returned invalid value"
6956                                      " for '%s': %s" %
6957                                      (nname, attr, remote_info[attr]))
6958         # compute memory used by primary instances
6959         i_p_mem = i_p_up_mem = 0
6960         for iinfo, beinfo in i_list:
6961           if iinfo.primary_node == nname:
6962             i_p_mem += beinfo[constants.BE_MEMORY]
6963             if iinfo.name not in node_iinfo[nname].payload:
6964               i_used_mem = 0
6965             else:
6966               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6967             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6968             remote_info['memory_free'] -= max(0, i_mem_diff)
6969
6970             if iinfo.admin_up:
6971               i_p_up_mem += beinfo[constants.BE_MEMORY]
6972
6973         # compute memory used by instances
6974         pnr_dyn = {
6975           "total_memory": remote_info['memory_total'],
6976           "reserved_memory": remote_info['memory_dom0'],
6977           "free_memory": remote_info['memory_free'],
6978           "total_disk": remote_info['vg_size'],
6979           "free_disk": remote_info['vg_free'],
6980           "total_cpus": remote_info['cpu_total'],
6981           "i_pri_memory": i_p_mem,
6982           "i_pri_up_memory": i_p_up_mem,
6983           }
6984         pnr.update(pnr_dyn)
6985
6986       node_results[nname] = pnr
6987     data["nodes"] = node_results
6988
6989     # instance data
6990     instance_data = {}
6991     for iinfo, beinfo in i_list:
6992       nic_data = []
6993       for nic in iinfo.nics:
6994         filled_params = objects.FillDict(
6995             cluster_info.nicparams[constants.PP_DEFAULT],
6996             nic.nicparams)
6997         nic_dict = {"mac": nic.mac,
6998                     "ip": nic.ip,
6999                     "mode": filled_params[constants.NIC_MODE],
7000                     "link": filled_params[constants.NIC_LINK],
7001                    }
7002         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7003           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7004         nic_data.append(nic_dict)
7005       pir = {
7006         "tags": list(iinfo.GetTags()),
7007         "admin_up": iinfo.admin_up,
7008         "vcpus": beinfo[constants.BE_VCPUS],
7009         "memory": beinfo[constants.BE_MEMORY],
7010         "os": iinfo.os,
7011         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7012         "nics": nic_data,
7013         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7014         "disk_template": iinfo.disk_template,
7015         "hypervisor": iinfo.hypervisor,
7016         }
7017       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7018                                                  pir["disks"])
7019       instance_data[iinfo.name] = pir
7020
7021     data["instances"] = instance_data
7022
7023     self.in_data = data
7024
7025   def _AddNewInstance(self):
7026     """Add new instance data to allocator structure.
7027
7028     This in combination with _AllocatorGetClusterData will create the
7029     correct structure needed as input for the allocator.
7030
7031     The checks for the completeness of the opcode must have already been
7032     done.
7033
7034     """
7035     data = self.in_data
7036
7037     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7038
7039     if self.disk_template in constants.DTS_NET_MIRROR:
7040       self.required_nodes = 2
7041     else:
7042       self.required_nodes = 1
7043     request = {
7044       "type": "allocate",
7045       "name": self.name,
7046       "disk_template": self.disk_template,
7047       "tags": self.tags,
7048       "os": self.os,
7049       "vcpus": self.vcpus,
7050       "memory": self.mem_size,
7051       "disks": self.disks,
7052       "disk_space_total": disk_space,
7053       "nics": self.nics,
7054       "required_nodes": self.required_nodes,
7055       }
7056     data["request"] = request
7057
7058   def _AddRelocateInstance(self):
7059     """Add relocate instance data to allocator structure.
7060
7061     This in combination with _IAllocatorGetClusterData will create the
7062     correct structure needed as input for the allocator.
7063
7064     The checks for the completeness of the opcode must have already been
7065     done.
7066
7067     """
7068     instance = self.lu.cfg.GetInstanceInfo(self.name)
7069     if instance is None:
7070       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7071                                    " IAllocator" % self.name)
7072
7073     if instance.disk_template not in constants.DTS_NET_MIRROR:
7074       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7075
7076     if len(instance.secondary_nodes) != 1:
7077       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7078
7079     self.required_nodes = 1
7080     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7081     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7082
7083     request = {
7084       "type": "relocate",
7085       "name": self.name,
7086       "disk_space_total": disk_space,
7087       "required_nodes": self.required_nodes,
7088       "relocate_from": self.relocate_from,
7089       }
7090     self.in_data["request"] = request
7091
7092   def _BuildInputData(self):
7093     """Build input data structures.
7094
7095     """
7096     self._ComputeClusterData()
7097
7098     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7099       self._AddNewInstance()
7100     else:
7101       self._AddRelocateInstance()
7102
7103     self.in_text = serializer.Dump(self.in_data)
7104
7105   def Run(self, name, validate=True, call_fn=None):
7106     """Run an instance allocator and return the results.
7107
7108     """
7109     if call_fn is None:
7110       call_fn = self.lu.rpc.call_iallocator_runner
7111     data = self.in_text
7112
7113     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7114     result.Raise()
7115
7116     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7117       raise errors.OpExecError("Invalid result from master iallocator runner")
7118
7119     rcode, stdout, stderr, fail = result.data
7120
7121     if rcode == constants.IARUN_NOTFOUND:
7122       raise errors.OpExecError("Can't find allocator '%s'" % name)
7123     elif rcode == constants.IARUN_FAILURE:
7124       raise errors.OpExecError("Instance allocator call failed: %s,"
7125                                " output: %s" % (fail, stdout+stderr))
7126     self.out_text = stdout
7127     if validate:
7128       self._ValidateResult()
7129
7130   def _ValidateResult(self):
7131     """Process the allocator results.
7132
7133     This will process and if successful save the result in
7134     self.out_data and the other parameters.
7135
7136     """
7137     try:
7138       rdict = serializer.Load(self.out_text)
7139     except Exception, err:
7140       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7141
7142     if not isinstance(rdict, dict):
7143       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7144
7145     for key in "success", "info", "nodes":
7146       if key not in rdict:
7147         raise errors.OpExecError("Can't parse iallocator results:"
7148                                  " missing key '%s'" % key)
7149       setattr(self, key, rdict[key])
7150
7151     if not isinstance(rdict["nodes"], list):
7152       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7153                                " is not a list")
7154     self.out_data = rdict
7155
7156
7157 class LUTestAllocator(NoHooksLU):
7158   """Run allocator tests.
7159
7160   This LU runs the allocator tests
7161
7162   """
7163   _OP_REQP = ["direction", "mode", "name"]
7164
7165   def CheckPrereq(self):
7166     """Check prerequisites.
7167
7168     This checks the opcode parameters depending on the director and mode test.
7169
7170     """
7171     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7172       for attr in ["name", "mem_size", "disks", "disk_template",
7173                    "os", "tags", "nics", "vcpus"]:
7174         if not hasattr(self.op, attr):
7175           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7176                                      attr)
7177       iname = self.cfg.ExpandInstanceName(self.op.name)
7178       if iname is not None:
7179         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7180                                    iname)
7181       if not isinstance(self.op.nics, list):
7182         raise errors.OpPrereqError("Invalid parameter 'nics'")
7183       for row in self.op.nics:
7184         if (not isinstance(row, dict) or
7185             "mac" not in row or
7186             "ip" not in row or
7187             "bridge" not in row):
7188           raise errors.OpPrereqError("Invalid contents of the"
7189                                      " 'nics' parameter")
7190       if not isinstance(self.op.disks, list):
7191         raise errors.OpPrereqError("Invalid parameter 'disks'")
7192       for row in self.op.disks:
7193         if (not isinstance(row, dict) or
7194             "size" not in row or
7195             not isinstance(row["size"], int) or
7196             "mode" not in row or
7197             row["mode"] not in ['r', 'w']):
7198           raise errors.OpPrereqError("Invalid contents of the"
7199                                      " 'disks' parameter")
7200       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7201         self.op.hypervisor = self.cfg.GetHypervisorType()
7202     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7203       if not hasattr(self.op, "name"):
7204         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7205       fname = self.cfg.ExpandInstanceName(self.op.name)
7206       if fname is None:
7207         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7208                                    self.op.name)
7209       self.op.name = fname
7210       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7211     else:
7212       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7213                                  self.op.mode)
7214
7215     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7216       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7217         raise errors.OpPrereqError("Missing allocator name")
7218     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7219       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7220                                  self.op.direction)
7221
7222   def Exec(self, feedback_fn):
7223     """Run the allocator test.
7224
7225     """
7226     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7227       ial = IAllocator(self,
7228                        mode=self.op.mode,
7229                        name=self.op.name,
7230                        mem_size=self.op.mem_size,
7231                        disks=self.op.disks,
7232                        disk_template=self.op.disk_template,
7233                        os=self.op.os,
7234                        tags=self.op.tags,
7235                        nics=self.op.nics,
7236                        vcpus=self.op.vcpus,
7237                        hypervisor=self.op.hypervisor,
7238                        )
7239     else:
7240       ial = IAllocator(self,
7241                        mode=self.op.mode,
7242                        name=self.op.name,
7243                        relocate_from=list(self.relocate_from),
7244                        )
7245
7246     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7247       result = ial.in_text
7248     else:
7249       ial.Run(self.op.allocator, validate=False)
7250       result = ial.out_text
7251     return result