Convert the file storage rpcs to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           msg = res.RemoteFailMsg()
1246           if msg:
1247             if res.offline:
1248               # no need to warn or set fail return value
1249               continue
1250             feedback_fn("    Communication failure in hooks execution: %s" %
1251                         msg)
1252             lu_result = 1
1253             continue
1254           for script, hkr, output in res.payload:
1255             if hkr == constants.HKR_FAIL:
1256               # The node header is only shown once, if there are
1257               # failing hooks on that node
1258               if show_node_header:
1259                 feedback_fn("  Node %s:" % node_name)
1260                 show_node_header = False
1261               feedback_fn("    ERROR: Script %s failed, output:" % script)
1262               output = indent_re.sub('      ', output)
1263               feedback_fn("%s" % output)
1264               lu_result = 1
1265
1266       return lu_result
1267
1268
1269 class LUVerifyDisks(NoHooksLU):
1270   """Verifies the cluster disks status.
1271
1272   """
1273   _OP_REQP = []
1274   REQ_BGL = False
1275
1276   def ExpandNames(self):
1277     self.needed_locks = {
1278       locking.LEVEL_NODE: locking.ALL_SET,
1279       locking.LEVEL_INSTANCE: locking.ALL_SET,
1280     }
1281     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1282
1283   def CheckPrereq(self):
1284     """Check prerequisites.
1285
1286     This has no prerequisites.
1287
1288     """
1289     pass
1290
1291   def Exec(self, feedback_fn):
1292     """Verify integrity of cluster disks.
1293
1294     @rtype: tuple of three items
1295     @return: a tuple of (dict of node-to-node_error, list of instances
1296         which need activate-disks, dict of instance: (node, volume) for
1297         missing volumes
1298
1299     """
1300     result = res_nodes, res_instances, res_missing = {}, [], {}
1301
1302     vg_name = self.cfg.GetVGName()
1303     nodes = utils.NiceSort(self.cfg.GetNodeList())
1304     instances = [self.cfg.GetInstanceInfo(name)
1305                  for name in self.cfg.GetInstanceList()]
1306
1307     nv_dict = {}
1308     for inst in instances:
1309       inst_lvs = {}
1310       if (not inst.admin_up or
1311           inst.disk_template not in constants.DTS_NET_MIRROR):
1312         continue
1313       inst.MapLVsByNode(inst_lvs)
1314       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1315       for node, vol_list in inst_lvs.iteritems():
1316         for vol in vol_list:
1317           nv_dict[(node, vol)] = inst
1318
1319     if not nv_dict:
1320       return result
1321
1322     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1323
1324     to_act = set()
1325     for node in nodes:
1326       # node_volume
1327       node_res = node_lvs[node]
1328       if node_res.offline:
1329         continue
1330       msg = node_res.RemoteFailMsg()
1331       if msg:
1332         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1333         res_nodes[node] = msg
1334         continue
1335
1336       lvs = node_res.payload
1337       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1338         inst = nv_dict.pop((node, lv_name), None)
1339         if (not lv_online and inst is not None
1340             and inst.name not in res_instances):
1341           res_instances.append(inst.name)
1342
1343     # any leftover items in nv_dict are missing LVs, let's arrange the
1344     # data better
1345     for key, inst in nv_dict.iteritems():
1346       if inst.name not in res_missing:
1347         res_missing[inst.name] = []
1348       res_missing[inst.name].append(key)
1349
1350     return result
1351
1352
1353 class LURenameCluster(LogicalUnit):
1354   """Rename the cluster.
1355
1356   """
1357   HPATH = "cluster-rename"
1358   HTYPE = constants.HTYPE_CLUSTER
1359   _OP_REQP = ["name"]
1360
1361   def BuildHooksEnv(self):
1362     """Build hooks env.
1363
1364     """
1365     env = {
1366       "OP_TARGET": self.cfg.GetClusterName(),
1367       "NEW_NAME": self.op.name,
1368       }
1369     mn = self.cfg.GetMasterNode()
1370     return env, [mn], [mn]
1371
1372   def CheckPrereq(self):
1373     """Verify that the passed name is a valid one.
1374
1375     """
1376     hostname = utils.HostInfo(self.op.name)
1377
1378     new_name = hostname.name
1379     self.ip = new_ip = hostname.ip
1380     old_name = self.cfg.GetClusterName()
1381     old_ip = self.cfg.GetMasterIP()
1382     if new_name == old_name and new_ip == old_ip:
1383       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384                                  " cluster has changed")
1385     if new_ip != old_ip:
1386       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388                                    " reachable on the network. Aborting." %
1389                                    new_ip)
1390
1391     self.op.name = new_name
1392
1393   def Exec(self, feedback_fn):
1394     """Rename the cluster.
1395
1396     """
1397     clustername = self.op.name
1398     ip = self.ip
1399
1400     # shutdown the master IP
1401     master = self.cfg.GetMasterNode()
1402     result = self.rpc.call_node_stop_master(master, False)
1403     msg = result.RemoteFailMsg()
1404     if msg:
1405       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1406
1407     try:
1408       cluster = self.cfg.GetClusterInfo()
1409       cluster.cluster_name = clustername
1410       cluster.master_ip = ip
1411       self.cfg.Update(cluster)
1412
1413       # update the known hosts file
1414       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1415       node_list = self.cfg.GetNodeList()
1416       try:
1417         node_list.remove(master)
1418       except ValueError:
1419         pass
1420       result = self.rpc.call_upload_file(node_list,
1421                                          constants.SSH_KNOWN_HOSTS_FILE)
1422       for to_node, to_result in result.iteritems():
1423          msg = to_result.RemoteFailMsg()
1424          if msg:
1425            msg = ("Copy of file %s to node %s failed: %s" %
1426                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1427            self.proc.LogWarning(msg)
1428
1429     finally:
1430       result = self.rpc.call_node_start_master(master, False)
1431       msg = result.RemoteFailMsg()
1432       if msg:
1433         self.LogWarning("Could not re-enable the master role on"
1434                         " the master, please restart manually: %s", msg)
1435
1436
1437 def _RecursiveCheckIfLVMBased(disk):
1438   """Check if the given disk or its children are lvm-based.
1439
1440   @type disk: L{objects.Disk}
1441   @param disk: the disk to check
1442   @rtype: booleean
1443   @return: boolean indicating whether a LD_LV dev_type was found or not
1444
1445   """
1446   if disk.children:
1447     for chdisk in disk.children:
1448       if _RecursiveCheckIfLVMBased(chdisk):
1449         return True
1450   return disk.dev_type == constants.LD_LV
1451
1452
1453 class LUSetClusterParams(LogicalUnit):
1454   """Change the parameters of the cluster.
1455
1456   """
1457   HPATH = "cluster-modify"
1458   HTYPE = constants.HTYPE_CLUSTER
1459   _OP_REQP = []
1460   REQ_BGL = False
1461
1462   def CheckArguments(self):
1463     """Check parameters
1464
1465     """
1466     if not hasattr(self.op, "candidate_pool_size"):
1467       self.op.candidate_pool_size = None
1468     if self.op.candidate_pool_size is not None:
1469       try:
1470         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1471       except (ValueError, TypeError), err:
1472         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1473                                    str(err))
1474       if self.op.candidate_pool_size < 1:
1475         raise errors.OpPrereqError("At least one master candidate needed")
1476
1477   def ExpandNames(self):
1478     # FIXME: in the future maybe other cluster params won't require checking on
1479     # all nodes to be modified.
1480     self.needed_locks = {
1481       locking.LEVEL_NODE: locking.ALL_SET,
1482     }
1483     self.share_locks[locking.LEVEL_NODE] = 1
1484
1485   def BuildHooksEnv(self):
1486     """Build hooks env.
1487
1488     """
1489     env = {
1490       "OP_TARGET": self.cfg.GetClusterName(),
1491       "NEW_VG_NAME": self.op.vg_name,
1492       }
1493     mn = self.cfg.GetMasterNode()
1494     return env, [mn], [mn]
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     This checks whether the given params don't conflict and
1500     if the given volume group is valid.
1501
1502     """
1503     if self.op.vg_name is not None and not self.op.vg_name:
1504       instances = self.cfg.GetAllInstancesInfo().values()
1505       for inst in instances:
1506         for disk in inst.disks:
1507           if _RecursiveCheckIfLVMBased(disk):
1508             raise errors.OpPrereqError("Cannot disable lvm storage while"
1509                                        " lvm-based instances exist")
1510
1511     node_list = self.acquired_locks[locking.LEVEL_NODE]
1512
1513     # if vg_name not None, checks given volume group on all nodes
1514     if self.op.vg_name:
1515       vglist = self.rpc.call_vg_list(node_list)
1516       for node in node_list:
1517         msg = vglist[node].RemoteFailMsg()
1518         if msg:
1519           # ignoring down node
1520           self.LogWarning("Error while gathering data on node %s"
1521                           " (ignoring node): %s", node, msg)
1522           continue
1523         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1524                                               self.op.vg_name,
1525                                               constants.MIN_VG_SIZE)
1526         if vgstatus:
1527           raise errors.OpPrereqError("Error on node '%s': %s" %
1528                                      (node, vgstatus))
1529
1530     self.cluster = cluster = self.cfg.GetClusterInfo()
1531     # validate params changes
1532     if self.op.beparams:
1533       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1534       self.new_beparams = objects.FillDict(
1535         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1536
1537     if self.op.nicparams:
1538       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1539       self.new_nicparams = objects.FillDict(
1540         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1541       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1542
1543     # hypervisor list/parameters
1544     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1545     if self.op.hvparams:
1546       if not isinstance(self.op.hvparams, dict):
1547         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1548       for hv_name, hv_dict in self.op.hvparams.items():
1549         if hv_name not in self.new_hvparams:
1550           self.new_hvparams[hv_name] = hv_dict
1551         else:
1552           self.new_hvparams[hv_name].update(hv_dict)
1553
1554     if self.op.enabled_hypervisors is not None:
1555       self.hv_list = self.op.enabled_hypervisors
1556     else:
1557       self.hv_list = cluster.enabled_hypervisors
1558
1559     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1560       # either the enabled list has changed, or the parameters have, validate
1561       for hv_name, hv_params in self.new_hvparams.items():
1562         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1563             (self.op.enabled_hypervisors and
1564              hv_name in self.op.enabled_hypervisors)):
1565           # either this is a new hypervisor, or its parameters have changed
1566           hv_class = hypervisor.GetHypervisor(hv_name)
1567           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1568           hv_class.CheckParameterSyntax(hv_params)
1569           _CheckHVParams(self, node_list, hv_name, hv_params)
1570
1571   def Exec(self, feedback_fn):
1572     """Change the parameters of the cluster.
1573
1574     """
1575     if self.op.vg_name is not None:
1576       new_volume = self.op.vg_name
1577       if not new_volume:
1578         new_volume = None
1579       if new_volume != self.cfg.GetVGName():
1580         self.cfg.SetVGName(new_volume)
1581       else:
1582         feedback_fn("Cluster LVM configuration already in desired"
1583                     " state, not changing")
1584     if self.op.hvparams:
1585       self.cluster.hvparams = self.new_hvparams
1586     if self.op.enabled_hypervisors is not None:
1587       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1588     if self.op.beparams:
1589       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1590     if self.op.nicparams:
1591       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1592
1593     if self.op.candidate_pool_size is not None:
1594       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1595
1596     self.cfg.Update(self.cluster)
1597
1598     # we want to update nodes after the cluster so that if any errors
1599     # happen, we have recorded and saved the cluster info
1600     if self.op.candidate_pool_size is not None:
1601       _AdjustCandidatePool(self)
1602
1603
1604 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1605   """Distribute additional files which are part of the cluster configuration.
1606
1607   ConfigWriter takes care of distributing the config and ssconf files, but
1608   there are more files which should be distributed to all nodes. This function
1609   makes sure those are copied.
1610
1611   @param lu: calling logical unit
1612   @param additional_nodes: list of nodes not in the config to distribute to
1613
1614   """
1615   # 1. Gather target nodes
1616   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1617   dist_nodes = lu.cfg.GetNodeList()
1618   if additional_nodes is not None:
1619     dist_nodes.extend(additional_nodes)
1620   if myself.name in dist_nodes:
1621     dist_nodes.remove(myself.name)
1622   # 2. Gather files to distribute
1623   dist_files = set([constants.ETC_HOSTS,
1624                     constants.SSH_KNOWN_HOSTS_FILE,
1625                     constants.RAPI_CERT_FILE,
1626                     constants.RAPI_USERS_FILE,
1627                    ])
1628
1629   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1630   for hv_name in enabled_hypervisors:
1631     hv_class = hypervisor.GetHypervisor(hv_name)
1632     dist_files.update(hv_class.GetAncillaryFiles())
1633
1634   # 3. Perform the files upload
1635   for fname in dist_files:
1636     if os.path.exists(fname):
1637       result = lu.rpc.call_upload_file(dist_nodes, fname)
1638       for to_node, to_result in result.items():
1639          msg = to_result.RemoteFailMsg()
1640          if msg:
1641            msg = ("Copy of file %s to node %s failed: %s" %
1642                    (fname, to_node, msg))
1643            lu.proc.LogWarning(msg)
1644
1645
1646 class LURedistributeConfig(NoHooksLU):
1647   """Force the redistribution of cluster configuration.
1648
1649   This is a very simple LU.
1650
1651   """
1652   _OP_REQP = []
1653   REQ_BGL = False
1654
1655   def ExpandNames(self):
1656     self.needed_locks = {
1657       locking.LEVEL_NODE: locking.ALL_SET,
1658     }
1659     self.share_locks[locking.LEVEL_NODE] = 1
1660
1661   def CheckPrereq(self):
1662     """Check prerequisites.
1663
1664     """
1665
1666   def Exec(self, feedback_fn):
1667     """Redistribute the configuration.
1668
1669     """
1670     self.cfg.Update(self.cfg.GetClusterInfo())
1671     _RedistributeAncillaryFiles(self)
1672
1673
1674 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1675   """Sleep and poll for an instance's disk to sync.
1676
1677   """
1678   if not instance.disks:
1679     return True
1680
1681   if not oneshot:
1682     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1683
1684   node = instance.primary_node
1685
1686   for dev in instance.disks:
1687     lu.cfg.SetDiskID(dev, node)
1688
1689   retries = 0
1690   while True:
1691     max_time = 0
1692     done = True
1693     cumul_degraded = False
1694     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1695     msg = rstats.RemoteFailMsg()
1696     if msg:
1697       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1698       retries += 1
1699       if retries >= 10:
1700         raise errors.RemoteError("Can't contact node %s for mirror data,"
1701                                  " aborting." % node)
1702       time.sleep(6)
1703       continue
1704     rstats = rstats.payload
1705     retries = 0
1706     for i, mstat in enumerate(rstats):
1707       if mstat is None:
1708         lu.LogWarning("Can't compute data for node %s/%s",
1709                            node, instance.disks[i].iv_name)
1710         continue
1711       # we ignore the ldisk parameter
1712       perc_done, est_time, is_degraded, _ = mstat
1713       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1714       if perc_done is not None:
1715         done = False
1716         if est_time is not None:
1717           rem_time = "%d estimated seconds remaining" % est_time
1718           max_time = est_time
1719         else:
1720           rem_time = "no time estimate"
1721         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1722                         (instance.disks[i].iv_name, perc_done, rem_time))
1723     if done or oneshot:
1724       break
1725
1726     time.sleep(min(60, max_time))
1727
1728   if done:
1729     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1730   return not cumul_degraded
1731
1732
1733 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1734   """Check that mirrors are not degraded.
1735
1736   The ldisk parameter, if True, will change the test from the
1737   is_degraded attribute (which represents overall non-ok status for
1738   the device(s)) to the ldisk (representing the local storage status).
1739
1740   """
1741   lu.cfg.SetDiskID(dev, node)
1742   if ldisk:
1743     idx = 6
1744   else:
1745     idx = 5
1746
1747   result = True
1748   if on_primary or dev.AssembleOnSecondary():
1749     rstats = lu.rpc.call_blockdev_find(node, dev)
1750     msg = rstats.RemoteFailMsg()
1751     if msg:
1752       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1753       result = False
1754     elif not rstats.payload:
1755       lu.LogWarning("Can't find disk on node %s", node)
1756       result = False
1757     else:
1758       result = result and (not rstats.payload[idx])
1759   if dev.children:
1760     for child in dev.children:
1761       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1762
1763   return result
1764
1765
1766 class LUDiagnoseOS(NoHooksLU):
1767   """Logical unit for OS diagnose/query.
1768
1769   """
1770   _OP_REQP = ["output_fields", "names"]
1771   REQ_BGL = False
1772   _FIELDS_STATIC = utils.FieldSet()
1773   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1774
1775   def ExpandNames(self):
1776     if self.op.names:
1777       raise errors.OpPrereqError("Selective OS query not supported")
1778
1779     _CheckOutputFields(static=self._FIELDS_STATIC,
1780                        dynamic=self._FIELDS_DYNAMIC,
1781                        selected=self.op.output_fields)
1782
1783     # Lock all nodes, in shared mode
1784     # Temporary removal of locks, should be reverted later
1785     # TODO: reintroduce locks when they are lighter-weight
1786     self.needed_locks = {}
1787     #self.share_locks[locking.LEVEL_NODE] = 1
1788     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1789
1790   def CheckPrereq(self):
1791     """Check prerequisites.
1792
1793     """
1794
1795   @staticmethod
1796   def _DiagnoseByOS(node_list, rlist):
1797     """Remaps a per-node return list into an a per-os per-node dictionary
1798
1799     @param node_list: a list with the names of all nodes
1800     @param rlist: a map with node names as keys and OS objects as values
1801
1802     @rtype: dict
1803     @return: a dictionary with osnames as keys and as value another map, with
1804         nodes as keys and list of OS objects as values, eg::
1805
1806           {"debian-etch": {"node1": [<object>,...],
1807                            "node2": [<object>,]}
1808           }
1809
1810     """
1811     all_os = {}
1812     # we build here the list of nodes that didn't fail the RPC (at RPC
1813     # level), so that nodes with a non-responding node daemon don't
1814     # make all OSes invalid
1815     good_nodes = [node_name for node_name in rlist
1816                   if not rlist[node_name].RemoteFailMsg()]
1817     for node_name, nr in rlist.items():
1818       if nr.RemoteFailMsg() or not nr.payload:
1819         continue
1820       for os_serialized in nr.payload:
1821         os_obj = objects.OS.FromDict(os_serialized)
1822         if os_obj.name not in all_os:
1823           # build a list of nodes for this os containing empty lists
1824           # for each node in node_list
1825           all_os[os_obj.name] = {}
1826           for nname in good_nodes:
1827             all_os[os_obj.name][nname] = []
1828         all_os[os_obj.name][node_name].append(os_obj)
1829     return all_os
1830
1831   def Exec(self, feedback_fn):
1832     """Compute the list of OSes.
1833
1834     """
1835     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1836     node_data = self.rpc.call_os_diagnose(valid_nodes)
1837     pol = self._DiagnoseByOS(valid_nodes, node_data)
1838     output = []
1839     for os_name, os_data in pol.items():
1840       row = []
1841       for field in self.op.output_fields:
1842         if field == "name":
1843           val = os_name
1844         elif field == "valid":
1845           val = utils.all([osl and osl[0] for osl in os_data.values()])
1846         elif field == "node_status":
1847           val = {}
1848           for node_name, nos_list in os_data.iteritems():
1849             val[node_name] = [(v.status, v.path) for v in nos_list]
1850         else:
1851           raise errors.ParameterError(field)
1852         row.append(val)
1853       output.append(row)
1854
1855     return output
1856
1857
1858 class LURemoveNode(LogicalUnit):
1859   """Logical unit for removing a node.
1860
1861   """
1862   HPATH = "node-remove"
1863   HTYPE = constants.HTYPE_NODE
1864   _OP_REQP = ["node_name"]
1865
1866   def BuildHooksEnv(self):
1867     """Build hooks env.
1868
1869     This doesn't run on the target node in the pre phase as a failed
1870     node would then be impossible to remove.
1871
1872     """
1873     env = {
1874       "OP_TARGET": self.op.node_name,
1875       "NODE_NAME": self.op.node_name,
1876       }
1877     all_nodes = self.cfg.GetNodeList()
1878     all_nodes.remove(self.op.node_name)
1879     return env, all_nodes, all_nodes
1880
1881   def CheckPrereq(self):
1882     """Check prerequisites.
1883
1884     This checks:
1885      - the node exists in the configuration
1886      - it does not have primary or secondary instances
1887      - it's not the master
1888
1889     Any errors are signalled by raising errors.OpPrereqError.
1890
1891     """
1892     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1893     if node is None:
1894       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1895
1896     instance_list = self.cfg.GetInstanceList()
1897
1898     masternode = self.cfg.GetMasterNode()
1899     if node.name == masternode:
1900       raise errors.OpPrereqError("Node is the master node,"
1901                                  " you need to failover first.")
1902
1903     for instance_name in instance_list:
1904       instance = self.cfg.GetInstanceInfo(instance_name)
1905       if node.name in instance.all_nodes:
1906         raise errors.OpPrereqError("Instance %s is still running on the node,"
1907                                    " please remove first." % instance_name)
1908     self.op.node_name = node.name
1909     self.node = node
1910
1911   def Exec(self, feedback_fn):
1912     """Removes the node from the cluster.
1913
1914     """
1915     node = self.node
1916     logging.info("Stopping the node daemon and removing configs from node %s",
1917                  node.name)
1918
1919     self.context.RemoveNode(node.name)
1920
1921     result = self.rpc.call_node_leave_cluster(node.name)
1922     msg = result.RemoteFailMsg()
1923     if msg:
1924       self.LogWarning("Errors encountered on the remote node while leaving"
1925                       " the cluster: %s", msg)
1926
1927     # Promote nodes to master candidate as needed
1928     _AdjustCandidatePool(self)
1929
1930
1931 class LUQueryNodes(NoHooksLU):
1932   """Logical unit for querying nodes.
1933
1934   """
1935   _OP_REQP = ["output_fields", "names", "use_locking"]
1936   REQ_BGL = False
1937   _FIELDS_DYNAMIC = utils.FieldSet(
1938     "dtotal", "dfree",
1939     "mtotal", "mnode", "mfree",
1940     "bootid",
1941     "ctotal", "cnodes", "csockets",
1942     )
1943
1944   _FIELDS_STATIC = utils.FieldSet(
1945     "name", "pinst_cnt", "sinst_cnt",
1946     "pinst_list", "sinst_list",
1947     "pip", "sip", "tags",
1948     "serial_no",
1949     "master_candidate",
1950     "master",
1951     "offline",
1952     "drained",
1953     )
1954
1955   def ExpandNames(self):
1956     _CheckOutputFields(static=self._FIELDS_STATIC,
1957                        dynamic=self._FIELDS_DYNAMIC,
1958                        selected=self.op.output_fields)
1959
1960     self.needed_locks = {}
1961     self.share_locks[locking.LEVEL_NODE] = 1
1962
1963     if self.op.names:
1964       self.wanted = _GetWantedNodes(self, self.op.names)
1965     else:
1966       self.wanted = locking.ALL_SET
1967
1968     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1969     self.do_locking = self.do_node_query and self.op.use_locking
1970     if self.do_locking:
1971       # if we don't request only static fields, we need to lock the nodes
1972       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1973
1974
1975   def CheckPrereq(self):
1976     """Check prerequisites.
1977
1978     """
1979     # The validation of the node list is done in the _GetWantedNodes,
1980     # if non empty, and if empty, there's no validation to do
1981     pass
1982
1983   def Exec(self, feedback_fn):
1984     """Computes the list of nodes and their attributes.
1985
1986     """
1987     all_info = self.cfg.GetAllNodesInfo()
1988     if self.do_locking:
1989       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1990     elif self.wanted != locking.ALL_SET:
1991       nodenames = self.wanted
1992       missing = set(nodenames).difference(all_info.keys())
1993       if missing:
1994         raise errors.OpExecError(
1995           "Some nodes were removed before retrieving their data: %s" % missing)
1996     else:
1997       nodenames = all_info.keys()
1998
1999     nodenames = utils.NiceSort(nodenames)
2000     nodelist = [all_info[name] for name in nodenames]
2001
2002     # begin data gathering
2003
2004     if self.do_node_query:
2005       live_data = {}
2006       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2007                                           self.cfg.GetHypervisorType())
2008       for name in nodenames:
2009         nodeinfo = node_data[name]
2010         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2011           nodeinfo = nodeinfo.payload
2012           fn = utils.TryConvert
2013           live_data[name] = {
2014             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2015             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2016             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2017             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2018             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2019             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2020             "bootid": nodeinfo.get('bootid', None),
2021             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2022             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2023             }
2024         else:
2025           live_data[name] = {}
2026     else:
2027       live_data = dict.fromkeys(nodenames, {})
2028
2029     node_to_primary = dict([(name, set()) for name in nodenames])
2030     node_to_secondary = dict([(name, set()) for name in nodenames])
2031
2032     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2033                              "sinst_cnt", "sinst_list"))
2034     if inst_fields & frozenset(self.op.output_fields):
2035       instancelist = self.cfg.GetInstanceList()
2036
2037       for instance_name in instancelist:
2038         inst = self.cfg.GetInstanceInfo(instance_name)
2039         if inst.primary_node in node_to_primary:
2040           node_to_primary[inst.primary_node].add(inst.name)
2041         for secnode in inst.secondary_nodes:
2042           if secnode in node_to_secondary:
2043             node_to_secondary[secnode].add(inst.name)
2044
2045     master_node = self.cfg.GetMasterNode()
2046
2047     # end data gathering
2048
2049     output = []
2050     for node in nodelist:
2051       node_output = []
2052       for field in self.op.output_fields:
2053         if field == "name":
2054           val = node.name
2055         elif field == "pinst_list":
2056           val = list(node_to_primary[node.name])
2057         elif field == "sinst_list":
2058           val = list(node_to_secondary[node.name])
2059         elif field == "pinst_cnt":
2060           val = len(node_to_primary[node.name])
2061         elif field == "sinst_cnt":
2062           val = len(node_to_secondary[node.name])
2063         elif field == "pip":
2064           val = node.primary_ip
2065         elif field == "sip":
2066           val = node.secondary_ip
2067         elif field == "tags":
2068           val = list(node.GetTags())
2069         elif field == "serial_no":
2070           val = node.serial_no
2071         elif field == "master_candidate":
2072           val = node.master_candidate
2073         elif field == "master":
2074           val = node.name == master_node
2075         elif field == "offline":
2076           val = node.offline
2077         elif field == "drained":
2078           val = node.drained
2079         elif self._FIELDS_DYNAMIC.Matches(field):
2080           val = live_data[node.name].get(field, None)
2081         else:
2082           raise errors.ParameterError(field)
2083         node_output.append(val)
2084       output.append(node_output)
2085
2086     return output
2087
2088
2089 class LUQueryNodeVolumes(NoHooksLU):
2090   """Logical unit for getting volumes on node(s).
2091
2092   """
2093   _OP_REQP = ["nodes", "output_fields"]
2094   REQ_BGL = False
2095   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2096   _FIELDS_STATIC = utils.FieldSet("node")
2097
2098   def ExpandNames(self):
2099     _CheckOutputFields(static=self._FIELDS_STATIC,
2100                        dynamic=self._FIELDS_DYNAMIC,
2101                        selected=self.op.output_fields)
2102
2103     self.needed_locks = {}
2104     self.share_locks[locking.LEVEL_NODE] = 1
2105     if not self.op.nodes:
2106       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2107     else:
2108       self.needed_locks[locking.LEVEL_NODE] = \
2109         _GetWantedNodes(self, self.op.nodes)
2110
2111   def CheckPrereq(self):
2112     """Check prerequisites.
2113
2114     This checks that the fields required are valid output fields.
2115
2116     """
2117     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2118
2119   def Exec(self, feedback_fn):
2120     """Computes the list of nodes and their attributes.
2121
2122     """
2123     nodenames = self.nodes
2124     volumes = self.rpc.call_node_volumes(nodenames)
2125
2126     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2127              in self.cfg.GetInstanceList()]
2128
2129     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2130
2131     output = []
2132     for node in nodenames:
2133       nresult = volumes[node]
2134       if nresult.offline:
2135         continue
2136       msg = nresult.RemoteFailMsg()
2137       if msg:
2138         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2139         continue
2140
2141       node_vols = nresult.payload[:]
2142       node_vols.sort(key=lambda vol: vol['dev'])
2143
2144       for vol in node_vols:
2145         node_output = []
2146         for field in self.op.output_fields:
2147           if field == "node":
2148             val = node
2149           elif field == "phys":
2150             val = vol['dev']
2151           elif field == "vg":
2152             val = vol['vg']
2153           elif field == "name":
2154             val = vol['name']
2155           elif field == "size":
2156             val = int(float(vol['size']))
2157           elif field == "instance":
2158             for inst in ilist:
2159               if node not in lv_by_node[inst]:
2160                 continue
2161               if vol['name'] in lv_by_node[inst][node]:
2162                 val = inst.name
2163                 break
2164             else:
2165               val = '-'
2166           else:
2167             raise errors.ParameterError(field)
2168           node_output.append(str(val))
2169
2170         output.append(node_output)
2171
2172     return output
2173
2174
2175 class LUAddNode(LogicalUnit):
2176   """Logical unit for adding node to the cluster.
2177
2178   """
2179   HPATH = "node-add"
2180   HTYPE = constants.HTYPE_NODE
2181   _OP_REQP = ["node_name"]
2182
2183   def BuildHooksEnv(self):
2184     """Build hooks env.
2185
2186     This will run on all nodes before, and on all nodes + the new node after.
2187
2188     """
2189     env = {
2190       "OP_TARGET": self.op.node_name,
2191       "NODE_NAME": self.op.node_name,
2192       "NODE_PIP": self.op.primary_ip,
2193       "NODE_SIP": self.op.secondary_ip,
2194       }
2195     nodes_0 = self.cfg.GetNodeList()
2196     nodes_1 = nodes_0 + [self.op.node_name, ]
2197     return env, nodes_0, nodes_1
2198
2199   def CheckPrereq(self):
2200     """Check prerequisites.
2201
2202     This checks:
2203      - the new node is not already in the config
2204      - it is resolvable
2205      - its parameters (single/dual homed) matches the cluster
2206
2207     Any errors are signalled by raising errors.OpPrereqError.
2208
2209     """
2210     node_name = self.op.node_name
2211     cfg = self.cfg
2212
2213     dns_data = utils.HostInfo(node_name)
2214
2215     node = dns_data.name
2216     primary_ip = self.op.primary_ip = dns_data.ip
2217     secondary_ip = getattr(self.op, "secondary_ip", None)
2218     if secondary_ip is None:
2219       secondary_ip = primary_ip
2220     if not utils.IsValidIP(secondary_ip):
2221       raise errors.OpPrereqError("Invalid secondary IP given")
2222     self.op.secondary_ip = secondary_ip
2223
2224     node_list = cfg.GetNodeList()
2225     if not self.op.readd and node in node_list:
2226       raise errors.OpPrereqError("Node %s is already in the configuration" %
2227                                  node)
2228     elif self.op.readd and node not in node_list:
2229       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2230
2231     for existing_node_name in node_list:
2232       existing_node = cfg.GetNodeInfo(existing_node_name)
2233
2234       if self.op.readd and node == existing_node_name:
2235         if (existing_node.primary_ip != primary_ip or
2236             existing_node.secondary_ip != secondary_ip):
2237           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2238                                      " address configuration as before")
2239         continue
2240
2241       if (existing_node.primary_ip == primary_ip or
2242           existing_node.secondary_ip == primary_ip or
2243           existing_node.primary_ip == secondary_ip or
2244           existing_node.secondary_ip == secondary_ip):
2245         raise errors.OpPrereqError("New node ip address(es) conflict with"
2246                                    " existing node %s" % existing_node.name)
2247
2248     # check that the type of the node (single versus dual homed) is the
2249     # same as for the master
2250     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2251     master_singlehomed = myself.secondary_ip == myself.primary_ip
2252     newbie_singlehomed = secondary_ip == primary_ip
2253     if master_singlehomed != newbie_singlehomed:
2254       if master_singlehomed:
2255         raise errors.OpPrereqError("The master has no private ip but the"
2256                                    " new node has one")
2257       else:
2258         raise errors.OpPrereqError("The master has a private ip but the"
2259                                    " new node doesn't have one")
2260
2261     # checks reachablity
2262     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2263       raise errors.OpPrereqError("Node not reachable by ping")
2264
2265     if not newbie_singlehomed:
2266       # check reachability from my secondary ip to newbie's secondary ip
2267       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2268                            source=myself.secondary_ip):
2269         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2270                                    " based ping to noded port")
2271
2272     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2273     mc_now, _ = self.cfg.GetMasterCandidateStats()
2274     master_candidate = mc_now < cp_size
2275
2276     self.new_node = objects.Node(name=node,
2277                                  primary_ip=primary_ip,
2278                                  secondary_ip=secondary_ip,
2279                                  master_candidate=master_candidate,
2280                                  offline=False, drained=False)
2281
2282   def Exec(self, feedback_fn):
2283     """Adds the new node to the cluster.
2284
2285     """
2286     new_node = self.new_node
2287     node = new_node.name
2288
2289     # check connectivity
2290     result = self.rpc.call_version([node])[node]
2291     msg = result.RemoteFailMsg()
2292     if msg:
2293       raise errors.OpExecError("Can't get version information from"
2294                                " node %s: %s" % (node, msg))
2295     if constants.PROTOCOL_VERSION == result.payload:
2296       logging.info("Communication to node %s fine, sw version %s match",
2297                    node, result.payload)
2298     else:
2299       raise errors.OpExecError("Version mismatch master version %s,"
2300                                " node version %s" %
2301                                (constants.PROTOCOL_VERSION, result.payload))
2302
2303     # setup ssh on node
2304     logging.info("Copy ssh key to node %s", node)
2305     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2306     keyarray = []
2307     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2308                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2309                 priv_key, pub_key]
2310
2311     for i in keyfiles:
2312       f = open(i, 'r')
2313       try:
2314         keyarray.append(f.read())
2315       finally:
2316         f.close()
2317
2318     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2319                                     keyarray[2],
2320                                     keyarray[3], keyarray[4], keyarray[5])
2321
2322     msg = result.RemoteFailMsg()
2323     if msg:
2324       raise errors.OpExecError("Cannot transfer ssh keys to the"
2325                                " new node: %s" % msg)
2326
2327     # Add node to our /etc/hosts, and add key to known_hosts
2328     if self.cfg.GetClusterInfo().modify_etc_hosts:
2329       utils.AddHostToEtcHosts(new_node.name)
2330
2331     if new_node.secondary_ip != new_node.primary_ip:
2332       result = self.rpc.call_node_has_ip_address(new_node.name,
2333                                                  new_node.secondary_ip)
2334       msg = result.RemoteFailMsg()
2335       if msg:
2336         raise errors.OpPrereqError("Failure checking secondary ip"
2337                                    " on node %s: %s" % (new_node.name, msg))
2338       if not result.payload:
2339         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2340                                  " you gave (%s). Please fix and re-run this"
2341                                  " command." % new_node.secondary_ip)
2342
2343     node_verify_list = [self.cfg.GetMasterNode()]
2344     node_verify_param = {
2345       'nodelist': [node],
2346       # TODO: do a node-net-test as well?
2347     }
2348
2349     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2350                                        self.cfg.GetClusterName())
2351     for verifier in node_verify_list:
2352       msg = result[verifier].RemoteFailMsg()
2353       if msg:
2354         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2355                                  (verifier, msg))
2356       nl_payload = result[verifier].payload['nodelist']
2357       if nl_payload:
2358         for failed in nl_payload:
2359           feedback_fn("ssh/hostname verification failed %s -> %s" %
2360                       (verifier, nl_payload[failed]))
2361         raise errors.OpExecError("ssh/hostname verification failed.")
2362
2363     if self.op.readd:
2364       _RedistributeAncillaryFiles(self)
2365       self.context.ReaddNode(new_node)
2366     else:
2367       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2368       self.context.AddNode(new_node)
2369
2370
2371 class LUSetNodeParams(LogicalUnit):
2372   """Modifies the parameters of a node.
2373
2374   """
2375   HPATH = "node-modify"
2376   HTYPE = constants.HTYPE_NODE
2377   _OP_REQP = ["node_name"]
2378   REQ_BGL = False
2379
2380   def CheckArguments(self):
2381     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2382     if node_name is None:
2383       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2384     self.op.node_name = node_name
2385     _CheckBooleanOpField(self.op, 'master_candidate')
2386     _CheckBooleanOpField(self.op, 'offline')
2387     _CheckBooleanOpField(self.op, 'drained')
2388     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2389     if all_mods.count(None) == 3:
2390       raise errors.OpPrereqError("Please pass at least one modification")
2391     if all_mods.count(True) > 1:
2392       raise errors.OpPrereqError("Can't set the node into more than one"
2393                                  " state at the same time")
2394
2395   def ExpandNames(self):
2396     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2397
2398   def BuildHooksEnv(self):
2399     """Build hooks env.
2400
2401     This runs on the master node.
2402
2403     """
2404     env = {
2405       "OP_TARGET": self.op.node_name,
2406       "MASTER_CANDIDATE": str(self.op.master_candidate),
2407       "OFFLINE": str(self.op.offline),
2408       "DRAINED": str(self.op.drained),
2409       }
2410     nl = [self.cfg.GetMasterNode(),
2411           self.op.node_name]
2412     return env, nl, nl
2413
2414   def CheckPrereq(self):
2415     """Check prerequisites.
2416
2417     This only checks the instance list against the existing names.
2418
2419     """
2420     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2421
2422     if ((self.op.master_candidate == False or self.op.offline == True or
2423          self.op.drained == True) and node.master_candidate):
2424       # we will demote the node from master_candidate
2425       if self.op.node_name == self.cfg.GetMasterNode():
2426         raise errors.OpPrereqError("The master node has to be a"
2427                                    " master candidate, online and not drained")
2428       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2429       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2430       if num_candidates <= cp_size:
2431         msg = ("Not enough master candidates (desired"
2432                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2433         if self.op.force:
2434           self.LogWarning(msg)
2435         else:
2436           raise errors.OpPrereqError(msg)
2437
2438     if (self.op.master_candidate == True and
2439         ((node.offline and not self.op.offline == False) or
2440          (node.drained and not self.op.drained == False))):
2441       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2442                                  " to master_candidate" % node.name)
2443
2444     return
2445
2446   def Exec(self, feedback_fn):
2447     """Modifies a node.
2448
2449     """
2450     node = self.node
2451
2452     result = []
2453     changed_mc = False
2454
2455     if self.op.offline is not None:
2456       node.offline = self.op.offline
2457       result.append(("offline", str(self.op.offline)))
2458       if self.op.offline == True:
2459         if node.master_candidate:
2460           node.master_candidate = False
2461           changed_mc = True
2462           result.append(("master_candidate", "auto-demotion due to offline"))
2463         if node.drained:
2464           node.drained = False
2465           result.append(("drained", "clear drained status due to offline"))
2466
2467     if self.op.master_candidate is not None:
2468       node.master_candidate = self.op.master_candidate
2469       changed_mc = True
2470       result.append(("master_candidate", str(self.op.master_candidate)))
2471       if self.op.master_candidate == False:
2472         rrc = self.rpc.call_node_demote_from_mc(node.name)
2473         msg = rrc.RemoteFailMsg()
2474         if msg:
2475           self.LogWarning("Node failed to demote itself: %s" % msg)
2476
2477     if self.op.drained is not None:
2478       node.drained = self.op.drained
2479       result.append(("drained", str(self.op.drained)))
2480       if self.op.drained == True:
2481         if node.master_candidate:
2482           node.master_candidate = False
2483           changed_mc = True
2484           result.append(("master_candidate", "auto-demotion due to drain"))
2485         if node.offline:
2486           node.offline = False
2487           result.append(("offline", "clear offline status due to drain"))
2488
2489     # this will trigger configuration file update, if needed
2490     self.cfg.Update(node)
2491     # this will trigger job queue propagation or cleanup
2492     if changed_mc:
2493       self.context.ReaddNode(node)
2494
2495     return result
2496
2497
2498 class LUPowercycleNode(NoHooksLU):
2499   """Powercycles a node.
2500
2501   """
2502   _OP_REQP = ["node_name", "force"]
2503   REQ_BGL = False
2504
2505   def CheckArguments(self):
2506     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2507     if node_name is None:
2508       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2509     self.op.node_name = node_name
2510     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2511       raise errors.OpPrereqError("The node is the master and the force"
2512                                  " parameter was not set")
2513
2514   def ExpandNames(self):
2515     """Locking for PowercycleNode.
2516
2517     This is a last-resource option and shouldn't block on other
2518     jobs. Therefore, we grab no locks.
2519
2520     """
2521     self.needed_locks = {}
2522
2523   def CheckPrereq(self):
2524     """Check prerequisites.
2525
2526     This LU has no prereqs.
2527
2528     """
2529     pass
2530
2531   def Exec(self, feedback_fn):
2532     """Reboots a node.
2533
2534     """
2535     result = self.rpc.call_node_powercycle(self.op.node_name,
2536                                            self.cfg.GetHypervisorType())
2537     msg = result.RemoteFailMsg()
2538     if msg:
2539       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2540     return result.payload
2541
2542
2543 class LUQueryClusterInfo(NoHooksLU):
2544   """Query cluster configuration.
2545
2546   """
2547   _OP_REQP = []
2548   REQ_BGL = False
2549
2550   def ExpandNames(self):
2551     self.needed_locks = {}
2552
2553   def CheckPrereq(self):
2554     """No prerequsites needed for this LU.
2555
2556     """
2557     pass
2558
2559   def Exec(self, feedback_fn):
2560     """Return cluster config.
2561
2562     """
2563     cluster = self.cfg.GetClusterInfo()
2564     result = {
2565       "software_version": constants.RELEASE_VERSION,
2566       "protocol_version": constants.PROTOCOL_VERSION,
2567       "config_version": constants.CONFIG_VERSION,
2568       "os_api_version": constants.OS_API_VERSION,
2569       "export_version": constants.EXPORT_VERSION,
2570       "architecture": (platform.architecture()[0], platform.machine()),
2571       "name": cluster.cluster_name,
2572       "master": cluster.master_node,
2573       "default_hypervisor": cluster.default_hypervisor,
2574       "enabled_hypervisors": cluster.enabled_hypervisors,
2575       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2576                         for hypervisor in cluster.enabled_hypervisors]),
2577       "beparams": cluster.beparams,
2578       "nicparams": cluster.nicparams,
2579       "candidate_pool_size": cluster.candidate_pool_size,
2580       "master_netdev": cluster.master_netdev,
2581       "volume_group_name": cluster.volume_group_name,
2582       "file_storage_dir": cluster.file_storage_dir,
2583       }
2584
2585     return result
2586
2587
2588 class LUQueryConfigValues(NoHooksLU):
2589   """Return configuration values.
2590
2591   """
2592   _OP_REQP = []
2593   REQ_BGL = False
2594   _FIELDS_DYNAMIC = utils.FieldSet()
2595   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2596
2597   def ExpandNames(self):
2598     self.needed_locks = {}
2599
2600     _CheckOutputFields(static=self._FIELDS_STATIC,
2601                        dynamic=self._FIELDS_DYNAMIC,
2602                        selected=self.op.output_fields)
2603
2604   def CheckPrereq(self):
2605     """No prerequisites.
2606
2607     """
2608     pass
2609
2610   def Exec(self, feedback_fn):
2611     """Dump a representation of the cluster config to the standard output.
2612
2613     """
2614     values = []
2615     for field in self.op.output_fields:
2616       if field == "cluster_name":
2617         entry = self.cfg.GetClusterName()
2618       elif field == "master_node":
2619         entry = self.cfg.GetMasterNode()
2620       elif field == "drain_flag":
2621         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2622       else:
2623         raise errors.ParameterError(field)
2624       values.append(entry)
2625     return values
2626
2627
2628 class LUActivateInstanceDisks(NoHooksLU):
2629   """Bring up an instance's disks.
2630
2631   """
2632   _OP_REQP = ["instance_name"]
2633   REQ_BGL = False
2634
2635   def ExpandNames(self):
2636     self._ExpandAndLockInstance()
2637     self.needed_locks[locking.LEVEL_NODE] = []
2638     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2639
2640   def DeclareLocks(self, level):
2641     if level == locking.LEVEL_NODE:
2642       self._LockInstancesNodes()
2643
2644   def CheckPrereq(self):
2645     """Check prerequisites.
2646
2647     This checks that the instance is in the cluster.
2648
2649     """
2650     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2651     assert self.instance is not None, \
2652       "Cannot retrieve locked instance %s" % self.op.instance_name
2653     _CheckNodeOnline(self, self.instance.primary_node)
2654
2655   def Exec(self, feedback_fn):
2656     """Activate the disks.
2657
2658     """
2659     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2660     if not disks_ok:
2661       raise errors.OpExecError("Cannot activate block devices")
2662
2663     return disks_info
2664
2665
2666 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2667   """Prepare the block devices for an instance.
2668
2669   This sets up the block devices on all nodes.
2670
2671   @type lu: L{LogicalUnit}
2672   @param lu: the logical unit on whose behalf we execute
2673   @type instance: L{objects.Instance}
2674   @param instance: the instance for whose disks we assemble
2675   @type ignore_secondaries: boolean
2676   @param ignore_secondaries: if true, errors on secondary nodes
2677       won't result in an error return from the function
2678   @return: False if the operation failed, otherwise a list of
2679       (host, instance_visible_name, node_visible_name)
2680       with the mapping from node devices to instance devices
2681
2682   """
2683   device_info = []
2684   disks_ok = True
2685   iname = instance.name
2686   # With the two passes mechanism we try to reduce the window of
2687   # opportunity for the race condition of switching DRBD to primary
2688   # before handshaking occured, but we do not eliminate it
2689
2690   # The proper fix would be to wait (with some limits) until the
2691   # connection has been made and drbd transitions from WFConnection
2692   # into any other network-connected state (Connected, SyncTarget,
2693   # SyncSource, etc.)
2694
2695   # 1st pass, assemble on all nodes in secondary mode
2696   for inst_disk in instance.disks:
2697     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2698       lu.cfg.SetDiskID(node_disk, node)
2699       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2700       msg = result.RemoteFailMsg()
2701       if msg:
2702         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2703                            " (is_primary=False, pass=1): %s",
2704                            inst_disk.iv_name, node, msg)
2705         if not ignore_secondaries:
2706           disks_ok = False
2707
2708   # FIXME: race condition on drbd migration to primary
2709
2710   # 2nd pass, do only the primary node
2711   for inst_disk in instance.disks:
2712     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2713       if node != instance.primary_node:
2714         continue
2715       lu.cfg.SetDiskID(node_disk, node)
2716       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2717       msg = result.RemoteFailMsg()
2718       if msg:
2719         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2720                            " (is_primary=True, pass=2): %s",
2721                            inst_disk.iv_name, node, msg)
2722         disks_ok = False
2723     device_info.append((instance.primary_node, inst_disk.iv_name,
2724                         result.payload))
2725
2726   # leave the disks configured for the primary node
2727   # this is a workaround that would be fixed better by
2728   # improving the logical/physical id handling
2729   for disk in instance.disks:
2730     lu.cfg.SetDiskID(disk, instance.primary_node)
2731
2732   return disks_ok, device_info
2733
2734
2735 def _StartInstanceDisks(lu, instance, force):
2736   """Start the disks of an instance.
2737
2738   """
2739   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2740                                            ignore_secondaries=force)
2741   if not disks_ok:
2742     _ShutdownInstanceDisks(lu, instance)
2743     if force is not None and not force:
2744       lu.proc.LogWarning("", hint="If the message above refers to a"
2745                          " secondary node,"
2746                          " you can retry the operation using '--force'.")
2747     raise errors.OpExecError("Disk consistency error")
2748
2749
2750 class LUDeactivateInstanceDisks(NoHooksLU):
2751   """Shutdown an instance's disks.
2752
2753   """
2754   _OP_REQP = ["instance_name"]
2755   REQ_BGL = False
2756
2757   def ExpandNames(self):
2758     self._ExpandAndLockInstance()
2759     self.needed_locks[locking.LEVEL_NODE] = []
2760     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2761
2762   def DeclareLocks(self, level):
2763     if level == locking.LEVEL_NODE:
2764       self._LockInstancesNodes()
2765
2766   def CheckPrereq(self):
2767     """Check prerequisites.
2768
2769     This checks that the instance is in the cluster.
2770
2771     """
2772     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2773     assert self.instance is not None, \
2774       "Cannot retrieve locked instance %s" % self.op.instance_name
2775
2776   def Exec(self, feedback_fn):
2777     """Deactivate the disks
2778
2779     """
2780     instance = self.instance
2781     _SafeShutdownInstanceDisks(self, instance)
2782
2783
2784 def _SafeShutdownInstanceDisks(lu, instance):
2785   """Shutdown block devices of an instance.
2786
2787   This function checks if an instance is running, before calling
2788   _ShutdownInstanceDisks.
2789
2790   """
2791   pnode = instance.primary_node
2792   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2793   ins_l = ins_l[pnode]
2794   msg = ins_l.RemoteFailMsg()
2795   if msg:
2796     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2797
2798   if instance.name in ins_l.payload:
2799     raise errors.OpExecError("Instance is running, can't shutdown"
2800                              " block devices.")
2801
2802   _ShutdownInstanceDisks(lu, instance)
2803
2804
2805 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2806   """Shutdown block devices of an instance.
2807
2808   This does the shutdown on all nodes of the instance.
2809
2810   If the ignore_primary is false, errors on the primary node are
2811   ignored.
2812
2813   """
2814   all_result = True
2815   for disk in instance.disks:
2816     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2817       lu.cfg.SetDiskID(top_disk, node)
2818       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2819       msg = result.RemoteFailMsg()
2820       if msg:
2821         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2822                       disk.iv_name, node, msg)
2823         if not ignore_primary or node != instance.primary_node:
2824           all_result = False
2825   return all_result
2826
2827
2828 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2829   """Checks if a node has enough free memory.
2830
2831   This function check if a given node has the needed amount of free
2832   memory. In case the node has less memory or we cannot get the
2833   information from the node, this function raise an OpPrereqError
2834   exception.
2835
2836   @type lu: C{LogicalUnit}
2837   @param lu: a logical unit from which we get configuration data
2838   @type node: C{str}
2839   @param node: the node to check
2840   @type reason: C{str}
2841   @param reason: string to use in the error message
2842   @type requested: C{int}
2843   @param requested: the amount of memory in MiB to check for
2844   @type hypervisor_name: C{str}
2845   @param hypervisor_name: the hypervisor to ask for memory stats
2846   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2847       we cannot check the node
2848
2849   """
2850   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2851   msg = nodeinfo[node].RemoteFailMsg()
2852   if msg:
2853     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2854   free_mem = nodeinfo[node].payload.get('memory_free', None)
2855   if not isinstance(free_mem, int):
2856     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2857                                " was '%s'" % (node, free_mem))
2858   if requested > free_mem:
2859     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2860                                " needed %s MiB, available %s MiB" %
2861                                (node, reason, requested, free_mem))
2862
2863
2864 class LUStartupInstance(LogicalUnit):
2865   """Starts an instance.
2866
2867   """
2868   HPATH = "instance-start"
2869   HTYPE = constants.HTYPE_INSTANCE
2870   _OP_REQP = ["instance_name", "force"]
2871   REQ_BGL = False
2872
2873   def ExpandNames(self):
2874     self._ExpandAndLockInstance()
2875
2876   def BuildHooksEnv(self):
2877     """Build hooks env.
2878
2879     This runs on master, primary and secondary nodes of the instance.
2880
2881     """
2882     env = {
2883       "FORCE": self.op.force,
2884       }
2885     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2886     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2887     return env, nl, nl
2888
2889   def CheckPrereq(self):
2890     """Check prerequisites.
2891
2892     This checks that the instance is in the cluster.
2893
2894     """
2895     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2896     assert self.instance is not None, \
2897       "Cannot retrieve locked instance %s" % self.op.instance_name
2898
2899     # extra beparams
2900     self.beparams = getattr(self.op, "beparams", {})
2901     if self.beparams:
2902       if not isinstance(self.beparams, dict):
2903         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2904                                    " dict" % (type(self.beparams), ))
2905       # fill the beparams dict
2906       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2907       self.op.beparams = self.beparams
2908
2909     # extra hvparams
2910     self.hvparams = getattr(self.op, "hvparams", {})
2911     if self.hvparams:
2912       if not isinstance(self.hvparams, dict):
2913         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2914                                    " dict" % (type(self.hvparams), ))
2915
2916       # check hypervisor parameter syntax (locally)
2917       cluster = self.cfg.GetClusterInfo()
2918       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2919       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2920                                     instance.hvparams)
2921       filled_hvp.update(self.hvparams)
2922       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2923       hv_type.CheckParameterSyntax(filled_hvp)
2924       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2925       self.op.hvparams = self.hvparams
2926
2927     _CheckNodeOnline(self, instance.primary_node)
2928
2929     bep = self.cfg.GetClusterInfo().FillBE(instance)
2930     # check bridges existance
2931     _CheckInstanceBridgesExist(self, instance)
2932
2933     remote_info = self.rpc.call_instance_info(instance.primary_node,
2934                                               instance.name,
2935                                               instance.hypervisor)
2936     msg = remote_info.RemoteFailMsg()
2937     if msg:
2938       raise errors.OpPrereqError("Error checking node %s: %s" %
2939                                  (instance.primary_node, msg))
2940     if not remote_info.payload: # not running already
2941       _CheckNodeFreeMemory(self, instance.primary_node,
2942                            "starting instance %s" % instance.name,
2943                            bep[constants.BE_MEMORY], instance.hypervisor)
2944
2945   def Exec(self, feedback_fn):
2946     """Start the instance.
2947
2948     """
2949     instance = self.instance
2950     force = self.op.force
2951
2952     self.cfg.MarkInstanceUp(instance.name)
2953
2954     node_current = instance.primary_node
2955
2956     _StartInstanceDisks(self, instance, force)
2957
2958     result = self.rpc.call_instance_start(node_current, instance,
2959                                           self.hvparams, self.beparams)
2960     msg = result.RemoteFailMsg()
2961     if msg:
2962       _ShutdownInstanceDisks(self, instance)
2963       raise errors.OpExecError("Could not start instance: %s" % msg)
2964
2965
2966 class LURebootInstance(LogicalUnit):
2967   """Reboot an instance.
2968
2969   """
2970   HPATH = "instance-reboot"
2971   HTYPE = constants.HTYPE_INSTANCE
2972   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2973   REQ_BGL = False
2974
2975   def ExpandNames(self):
2976     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2977                                    constants.INSTANCE_REBOOT_HARD,
2978                                    constants.INSTANCE_REBOOT_FULL]:
2979       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2980                                   (constants.INSTANCE_REBOOT_SOFT,
2981                                    constants.INSTANCE_REBOOT_HARD,
2982                                    constants.INSTANCE_REBOOT_FULL))
2983     self._ExpandAndLockInstance()
2984
2985   def BuildHooksEnv(self):
2986     """Build hooks env.
2987
2988     This runs on master, primary and secondary nodes of the instance.
2989
2990     """
2991     env = {
2992       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2993       "REBOOT_TYPE": self.op.reboot_type,
2994       }
2995     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2996     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2997     return env, nl, nl
2998
2999   def CheckPrereq(self):
3000     """Check prerequisites.
3001
3002     This checks that the instance is in the cluster.
3003
3004     """
3005     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3006     assert self.instance is not None, \
3007       "Cannot retrieve locked instance %s" % self.op.instance_name
3008
3009     _CheckNodeOnline(self, instance.primary_node)
3010
3011     # check bridges existance
3012     _CheckInstanceBridgesExist(self, instance)
3013
3014   def Exec(self, feedback_fn):
3015     """Reboot the instance.
3016
3017     """
3018     instance = self.instance
3019     ignore_secondaries = self.op.ignore_secondaries
3020     reboot_type = self.op.reboot_type
3021
3022     node_current = instance.primary_node
3023
3024     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3025                        constants.INSTANCE_REBOOT_HARD]:
3026       for disk in instance.disks:
3027         self.cfg.SetDiskID(disk, node_current)
3028       result = self.rpc.call_instance_reboot(node_current, instance,
3029                                              reboot_type)
3030       msg = result.RemoteFailMsg()
3031       if msg:
3032         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3033     else:
3034       result = self.rpc.call_instance_shutdown(node_current, instance)
3035       msg = result.RemoteFailMsg()
3036       if msg:
3037         raise errors.OpExecError("Could not shutdown instance for"
3038                                  " full reboot: %s" % msg)
3039       _ShutdownInstanceDisks(self, instance)
3040       _StartInstanceDisks(self, instance, ignore_secondaries)
3041       result = self.rpc.call_instance_start(node_current, instance, None, None)
3042       msg = result.RemoteFailMsg()
3043       if msg:
3044         _ShutdownInstanceDisks(self, instance)
3045         raise errors.OpExecError("Could not start instance for"
3046                                  " full reboot: %s" % msg)
3047
3048     self.cfg.MarkInstanceUp(instance.name)
3049
3050
3051 class LUShutdownInstance(LogicalUnit):
3052   """Shutdown an instance.
3053
3054   """
3055   HPATH = "instance-stop"
3056   HTYPE = constants.HTYPE_INSTANCE
3057   _OP_REQP = ["instance_name"]
3058   REQ_BGL = False
3059
3060   def ExpandNames(self):
3061     self._ExpandAndLockInstance()
3062
3063   def BuildHooksEnv(self):
3064     """Build hooks env.
3065
3066     This runs on master, primary and secondary nodes of the instance.
3067
3068     """
3069     env = _BuildInstanceHookEnvByObject(self, self.instance)
3070     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071     return env, nl, nl
3072
3073   def CheckPrereq(self):
3074     """Check prerequisites.
3075
3076     This checks that the instance is in the cluster.
3077
3078     """
3079     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080     assert self.instance is not None, \
3081       "Cannot retrieve locked instance %s" % self.op.instance_name
3082     _CheckNodeOnline(self, self.instance.primary_node)
3083
3084   def Exec(self, feedback_fn):
3085     """Shutdown the instance.
3086
3087     """
3088     instance = self.instance
3089     node_current = instance.primary_node
3090     self.cfg.MarkInstanceDown(instance.name)
3091     result = self.rpc.call_instance_shutdown(node_current, instance)
3092     msg = result.RemoteFailMsg()
3093     if msg:
3094       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3095
3096     _ShutdownInstanceDisks(self, instance)
3097
3098
3099 class LUReinstallInstance(LogicalUnit):
3100   """Reinstall an instance.
3101
3102   """
3103   HPATH = "instance-reinstall"
3104   HTYPE = constants.HTYPE_INSTANCE
3105   _OP_REQP = ["instance_name"]
3106   REQ_BGL = False
3107
3108   def ExpandNames(self):
3109     self._ExpandAndLockInstance()
3110
3111   def BuildHooksEnv(self):
3112     """Build hooks env.
3113
3114     This runs on master, primary and secondary nodes of the instance.
3115
3116     """
3117     env = _BuildInstanceHookEnvByObject(self, self.instance)
3118     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119     return env, nl, nl
3120
3121   def CheckPrereq(self):
3122     """Check prerequisites.
3123
3124     This checks that the instance is in the cluster and is not running.
3125
3126     """
3127     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128     assert instance is not None, \
3129       "Cannot retrieve locked instance %s" % self.op.instance_name
3130     _CheckNodeOnline(self, instance.primary_node)
3131
3132     if instance.disk_template == constants.DT_DISKLESS:
3133       raise errors.OpPrereqError("Instance '%s' has no disks" %
3134                                  self.op.instance_name)
3135     if instance.admin_up:
3136       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3137                                  self.op.instance_name)
3138     remote_info = self.rpc.call_instance_info(instance.primary_node,
3139                                               instance.name,
3140                                               instance.hypervisor)
3141     msg = remote_info.RemoteFailMsg()
3142     if msg:
3143       raise errors.OpPrereqError("Error checking node %s: %s" %
3144                                  (instance.primary_node, msg))
3145     if remote_info.payload:
3146       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3147                                  (self.op.instance_name,
3148                                   instance.primary_node))
3149
3150     self.op.os_type = getattr(self.op, "os_type", None)
3151     if self.op.os_type is not None:
3152       # OS verification
3153       pnode = self.cfg.GetNodeInfo(
3154         self.cfg.ExpandNodeName(instance.primary_node))
3155       if pnode is None:
3156         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3157                                    self.op.pnode)
3158       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3159       result.Raise()
3160       if not isinstance(result.data, objects.OS):
3161         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3162                                    " primary node"  % self.op.os_type)
3163
3164     self.instance = instance
3165
3166   def Exec(self, feedback_fn):
3167     """Reinstall the instance.
3168
3169     """
3170     inst = self.instance
3171
3172     if self.op.os_type is not None:
3173       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3174       inst.os = self.op.os_type
3175       self.cfg.Update(inst)
3176
3177     _StartInstanceDisks(self, inst, None)
3178     try:
3179       feedback_fn("Running the instance OS create scripts...")
3180       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3181       msg = result.RemoteFailMsg()
3182       if msg:
3183         raise errors.OpExecError("Could not install OS for instance %s"
3184                                  " on node %s: %s" %
3185                                  (inst.name, inst.primary_node, msg))
3186     finally:
3187       _ShutdownInstanceDisks(self, inst)
3188
3189
3190 class LURenameInstance(LogicalUnit):
3191   """Rename an instance.
3192
3193   """
3194   HPATH = "instance-rename"
3195   HTYPE = constants.HTYPE_INSTANCE
3196   _OP_REQP = ["instance_name", "new_name"]
3197
3198   def BuildHooksEnv(self):
3199     """Build hooks env.
3200
3201     This runs on master, primary and secondary nodes of the instance.
3202
3203     """
3204     env = _BuildInstanceHookEnvByObject(self, self.instance)
3205     env["INSTANCE_NEW_NAME"] = self.op.new_name
3206     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3207     return env, nl, nl
3208
3209   def CheckPrereq(self):
3210     """Check prerequisites.
3211
3212     This checks that the instance is in the cluster and is not running.
3213
3214     """
3215     instance = self.cfg.GetInstanceInfo(
3216       self.cfg.ExpandInstanceName(self.op.instance_name))
3217     if instance is None:
3218       raise errors.OpPrereqError("Instance '%s' not known" %
3219                                  self.op.instance_name)
3220     _CheckNodeOnline(self, instance.primary_node)
3221
3222     if instance.admin_up:
3223       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3224                                  self.op.instance_name)
3225     remote_info = self.rpc.call_instance_info(instance.primary_node,
3226                                               instance.name,
3227                                               instance.hypervisor)
3228     msg = remote_info.RemoteFailMsg()
3229     if msg:
3230       raise errors.OpPrereqError("Error checking node %s: %s" %
3231                                  (instance.primary_node, msg))
3232     if remote_info.payload:
3233       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3234                                  (self.op.instance_name,
3235                                   instance.primary_node))
3236     self.instance = instance
3237
3238     # new name verification
3239     name_info = utils.HostInfo(self.op.new_name)
3240
3241     self.op.new_name = new_name = name_info.name
3242     instance_list = self.cfg.GetInstanceList()
3243     if new_name in instance_list:
3244       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3245                                  new_name)
3246
3247     if not getattr(self.op, "ignore_ip", False):
3248       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3249         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3250                                    (name_info.ip, new_name))
3251
3252
3253   def Exec(self, feedback_fn):
3254     """Reinstall the instance.
3255
3256     """
3257     inst = self.instance
3258     old_name = inst.name
3259
3260     if inst.disk_template == constants.DT_FILE:
3261       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3262
3263     self.cfg.RenameInstance(inst.name, self.op.new_name)
3264     # Change the instance lock. This is definitely safe while we hold the BGL
3265     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3266     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3267
3268     # re-read the instance from the configuration after rename
3269     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3270
3271     if inst.disk_template == constants.DT_FILE:
3272       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3273       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3274                                                      old_file_storage_dir,
3275                                                      new_file_storage_dir)
3276       msg = result.RemoteFailMsg()
3277       if msg:
3278         raise errors.OpExecError("Could not rename on node %s"
3279                                  " directory '%s' to '%s' (but the instance"
3280                                  " has been renamed in Ganeti): %s" %
3281                                  (inst.primary_node, old_file_storage_dir,
3282                                   new_file_storage_dir, msg))
3283
3284     _StartInstanceDisks(self, inst, None)
3285     try:
3286       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3287                                                  old_name)
3288       msg = result.RemoteFailMsg()
3289       if msg:
3290         msg = ("Could not run OS rename script for instance %s on node %s"
3291                " (but the instance has been renamed in Ganeti): %s" %
3292                (inst.name, inst.primary_node, msg))
3293         self.proc.LogWarning(msg)
3294     finally:
3295       _ShutdownInstanceDisks(self, inst)
3296
3297
3298 class LURemoveInstance(LogicalUnit):
3299   """Remove an instance.
3300
3301   """
3302   HPATH = "instance-remove"
3303   HTYPE = constants.HTYPE_INSTANCE
3304   _OP_REQP = ["instance_name", "ignore_failures"]
3305   REQ_BGL = False
3306
3307   def ExpandNames(self):
3308     self._ExpandAndLockInstance()
3309     self.needed_locks[locking.LEVEL_NODE] = []
3310     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3311
3312   def DeclareLocks(self, level):
3313     if level == locking.LEVEL_NODE:
3314       self._LockInstancesNodes()
3315
3316   def BuildHooksEnv(self):
3317     """Build hooks env.
3318
3319     This runs on master, primary and secondary nodes of the instance.
3320
3321     """
3322     env = _BuildInstanceHookEnvByObject(self, self.instance)
3323     nl = [self.cfg.GetMasterNode()]
3324     return env, nl, nl
3325
3326   def CheckPrereq(self):
3327     """Check prerequisites.
3328
3329     This checks that the instance is in the cluster.
3330
3331     """
3332     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3333     assert self.instance is not None, \
3334       "Cannot retrieve locked instance %s" % self.op.instance_name
3335
3336   def Exec(self, feedback_fn):
3337     """Remove the instance.
3338
3339     """
3340     instance = self.instance
3341     logging.info("Shutting down instance %s on node %s",
3342                  instance.name, instance.primary_node)
3343
3344     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3345     msg = result.RemoteFailMsg()
3346     if msg:
3347       if self.op.ignore_failures:
3348         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3349       else:
3350         raise errors.OpExecError("Could not shutdown instance %s on"
3351                                  " node %s: %s" %
3352                                  (instance.name, instance.primary_node, msg))
3353
3354     logging.info("Removing block devices for instance %s", instance.name)
3355
3356     if not _RemoveDisks(self, instance):
3357       if self.op.ignore_failures:
3358         feedback_fn("Warning: can't remove instance's disks")
3359       else:
3360         raise errors.OpExecError("Can't remove instance's disks")
3361
3362     logging.info("Removing instance %s out of cluster config", instance.name)
3363
3364     self.cfg.RemoveInstance(instance.name)
3365     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3366
3367
3368 class LUQueryInstances(NoHooksLU):
3369   """Logical unit for querying instances.
3370
3371   """
3372   _OP_REQP = ["output_fields", "names", "use_locking"]
3373   REQ_BGL = False
3374   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3375                                     "admin_state",
3376                                     "disk_template", "ip", "mac", "bridge",
3377                                     "sda_size", "sdb_size", "vcpus", "tags",
3378                                     "network_port", "beparams",
3379                                     r"(disk)\.(size)/([0-9]+)",
3380                                     r"(disk)\.(sizes)", "disk_usage",
3381                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3382                                     r"(nic)\.(macs|ips|bridges)",
3383                                     r"(disk|nic)\.(count)",
3384                                     "serial_no", "hypervisor", "hvparams",] +
3385                                   ["hv/%s" % name
3386                                    for name in constants.HVS_PARAMETERS] +
3387                                   ["be/%s" % name
3388                                    for name in constants.BES_PARAMETERS])
3389   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3390
3391
3392   def ExpandNames(self):
3393     _CheckOutputFields(static=self._FIELDS_STATIC,
3394                        dynamic=self._FIELDS_DYNAMIC,
3395                        selected=self.op.output_fields)
3396
3397     self.needed_locks = {}
3398     self.share_locks[locking.LEVEL_INSTANCE] = 1
3399     self.share_locks[locking.LEVEL_NODE] = 1
3400
3401     if self.op.names:
3402       self.wanted = _GetWantedInstances(self, self.op.names)
3403     else:
3404       self.wanted = locking.ALL_SET
3405
3406     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3407     self.do_locking = self.do_node_query and self.op.use_locking
3408     if self.do_locking:
3409       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3410       self.needed_locks[locking.LEVEL_NODE] = []
3411       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3412
3413   def DeclareLocks(self, level):
3414     if level == locking.LEVEL_NODE and self.do_locking:
3415       self._LockInstancesNodes()
3416
3417   def CheckPrereq(self):
3418     """Check prerequisites.
3419
3420     """
3421     pass
3422
3423   def Exec(self, feedback_fn):
3424     """Computes the list of nodes and their attributes.
3425
3426     """
3427     all_info = self.cfg.GetAllInstancesInfo()
3428     if self.wanted == locking.ALL_SET:
3429       # caller didn't specify instance names, so ordering is not important
3430       if self.do_locking:
3431         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3432       else:
3433         instance_names = all_info.keys()
3434       instance_names = utils.NiceSort(instance_names)
3435     else:
3436       # caller did specify names, so we must keep the ordering
3437       if self.do_locking:
3438         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3439       else:
3440         tgt_set = all_info.keys()
3441       missing = set(self.wanted).difference(tgt_set)
3442       if missing:
3443         raise errors.OpExecError("Some instances were removed before"
3444                                  " retrieving their data: %s" % missing)
3445       instance_names = self.wanted
3446
3447     instance_list = [all_info[iname] for iname in instance_names]
3448
3449     # begin data gathering
3450
3451     nodes = frozenset([inst.primary_node for inst in instance_list])
3452     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3453
3454     bad_nodes = []
3455     off_nodes = []
3456     if self.do_node_query:
3457       live_data = {}
3458       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3459       for name in nodes:
3460         result = node_data[name]
3461         if result.offline:
3462           # offline nodes will be in both lists
3463           off_nodes.append(name)
3464         if result.failed or result.RemoteFailMsg():
3465           bad_nodes.append(name)
3466         else:
3467           if result.payload:
3468             live_data.update(result.payload)
3469           # else no instance is alive
3470     else:
3471       live_data = dict([(name, {}) for name in instance_names])
3472
3473     # end data gathering
3474
3475     HVPREFIX = "hv/"
3476     BEPREFIX = "be/"
3477     output = []
3478     for instance in instance_list:
3479       iout = []
3480       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3481       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3482       for field in self.op.output_fields:
3483         st_match = self._FIELDS_STATIC.Matches(field)
3484         if field == "name":
3485           val = instance.name
3486         elif field == "os":
3487           val = instance.os
3488         elif field == "pnode":
3489           val = instance.primary_node
3490         elif field == "snodes":
3491           val = list(instance.secondary_nodes)
3492         elif field == "admin_state":
3493           val = instance.admin_up
3494         elif field == "oper_state":
3495           if instance.primary_node in bad_nodes:
3496             val = None
3497           else:
3498             val = bool(live_data.get(instance.name))
3499         elif field == "status":
3500           if instance.primary_node in off_nodes:
3501             val = "ERROR_nodeoffline"
3502           elif instance.primary_node in bad_nodes:
3503             val = "ERROR_nodedown"
3504           else:
3505             running = bool(live_data.get(instance.name))
3506             if running:
3507               if instance.admin_up:
3508                 val = "running"
3509               else:
3510                 val = "ERROR_up"
3511             else:
3512               if instance.admin_up:
3513                 val = "ERROR_down"
3514               else:
3515                 val = "ADMIN_down"
3516         elif field == "oper_ram":
3517           if instance.primary_node in bad_nodes:
3518             val = None
3519           elif instance.name in live_data:
3520             val = live_data[instance.name].get("memory", "?")
3521           else:
3522             val = "-"
3523         elif field == "disk_template":
3524           val = instance.disk_template
3525         elif field == "ip":
3526           val = instance.nics[0].ip
3527         elif field == "bridge":
3528           val = instance.nics[0].bridge
3529         elif field == "mac":
3530           val = instance.nics[0].mac
3531         elif field == "sda_size" or field == "sdb_size":
3532           idx = ord(field[2]) - ord('a')
3533           try:
3534             val = instance.FindDisk(idx).size
3535           except errors.OpPrereqError:
3536             val = None
3537         elif field == "disk_usage": # total disk usage per node
3538           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3539           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3540         elif field == "tags":
3541           val = list(instance.GetTags())
3542         elif field == "serial_no":
3543           val = instance.serial_no
3544         elif field == "network_port":
3545           val = instance.network_port
3546         elif field == "hypervisor":
3547           val = instance.hypervisor
3548         elif field == "hvparams":
3549           val = i_hv
3550         elif (field.startswith(HVPREFIX) and
3551               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3552           val = i_hv.get(field[len(HVPREFIX):], None)
3553         elif field == "beparams":
3554           val = i_be
3555         elif (field.startswith(BEPREFIX) and
3556               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3557           val = i_be.get(field[len(BEPREFIX):], None)
3558         elif st_match and st_match.groups():
3559           # matches a variable list
3560           st_groups = st_match.groups()
3561           if st_groups and st_groups[0] == "disk":
3562             if st_groups[1] == "count":
3563               val = len(instance.disks)
3564             elif st_groups[1] == "sizes":
3565               val = [disk.size for disk in instance.disks]
3566             elif st_groups[1] == "size":
3567               try:
3568                 val = instance.FindDisk(st_groups[2]).size
3569               except errors.OpPrereqError:
3570                 val = None
3571             else:
3572               assert False, "Unhandled disk parameter"
3573           elif st_groups[0] == "nic":
3574             if st_groups[1] == "count":
3575               val = len(instance.nics)
3576             elif st_groups[1] == "macs":
3577               val = [nic.mac for nic in instance.nics]
3578             elif st_groups[1] == "ips":
3579               val = [nic.ip for nic in instance.nics]
3580             elif st_groups[1] == "bridges":
3581               val = [nic.bridge for nic in instance.nics]
3582             else:
3583               # index-based item
3584               nic_idx = int(st_groups[2])
3585               if nic_idx >= len(instance.nics):
3586                 val = None
3587               else:
3588                 if st_groups[1] == "mac":
3589                   val = instance.nics[nic_idx].mac
3590                 elif st_groups[1] == "ip":
3591                   val = instance.nics[nic_idx].ip
3592                 elif st_groups[1] == "bridge":
3593                   val = instance.nics[nic_idx].bridge
3594                 else:
3595                   assert False, "Unhandled NIC parameter"
3596           else:
3597             assert False, "Unhandled variable parameter"
3598         else:
3599           raise errors.ParameterError(field)
3600         iout.append(val)
3601       output.append(iout)
3602
3603     return output
3604
3605
3606 class LUFailoverInstance(LogicalUnit):
3607   """Failover an instance.
3608
3609   """
3610   HPATH = "instance-failover"
3611   HTYPE = constants.HTYPE_INSTANCE
3612   _OP_REQP = ["instance_name", "ignore_consistency"]
3613   REQ_BGL = False
3614
3615   def ExpandNames(self):
3616     self._ExpandAndLockInstance()
3617     self.needed_locks[locking.LEVEL_NODE] = []
3618     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3619
3620   def DeclareLocks(self, level):
3621     if level == locking.LEVEL_NODE:
3622       self._LockInstancesNodes()
3623
3624   def BuildHooksEnv(self):
3625     """Build hooks env.
3626
3627     This runs on master, primary and secondary nodes of the instance.
3628
3629     """
3630     env = {
3631       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3632       }
3633     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3634     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3635     return env, nl, nl
3636
3637   def CheckPrereq(self):
3638     """Check prerequisites.
3639
3640     This checks that the instance is in the cluster.
3641
3642     """
3643     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3644     assert self.instance is not None, \
3645       "Cannot retrieve locked instance %s" % self.op.instance_name
3646
3647     bep = self.cfg.GetClusterInfo().FillBE(instance)
3648     if instance.disk_template not in constants.DTS_NET_MIRROR:
3649       raise errors.OpPrereqError("Instance's disk layout is not"
3650                                  " network mirrored, cannot failover.")
3651
3652     secondary_nodes = instance.secondary_nodes
3653     if not secondary_nodes:
3654       raise errors.ProgrammerError("no secondary node but using "
3655                                    "a mirrored disk template")
3656
3657     target_node = secondary_nodes[0]
3658     _CheckNodeOnline(self, target_node)
3659     _CheckNodeNotDrained(self, target_node)
3660     # check memory requirements on the secondary node
3661     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3662                          instance.name, bep[constants.BE_MEMORY],
3663                          instance.hypervisor)
3664     # check bridge existance
3665     _CheckInstanceBridgesExist(self, instance, node=target_node)
3666
3667   def Exec(self, feedback_fn):
3668     """Failover an instance.
3669
3670     The failover is done by shutting it down on its present node and
3671     starting it on the secondary.
3672
3673     """
3674     instance = self.instance
3675
3676     source_node = instance.primary_node
3677     target_node = instance.secondary_nodes[0]
3678
3679     feedback_fn("* checking disk consistency between source and target")
3680     for dev in instance.disks:
3681       # for drbd, these are drbd over lvm
3682       if not _CheckDiskConsistency(self, dev, target_node, False):
3683         if instance.admin_up and not self.op.ignore_consistency:
3684           raise errors.OpExecError("Disk %s is degraded on target node,"
3685                                    " aborting failover." % dev.iv_name)
3686
3687     feedback_fn("* shutting down instance on source node")
3688     logging.info("Shutting down instance %s on node %s",
3689                  instance.name, source_node)
3690
3691     result = self.rpc.call_instance_shutdown(source_node, instance)
3692     msg = result.RemoteFailMsg()
3693     if msg:
3694       if self.op.ignore_consistency:
3695         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3696                              " Proceeding anyway. Please make sure node"
3697                              " %s is down. Error details: %s",
3698                              instance.name, source_node, source_node, msg)
3699       else:
3700         raise errors.OpExecError("Could not shutdown instance %s on"
3701                                  " node %s: %s" %
3702                                  (instance.name, source_node, msg))
3703
3704     feedback_fn("* deactivating the instance's disks on source node")
3705     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3706       raise errors.OpExecError("Can't shut down the instance's disks.")
3707
3708     instance.primary_node = target_node
3709     # distribute new instance config to the other nodes
3710     self.cfg.Update(instance)
3711
3712     # Only start the instance if it's marked as up
3713     if instance.admin_up:
3714       feedback_fn("* activating the instance's disks on target node")
3715       logging.info("Starting instance %s on node %s",
3716                    instance.name, target_node)
3717
3718       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3719                                                ignore_secondaries=True)
3720       if not disks_ok:
3721         _ShutdownInstanceDisks(self, instance)
3722         raise errors.OpExecError("Can't activate the instance's disks")
3723
3724       feedback_fn("* starting the instance on the target node")
3725       result = self.rpc.call_instance_start(target_node, instance, None, None)
3726       msg = result.RemoteFailMsg()
3727       if msg:
3728         _ShutdownInstanceDisks(self, instance)
3729         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3730                                  (instance.name, target_node, msg))
3731
3732
3733 class LUMigrateInstance(LogicalUnit):
3734   """Migrate an instance.
3735
3736   This is migration without shutting down, compared to the failover,
3737   which is done with shutdown.
3738
3739   """
3740   HPATH = "instance-migrate"
3741   HTYPE = constants.HTYPE_INSTANCE
3742   _OP_REQP = ["instance_name", "live", "cleanup"]
3743
3744   REQ_BGL = False
3745
3746   def ExpandNames(self):
3747     self._ExpandAndLockInstance()
3748     self.needed_locks[locking.LEVEL_NODE] = []
3749     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3750
3751   def DeclareLocks(self, level):
3752     if level == locking.LEVEL_NODE:
3753       self._LockInstancesNodes()
3754
3755   def BuildHooksEnv(self):
3756     """Build hooks env.
3757
3758     This runs on master, primary and secondary nodes of the instance.
3759
3760     """
3761     env = _BuildInstanceHookEnvByObject(self, self.instance)
3762     env["MIGRATE_LIVE"] = self.op.live
3763     env["MIGRATE_CLEANUP"] = self.op.cleanup
3764     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3765     return env, nl, nl
3766
3767   def CheckPrereq(self):
3768     """Check prerequisites.
3769
3770     This checks that the instance is in the cluster.
3771
3772     """
3773     instance = self.cfg.GetInstanceInfo(
3774       self.cfg.ExpandInstanceName(self.op.instance_name))
3775     if instance is None:
3776       raise errors.OpPrereqError("Instance '%s' not known" %
3777                                  self.op.instance_name)
3778
3779     if instance.disk_template != constants.DT_DRBD8:
3780       raise errors.OpPrereqError("Instance's disk layout is not"
3781                                  " drbd8, cannot migrate.")
3782
3783     secondary_nodes = instance.secondary_nodes
3784     if not secondary_nodes:
3785       raise errors.ConfigurationError("No secondary node but using"
3786                                       " drbd8 disk template")
3787
3788     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3789
3790     target_node = secondary_nodes[0]
3791     # check memory requirements on the secondary node
3792     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3793                          instance.name, i_be[constants.BE_MEMORY],
3794                          instance.hypervisor)
3795
3796     # check bridge existance
3797     _CheckInstanceBridgesExist(self, instance, node=target_node)
3798
3799     if not self.op.cleanup:
3800       _CheckNodeNotDrained(self, target_node)
3801       result = self.rpc.call_instance_migratable(instance.primary_node,
3802                                                  instance)
3803       msg = result.RemoteFailMsg()
3804       if msg:
3805         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3806                                    msg)
3807
3808     self.instance = instance
3809
3810   def _WaitUntilSync(self):
3811     """Poll with custom rpc for disk sync.
3812
3813     This uses our own step-based rpc call.
3814
3815     """
3816     self.feedback_fn("* wait until resync is done")
3817     all_done = False
3818     while not all_done:
3819       all_done = True
3820       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3821                                             self.nodes_ip,
3822                                             self.instance.disks)
3823       min_percent = 100
3824       for node, nres in result.items():
3825         msg = nres.RemoteFailMsg()
3826         if msg:
3827           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3828                                    (node, msg))
3829         node_done, node_percent = nres.payload
3830         all_done = all_done and node_done
3831         if node_percent is not None:
3832           min_percent = min(min_percent, node_percent)
3833       if not all_done:
3834         if min_percent < 100:
3835           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3836         time.sleep(2)
3837
3838   def _EnsureSecondary(self, node):
3839     """Demote a node to secondary.
3840
3841     """
3842     self.feedback_fn("* switching node %s to secondary mode" % node)
3843
3844     for dev in self.instance.disks:
3845       self.cfg.SetDiskID(dev, node)
3846
3847     result = self.rpc.call_blockdev_close(node, self.instance.name,
3848                                           self.instance.disks)
3849     msg = result.RemoteFailMsg()
3850     if msg:
3851       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3852                                " error %s" % (node, msg))
3853
3854   def _GoStandalone(self):
3855     """Disconnect from the network.
3856
3857     """
3858     self.feedback_fn("* changing into standalone mode")
3859     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3860                                                self.instance.disks)
3861     for node, nres in result.items():
3862       msg = nres.RemoteFailMsg()
3863       if msg:
3864         raise errors.OpExecError("Cannot disconnect disks node %s,"
3865                                  " error %s" % (node, msg))
3866
3867   def _GoReconnect(self, multimaster):
3868     """Reconnect to the network.
3869
3870     """
3871     if multimaster:
3872       msg = "dual-master"
3873     else:
3874       msg = "single-master"
3875     self.feedback_fn("* changing disks into %s mode" % msg)
3876     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3877                                            self.instance.disks,
3878                                            self.instance.name, multimaster)
3879     for node, nres in result.items():
3880       msg = nres.RemoteFailMsg()
3881       if msg:
3882         raise errors.OpExecError("Cannot change disks config on node %s,"
3883                                  " error: %s" % (node, msg))
3884
3885   def _ExecCleanup(self):
3886     """Try to cleanup after a failed migration.
3887
3888     The cleanup is done by:
3889       - check that the instance is running only on one node
3890         (and update the config if needed)
3891       - change disks on its secondary node to secondary
3892       - wait until disks are fully synchronized
3893       - disconnect from the network
3894       - change disks into single-master mode
3895       - wait again until disks are fully synchronized
3896
3897     """
3898     instance = self.instance
3899     target_node = self.target_node
3900     source_node = self.source_node
3901
3902     # check running on only one node
3903     self.feedback_fn("* checking where the instance actually runs"
3904                      " (if this hangs, the hypervisor might be in"
3905                      " a bad state)")
3906     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3907     for node, result in ins_l.items():
3908       msg = result.RemoteFailMsg()
3909       if msg:
3910         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3911
3912     runningon_source = instance.name in ins_l[source_node].payload
3913     runningon_target = instance.name in ins_l[target_node].payload
3914
3915     if runningon_source and runningon_target:
3916       raise errors.OpExecError("Instance seems to be running on two nodes,"
3917                                " or the hypervisor is confused. You will have"
3918                                " to ensure manually that it runs only on one"
3919                                " and restart this operation.")
3920
3921     if not (runningon_source or runningon_target):
3922       raise errors.OpExecError("Instance does not seem to be running at all."
3923                                " In this case, it's safer to repair by"
3924                                " running 'gnt-instance stop' to ensure disk"
3925                                " shutdown, and then restarting it.")
3926
3927     if runningon_target:
3928       # the migration has actually succeeded, we need to update the config
3929       self.feedback_fn("* instance running on secondary node (%s),"
3930                        " updating config" % target_node)
3931       instance.primary_node = target_node
3932       self.cfg.Update(instance)
3933       demoted_node = source_node
3934     else:
3935       self.feedback_fn("* instance confirmed to be running on its"
3936                        " primary node (%s)" % source_node)
3937       demoted_node = target_node
3938
3939     self._EnsureSecondary(demoted_node)
3940     try:
3941       self._WaitUntilSync()
3942     except errors.OpExecError:
3943       # we ignore here errors, since if the device is standalone, it
3944       # won't be able to sync
3945       pass
3946     self._GoStandalone()
3947     self._GoReconnect(False)
3948     self._WaitUntilSync()
3949
3950     self.feedback_fn("* done")
3951
3952   def _RevertDiskStatus(self):
3953     """Try to revert the disk status after a failed migration.
3954
3955     """
3956     target_node = self.target_node
3957     try:
3958       self._EnsureSecondary(target_node)
3959       self._GoStandalone()
3960       self._GoReconnect(False)
3961       self._WaitUntilSync()
3962     except errors.OpExecError, err:
3963       self.LogWarning("Migration failed and I can't reconnect the"
3964                       " drives: error '%s'\n"
3965                       "Please look and recover the instance status" %
3966                       str(err))
3967
3968   def _AbortMigration(self):
3969     """Call the hypervisor code to abort a started migration.
3970
3971     """
3972     instance = self.instance
3973     target_node = self.target_node
3974     migration_info = self.migration_info
3975
3976     abort_result = self.rpc.call_finalize_migration(target_node,
3977                                                     instance,
3978                                                     migration_info,
3979                                                     False)
3980     abort_msg = abort_result.RemoteFailMsg()
3981     if abort_msg:
3982       logging.error("Aborting migration failed on target node %s: %s" %
3983                     (target_node, abort_msg))
3984       # Don't raise an exception here, as we stil have to try to revert the
3985       # disk status, even if this step failed.
3986
3987   def _ExecMigration(self):
3988     """Migrate an instance.
3989
3990     The migrate is done by:
3991       - change the disks into dual-master mode
3992       - wait until disks are fully synchronized again
3993       - migrate the instance
3994       - change disks on the new secondary node (the old primary) to secondary
3995       - wait until disks are fully synchronized
3996       - change disks into single-master mode
3997
3998     """
3999     instance = self.instance
4000     target_node = self.target_node
4001     source_node = self.source_node
4002
4003     self.feedback_fn("* checking disk consistency between source and target")
4004     for dev in instance.disks:
4005       if not _CheckDiskConsistency(self, dev, target_node, False):
4006         raise errors.OpExecError("Disk %s is degraded or not fully"
4007                                  " synchronized on target node,"
4008                                  " aborting migrate." % dev.iv_name)
4009
4010     # First get the migration information from the remote node
4011     result = self.rpc.call_migration_info(source_node, instance)
4012     msg = result.RemoteFailMsg()
4013     if msg:
4014       log_err = ("Failed fetching source migration information from %s: %s" %
4015                  (source_node, msg))
4016       logging.error(log_err)
4017       raise errors.OpExecError(log_err)
4018
4019     self.migration_info = migration_info = result.payload
4020
4021     # Then switch the disks to master/master mode
4022     self._EnsureSecondary(target_node)
4023     self._GoStandalone()
4024     self._GoReconnect(True)
4025     self._WaitUntilSync()
4026
4027     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4028     result = self.rpc.call_accept_instance(target_node,
4029                                            instance,
4030                                            migration_info,
4031                                            self.nodes_ip[target_node])
4032
4033     msg = result.RemoteFailMsg()
4034     if msg:
4035       logging.error("Instance pre-migration failed, trying to revert"
4036                     " disk status: %s", msg)
4037       self._AbortMigration()
4038       self._RevertDiskStatus()
4039       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4040                                (instance.name, msg))
4041
4042     self.feedback_fn("* migrating instance to %s" % target_node)
4043     time.sleep(10)
4044     result = self.rpc.call_instance_migrate(source_node, instance,
4045                                             self.nodes_ip[target_node],
4046                                             self.op.live)
4047     msg = result.RemoteFailMsg()
4048     if msg:
4049       logging.error("Instance migration failed, trying to revert"
4050                     " disk status: %s", msg)
4051       self._AbortMigration()
4052       self._RevertDiskStatus()
4053       raise errors.OpExecError("Could not migrate instance %s: %s" %
4054                                (instance.name, msg))
4055     time.sleep(10)
4056
4057     instance.primary_node = target_node
4058     # distribute new instance config to the other nodes
4059     self.cfg.Update(instance)
4060
4061     result = self.rpc.call_finalize_migration(target_node,
4062                                               instance,
4063                                               migration_info,
4064                                               True)
4065     msg = result.RemoteFailMsg()
4066     if msg:
4067       logging.error("Instance migration succeeded, but finalization failed:"
4068                     " %s" % msg)
4069       raise errors.OpExecError("Could not finalize instance migration: %s" %
4070                                msg)
4071
4072     self._EnsureSecondary(source_node)
4073     self._WaitUntilSync()
4074     self._GoStandalone()
4075     self._GoReconnect(False)
4076     self._WaitUntilSync()
4077
4078     self.feedback_fn("* done")
4079
4080   def Exec(self, feedback_fn):
4081     """Perform the migration.
4082
4083     """
4084     self.feedback_fn = feedback_fn
4085
4086     self.source_node = self.instance.primary_node
4087     self.target_node = self.instance.secondary_nodes[0]
4088     self.all_nodes = [self.source_node, self.target_node]
4089     self.nodes_ip = {
4090       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4091       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4092       }
4093     if self.op.cleanup:
4094       return self._ExecCleanup()
4095     else:
4096       return self._ExecMigration()
4097
4098
4099 def _CreateBlockDev(lu, node, instance, device, force_create,
4100                     info, force_open):
4101   """Create a tree of block devices on a given node.
4102
4103   If this device type has to be created on secondaries, create it and
4104   all its children.
4105
4106   If not, just recurse to children keeping the same 'force' value.
4107
4108   @param lu: the lu on whose behalf we execute
4109   @param node: the node on which to create the device
4110   @type instance: L{objects.Instance}
4111   @param instance: the instance which owns the device
4112   @type device: L{objects.Disk}
4113   @param device: the device to create
4114   @type force_create: boolean
4115   @param force_create: whether to force creation of this device; this
4116       will be change to True whenever we find a device which has
4117       CreateOnSecondary() attribute
4118   @param info: the extra 'metadata' we should attach to the device
4119       (this will be represented as a LVM tag)
4120   @type force_open: boolean
4121   @param force_open: this parameter will be passes to the
4122       L{backend.BlockdevCreate} function where it specifies
4123       whether we run on primary or not, and it affects both
4124       the child assembly and the device own Open() execution
4125
4126   """
4127   if device.CreateOnSecondary():
4128     force_create = True
4129
4130   if device.children:
4131     for child in device.children:
4132       _CreateBlockDev(lu, node, instance, child, force_create,
4133                       info, force_open)
4134
4135   if not force_create:
4136     return
4137
4138   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4139
4140
4141 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4142   """Create a single block device on a given node.
4143
4144   This will not recurse over children of the device, so they must be
4145   created in advance.
4146
4147   @param lu: the lu on whose behalf we execute
4148   @param node: the node on which to create the device
4149   @type instance: L{objects.Instance}
4150   @param instance: the instance which owns the device
4151   @type device: L{objects.Disk}
4152   @param device: the device to create
4153   @param info: the extra 'metadata' we should attach to the device
4154       (this will be represented as a LVM tag)
4155   @type force_open: boolean
4156   @param force_open: this parameter will be passes to the
4157       L{backend.BlockdevCreate} function where it specifies
4158       whether we run on primary or not, and it affects both
4159       the child assembly and the device own Open() execution
4160
4161   """
4162   lu.cfg.SetDiskID(device, node)
4163   result = lu.rpc.call_blockdev_create(node, device, device.size,
4164                                        instance.name, force_open, info)
4165   msg = result.RemoteFailMsg()
4166   if msg:
4167     raise errors.OpExecError("Can't create block device %s on"
4168                              " node %s for instance %s: %s" %
4169                              (device, node, instance.name, msg))
4170   if device.physical_id is None:
4171     device.physical_id = result.payload
4172
4173
4174 def _GenerateUniqueNames(lu, exts):
4175   """Generate a suitable LV name.
4176
4177   This will generate a logical volume name for the given instance.
4178
4179   """
4180   results = []
4181   for val in exts:
4182     new_id = lu.cfg.GenerateUniqueID()
4183     results.append("%s%s" % (new_id, val))
4184   return results
4185
4186
4187 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4188                          p_minor, s_minor):
4189   """Generate a drbd8 device complete with its children.
4190
4191   """
4192   port = lu.cfg.AllocatePort()
4193   vgname = lu.cfg.GetVGName()
4194   shared_secret = lu.cfg.GenerateDRBDSecret()
4195   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4196                           logical_id=(vgname, names[0]))
4197   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4198                           logical_id=(vgname, names[1]))
4199   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4200                           logical_id=(primary, secondary, port,
4201                                       p_minor, s_minor,
4202                                       shared_secret),
4203                           children=[dev_data, dev_meta],
4204                           iv_name=iv_name)
4205   return drbd_dev
4206
4207
4208 def _GenerateDiskTemplate(lu, template_name,
4209                           instance_name, primary_node,
4210                           secondary_nodes, disk_info,
4211                           file_storage_dir, file_driver,
4212                           base_index):
4213   """Generate the entire disk layout for a given template type.
4214
4215   """
4216   #TODO: compute space requirements
4217
4218   vgname = lu.cfg.GetVGName()
4219   disk_count = len(disk_info)
4220   disks = []
4221   if template_name == constants.DT_DISKLESS:
4222     pass
4223   elif template_name == constants.DT_PLAIN:
4224     if len(secondary_nodes) != 0:
4225       raise errors.ProgrammerError("Wrong template configuration")
4226
4227     names = _GenerateUniqueNames(lu, [".disk%d" % i
4228                                       for i in range(disk_count)])
4229     for idx, disk in enumerate(disk_info):
4230       disk_index = idx + base_index
4231       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4232                               logical_id=(vgname, names[idx]),
4233                               iv_name="disk/%d" % disk_index,
4234                               mode=disk["mode"])
4235       disks.append(disk_dev)
4236   elif template_name == constants.DT_DRBD8:
4237     if len(secondary_nodes) != 1:
4238       raise errors.ProgrammerError("Wrong template configuration")
4239     remote_node = secondary_nodes[0]
4240     minors = lu.cfg.AllocateDRBDMinor(
4241       [primary_node, remote_node] * len(disk_info), instance_name)
4242
4243     names = []
4244     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4245                                                for i in range(disk_count)]):
4246       names.append(lv_prefix + "_data")
4247       names.append(lv_prefix + "_meta")
4248     for idx, disk in enumerate(disk_info):
4249       disk_index = idx + base_index
4250       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4251                                       disk["size"], names[idx*2:idx*2+2],
4252                                       "disk/%d" % disk_index,
4253                                       minors[idx*2], minors[idx*2+1])
4254       disk_dev.mode = disk["mode"]
4255       disks.append(disk_dev)
4256   elif template_name == constants.DT_FILE:
4257     if len(secondary_nodes) != 0:
4258       raise errors.ProgrammerError("Wrong template configuration")
4259
4260     for idx, disk in enumerate(disk_info):
4261       disk_index = idx + base_index
4262       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4263                               iv_name="disk/%d" % disk_index,
4264                               logical_id=(file_driver,
4265                                           "%s/disk%d" % (file_storage_dir,
4266                                                          disk_index)),
4267                               mode=disk["mode"])
4268       disks.append(disk_dev)
4269   else:
4270     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4271   return disks
4272
4273
4274 def _GetInstanceInfoText(instance):
4275   """Compute that text that should be added to the disk's metadata.
4276
4277   """
4278   return "originstname+%s" % instance.name
4279
4280
4281 def _CreateDisks(lu, instance):
4282   """Create all disks for an instance.
4283
4284   This abstracts away some work from AddInstance.
4285
4286   @type lu: L{LogicalUnit}
4287   @param lu: the logical unit on whose behalf we execute
4288   @type instance: L{objects.Instance}
4289   @param instance: the instance whose disks we should create
4290   @rtype: boolean
4291   @return: the success of the creation
4292
4293   """
4294   info = _GetInstanceInfoText(instance)
4295   pnode = instance.primary_node
4296
4297   if instance.disk_template == constants.DT_FILE:
4298     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4299     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4300
4301     msg = result.RemoteFailMsg()
4302
4303     if msg:
4304       raise errors.OpExecError("Failed to create directory '%s' on"
4305                                " node %s: %s" % (file_storage_dir, msg))
4306
4307   # Note: this needs to be kept in sync with adding of disks in
4308   # LUSetInstanceParams
4309   for device in instance.disks:
4310     logging.info("Creating volume %s for instance %s",
4311                  device.iv_name, instance.name)
4312     #HARDCODE
4313     for node in instance.all_nodes:
4314       f_create = node == pnode
4315       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4316
4317
4318 def _RemoveDisks(lu, instance):
4319   """Remove all disks for an instance.
4320
4321   This abstracts away some work from `AddInstance()` and
4322   `RemoveInstance()`. Note that in case some of the devices couldn't
4323   be removed, the removal will continue with the other ones (compare
4324   with `_CreateDisks()`).
4325
4326   @type lu: L{LogicalUnit}
4327   @param lu: the logical unit on whose behalf we execute
4328   @type instance: L{objects.Instance}
4329   @param instance: the instance whose disks we should remove
4330   @rtype: boolean
4331   @return: the success of the removal
4332
4333   """
4334   logging.info("Removing block devices for instance %s", instance.name)
4335
4336   all_result = True
4337   for device in instance.disks:
4338     for node, disk in device.ComputeNodeTree(instance.primary_node):
4339       lu.cfg.SetDiskID(disk, node)
4340       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4341       if msg:
4342         lu.LogWarning("Could not remove block device %s on node %s,"
4343                       " continuing anyway: %s", device.iv_name, node, msg)
4344         all_result = False
4345
4346   if instance.disk_template == constants.DT_FILE:
4347     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4348     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4349                                                  file_storage_dir)
4350     msg = result.RemoteFailMsg()
4351     if msg:
4352       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4353                     file_storage_dir, instance.primary_node, msg)
4354       all_result = False
4355
4356   return all_result
4357
4358
4359 def _ComputeDiskSize(disk_template, disks):
4360   """Compute disk size requirements in the volume group
4361
4362   """
4363   # Required free disk space as a function of disk and swap space
4364   req_size_dict = {
4365     constants.DT_DISKLESS: None,
4366     constants.DT_PLAIN: sum(d["size"] for d in disks),
4367     # 128 MB are added for drbd metadata for each disk
4368     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4369     constants.DT_FILE: None,
4370   }
4371
4372   if disk_template not in req_size_dict:
4373     raise errors.ProgrammerError("Disk template '%s' size requirement"
4374                                  " is unknown" %  disk_template)
4375
4376   return req_size_dict[disk_template]
4377
4378
4379 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4380   """Hypervisor parameter validation.
4381
4382   This function abstract the hypervisor parameter validation to be
4383   used in both instance create and instance modify.
4384
4385   @type lu: L{LogicalUnit}
4386   @param lu: the logical unit for which we check
4387   @type nodenames: list
4388   @param nodenames: the list of nodes on which we should check
4389   @type hvname: string
4390   @param hvname: the name of the hypervisor we should use
4391   @type hvparams: dict
4392   @param hvparams: the parameters which we need to check
4393   @raise errors.OpPrereqError: if the parameters are not valid
4394
4395   """
4396   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4397                                                   hvname,
4398                                                   hvparams)
4399   for node in nodenames:
4400     info = hvinfo[node]
4401     if info.offline:
4402       continue
4403     msg = info.RemoteFailMsg()
4404     if msg:
4405       raise errors.OpPrereqError("Hypervisor parameter validation"
4406                                  " failed on node %s: %s" % (node, msg))
4407
4408
4409 class LUCreateInstance(LogicalUnit):
4410   """Create an instance.
4411
4412   """
4413   HPATH = "instance-add"
4414   HTYPE = constants.HTYPE_INSTANCE
4415   _OP_REQP = ["instance_name", "disks", "disk_template",
4416               "mode", "start",
4417               "wait_for_sync", "ip_check", "nics",
4418               "hvparams", "beparams"]
4419   REQ_BGL = False
4420
4421   def _ExpandNode(self, node):
4422     """Expands and checks one node name.
4423
4424     """
4425     node_full = self.cfg.ExpandNodeName(node)
4426     if node_full is None:
4427       raise errors.OpPrereqError("Unknown node %s" % node)
4428     return node_full
4429
4430   def ExpandNames(self):
4431     """ExpandNames for CreateInstance.
4432
4433     Figure out the right locks for instance creation.
4434
4435     """
4436     self.needed_locks = {}
4437
4438     # set optional parameters to none if they don't exist
4439     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4440       if not hasattr(self.op, attr):
4441         setattr(self.op, attr, None)
4442
4443     # cheap checks, mostly valid constants given
4444
4445     # verify creation mode
4446     if self.op.mode not in (constants.INSTANCE_CREATE,
4447                             constants.INSTANCE_IMPORT):
4448       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4449                                  self.op.mode)
4450
4451     # disk template and mirror node verification
4452     if self.op.disk_template not in constants.DISK_TEMPLATES:
4453       raise errors.OpPrereqError("Invalid disk template name")
4454
4455     if self.op.hypervisor is None:
4456       self.op.hypervisor = self.cfg.GetHypervisorType()
4457
4458     cluster = self.cfg.GetClusterInfo()
4459     enabled_hvs = cluster.enabled_hypervisors
4460     if self.op.hypervisor not in enabled_hvs:
4461       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4462                                  " cluster (%s)" % (self.op.hypervisor,
4463                                   ",".join(enabled_hvs)))
4464
4465     # check hypervisor parameter syntax (locally)
4466     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4467     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4468                                   self.op.hvparams)
4469     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4470     hv_type.CheckParameterSyntax(filled_hvp)
4471
4472     # fill and remember the beparams dict
4473     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4474     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4475                                     self.op.beparams)
4476
4477     #### instance parameters check
4478
4479     # instance name verification
4480     hostname1 = utils.HostInfo(self.op.instance_name)
4481     self.op.instance_name = instance_name = hostname1.name
4482
4483     # this is just a preventive check, but someone might still add this
4484     # instance in the meantime, and creation will fail at lock-add time
4485     if instance_name in self.cfg.GetInstanceList():
4486       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4487                                  instance_name)
4488
4489     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4490
4491     # NIC buildup
4492     self.nics = []
4493     for idx, nic in enumerate(self.op.nics):
4494       nic_mode_req = nic.get("mode", None)
4495       nic_mode = nic_mode_req
4496       if nic_mode is None:
4497         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4498
4499       # in routed mode, for the first nic, the default ip is 'auto'
4500       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4501         default_ip_mode = constants.VALUE_AUTO
4502       else:
4503         default_ip_mode = constants.VALUE_NONE
4504
4505       # ip validity checks
4506       ip = nic.get("ip", default_ip_mode)
4507       if ip is None or ip.lower() == constants.VALUE_NONE:
4508         nic_ip = None
4509       elif ip.lower() == constants.VALUE_AUTO:
4510         nic_ip = hostname1.ip
4511       else:
4512         if not utils.IsValidIP(ip):
4513           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4514                                      " like a valid IP" % ip)
4515         nic_ip = ip
4516
4517       # TODO: check the ip for uniqueness !!
4518       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4519         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4520
4521       # MAC address verification
4522       mac = nic.get("mac", constants.VALUE_AUTO)
4523       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4524         if not utils.IsValidMac(mac.lower()):
4525           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4526                                      mac)
4527       # bridge verification
4528       bridge = nic.get("bridge", None)
4529       link = nic.get("link", None)
4530       if bridge and link:
4531         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4532       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4533         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4534       elif bridge:
4535         link = bridge
4536
4537       nicparams = {}
4538       if nic_mode_req:
4539         nicparams[constants.NIC_MODE] = nic_mode_req
4540       if link:
4541         nicparams[constants.NIC_LINK] = link
4542
4543       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4544                                       nicparams)
4545       objects.NIC.CheckParameterSyntax(check_params)
4546       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4547
4548     # disk checks/pre-build
4549     self.disks = []
4550     for disk in self.op.disks:
4551       mode = disk.get("mode", constants.DISK_RDWR)
4552       if mode not in constants.DISK_ACCESS_SET:
4553         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4554                                    mode)
4555       size = disk.get("size", None)
4556       if size is None:
4557         raise errors.OpPrereqError("Missing disk size")
4558       try:
4559         size = int(size)
4560       except ValueError:
4561         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4562       self.disks.append({"size": size, "mode": mode})
4563
4564     # used in CheckPrereq for ip ping check
4565     self.check_ip = hostname1.ip
4566
4567     # file storage checks
4568     if (self.op.file_driver and
4569         not self.op.file_driver in constants.FILE_DRIVER):
4570       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4571                                  self.op.file_driver)
4572
4573     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4574       raise errors.OpPrereqError("File storage directory path not absolute")
4575
4576     ### Node/iallocator related checks
4577     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4578       raise errors.OpPrereqError("One and only one of iallocator and primary"
4579                                  " node must be given")
4580
4581     if self.op.iallocator:
4582       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4583     else:
4584       self.op.pnode = self._ExpandNode(self.op.pnode)
4585       nodelist = [self.op.pnode]
4586       if self.op.snode is not None:
4587         self.op.snode = self._ExpandNode(self.op.snode)
4588         nodelist.append(self.op.snode)
4589       self.needed_locks[locking.LEVEL_NODE] = nodelist
4590
4591     # in case of import lock the source node too
4592     if self.op.mode == constants.INSTANCE_IMPORT:
4593       src_node = getattr(self.op, "src_node", None)
4594       src_path = getattr(self.op, "src_path", None)
4595
4596       if src_path is None:
4597         self.op.src_path = src_path = self.op.instance_name
4598
4599       if src_node is None:
4600         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4601         self.op.src_node = None
4602         if os.path.isabs(src_path):
4603           raise errors.OpPrereqError("Importing an instance from an absolute"
4604                                      " path requires a source node option.")
4605       else:
4606         self.op.src_node = src_node = self._ExpandNode(src_node)
4607         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4608           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4609         if not os.path.isabs(src_path):
4610           self.op.src_path = src_path = \
4611             os.path.join(constants.EXPORT_DIR, src_path)
4612
4613     else: # INSTANCE_CREATE
4614       if getattr(self.op, "os_type", None) is None:
4615         raise errors.OpPrereqError("No guest OS specified")
4616
4617   def _RunAllocator(self):
4618     """Run the allocator based on input opcode.
4619
4620     """
4621     nics = [n.ToDict() for n in self.nics]
4622     ial = IAllocator(self,
4623                      mode=constants.IALLOCATOR_MODE_ALLOC,
4624                      name=self.op.instance_name,
4625                      disk_template=self.op.disk_template,
4626                      tags=[],
4627                      os=self.op.os_type,
4628                      vcpus=self.be_full[constants.BE_VCPUS],
4629                      mem_size=self.be_full[constants.BE_MEMORY],
4630                      disks=self.disks,
4631                      nics=nics,
4632                      hypervisor=self.op.hypervisor,
4633                      )
4634
4635     ial.Run(self.op.iallocator)
4636
4637     if not ial.success:
4638       raise errors.OpPrereqError("Can't compute nodes using"
4639                                  " iallocator '%s': %s" % (self.op.iallocator,
4640                                                            ial.info))
4641     if len(ial.nodes) != ial.required_nodes:
4642       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4643                                  " of nodes (%s), required %s" %
4644                                  (self.op.iallocator, len(ial.nodes),
4645                                   ial.required_nodes))
4646     self.op.pnode = ial.nodes[0]
4647     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4648                  self.op.instance_name, self.op.iallocator,
4649                  ", ".join(ial.nodes))
4650     if ial.required_nodes == 2:
4651       self.op.snode = ial.nodes[1]
4652
4653   def BuildHooksEnv(self):
4654     """Build hooks env.
4655
4656     This runs on master, primary and secondary nodes of the instance.
4657
4658     """
4659     env = {
4660       "ADD_MODE": self.op.mode,
4661       }
4662     if self.op.mode == constants.INSTANCE_IMPORT:
4663       env["SRC_NODE"] = self.op.src_node
4664       env["SRC_PATH"] = self.op.src_path
4665       env["SRC_IMAGES"] = self.src_images
4666
4667     env.update(_BuildInstanceHookEnv(
4668       name=self.op.instance_name,
4669       primary_node=self.op.pnode,
4670       secondary_nodes=self.secondaries,
4671       status=self.op.start,
4672       os_type=self.op.os_type,
4673       memory=self.be_full[constants.BE_MEMORY],
4674       vcpus=self.be_full[constants.BE_VCPUS],
4675       nics=_PreBuildNICHooksList(self, self.nics),
4676       disk_template=self.op.disk_template,
4677       disks=[(d["size"], d["mode"]) for d in self.disks],
4678     ))
4679
4680     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4681           self.secondaries)
4682     return env, nl, nl
4683
4684
4685   def CheckPrereq(self):
4686     """Check prerequisites.
4687
4688     """
4689     if (not self.cfg.GetVGName() and
4690         self.op.disk_template not in constants.DTS_NOT_LVM):
4691       raise errors.OpPrereqError("Cluster does not support lvm-based"
4692                                  " instances")
4693
4694     if self.op.mode == constants.INSTANCE_IMPORT:
4695       src_node = self.op.src_node
4696       src_path = self.op.src_path
4697
4698       if src_node is None:
4699         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4700         exp_list = self.rpc.call_export_list(locked_nodes)
4701         found = False
4702         for node in exp_list:
4703           if exp_list[node].RemoteFailMsg():
4704             continue
4705           if src_path in exp_list[node].payload:
4706             found = True
4707             self.op.src_node = src_node = node
4708             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4709                                                        src_path)
4710             break
4711         if not found:
4712           raise errors.OpPrereqError("No export found for relative path %s" %
4713                                       src_path)
4714
4715       _CheckNodeOnline(self, src_node)
4716       result = self.rpc.call_export_info(src_node, src_path)
4717       msg = result.RemoteFailMsg()
4718       if msg:
4719         raise errors.OpPrereqError("No export or invalid export found in"
4720                                    " dir %s: %s" % (src_path, msg))
4721
4722       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4723       if not export_info.has_section(constants.INISECT_EXP):
4724         raise errors.ProgrammerError("Corrupted export config")
4725
4726       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4727       if (int(ei_version) != constants.EXPORT_VERSION):
4728         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4729                                    (ei_version, constants.EXPORT_VERSION))
4730
4731       # Check that the new instance doesn't have less disks than the export
4732       instance_disks = len(self.disks)
4733       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4734       if instance_disks < export_disks:
4735         raise errors.OpPrereqError("Not enough disks to import."
4736                                    " (instance: %d, export: %d)" %
4737                                    (instance_disks, export_disks))
4738
4739       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4740       disk_images = []
4741       for idx in range(export_disks):
4742         option = 'disk%d_dump' % idx
4743         if export_info.has_option(constants.INISECT_INS, option):
4744           # FIXME: are the old os-es, disk sizes, etc. useful?
4745           export_name = export_info.get(constants.INISECT_INS, option)
4746           image = os.path.join(src_path, export_name)
4747           disk_images.append(image)
4748         else:
4749           disk_images.append(False)
4750
4751       self.src_images = disk_images
4752
4753       old_name = export_info.get(constants.INISECT_INS, 'name')
4754       # FIXME: int() here could throw a ValueError on broken exports
4755       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4756       if self.op.instance_name == old_name:
4757         for idx, nic in enumerate(self.nics):
4758           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4759             nic_mac_ini = 'nic%d_mac' % idx
4760             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4761
4762     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4763     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4764     if self.op.start and not self.op.ip_check:
4765       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4766                                  " adding an instance in start mode")
4767
4768     if self.op.ip_check:
4769       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4770         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4771                                    (self.check_ip, self.op.instance_name))
4772
4773     #### mac address generation
4774     # By generating here the mac address both the allocator and the hooks get
4775     # the real final mac address rather than the 'auto' or 'generate' value.
4776     # There is a race condition between the generation and the instance object
4777     # creation, which means that we know the mac is valid now, but we're not
4778     # sure it will be when we actually add the instance. If things go bad
4779     # adding the instance will abort because of a duplicate mac, and the
4780     # creation job will fail.
4781     for nic in self.nics:
4782       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4783         nic.mac = self.cfg.GenerateMAC()
4784
4785     #### allocator run
4786
4787     if self.op.iallocator is not None:
4788       self._RunAllocator()
4789
4790     #### node related checks
4791
4792     # check primary node
4793     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4794     assert self.pnode is not None, \
4795       "Cannot retrieve locked node %s" % self.op.pnode
4796     if pnode.offline:
4797       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4798                                  pnode.name)
4799     if pnode.drained:
4800       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4801                                  pnode.name)
4802
4803     self.secondaries = []
4804
4805     # mirror node verification
4806     if self.op.disk_template in constants.DTS_NET_MIRROR:
4807       if self.op.snode is None:
4808         raise errors.OpPrereqError("The networked disk templates need"
4809                                    " a mirror node")
4810       if self.op.snode == pnode.name:
4811         raise errors.OpPrereqError("The secondary node cannot be"
4812                                    " the primary node.")
4813       _CheckNodeOnline(self, self.op.snode)
4814       _CheckNodeNotDrained(self, self.op.snode)
4815       self.secondaries.append(self.op.snode)
4816
4817     nodenames = [pnode.name] + self.secondaries
4818
4819     req_size = _ComputeDiskSize(self.op.disk_template,
4820                                 self.disks)
4821
4822     # Check lv size requirements
4823     if req_size is not None:
4824       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4825                                          self.op.hypervisor)
4826       for node in nodenames:
4827         info = nodeinfo[node]
4828         msg = info.RemoteFailMsg()
4829         if msg:
4830           raise errors.OpPrereqError("Cannot get current information"
4831                                      " from node %s: %s" % (node, msg))
4832         info = info.payload
4833         vg_free = info.get('vg_free', None)
4834         if not isinstance(vg_free, int):
4835           raise errors.OpPrereqError("Can't compute free disk space on"
4836                                      " node %s" % node)
4837         if req_size > vg_free:
4838           raise errors.OpPrereqError("Not enough disk space on target node %s."
4839                                      " %d MB available, %d MB required" %
4840                                      (node, vg_free, req_size))
4841
4842     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4843
4844     # os verification
4845     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4846     result.Raise()
4847     if not isinstance(result.data, objects.OS):
4848       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4849                                  " primary node"  % self.op.os_type)
4850
4851     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4852
4853     # memory check on primary node
4854     if self.op.start:
4855       _CheckNodeFreeMemory(self, self.pnode.name,
4856                            "creating instance %s" % self.op.instance_name,
4857                            self.be_full[constants.BE_MEMORY],
4858                            self.op.hypervisor)
4859
4860   def Exec(self, feedback_fn):
4861     """Create and add the instance to the cluster.
4862
4863     """
4864     instance = self.op.instance_name
4865     pnode_name = self.pnode.name
4866
4867     ht_kind = self.op.hypervisor
4868     if ht_kind in constants.HTS_REQ_PORT:
4869       network_port = self.cfg.AllocatePort()
4870     else:
4871       network_port = None
4872
4873     ##if self.op.vnc_bind_address is None:
4874     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4875
4876     # this is needed because os.path.join does not accept None arguments
4877     if self.op.file_storage_dir is None:
4878       string_file_storage_dir = ""
4879     else:
4880       string_file_storage_dir = self.op.file_storage_dir
4881
4882     # build the full file storage dir path
4883     file_storage_dir = os.path.normpath(os.path.join(
4884                                         self.cfg.GetFileStorageDir(),
4885                                         string_file_storage_dir, instance))
4886
4887
4888     disks = _GenerateDiskTemplate(self,
4889                                   self.op.disk_template,
4890                                   instance, pnode_name,
4891                                   self.secondaries,
4892                                   self.disks,
4893                                   file_storage_dir,
4894                                   self.op.file_driver,
4895                                   0)
4896
4897     iobj = objects.Instance(name=instance, os=self.op.os_type,
4898                             primary_node=pnode_name,
4899                             nics=self.nics, disks=disks,
4900                             disk_template=self.op.disk_template,
4901                             admin_up=False,
4902                             network_port=network_port,
4903                             beparams=self.op.beparams,
4904                             hvparams=self.op.hvparams,
4905                             hypervisor=self.op.hypervisor,
4906                             )
4907
4908     feedback_fn("* creating instance disks...")
4909     try:
4910       _CreateDisks(self, iobj)
4911     except errors.OpExecError:
4912       self.LogWarning("Device creation failed, reverting...")
4913       try:
4914         _RemoveDisks(self, iobj)
4915       finally:
4916         self.cfg.ReleaseDRBDMinors(instance)
4917         raise
4918
4919     feedback_fn("adding instance %s to cluster config" % instance)
4920
4921     self.cfg.AddInstance(iobj)
4922     # Declare that we don't want to remove the instance lock anymore, as we've
4923     # added the instance to the config
4924     del self.remove_locks[locking.LEVEL_INSTANCE]
4925     # Unlock all the nodes
4926     if self.op.mode == constants.INSTANCE_IMPORT:
4927       nodes_keep = [self.op.src_node]
4928       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4929                        if node != self.op.src_node]
4930       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4931       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4932     else:
4933       self.context.glm.release(locking.LEVEL_NODE)
4934       del self.acquired_locks[locking.LEVEL_NODE]
4935
4936     if self.op.wait_for_sync:
4937       disk_abort = not _WaitForSync(self, iobj)
4938     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4939       # make sure the disks are not degraded (still sync-ing is ok)
4940       time.sleep(15)
4941       feedback_fn("* checking mirrors status")
4942       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4943     else:
4944       disk_abort = False
4945
4946     if disk_abort:
4947       _RemoveDisks(self, iobj)
4948       self.cfg.RemoveInstance(iobj.name)
4949       # Make sure the instance lock gets removed
4950       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4951       raise errors.OpExecError("There are some degraded disks for"
4952                                " this instance")
4953
4954     feedback_fn("creating os for instance %s on node %s" %
4955                 (instance, pnode_name))
4956
4957     if iobj.disk_template != constants.DT_DISKLESS:
4958       if self.op.mode == constants.INSTANCE_CREATE:
4959         feedback_fn("* running the instance OS create scripts...")
4960         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4961         msg = result.RemoteFailMsg()
4962         if msg:
4963           raise errors.OpExecError("Could not add os for instance %s"
4964                                    " on node %s: %s" %
4965                                    (instance, pnode_name, msg))
4966
4967       elif self.op.mode == constants.INSTANCE_IMPORT:
4968         feedback_fn("* running the instance OS import scripts...")
4969         src_node = self.op.src_node
4970         src_images = self.src_images
4971         cluster_name = self.cfg.GetClusterName()
4972         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4973                                                          src_node, src_images,
4974                                                          cluster_name)
4975         msg = import_result.RemoteFailMsg()
4976         if msg:
4977           self.LogWarning("Error while importing the disk images for instance"
4978                           " %s on node %s: %s" % (instance, pnode_name, msg))
4979       else:
4980         # also checked in the prereq part
4981         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4982                                      % self.op.mode)
4983
4984     if self.op.start:
4985       iobj.admin_up = True
4986       self.cfg.Update(iobj)
4987       logging.info("Starting instance %s on node %s", instance, pnode_name)
4988       feedback_fn("* starting instance...")
4989       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4990       msg = result.RemoteFailMsg()
4991       if msg:
4992         raise errors.OpExecError("Could not start instance: %s" % msg)
4993
4994
4995 class LUConnectConsole(NoHooksLU):
4996   """Connect to an instance's console.
4997
4998   This is somewhat special in that it returns the command line that
4999   you need to run on the master node in order to connect to the
5000   console.
5001
5002   """
5003   _OP_REQP = ["instance_name"]
5004   REQ_BGL = False
5005
5006   def ExpandNames(self):
5007     self._ExpandAndLockInstance()
5008
5009   def CheckPrereq(self):
5010     """Check prerequisites.
5011
5012     This checks that the instance is in the cluster.
5013
5014     """
5015     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5016     assert self.instance is not None, \
5017       "Cannot retrieve locked instance %s" % self.op.instance_name
5018     _CheckNodeOnline(self, self.instance.primary_node)
5019
5020   def Exec(self, feedback_fn):
5021     """Connect to the console of an instance
5022
5023     """
5024     instance = self.instance
5025     node = instance.primary_node
5026
5027     node_insts = self.rpc.call_instance_list([node],
5028                                              [instance.hypervisor])[node]
5029     msg = node_insts.RemoteFailMsg()
5030     if msg:
5031       raise errors.OpExecError("Can't get node information from %s: %s" %
5032                                (node, msg))
5033
5034     if instance.name not in node_insts.payload:
5035       raise errors.OpExecError("Instance %s is not running." % instance.name)
5036
5037     logging.debug("Connecting to console of %s on %s", instance.name, node)
5038
5039     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5040     cluster = self.cfg.GetClusterInfo()
5041     # beparams and hvparams are passed separately, to avoid editing the
5042     # instance and then saving the defaults in the instance itself.
5043     hvparams = cluster.FillHV(instance)
5044     beparams = cluster.FillBE(instance)
5045     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5046
5047     # build ssh cmdline
5048     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5049
5050
5051 class LUReplaceDisks(LogicalUnit):
5052   """Replace the disks of an instance.
5053
5054   """
5055   HPATH = "mirrors-replace"
5056   HTYPE = constants.HTYPE_INSTANCE
5057   _OP_REQP = ["instance_name", "mode", "disks"]
5058   REQ_BGL = False
5059
5060   def CheckArguments(self):
5061     if not hasattr(self.op, "remote_node"):
5062       self.op.remote_node = None
5063     if not hasattr(self.op, "iallocator"):
5064       self.op.iallocator = None
5065
5066     # check for valid parameter combination
5067     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5068     if self.op.mode == constants.REPLACE_DISK_CHG:
5069       if cnt == 2:
5070         raise errors.OpPrereqError("When changing the secondary either an"
5071                                    " iallocator script must be used or the"
5072                                    " new node given")
5073       elif cnt == 0:
5074         raise errors.OpPrereqError("Give either the iallocator or the new"
5075                                    " secondary, not both")
5076     else: # not replacing the secondary
5077       if cnt != 2:
5078         raise errors.OpPrereqError("The iallocator and new node options can"
5079                                    " be used only when changing the"
5080                                    " secondary node")
5081
5082   def ExpandNames(self):
5083     self._ExpandAndLockInstance()
5084
5085     if self.op.iallocator is not None:
5086       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5087     elif self.op.remote_node is not None:
5088       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5089       if remote_node is None:
5090         raise errors.OpPrereqError("Node '%s' not known" %
5091                                    self.op.remote_node)
5092       self.op.remote_node = remote_node
5093       # Warning: do not remove the locking of the new secondary here
5094       # unless DRBD8.AddChildren is changed to work in parallel;
5095       # currently it doesn't since parallel invocations of
5096       # FindUnusedMinor will conflict
5097       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5098       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5099     else:
5100       self.needed_locks[locking.LEVEL_NODE] = []
5101       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5102
5103   def DeclareLocks(self, level):
5104     # If we're not already locking all nodes in the set we have to declare the
5105     # instance's primary/secondary nodes.
5106     if (level == locking.LEVEL_NODE and
5107         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5108       self._LockInstancesNodes()
5109
5110   def _RunAllocator(self):
5111     """Compute a new secondary node using an IAllocator.
5112
5113     """
5114     ial = IAllocator(self,
5115                      mode=constants.IALLOCATOR_MODE_RELOC,
5116                      name=self.op.instance_name,
5117                      relocate_from=[self.sec_node])
5118
5119     ial.Run(self.op.iallocator)
5120
5121     if not ial.success:
5122       raise errors.OpPrereqError("Can't compute nodes using"
5123                                  " iallocator '%s': %s" % (self.op.iallocator,
5124                                                            ial.info))
5125     if len(ial.nodes) != ial.required_nodes:
5126       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5127                                  " of nodes (%s), required %s" %
5128                                  (len(ial.nodes), ial.required_nodes))
5129     self.op.remote_node = ial.nodes[0]
5130     self.LogInfo("Selected new secondary for the instance: %s",
5131                  self.op.remote_node)
5132
5133   def BuildHooksEnv(self):
5134     """Build hooks env.
5135
5136     This runs on the master, the primary and all the secondaries.
5137
5138     """
5139     env = {
5140       "MODE": self.op.mode,
5141       "NEW_SECONDARY": self.op.remote_node,
5142       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5143       }
5144     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5145     nl = [
5146       self.cfg.GetMasterNode(),
5147       self.instance.primary_node,
5148       ]
5149     if self.op.remote_node is not None:
5150       nl.append(self.op.remote_node)
5151     return env, nl, nl
5152
5153   def CheckPrereq(self):
5154     """Check prerequisites.
5155
5156     This checks that the instance is in the cluster.
5157
5158     """
5159     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5160     assert instance is not None, \
5161       "Cannot retrieve locked instance %s" % self.op.instance_name
5162     self.instance = instance
5163
5164     if instance.disk_template != constants.DT_DRBD8:
5165       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5166                                  " instances")
5167
5168     if len(instance.secondary_nodes) != 1:
5169       raise errors.OpPrereqError("The instance has a strange layout,"
5170                                  " expected one secondary but found %d" %
5171                                  len(instance.secondary_nodes))
5172
5173     self.sec_node = instance.secondary_nodes[0]
5174
5175     if self.op.iallocator is not None:
5176       self._RunAllocator()
5177
5178     remote_node = self.op.remote_node
5179     if remote_node is not None:
5180       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5181       assert self.remote_node_info is not None, \
5182         "Cannot retrieve locked node %s" % remote_node
5183     else:
5184       self.remote_node_info = None
5185     if remote_node == instance.primary_node:
5186       raise errors.OpPrereqError("The specified node is the primary node of"
5187                                  " the instance.")
5188     elif remote_node == self.sec_node:
5189       raise errors.OpPrereqError("The specified node is already the"
5190                                  " secondary node of the instance.")
5191
5192     if self.op.mode == constants.REPLACE_DISK_PRI:
5193       n1 = self.tgt_node = instance.primary_node
5194       n2 = self.oth_node = self.sec_node
5195     elif self.op.mode == constants.REPLACE_DISK_SEC:
5196       n1 = self.tgt_node = self.sec_node
5197       n2 = self.oth_node = instance.primary_node
5198     elif self.op.mode == constants.REPLACE_DISK_CHG:
5199       n1 = self.new_node = remote_node
5200       n2 = self.oth_node = instance.primary_node
5201       self.tgt_node = self.sec_node
5202       _CheckNodeNotDrained(self, remote_node)
5203     else:
5204       raise errors.ProgrammerError("Unhandled disk replace mode")
5205
5206     _CheckNodeOnline(self, n1)
5207     _CheckNodeOnline(self, n2)
5208
5209     if not self.op.disks:
5210       self.op.disks = range(len(instance.disks))
5211
5212     for disk_idx in self.op.disks:
5213       instance.FindDisk(disk_idx)
5214
5215   def _ExecD8DiskOnly(self, feedback_fn):
5216     """Replace a disk on the primary or secondary for dbrd8.
5217
5218     The algorithm for replace is quite complicated:
5219
5220       1. for each disk to be replaced:
5221
5222         1. create new LVs on the target node with unique names
5223         1. detach old LVs from the drbd device
5224         1. rename old LVs to name_replaced.<time_t>
5225         1. rename new LVs to old LVs
5226         1. attach the new LVs (with the old names now) to the drbd device
5227
5228       1. wait for sync across all devices
5229
5230       1. for each modified disk:
5231
5232         1. remove old LVs (which have the name name_replaces.<time_t>)
5233
5234     Failures are not very well handled.
5235
5236     """
5237     steps_total = 6
5238     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5239     instance = self.instance
5240     iv_names = {}
5241     vgname = self.cfg.GetVGName()
5242     # start of work
5243     cfg = self.cfg
5244     tgt_node = self.tgt_node
5245     oth_node = self.oth_node
5246
5247     # Step: check device activation
5248     self.proc.LogStep(1, steps_total, "check device existence")
5249     info("checking volume groups")
5250     my_vg = cfg.GetVGName()
5251     results = self.rpc.call_vg_list([oth_node, tgt_node])
5252     if not results:
5253       raise errors.OpExecError("Can't list volume groups on the nodes")
5254     for node in oth_node, tgt_node:
5255       res = results[node]
5256       msg = res.RemoteFailMsg()
5257       if msg:
5258         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5259       if my_vg not in res.payload:
5260         raise errors.OpExecError("Volume group '%s' not found on %s" %
5261                                  (my_vg, node))
5262     for idx, dev in enumerate(instance.disks):
5263       if idx not in self.op.disks:
5264         continue
5265       for node in tgt_node, oth_node:
5266         info("checking disk/%d on %s" % (idx, node))
5267         cfg.SetDiskID(dev, node)
5268         result = self.rpc.call_blockdev_find(node, dev)
5269         msg = result.RemoteFailMsg()
5270         if not msg and not result.payload:
5271           msg = "disk not found"
5272         if msg:
5273           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5274                                    (idx, node, msg))
5275
5276     # Step: check other node consistency
5277     self.proc.LogStep(2, steps_total, "check peer consistency")
5278     for idx, dev in enumerate(instance.disks):
5279       if idx not in self.op.disks:
5280         continue
5281       info("checking disk/%d consistency on %s" % (idx, oth_node))
5282       if not _CheckDiskConsistency(self, dev, oth_node,
5283                                    oth_node==instance.primary_node):
5284         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5285                                  " to replace disks on this node (%s)" %
5286                                  (oth_node, tgt_node))
5287
5288     # Step: create new storage
5289     self.proc.LogStep(3, steps_total, "allocate new storage")
5290     for idx, dev in enumerate(instance.disks):
5291       if idx not in self.op.disks:
5292         continue
5293       size = dev.size
5294       cfg.SetDiskID(dev, tgt_node)
5295       lv_names = [".disk%d_%s" % (idx, suf)
5296                   for suf in ["data", "meta"]]
5297       names = _GenerateUniqueNames(self, lv_names)
5298       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5299                              logical_id=(vgname, names[0]))
5300       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5301                              logical_id=(vgname, names[1]))
5302       new_lvs = [lv_data, lv_meta]
5303       old_lvs = dev.children
5304       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5305       info("creating new local storage on %s for %s" %
5306            (tgt_node, dev.iv_name))
5307       # we pass force_create=True to force the LVM creation
5308       for new_lv in new_lvs:
5309         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5310                         _GetInstanceInfoText(instance), False)
5311
5312     # Step: for each lv, detach+rename*2+attach
5313     self.proc.LogStep(4, steps_total, "change drbd configuration")
5314     for dev, old_lvs, new_lvs in iv_names.itervalues():
5315       info("detaching %s drbd from local storage" % dev.iv_name)
5316       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5317       msg = result.RemoteFailMsg()
5318       if msg:
5319         raise errors.OpExecError("Can't detach drbd from local storage on node"
5320                                  " %s for device %s: %s" %
5321                                  (tgt_node, dev.iv_name, msg))
5322       #dev.children = []
5323       #cfg.Update(instance)
5324
5325       # ok, we created the new LVs, so now we know we have the needed
5326       # storage; as such, we proceed on the target node to rename
5327       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5328       # using the assumption that logical_id == physical_id (which in
5329       # turn is the unique_id on that node)
5330
5331       # FIXME(iustin): use a better name for the replaced LVs
5332       temp_suffix = int(time.time())
5333       ren_fn = lambda d, suff: (d.physical_id[0],
5334                                 d.physical_id[1] + "_replaced-%s" % suff)
5335       # build the rename list based on what LVs exist on the node
5336       rlist = []
5337       for to_ren in old_lvs:
5338         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5339         if not result.RemoteFailMsg() and result.payload:
5340           # device exists
5341           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5342
5343       info("renaming the old LVs on the target node")
5344       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5345       msg = result.RemoteFailMsg()
5346       if msg:
5347         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5348                                  (tgt_node, msg))
5349       # now we rename the new LVs to the old LVs
5350       info("renaming the new LVs on the target node")
5351       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5352       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5353       msg = result.RemoteFailMsg()
5354       if msg:
5355         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5356                                  (tgt_node, msg))
5357
5358       for old, new in zip(old_lvs, new_lvs):
5359         new.logical_id = old.logical_id
5360         cfg.SetDiskID(new, tgt_node)
5361
5362       for disk in old_lvs:
5363         disk.logical_id = ren_fn(disk, temp_suffix)
5364         cfg.SetDiskID(disk, tgt_node)
5365
5366       # now that the new lvs have the old name, we can add them to the device
5367       info("adding new mirror component on %s" % tgt_node)
5368       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5369       msg = result.RemoteFailMsg()
5370       if msg:
5371         for new_lv in new_lvs:
5372           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5373           if msg:
5374             warning("Can't rollback device %s: %s", dev, msg,
5375                     hint="cleanup manually the unused logical volumes")
5376         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5377
5378       dev.children = new_lvs
5379       cfg.Update(instance)
5380
5381     # Step: wait for sync
5382
5383     # this can fail as the old devices are degraded and _WaitForSync
5384     # does a combined result over all disks, so we don't check its
5385     # return value
5386     self.proc.LogStep(5, steps_total, "sync devices")
5387     _WaitForSync(self, instance, unlock=True)
5388
5389     # so check manually all the devices
5390     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5391       cfg.SetDiskID(dev, instance.primary_node)
5392       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5393       msg = result.RemoteFailMsg()
5394       if not msg and not result.payload:
5395         msg = "disk not found"
5396       if msg:
5397         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5398                                  (name, msg))
5399       if result.payload[5]:
5400         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5401
5402     # Step: remove old storage
5403     self.proc.LogStep(6, steps_total, "removing old storage")
5404     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5405       info("remove logical volumes for %s" % name)
5406       for lv in old_lvs:
5407         cfg.SetDiskID(lv, tgt_node)
5408         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5409         if msg:
5410           warning("Can't remove old LV: %s" % msg,
5411                   hint="manually remove unused LVs")
5412           continue
5413
5414   def _ExecD8Secondary(self, feedback_fn):
5415     """Replace the secondary node for drbd8.
5416
5417     The algorithm for replace is quite complicated:
5418       - for all disks of the instance:
5419         - create new LVs on the new node with same names
5420         - shutdown the drbd device on the old secondary
5421         - disconnect the drbd network on the primary
5422         - create the drbd device on the new secondary
5423         - network attach the drbd on the primary, using an artifice:
5424           the drbd code for Attach() will connect to the network if it
5425           finds a device which is connected to the good local disks but
5426           not network enabled
5427       - wait for sync across all devices
5428       - remove all disks from the old secondary
5429
5430     Failures are not very well handled.
5431
5432     """
5433     steps_total = 6
5434     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5435     instance = self.instance
5436     iv_names = {}
5437     # start of work
5438     cfg = self.cfg
5439     old_node = self.tgt_node
5440     new_node = self.new_node
5441     pri_node = instance.primary_node
5442     nodes_ip = {
5443       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5444       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5445       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5446       }
5447
5448     # Step: check device activation
5449     self.proc.LogStep(1, steps_total, "check device existence")
5450     info("checking volume groups")
5451     my_vg = cfg.GetVGName()
5452     results = self.rpc.call_vg_list([pri_node, new_node])
5453     for node in pri_node, new_node:
5454       res = results[node]
5455       msg = res.RemoteFailMsg()
5456       if msg:
5457         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5458       if my_vg not in res.payload:
5459         raise errors.OpExecError("Volume group '%s' not found on %s" %
5460                                  (my_vg, node))
5461     for idx, dev in enumerate(instance.disks):
5462       if idx not in self.op.disks:
5463         continue
5464       info("checking disk/%d on %s" % (idx, pri_node))
5465       cfg.SetDiskID(dev, pri_node)
5466       result = self.rpc.call_blockdev_find(pri_node, dev)
5467       msg = result.RemoteFailMsg()
5468       if not msg and not result.payload:
5469         msg = "disk not found"
5470       if msg:
5471         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5472                                  (idx, pri_node, msg))
5473
5474     # Step: check other node consistency
5475     self.proc.LogStep(2, steps_total, "check peer consistency")
5476     for idx, dev in enumerate(instance.disks):
5477       if idx not in self.op.disks:
5478         continue
5479       info("checking disk/%d consistency on %s" % (idx, pri_node))
5480       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5481         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5482                                  " unsafe to replace the secondary" %
5483                                  pri_node)
5484
5485     # Step: create new storage
5486     self.proc.LogStep(3, steps_total, "allocate new storage")
5487     for idx, dev in enumerate(instance.disks):
5488       info("adding new local storage on %s for disk/%d" %
5489            (new_node, idx))
5490       # we pass force_create=True to force LVM creation
5491       for new_lv in dev.children:
5492         _CreateBlockDev(self, new_node, instance, new_lv, True,
5493                         _GetInstanceInfoText(instance), False)
5494
5495     # Step 4: dbrd minors and drbd setups changes
5496     # after this, we must manually remove the drbd minors on both the
5497     # error and the success paths
5498     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5499                                    instance.name)
5500     logging.debug("Allocated minors %s" % (minors,))
5501     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5502     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5503       size = dev.size
5504       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5505       # create new devices on new_node; note that we create two IDs:
5506       # one without port, so the drbd will be activated without
5507       # networking information on the new node at this stage, and one
5508       # with network, for the latter activation in step 4
5509       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5510       if pri_node == o_node1:
5511         p_minor = o_minor1
5512       else:
5513         p_minor = o_minor2
5514
5515       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5516       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5517
5518       iv_names[idx] = (dev, dev.children, new_net_id)
5519       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5520                     new_net_id)
5521       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5522                               logical_id=new_alone_id,
5523                               children=dev.children)
5524       try:
5525         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5526                               _GetInstanceInfoText(instance), False)
5527       except errors.GenericError:
5528         self.cfg.ReleaseDRBDMinors(instance.name)
5529         raise
5530
5531     for idx, dev in enumerate(instance.disks):
5532       # we have new devices, shutdown the drbd on the old secondary
5533       info("shutting down drbd for disk/%d on old node" % idx)
5534       cfg.SetDiskID(dev, old_node)
5535       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5536       if msg:
5537         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5538                 (idx, msg),
5539                 hint="Please cleanup this device manually as soon as possible")
5540
5541     info("detaching primary drbds from the network (=> standalone)")
5542     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5543                                                instance.disks)[pri_node]
5544
5545     msg = result.RemoteFailMsg()
5546     if msg:
5547       # detaches didn't succeed (unlikely)
5548       self.cfg.ReleaseDRBDMinors(instance.name)
5549       raise errors.OpExecError("Can't detach the disks from the network on"
5550                                " old node: %s" % (msg,))
5551
5552     # if we managed to detach at least one, we update all the disks of
5553     # the instance to point to the new secondary
5554     info("updating instance configuration")
5555     for dev, _, new_logical_id in iv_names.itervalues():
5556       dev.logical_id = new_logical_id
5557       cfg.SetDiskID(dev, pri_node)
5558     cfg.Update(instance)
5559
5560     # and now perform the drbd attach
5561     info("attaching primary drbds to new secondary (standalone => connected)")
5562     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5563                                            instance.disks, instance.name,
5564                                            False)
5565     for to_node, to_result in result.items():
5566       msg = to_result.RemoteFailMsg()
5567       if msg:
5568         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5569                 hint="please do a gnt-instance info to see the"
5570                 " status of disks")
5571
5572     # this can fail as the old devices are degraded and _WaitForSync
5573     # does a combined result over all disks, so we don't check its
5574     # return value
5575     self.proc.LogStep(5, steps_total, "sync devices")
5576     _WaitForSync(self, instance, unlock=True)
5577
5578     # so check manually all the devices
5579     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5580       cfg.SetDiskID(dev, pri_node)
5581       result = self.rpc.call_blockdev_find(pri_node, dev)
5582       msg = result.RemoteFailMsg()
5583       if not msg and not result.payload:
5584         msg = "disk not found"
5585       if msg:
5586         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5587                                  (idx, msg))
5588       if result.payload[5]:
5589         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5590
5591     self.proc.LogStep(6, steps_total, "removing old storage")
5592     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5593       info("remove logical volumes for disk/%d" % idx)
5594       for lv in old_lvs:
5595         cfg.SetDiskID(lv, old_node)
5596         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5597         if msg:
5598           warning("Can't remove LV on old secondary: %s", msg,
5599                   hint="Cleanup stale volumes by hand")
5600
5601   def Exec(self, feedback_fn):
5602     """Execute disk replacement.
5603
5604     This dispatches the disk replacement to the appropriate handler.
5605
5606     """
5607     instance = self.instance
5608
5609     # Activate the instance disks if we're replacing them on a down instance
5610     if not instance.admin_up:
5611       _StartInstanceDisks(self, instance, True)
5612
5613     if self.op.mode == constants.REPLACE_DISK_CHG:
5614       fn = self._ExecD8Secondary
5615     else:
5616       fn = self._ExecD8DiskOnly
5617
5618     ret = fn(feedback_fn)
5619
5620     # Deactivate the instance disks if we're replacing them on a down instance
5621     if not instance.admin_up:
5622       _SafeShutdownInstanceDisks(self, instance)
5623
5624     return ret
5625
5626
5627 class LUGrowDisk(LogicalUnit):
5628   """Grow a disk of an instance.
5629
5630   """
5631   HPATH = "disk-grow"
5632   HTYPE = constants.HTYPE_INSTANCE
5633   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5634   REQ_BGL = False
5635
5636   def ExpandNames(self):
5637     self._ExpandAndLockInstance()
5638     self.needed_locks[locking.LEVEL_NODE] = []
5639     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5640
5641   def DeclareLocks(self, level):
5642     if level == locking.LEVEL_NODE:
5643       self._LockInstancesNodes()
5644
5645   def BuildHooksEnv(self):
5646     """Build hooks env.
5647
5648     This runs on the master, the primary and all the secondaries.
5649
5650     """
5651     env = {
5652       "DISK": self.op.disk,
5653       "AMOUNT": self.op.amount,
5654       }
5655     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5656     nl = [
5657       self.cfg.GetMasterNode(),
5658       self.instance.primary_node,
5659       ]
5660     return env, nl, nl
5661
5662   def CheckPrereq(self):
5663     """Check prerequisites.
5664
5665     This checks that the instance is in the cluster.
5666
5667     """
5668     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5669     assert instance is not None, \
5670       "Cannot retrieve locked instance %s" % self.op.instance_name
5671     nodenames = list(instance.all_nodes)
5672     for node in nodenames:
5673       _CheckNodeOnline(self, node)
5674
5675
5676     self.instance = instance
5677
5678     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5679       raise errors.OpPrereqError("Instance's disk layout does not support"
5680                                  " growing.")
5681
5682     self.disk = instance.FindDisk(self.op.disk)
5683
5684     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5685                                        instance.hypervisor)
5686     for node in nodenames:
5687       info = nodeinfo[node]
5688       msg = info.RemoteFailMsg()
5689       if msg:
5690         raise errors.OpPrereqError("Cannot get current information"
5691                                    " from node %s:" % (node, msg))
5692       vg_free = info.payload.get('vg_free', None)
5693       if not isinstance(vg_free, int):
5694         raise errors.OpPrereqError("Can't compute free disk space on"
5695                                    " node %s" % node)
5696       if self.op.amount > vg_free:
5697         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5698                                    " %d MiB available, %d MiB required" %
5699                                    (node, vg_free, self.op.amount))
5700
5701   def Exec(self, feedback_fn):
5702     """Execute disk grow.
5703
5704     """
5705     instance = self.instance
5706     disk = self.disk
5707     for node in instance.all_nodes:
5708       self.cfg.SetDiskID(disk, node)
5709       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5710       msg = result.RemoteFailMsg()
5711       if msg:
5712         raise errors.OpExecError("Grow request failed to node %s: %s" %
5713                                  (node, msg))
5714     disk.RecordGrow(self.op.amount)
5715     self.cfg.Update(instance)
5716     if self.op.wait_for_sync:
5717       disk_abort = not _WaitForSync(self, instance)
5718       if disk_abort:
5719         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5720                              " status.\nPlease check the instance.")
5721
5722
5723 class LUQueryInstanceData(NoHooksLU):
5724   """Query runtime instance data.
5725
5726   """
5727   _OP_REQP = ["instances", "static"]
5728   REQ_BGL = False
5729
5730   def ExpandNames(self):
5731     self.needed_locks = {}
5732     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5733
5734     if not isinstance(self.op.instances, list):
5735       raise errors.OpPrereqError("Invalid argument type 'instances'")
5736
5737     if self.op.instances:
5738       self.wanted_names = []
5739       for name in self.op.instances:
5740         full_name = self.cfg.ExpandInstanceName(name)
5741         if full_name is None:
5742           raise errors.OpPrereqError("Instance '%s' not known" % name)
5743         self.wanted_names.append(full_name)
5744       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5745     else:
5746       self.wanted_names = None
5747       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5748
5749     self.needed_locks[locking.LEVEL_NODE] = []
5750     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5751
5752   def DeclareLocks(self, level):
5753     if level == locking.LEVEL_NODE:
5754       self._LockInstancesNodes()
5755
5756   def CheckPrereq(self):
5757     """Check prerequisites.
5758
5759     This only checks the optional instance list against the existing names.
5760
5761     """
5762     if self.wanted_names is None:
5763       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5764
5765     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5766                              in self.wanted_names]
5767     return
5768
5769   def _ComputeDiskStatus(self, instance, snode, dev):
5770     """Compute block device status.
5771
5772     """
5773     static = self.op.static
5774     if not static:
5775       self.cfg.SetDiskID(dev, instance.primary_node)
5776       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5777       if dev_pstatus.offline:
5778         dev_pstatus = None
5779       else:
5780         msg = dev_pstatus.RemoteFailMsg()
5781         if msg:
5782           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5783                                    (instance.name, msg))
5784         dev_pstatus = dev_pstatus.payload
5785     else:
5786       dev_pstatus = None
5787
5788     if dev.dev_type in constants.LDS_DRBD:
5789       # we change the snode then (otherwise we use the one passed in)
5790       if dev.logical_id[0] == instance.primary_node:
5791         snode = dev.logical_id[1]
5792       else:
5793         snode = dev.logical_id[0]
5794
5795     if snode and not static:
5796       self.cfg.SetDiskID(dev, snode)
5797       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5798       if dev_sstatus.offline:
5799         dev_sstatus = None
5800       else:
5801         msg = dev_sstatus.RemoteFailMsg()
5802         if msg:
5803           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5804                                    (instance.name, msg))
5805         dev_sstatus = dev_sstatus.payload
5806     else:
5807       dev_sstatus = None
5808
5809     if dev.children:
5810       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5811                       for child in dev.children]
5812     else:
5813       dev_children = []
5814
5815     data = {
5816       "iv_name": dev.iv_name,
5817       "dev_type": dev.dev_type,
5818       "logical_id": dev.logical_id,
5819       "physical_id": dev.physical_id,
5820       "pstatus": dev_pstatus,
5821       "sstatus": dev_sstatus,
5822       "children": dev_children,
5823       "mode": dev.mode,
5824       }
5825
5826     return data
5827
5828   def Exec(self, feedback_fn):
5829     """Gather and return data"""
5830     result = {}
5831
5832     cluster = self.cfg.GetClusterInfo()
5833
5834     for instance in self.wanted_instances:
5835       if not self.op.static:
5836         remote_info = self.rpc.call_instance_info(instance.primary_node,
5837                                                   instance.name,
5838                                                   instance.hypervisor)
5839         msg = remote_info.RemoteFailMsg()
5840         if msg:
5841           raise errors.OpExecError("Error checking node %s: %s" %
5842                                    (instance.primary_node, msg))
5843         remote_info = remote_info.payload
5844         if remote_info and "state" in remote_info:
5845           remote_state = "up"
5846         else:
5847           remote_state = "down"
5848       else:
5849         remote_state = None
5850       if instance.admin_up:
5851         config_state = "up"
5852       else:
5853         config_state = "down"
5854
5855       disks = [self._ComputeDiskStatus(instance, None, device)
5856                for device in instance.disks]
5857
5858       idict = {
5859         "name": instance.name,
5860         "config_state": config_state,
5861         "run_state": remote_state,
5862         "pnode": instance.primary_node,
5863         "snodes": instance.secondary_nodes,
5864         "os": instance.os,
5865         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5866         "disks": disks,
5867         "hypervisor": instance.hypervisor,
5868         "network_port": instance.network_port,
5869         "hv_instance": instance.hvparams,
5870         "hv_actual": cluster.FillHV(instance),
5871         "be_instance": instance.beparams,
5872         "be_actual": cluster.FillBE(instance),
5873         }
5874
5875       result[instance.name] = idict
5876
5877     return result
5878
5879
5880 class LUSetInstanceParams(LogicalUnit):
5881   """Modifies an instances's parameters.
5882
5883   """
5884   HPATH = "instance-modify"
5885   HTYPE = constants.HTYPE_INSTANCE
5886   _OP_REQP = ["instance_name"]
5887   REQ_BGL = False
5888
5889   def CheckArguments(self):
5890     if not hasattr(self.op, 'nics'):
5891       self.op.nics = []
5892     if not hasattr(self.op, 'disks'):
5893       self.op.disks = []
5894     if not hasattr(self.op, 'beparams'):
5895       self.op.beparams = {}
5896     if not hasattr(self.op, 'hvparams'):
5897       self.op.hvparams = {}
5898     self.op.force = getattr(self.op, "force", False)
5899     if not (self.op.nics or self.op.disks or
5900             self.op.hvparams or self.op.beparams):
5901       raise errors.OpPrereqError("No changes submitted")
5902
5903     # Disk validation
5904     disk_addremove = 0
5905     for disk_op, disk_dict in self.op.disks:
5906       if disk_op == constants.DDM_REMOVE:
5907         disk_addremove += 1
5908         continue
5909       elif disk_op == constants.DDM_ADD:
5910         disk_addremove += 1
5911       else:
5912         if not isinstance(disk_op, int):
5913           raise errors.OpPrereqError("Invalid disk index")
5914       if disk_op == constants.DDM_ADD:
5915         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5916         if mode not in constants.DISK_ACCESS_SET:
5917           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5918         size = disk_dict.get('size', None)
5919         if size is None:
5920           raise errors.OpPrereqError("Required disk parameter size missing")
5921         try:
5922           size = int(size)
5923         except ValueError, err:
5924           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5925                                      str(err))
5926         disk_dict['size'] = size
5927       else:
5928         # modification of disk
5929         if 'size' in disk_dict:
5930           raise errors.OpPrereqError("Disk size change not possible, use"
5931                                      " grow-disk")
5932
5933     if disk_addremove > 1:
5934       raise errors.OpPrereqError("Only one disk add or remove operation"
5935                                  " supported at a time")
5936
5937     # NIC validation
5938     nic_addremove = 0
5939     for nic_op, nic_dict in self.op.nics:
5940       if nic_op == constants.DDM_REMOVE:
5941         nic_addremove += 1
5942         continue
5943       elif nic_op == constants.DDM_ADD:
5944         nic_addremove += 1
5945       else:
5946         if not isinstance(nic_op, int):
5947           raise errors.OpPrereqError("Invalid nic index")
5948
5949       # nic_dict should be a dict
5950       nic_ip = nic_dict.get('ip', None)
5951       if nic_ip is not None:
5952         if nic_ip.lower() == constants.VALUE_NONE:
5953           nic_dict['ip'] = None
5954         else:
5955           if not utils.IsValidIP(nic_ip):
5956             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5957
5958       nic_bridge = nic_dict.get('bridge', None)
5959       nic_link = nic_dict.get('link', None)
5960       if nic_bridge and nic_link:
5961         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5962       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5963         nic_dict['bridge'] = None
5964       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5965         nic_dict['link'] = None
5966
5967       if nic_op == constants.DDM_ADD:
5968         nic_mac = nic_dict.get('mac', None)
5969         if nic_mac is None:
5970           nic_dict['mac'] = constants.VALUE_AUTO
5971
5972       if 'mac' in nic_dict:
5973         nic_mac = nic_dict['mac']
5974         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5975           if not utils.IsValidMac(nic_mac):
5976             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5977         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5978           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5979                                      " modifying an existing nic")
5980
5981     if nic_addremove > 1:
5982       raise errors.OpPrereqError("Only one NIC add or remove operation"
5983                                  " supported at a time")
5984
5985   def ExpandNames(self):
5986     self._ExpandAndLockInstance()
5987     self.needed_locks[locking.LEVEL_NODE] = []
5988     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5989
5990   def DeclareLocks(self, level):
5991     if level == locking.LEVEL_NODE:
5992       self._LockInstancesNodes()
5993
5994   def BuildHooksEnv(self):
5995     """Build hooks env.
5996
5997     This runs on the master, primary and secondaries.
5998
5999     """
6000     args = dict()
6001     if constants.BE_MEMORY in self.be_new:
6002       args['memory'] = self.be_new[constants.BE_MEMORY]
6003     if constants.BE_VCPUS in self.be_new:
6004       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6005     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6006     # information at all.
6007     if self.op.nics:
6008       args['nics'] = []
6009       nic_override = dict(self.op.nics)
6010       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6011       for idx, nic in enumerate(self.instance.nics):
6012         if idx in nic_override:
6013           this_nic_override = nic_override[idx]
6014         else:
6015           this_nic_override = {}
6016         if 'ip' in this_nic_override:
6017           ip = this_nic_override['ip']
6018         else:
6019           ip = nic.ip
6020         if 'mac' in this_nic_override:
6021           mac = this_nic_override['mac']
6022         else:
6023           mac = nic.mac
6024         if idx in self.nic_pnew:
6025           nicparams = self.nic_pnew[idx]
6026         else:
6027           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6028         mode = nicparams[constants.NIC_MODE]
6029         link = nicparams[constants.NIC_LINK]
6030         args['nics'].append((ip, mac, mode, link))
6031       if constants.DDM_ADD in nic_override:
6032         ip = nic_override[constants.DDM_ADD].get('ip', None)
6033         mac = nic_override[constants.DDM_ADD]['mac']
6034         nicparams = self.nic_pnew[constants.DDM_ADD]
6035         mode = nicparams[constants.NIC_MODE]
6036         link = nicparams[constants.NIC_LINK]
6037         args['nics'].append((ip, mac, mode, link))
6038       elif constants.DDM_REMOVE in nic_override:
6039         del args['nics'][-1]
6040
6041     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6042     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6043     return env, nl, nl
6044
6045   def _GetUpdatedParams(self, old_params, update_dict,
6046                         default_values, parameter_types):
6047     """Return the new params dict for the given params.
6048
6049     @type old_params: dict
6050     @type old_params: old parameters
6051     @type update_dict: dict
6052     @type update_dict: dict containing new parameter values,
6053                        or constants.VALUE_DEFAULT to reset the
6054                        parameter to its default value
6055     @type default_values: dict
6056     @param default_values: default values for the filled parameters
6057     @type parameter_types: dict
6058     @param parameter_types: dict mapping target dict keys to types
6059                             in constants.ENFORCEABLE_TYPES
6060     @rtype: (dict, dict)
6061     @return: (new_parameters, filled_parameters)
6062
6063     """
6064     params_copy = copy.deepcopy(old_params)
6065     for key, val in update_dict.iteritems():
6066       if val == constants.VALUE_DEFAULT:
6067         try:
6068           del params_copy[key]
6069         except KeyError:
6070           pass
6071       else:
6072         params_copy[key] = val
6073     utils.ForceDictType(params_copy, parameter_types)
6074     params_filled = objects.FillDict(default_values, params_copy)
6075     return (params_copy, params_filled)
6076
6077   def CheckPrereq(self):
6078     """Check prerequisites.
6079
6080     This only checks the instance list against the existing names.
6081
6082     """
6083     force = self.force = self.op.force
6084
6085     # checking the new params on the primary/secondary nodes
6086
6087     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6088     cluster = self.cluster = self.cfg.GetClusterInfo()
6089     assert self.instance is not None, \
6090       "Cannot retrieve locked instance %s" % self.op.instance_name
6091     pnode = instance.primary_node
6092     nodelist = list(instance.all_nodes)
6093
6094     # hvparams processing
6095     if self.op.hvparams:
6096       i_hvdict, hv_new = self._GetUpdatedParams(
6097                              instance.hvparams, self.op.hvparams,
6098                              cluster.hvparams[instance.hypervisor],
6099                              constants.HVS_PARAMETER_TYPES)
6100       # local check
6101       hypervisor.GetHypervisor(
6102         instance.hypervisor).CheckParameterSyntax(hv_new)
6103       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6104       self.hv_new = hv_new # the new actual values
6105       self.hv_inst = i_hvdict # the new dict (without defaults)
6106     else:
6107       self.hv_new = self.hv_inst = {}
6108
6109     # beparams processing
6110     if self.op.beparams:
6111       i_bedict, be_new = self._GetUpdatedParams(
6112                              instance.beparams, self.op.beparams,
6113                              cluster.beparams[constants.PP_DEFAULT],
6114                              constants.BES_PARAMETER_TYPES)
6115       self.be_new = be_new # the new actual values
6116       self.be_inst = i_bedict # the new dict (without defaults)
6117     else:
6118       self.be_new = self.be_inst = {}
6119
6120     self.warn = []
6121
6122     if constants.BE_MEMORY in self.op.beparams and not self.force:
6123       mem_check_list = [pnode]
6124       if be_new[constants.BE_AUTO_BALANCE]:
6125         # either we changed auto_balance to yes or it was from before
6126         mem_check_list.extend(instance.secondary_nodes)
6127       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6128                                                   instance.hypervisor)
6129       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6130                                          instance.hypervisor)
6131       pninfo = nodeinfo[pnode]
6132       msg = pninfo.RemoteFailMsg()
6133       if msg:
6134         # Assume the primary node is unreachable and go ahead
6135         self.warn.append("Can't get info from primary node %s: %s" %
6136                          (pnode,  msg))
6137       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6138         self.warn.append("Node data from primary node %s doesn't contain"
6139                          " free memory information" % pnode)
6140       elif instance_info.RemoteFailMsg():
6141         self.warn.append("Can't get instance runtime information: %s" %
6142                         instance_info.RemoteFailMsg())
6143       else:
6144         if instance_info.payload:
6145           current_mem = int(instance_info.payload['memory'])
6146         else:
6147           # Assume instance not running
6148           # (there is a slight race condition here, but it's not very probable,
6149           # and we have no other way to check)
6150           current_mem = 0
6151         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6152                     pninfo.payload['memory_free'])
6153         if miss_mem > 0:
6154           raise errors.OpPrereqError("This change will prevent the instance"
6155                                      " from starting, due to %d MB of memory"
6156                                      " missing on its primary node" % miss_mem)
6157
6158       if be_new[constants.BE_AUTO_BALANCE]:
6159         for node, nres in nodeinfo.items():
6160           if node not in instance.secondary_nodes:
6161             continue
6162           msg = nres.RemoteFailMsg()
6163           if msg:
6164             self.warn.append("Can't get info from secondary node %s: %s" %
6165                              (node, msg))
6166           elif not isinstance(nres.payload.get('memory_free', None), int):
6167             self.warn.append("Secondary node %s didn't return free"
6168                              " memory information" % node)
6169           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6170             self.warn.append("Not enough memory to failover instance to"
6171                              " secondary node %s" % node)
6172
6173     # NIC processing
6174     self.nic_pnew = {}
6175     self.nic_pinst = {}
6176     for nic_op, nic_dict in self.op.nics:
6177       if nic_op == constants.DDM_REMOVE:
6178         if not instance.nics:
6179           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6180         continue
6181       if nic_op != constants.DDM_ADD:
6182         # an existing nic
6183         if nic_op < 0 or nic_op >= len(instance.nics):
6184           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6185                                      " are 0 to %d" %
6186                                      (nic_op, len(instance.nics)))
6187         old_nic_params = instance.nics[nic_op].nicparams
6188         old_nic_ip = instance.nics[nic_op].ip
6189       else:
6190         old_nic_params = {}
6191         old_nic_ip = None
6192
6193       update_params_dict = dict([(key, nic_dict[key])
6194                                  for key in constants.NICS_PARAMETERS
6195                                  if key in nic_dict])
6196
6197       if 'bridge' in nic_dict:
6198         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6199
6200       new_nic_params, new_filled_nic_params = \
6201           self._GetUpdatedParams(old_nic_params, update_params_dict,
6202                                  cluster.nicparams[constants.PP_DEFAULT],
6203                                  constants.NICS_PARAMETER_TYPES)
6204       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6205       self.nic_pinst[nic_op] = new_nic_params
6206       self.nic_pnew[nic_op] = new_filled_nic_params
6207       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6208
6209       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6210         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6211         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6212         msg = result.RemoteFailMsg()
6213         if msg:
6214           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6215           if self.force:
6216             self.warn.append(msg)
6217           else:
6218             raise errors.OpPrereqError(msg)
6219       if new_nic_mode == constants.NIC_MODE_ROUTED:
6220         if 'ip' in nic_dict:
6221           nic_ip = nic_dict['ip']
6222         else:
6223           nic_ip = old_nic_ip
6224         if nic_ip is None:
6225           raise errors.OpPrereqError('Cannot set the nic ip to None'
6226                                      ' on a routed nic')
6227       if 'mac' in nic_dict:
6228         nic_mac = nic_dict['mac']
6229         if nic_mac is None:
6230           raise errors.OpPrereqError('Cannot set the nic mac to None')
6231         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6232           # otherwise generate the mac
6233           nic_dict['mac'] = self.cfg.GenerateMAC()
6234         else:
6235           # or validate/reserve the current one
6236           if self.cfg.IsMacInUse(nic_mac):
6237             raise errors.OpPrereqError("MAC address %s already in use"
6238                                        " in cluster" % nic_mac)
6239
6240     # DISK processing
6241     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6242       raise errors.OpPrereqError("Disk operations not supported for"
6243                                  " diskless instances")
6244     for disk_op, disk_dict in self.op.disks:
6245       if disk_op == constants.DDM_REMOVE:
6246         if len(instance.disks) == 1:
6247           raise errors.OpPrereqError("Cannot remove the last disk of"
6248                                      " an instance")
6249         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6250         ins_l = ins_l[pnode]
6251         msg = ins_l.RemoteFailMsg()
6252         if msg:
6253           raise errors.OpPrereqError("Can't contact node %s: %s" %
6254                                      (pnode, msg))
6255         if instance.name in ins_l.payload:
6256           raise errors.OpPrereqError("Instance is running, can't remove"
6257                                      " disks.")
6258
6259       if (disk_op == constants.DDM_ADD and
6260           len(instance.nics) >= constants.MAX_DISKS):
6261         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6262                                    " add more" % constants.MAX_DISKS)
6263       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6264         # an existing disk
6265         if disk_op < 0 or disk_op >= len(instance.disks):
6266           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6267                                      " are 0 to %d" %
6268                                      (disk_op, len(instance.disks)))
6269
6270     return
6271
6272   def Exec(self, feedback_fn):
6273     """Modifies an instance.
6274
6275     All parameters take effect only at the next restart of the instance.
6276
6277     """
6278     # Process here the warnings from CheckPrereq, as we don't have a
6279     # feedback_fn there.
6280     for warn in self.warn:
6281       feedback_fn("WARNING: %s" % warn)
6282
6283     result = []
6284     instance = self.instance
6285     cluster = self.cluster
6286     # disk changes
6287     for disk_op, disk_dict in self.op.disks:
6288       if disk_op == constants.DDM_REMOVE:
6289         # remove the last disk
6290         device = instance.disks.pop()
6291         device_idx = len(instance.disks)
6292         for node, disk in device.ComputeNodeTree(instance.primary_node):
6293           self.cfg.SetDiskID(disk, node)
6294           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6295           if msg:
6296             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6297                             " continuing anyway", device_idx, node, msg)
6298         result.append(("disk/%d" % device_idx, "remove"))
6299       elif disk_op == constants.DDM_ADD:
6300         # add a new disk
6301         if instance.disk_template == constants.DT_FILE:
6302           file_driver, file_path = instance.disks[0].logical_id
6303           file_path = os.path.dirname(file_path)
6304         else:
6305           file_driver = file_path = None
6306         disk_idx_base = len(instance.disks)
6307         new_disk = _GenerateDiskTemplate(self,
6308                                          instance.disk_template,
6309                                          instance.name, instance.primary_node,
6310                                          instance.secondary_nodes,
6311                                          [disk_dict],
6312                                          file_path,
6313                                          file_driver,
6314                                          disk_idx_base)[0]
6315         instance.disks.append(new_disk)
6316         info = _GetInstanceInfoText(instance)
6317
6318         logging.info("Creating volume %s for instance %s",
6319                      new_disk.iv_name, instance.name)
6320         # Note: this needs to be kept in sync with _CreateDisks
6321         #HARDCODE
6322         for node in instance.all_nodes:
6323           f_create = node == instance.primary_node
6324           try:
6325             _CreateBlockDev(self, node, instance, new_disk,
6326                             f_create, info, f_create)
6327           except errors.OpExecError, err:
6328             self.LogWarning("Failed to create volume %s (%s) on"
6329                             " node %s: %s",
6330                             new_disk.iv_name, new_disk, node, err)
6331         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6332                        (new_disk.size, new_disk.mode)))
6333       else:
6334         # change a given disk
6335         instance.disks[disk_op].mode = disk_dict['mode']
6336         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6337     # NIC changes
6338     for nic_op, nic_dict in self.op.nics:
6339       if nic_op == constants.DDM_REMOVE:
6340         # remove the last nic
6341         del instance.nics[-1]
6342         result.append(("nic.%d" % len(instance.nics), "remove"))
6343       elif nic_op == constants.DDM_ADD:
6344         # mac and bridge should be set, by now
6345         mac = nic_dict['mac']
6346         ip = nic_dict.get('ip', None)
6347         nicparams = self.nic_pinst[constants.DDM_ADD]
6348         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6349         instance.nics.append(new_nic)
6350         result.append(("nic.%d" % (len(instance.nics) - 1),
6351                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6352                        (new_nic.mac, new_nic.ip,
6353                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6354                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6355                        )))
6356       else:
6357         for key in 'mac', 'ip':
6358           if key in nic_dict:
6359             setattr(instance.nics[nic_op], key, nic_dict[key])
6360         if nic_op in self.nic_pnew:
6361           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6362         for key, val in nic_dict.iteritems():
6363           result.append(("nic.%s/%d" % (key, nic_op), val))
6364
6365     # hvparams changes
6366     if self.op.hvparams:
6367       instance.hvparams = self.hv_inst
6368       for key, val in self.op.hvparams.iteritems():
6369         result.append(("hv/%s" % key, val))
6370
6371     # beparams changes
6372     if self.op.beparams:
6373       instance.beparams = self.be_inst
6374       for key, val in self.op.beparams.iteritems():
6375         result.append(("be/%s" % key, val))
6376
6377     self.cfg.Update(instance)
6378
6379     return result
6380
6381
6382 class LUQueryExports(NoHooksLU):
6383   """Query the exports list
6384
6385   """
6386   _OP_REQP = ['nodes']
6387   REQ_BGL = False
6388
6389   def ExpandNames(self):
6390     self.needed_locks = {}
6391     self.share_locks[locking.LEVEL_NODE] = 1
6392     if not self.op.nodes:
6393       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6394     else:
6395       self.needed_locks[locking.LEVEL_NODE] = \
6396         _GetWantedNodes(self, self.op.nodes)
6397
6398   def CheckPrereq(self):
6399     """Check prerequisites.
6400
6401     """
6402     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6403
6404   def Exec(self, feedback_fn):
6405     """Compute the list of all the exported system images.
6406
6407     @rtype: dict
6408     @return: a dictionary with the structure node->(export-list)
6409         where export-list is a list of the instances exported on
6410         that node.
6411
6412     """
6413     rpcresult = self.rpc.call_export_list(self.nodes)
6414     result = {}
6415     for node in rpcresult:
6416       if rpcresult[node].RemoteFailMsg():
6417         result[node] = False
6418       else:
6419         result[node] = rpcresult[node].payload
6420
6421     return result
6422
6423
6424 class LUExportInstance(LogicalUnit):
6425   """Export an instance to an image in the cluster.
6426
6427   """
6428   HPATH = "instance-export"
6429   HTYPE = constants.HTYPE_INSTANCE
6430   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6431   REQ_BGL = False
6432
6433   def ExpandNames(self):
6434     self._ExpandAndLockInstance()
6435     # FIXME: lock only instance primary and destination node
6436     #
6437     # Sad but true, for now we have do lock all nodes, as we don't know where
6438     # the previous export might be, and and in this LU we search for it and
6439     # remove it from its current node. In the future we could fix this by:
6440     #  - making a tasklet to search (share-lock all), then create the new one,
6441     #    then one to remove, after
6442     #  - removing the removal operation altoghether
6443     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6444
6445   def DeclareLocks(self, level):
6446     """Last minute lock declaration."""
6447     # All nodes are locked anyway, so nothing to do here.
6448
6449   def BuildHooksEnv(self):
6450     """Build hooks env.
6451
6452     This will run on the master, primary node and target node.
6453
6454     """
6455     env = {
6456       "EXPORT_NODE": self.op.target_node,
6457       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6458       }
6459     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6460     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6461           self.op.target_node]
6462     return env, nl, nl
6463
6464   def CheckPrereq(self):
6465     """Check prerequisites.
6466
6467     This checks that the instance and node names are valid.
6468
6469     """
6470     instance_name = self.op.instance_name
6471     self.instance = self.cfg.GetInstanceInfo(instance_name)
6472     assert self.instance is not None, \
6473           "Cannot retrieve locked instance %s" % self.op.instance_name
6474     _CheckNodeOnline(self, self.instance.primary_node)
6475
6476     self.dst_node = self.cfg.GetNodeInfo(
6477       self.cfg.ExpandNodeName(self.op.target_node))
6478
6479     if self.dst_node is None:
6480       # This is wrong node name, not a non-locked node
6481       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6482     _CheckNodeOnline(self, self.dst_node.name)
6483     _CheckNodeNotDrained(self, self.dst_node.name)
6484
6485     # instance disk type verification
6486     for disk in self.instance.disks:
6487       if disk.dev_type == constants.LD_FILE:
6488         raise errors.OpPrereqError("Export not supported for instances with"
6489                                    " file-based disks")
6490
6491   def Exec(self, feedback_fn):
6492     """Export an instance to an image in the cluster.
6493
6494     """
6495     instance = self.instance
6496     dst_node = self.dst_node
6497     src_node = instance.primary_node
6498     if self.op.shutdown:
6499       # shutdown the instance, but not the disks
6500       result = self.rpc.call_instance_shutdown(src_node, instance)
6501       msg = result.RemoteFailMsg()
6502       if msg:
6503         raise errors.OpExecError("Could not shutdown instance %s on"
6504                                  " node %s: %s" %
6505                                  (instance.name, src_node, msg))
6506
6507     vgname = self.cfg.GetVGName()
6508
6509     snap_disks = []
6510
6511     # set the disks ID correctly since call_instance_start needs the
6512     # correct drbd minor to create the symlinks
6513     for disk in instance.disks:
6514       self.cfg.SetDiskID(disk, src_node)
6515
6516     try:
6517       for disk in instance.disks:
6518         # result.payload will be a snapshot of an lvm leaf of the one we passed
6519         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6520         msg = result.RemoteFailMsg()
6521         if msg:
6522           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6523                           disk.logical_id[1], src_node, msg)
6524           snap_disks.append(False)
6525         else:
6526           disk_id = (vgname, result.payload)
6527           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6528                                  logical_id=disk_id, physical_id=disk_id,
6529                                  iv_name=disk.iv_name)
6530           snap_disks.append(new_dev)
6531
6532     finally:
6533       if self.op.shutdown and instance.admin_up:
6534         result = self.rpc.call_instance_start(src_node, instance, None, None)
6535         msg = result.RemoteFailMsg()
6536         if msg:
6537           _ShutdownInstanceDisks(self, instance)
6538           raise errors.OpExecError("Could not start instance: %s" % msg)
6539
6540     # TODO: check for size
6541
6542     cluster_name = self.cfg.GetClusterName()
6543     for idx, dev in enumerate(snap_disks):
6544       if dev:
6545         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6546                                                instance, cluster_name, idx)
6547         msg = result.RemoteFailMsg()
6548         if msg:
6549           self.LogWarning("Could not export block device %s from node %s to"
6550                           " node %s: %s", dev.logical_id[1], src_node,
6551                           dst_node.name, msg)
6552         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6553         if msg:
6554           self.LogWarning("Could not remove snapshot block device %s from node"
6555                           " %s: %s", dev.logical_id[1], src_node, msg)
6556
6557     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6558     msg = result.RemoteFailMsg()
6559     if msg:
6560       self.LogWarning("Could not finalize export for instance %s"
6561                       " on node %s: %s", instance.name, dst_node.name, msg)
6562
6563     nodelist = self.cfg.GetNodeList()
6564     nodelist.remove(dst_node.name)
6565
6566     # on one-node clusters nodelist will be empty after the removal
6567     # if we proceed the backup would be removed because OpQueryExports
6568     # substitutes an empty list with the full cluster node list.
6569     iname = instance.name
6570     if nodelist:
6571       exportlist = self.rpc.call_export_list(nodelist)
6572       for node in exportlist:
6573         if exportlist[node].RemoteFailMsg():
6574           continue
6575         if iname in exportlist[node].payload:
6576           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6577           if msg:
6578             self.LogWarning("Could not remove older export for instance %s"
6579                             " on node %s: %s", iname, node, msg)
6580
6581
6582 class LURemoveExport(NoHooksLU):
6583   """Remove exports related to the named instance.
6584
6585   """
6586   _OP_REQP = ["instance_name"]
6587   REQ_BGL = False
6588
6589   def ExpandNames(self):
6590     self.needed_locks = {}
6591     # We need all nodes to be locked in order for RemoveExport to work, but we
6592     # don't need to lock the instance itself, as nothing will happen to it (and
6593     # we can remove exports also for a removed instance)
6594     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6595
6596   def CheckPrereq(self):
6597     """Check prerequisites.
6598     """
6599     pass
6600
6601   def Exec(self, feedback_fn):
6602     """Remove any export.
6603
6604     """
6605     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6606     # If the instance was not found we'll try with the name that was passed in.
6607     # This will only work if it was an FQDN, though.
6608     fqdn_warn = False
6609     if not instance_name:
6610       fqdn_warn = True
6611       instance_name = self.op.instance_name
6612
6613     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6614     exportlist = self.rpc.call_export_list(locked_nodes)
6615     found = False
6616     for node in exportlist:
6617       msg = exportlist[node].RemoteFailMsg()
6618       if msg:
6619         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6620         continue
6621       if instance_name in exportlist[node].payload:
6622         found = True
6623         result = self.rpc.call_export_remove(node, instance_name)
6624         msg = result.RemoteFailMsg()
6625         if msg:
6626           logging.error("Could not remove export for instance %s"
6627                         " on node %s: %s", instance_name, node, msg)
6628
6629     if fqdn_warn and not found:
6630       feedback_fn("Export not found. If trying to remove an export belonging"
6631                   " to a deleted instance please use its Fully Qualified"
6632                   " Domain Name.")
6633
6634
6635 class TagsLU(NoHooksLU):
6636   """Generic tags LU.
6637
6638   This is an abstract class which is the parent of all the other tags LUs.
6639
6640   """
6641
6642   def ExpandNames(self):
6643     self.needed_locks = {}
6644     if self.op.kind == constants.TAG_NODE:
6645       name = self.cfg.ExpandNodeName(self.op.name)
6646       if name is None:
6647         raise errors.OpPrereqError("Invalid node name (%s)" %
6648                                    (self.op.name,))
6649       self.op.name = name
6650       self.needed_locks[locking.LEVEL_NODE] = name
6651     elif self.op.kind == constants.TAG_INSTANCE:
6652       name = self.cfg.ExpandInstanceName(self.op.name)
6653       if name is None:
6654         raise errors.OpPrereqError("Invalid instance name (%s)" %
6655                                    (self.op.name,))
6656       self.op.name = name
6657       self.needed_locks[locking.LEVEL_INSTANCE] = name
6658
6659   def CheckPrereq(self):
6660     """Check prerequisites.
6661
6662     """
6663     if self.op.kind == constants.TAG_CLUSTER:
6664       self.target = self.cfg.GetClusterInfo()
6665     elif self.op.kind == constants.TAG_NODE:
6666       self.target = self.cfg.GetNodeInfo(self.op.name)
6667     elif self.op.kind == constants.TAG_INSTANCE:
6668       self.target = self.cfg.GetInstanceInfo(self.op.name)
6669     else:
6670       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6671                                  str(self.op.kind))
6672
6673
6674 class LUGetTags(TagsLU):
6675   """Returns the tags of a given object.
6676
6677   """
6678   _OP_REQP = ["kind", "name"]
6679   REQ_BGL = False
6680
6681   def Exec(self, feedback_fn):
6682     """Returns the tag list.
6683
6684     """
6685     return list(self.target.GetTags())
6686
6687
6688 class LUSearchTags(NoHooksLU):
6689   """Searches the tags for a given pattern.
6690
6691   """
6692   _OP_REQP = ["pattern"]
6693   REQ_BGL = False
6694
6695   def ExpandNames(self):
6696     self.needed_locks = {}
6697
6698   def CheckPrereq(self):
6699     """Check prerequisites.
6700
6701     This checks the pattern passed for validity by compiling it.
6702
6703     """
6704     try:
6705       self.re = re.compile(self.op.pattern)
6706     except re.error, err:
6707       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6708                                  (self.op.pattern, err))
6709
6710   def Exec(self, feedback_fn):
6711     """Returns the tag list.
6712
6713     """
6714     cfg = self.cfg
6715     tgts = [("/cluster", cfg.GetClusterInfo())]
6716     ilist = cfg.GetAllInstancesInfo().values()
6717     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6718     nlist = cfg.GetAllNodesInfo().values()
6719     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6720     results = []
6721     for path, target in tgts:
6722       for tag in target.GetTags():
6723         if self.re.search(tag):
6724           results.append((path, tag))
6725     return results
6726
6727
6728 class LUAddTags(TagsLU):
6729   """Sets a tag on a given object.
6730
6731   """
6732   _OP_REQP = ["kind", "name", "tags"]
6733   REQ_BGL = False
6734
6735   def CheckPrereq(self):
6736     """Check prerequisites.
6737
6738     This checks the type and length of the tag name and value.
6739
6740     """
6741     TagsLU.CheckPrereq(self)
6742     for tag in self.op.tags:
6743       objects.TaggableObject.ValidateTag(tag)
6744
6745   def Exec(self, feedback_fn):
6746     """Sets the tag.
6747
6748     """
6749     try:
6750       for tag in self.op.tags:
6751         self.target.AddTag(tag)
6752     except errors.TagError, err:
6753       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6754     try:
6755       self.cfg.Update(self.target)
6756     except errors.ConfigurationError:
6757       raise errors.OpRetryError("There has been a modification to the"
6758                                 " config file and the operation has been"
6759                                 " aborted. Please retry.")
6760
6761
6762 class LUDelTags(TagsLU):
6763   """Delete a list of tags from a given object.
6764
6765   """
6766   _OP_REQP = ["kind", "name", "tags"]
6767   REQ_BGL = False
6768
6769   def CheckPrereq(self):
6770     """Check prerequisites.
6771
6772     This checks that we have the given tag.
6773
6774     """
6775     TagsLU.CheckPrereq(self)
6776     for tag in self.op.tags:
6777       objects.TaggableObject.ValidateTag(tag)
6778     del_tags = frozenset(self.op.tags)
6779     cur_tags = self.target.GetTags()
6780     if not del_tags <= cur_tags:
6781       diff_tags = del_tags - cur_tags
6782       diff_names = ["'%s'" % tag for tag in diff_tags]
6783       diff_names.sort()
6784       raise errors.OpPrereqError("Tag(s) %s not found" %
6785                                  (",".join(diff_names)))
6786
6787   def Exec(self, feedback_fn):
6788     """Remove the tag from the object.
6789
6790     """
6791     for tag in self.op.tags:
6792       self.target.RemoveTag(tag)
6793     try:
6794       self.cfg.Update(self.target)
6795     except errors.ConfigurationError:
6796       raise errors.OpRetryError("There has been a modification to the"
6797                                 " config file and the operation has been"
6798                                 " aborted. Please retry.")
6799
6800
6801 class LUTestDelay(NoHooksLU):
6802   """Sleep for a specified amount of time.
6803
6804   This LU sleeps on the master and/or nodes for a specified amount of
6805   time.
6806
6807   """
6808   _OP_REQP = ["duration", "on_master", "on_nodes"]
6809   REQ_BGL = False
6810
6811   def ExpandNames(self):
6812     """Expand names and set required locks.
6813
6814     This expands the node list, if any.
6815
6816     """
6817     self.needed_locks = {}
6818     if self.op.on_nodes:
6819       # _GetWantedNodes can be used here, but is not always appropriate to use
6820       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6821       # more information.
6822       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6823       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6824
6825   def CheckPrereq(self):
6826     """Check prerequisites.
6827
6828     """
6829
6830   def Exec(self, feedback_fn):
6831     """Do the actual sleep.
6832
6833     """
6834     if self.op.on_master:
6835       if not utils.TestDelay(self.op.duration):
6836         raise errors.OpExecError("Error during master delay test")
6837     if self.op.on_nodes:
6838       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6839       for node, node_result in result.items():
6840         msg = node_result.RemoteFailMsg()
6841         if msg:
6842           raise errors.OpExecError("Failure during rpc call to node %s: %s"
6843                                    % (node, msg))
6844
6845
6846 class IAllocator(object):
6847   """IAllocator framework.
6848
6849   An IAllocator instance has three sets of attributes:
6850     - cfg that is needed to query the cluster
6851     - input data (all members of the _KEYS class attribute are required)
6852     - four buffer attributes (in|out_data|text), that represent the
6853       input (to the external script) in text and data structure format,
6854       and the output from it, again in two formats
6855     - the result variables from the script (success, info, nodes) for
6856       easy usage
6857
6858   """
6859   _ALLO_KEYS = [
6860     "mem_size", "disks", "disk_template",
6861     "os", "tags", "nics", "vcpus", "hypervisor",
6862     ]
6863   _RELO_KEYS = [
6864     "relocate_from",
6865     ]
6866
6867   def __init__(self, lu, mode, name, **kwargs):
6868     self.lu = lu
6869     # init buffer variables
6870     self.in_text = self.out_text = self.in_data = self.out_data = None
6871     # init all input fields so that pylint is happy
6872     self.mode = mode
6873     self.name = name
6874     self.mem_size = self.disks = self.disk_template = None
6875     self.os = self.tags = self.nics = self.vcpus = None
6876     self.hypervisor = None
6877     self.relocate_from = None
6878     # computed fields
6879     self.required_nodes = None
6880     # init result fields
6881     self.success = self.info = self.nodes = None
6882     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6883       keyset = self._ALLO_KEYS
6884     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6885       keyset = self._RELO_KEYS
6886     else:
6887       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6888                                    " IAllocator" % self.mode)
6889     for key in kwargs:
6890       if key not in keyset:
6891         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6892                                      " IAllocator" % key)
6893       setattr(self, key, kwargs[key])
6894     for key in keyset:
6895       if key not in kwargs:
6896         raise errors.ProgrammerError("Missing input parameter '%s' to"
6897                                      " IAllocator" % key)
6898     self._BuildInputData()
6899
6900   def _ComputeClusterData(self):
6901     """Compute the generic allocator input data.
6902
6903     This is the data that is independent of the actual operation.
6904
6905     """
6906     cfg = self.lu.cfg
6907     cluster_info = cfg.GetClusterInfo()
6908     # cluster data
6909     data = {
6910       "version": constants.IALLOCATOR_VERSION,
6911       "cluster_name": cfg.GetClusterName(),
6912       "cluster_tags": list(cluster_info.GetTags()),
6913       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6914       # we don't have job IDs
6915       }
6916     iinfo = cfg.GetAllInstancesInfo().values()
6917     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6918
6919     # node data
6920     node_results = {}
6921     node_list = cfg.GetNodeList()
6922
6923     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6924       hypervisor_name = self.hypervisor
6925     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6926       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6927
6928     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6929                                            hypervisor_name)
6930     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6931                        cluster_info.enabled_hypervisors)
6932     for nname, nresult in node_data.items():
6933       # first fill in static (config-based) values
6934       ninfo = cfg.GetNodeInfo(nname)
6935       pnr = {
6936         "tags": list(ninfo.GetTags()),
6937         "primary_ip": ninfo.primary_ip,
6938         "secondary_ip": ninfo.secondary_ip,
6939         "offline": ninfo.offline,
6940         "drained": ninfo.drained,
6941         "master_candidate": ninfo.master_candidate,
6942         }
6943
6944       if not ninfo.offline:
6945         msg = nresult.RemoteFailMsg()
6946         if msg:
6947           raise errors.OpExecError("Can't get data for node %s: %s" %
6948                                    (nname, msg))
6949         msg = node_iinfo[nname].RemoteFailMsg()
6950         if msg:
6951           raise errors.OpExecError("Can't get node instance info"
6952                                    " from node %s: %s" % (nname, msg))
6953         remote_info = nresult.payload
6954         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6955                      'vg_size', 'vg_free', 'cpu_total']:
6956           if attr not in remote_info:
6957             raise errors.OpExecError("Node '%s' didn't return attribute"
6958                                      " '%s'" % (nname, attr))
6959           if not isinstance(remote_info[attr], int):
6960             raise errors.OpExecError("Node '%s' returned invalid value"
6961                                      " for '%s': %s" %
6962                                      (nname, attr, remote_info[attr]))
6963         # compute memory used by primary instances
6964         i_p_mem = i_p_up_mem = 0
6965         for iinfo, beinfo in i_list:
6966           if iinfo.primary_node == nname:
6967             i_p_mem += beinfo[constants.BE_MEMORY]
6968             if iinfo.name not in node_iinfo[nname].payload:
6969               i_used_mem = 0
6970             else:
6971               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6972             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6973             remote_info['memory_free'] -= max(0, i_mem_diff)
6974
6975             if iinfo.admin_up:
6976               i_p_up_mem += beinfo[constants.BE_MEMORY]
6977
6978         # compute memory used by instances
6979         pnr_dyn = {
6980           "total_memory": remote_info['memory_total'],
6981           "reserved_memory": remote_info['memory_dom0'],
6982           "free_memory": remote_info['memory_free'],
6983           "total_disk": remote_info['vg_size'],
6984           "free_disk": remote_info['vg_free'],
6985           "total_cpus": remote_info['cpu_total'],
6986           "i_pri_memory": i_p_mem,
6987           "i_pri_up_memory": i_p_up_mem,
6988           }
6989         pnr.update(pnr_dyn)
6990
6991       node_results[nname] = pnr
6992     data["nodes"] = node_results
6993
6994     # instance data
6995     instance_data = {}
6996     for iinfo, beinfo in i_list:
6997       nic_data = []
6998       for nic in iinfo.nics:
6999         filled_params = objects.FillDict(
7000             cluster_info.nicparams[constants.PP_DEFAULT],
7001             nic.nicparams)
7002         nic_dict = {"mac": nic.mac,
7003                     "ip": nic.ip,
7004                     "mode": filled_params[constants.NIC_MODE],
7005                     "link": filled_params[constants.NIC_LINK],
7006                    }
7007         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7008           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7009         nic_data.append(nic_dict)
7010       pir = {
7011         "tags": list(iinfo.GetTags()),
7012         "admin_up": iinfo.admin_up,
7013         "vcpus": beinfo[constants.BE_VCPUS],
7014         "memory": beinfo[constants.BE_MEMORY],
7015         "os": iinfo.os,
7016         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7017         "nics": nic_data,
7018         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7019         "disk_template": iinfo.disk_template,
7020         "hypervisor": iinfo.hypervisor,
7021         }
7022       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7023                                                  pir["disks"])
7024       instance_data[iinfo.name] = pir
7025
7026     data["instances"] = instance_data
7027
7028     self.in_data = data
7029
7030   def _AddNewInstance(self):
7031     """Add new instance data to allocator structure.
7032
7033     This in combination with _AllocatorGetClusterData will create the
7034     correct structure needed as input for the allocator.
7035
7036     The checks for the completeness of the opcode must have already been
7037     done.
7038
7039     """
7040     data = self.in_data
7041
7042     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7043
7044     if self.disk_template in constants.DTS_NET_MIRROR:
7045       self.required_nodes = 2
7046     else:
7047       self.required_nodes = 1
7048     request = {
7049       "type": "allocate",
7050       "name": self.name,
7051       "disk_template": self.disk_template,
7052       "tags": self.tags,
7053       "os": self.os,
7054       "vcpus": self.vcpus,
7055       "memory": self.mem_size,
7056       "disks": self.disks,
7057       "disk_space_total": disk_space,
7058       "nics": self.nics,
7059       "required_nodes": self.required_nodes,
7060       }
7061     data["request"] = request
7062
7063   def _AddRelocateInstance(self):
7064     """Add relocate instance data to allocator structure.
7065
7066     This in combination with _IAllocatorGetClusterData will create the
7067     correct structure needed as input for the allocator.
7068
7069     The checks for the completeness of the opcode must have already been
7070     done.
7071
7072     """
7073     instance = self.lu.cfg.GetInstanceInfo(self.name)
7074     if instance is None:
7075       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7076                                    " IAllocator" % self.name)
7077
7078     if instance.disk_template not in constants.DTS_NET_MIRROR:
7079       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7080
7081     if len(instance.secondary_nodes) != 1:
7082       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7083
7084     self.required_nodes = 1
7085     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7086     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7087
7088     request = {
7089       "type": "relocate",
7090       "name": self.name,
7091       "disk_space_total": disk_space,
7092       "required_nodes": self.required_nodes,
7093       "relocate_from": self.relocate_from,
7094       }
7095     self.in_data["request"] = request
7096
7097   def _BuildInputData(self):
7098     """Build input data structures.
7099
7100     """
7101     self._ComputeClusterData()
7102
7103     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7104       self._AddNewInstance()
7105     else:
7106       self._AddRelocateInstance()
7107
7108     self.in_text = serializer.Dump(self.in_data)
7109
7110   def Run(self, name, validate=True, call_fn=None):
7111     """Run an instance allocator and return the results.
7112
7113     """
7114     if call_fn is None:
7115       call_fn = self.lu.rpc.call_iallocator_runner
7116     data = self.in_text
7117
7118     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7119     msg = result.RemoteFailMsg()
7120     if msg:
7121       raise errors.OpExecError("Failure while running the iallocator"
7122                                " script: %s" % msg)
7123
7124     self.out_text = result.payload
7125     if validate:
7126       self._ValidateResult()
7127
7128   def _ValidateResult(self):
7129     """Process the allocator results.
7130
7131     This will process and if successful save the result in
7132     self.out_data and the other parameters.
7133
7134     """
7135     try:
7136       rdict = serializer.Load(self.out_text)
7137     except Exception, err:
7138       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7139
7140     if not isinstance(rdict, dict):
7141       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7142
7143     for key in "success", "info", "nodes":
7144       if key not in rdict:
7145         raise errors.OpExecError("Can't parse iallocator results:"
7146                                  " missing key '%s'" % key)
7147       setattr(self, key, rdict[key])
7148
7149     if not isinstance(rdict["nodes"], list):
7150       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7151                                " is not a list")
7152     self.out_data = rdict
7153
7154
7155 class LUTestAllocator(NoHooksLU):
7156   """Run allocator tests.
7157
7158   This LU runs the allocator tests
7159
7160   """
7161   _OP_REQP = ["direction", "mode", "name"]
7162
7163   def CheckPrereq(self):
7164     """Check prerequisites.
7165
7166     This checks the opcode parameters depending on the director and mode test.
7167
7168     """
7169     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7170       for attr in ["name", "mem_size", "disks", "disk_template",
7171                    "os", "tags", "nics", "vcpus"]:
7172         if not hasattr(self.op, attr):
7173           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7174                                      attr)
7175       iname = self.cfg.ExpandInstanceName(self.op.name)
7176       if iname is not None:
7177         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7178                                    iname)
7179       if not isinstance(self.op.nics, list):
7180         raise errors.OpPrereqError("Invalid parameter 'nics'")
7181       for row in self.op.nics:
7182         if (not isinstance(row, dict) or
7183             "mac" not in row or
7184             "ip" not in row or
7185             "bridge" not in row):
7186           raise errors.OpPrereqError("Invalid contents of the"
7187                                      " 'nics' parameter")
7188       if not isinstance(self.op.disks, list):
7189         raise errors.OpPrereqError("Invalid parameter 'disks'")
7190       for row in self.op.disks:
7191         if (not isinstance(row, dict) or
7192             "size" not in row or
7193             not isinstance(row["size"], int) or
7194             "mode" not in row or
7195             row["mode"] not in ['r', 'w']):
7196           raise errors.OpPrereqError("Invalid contents of the"
7197                                      " 'disks' parameter")
7198       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7199         self.op.hypervisor = self.cfg.GetHypervisorType()
7200     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7201       if not hasattr(self.op, "name"):
7202         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7203       fname = self.cfg.ExpandInstanceName(self.op.name)
7204       if fname is None:
7205         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7206                                    self.op.name)
7207       self.op.name = fname
7208       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7209     else:
7210       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7211                                  self.op.mode)
7212
7213     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7214       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7215         raise errors.OpPrereqError("Missing allocator name")
7216     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7217       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7218                                  self.op.direction)
7219
7220   def Exec(self, feedback_fn):
7221     """Run the allocator test.
7222
7223     """
7224     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7225       ial = IAllocator(self,
7226                        mode=self.op.mode,
7227                        name=self.op.name,
7228                        mem_size=self.op.mem_size,
7229                        disks=self.op.disks,
7230                        disk_template=self.op.disk_template,
7231                        os=self.op.os,
7232                        tags=self.op.tags,
7233                        nics=self.op.nics,
7234                        vcpus=self.op.vcpus,
7235                        hypervisor=self.op.hypervisor,
7236                        )
7237     else:
7238       ial = IAllocator(self,
7239                        mode=self.op.mode,
7240                        name=self.op.name,
7241                        relocate_from=list(self.relocate_from),
7242                        )
7243
7244     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7245       result = ial.in_text
7246     else:
7247       ial.Run(self.op.allocator, validate=False)
7248       result = ial.out_text
7249     return result