Convert os_diagnose rpc to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     @rtype: tuple of three items
1293     @return: a tuple of (dict of node-to-node_error, list of instances
1294         which need activate-disks, dict of instance: (node, volume) for
1295         missing volumes
1296
1297     """
1298     result = res_nodes, res_instances, res_missing = {}, [], {}
1299
1300     vg_name = self.cfg.GetVGName()
1301     nodes = utils.NiceSort(self.cfg.GetNodeList())
1302     instances = [self.cfg.GetInstanceInfo(name)
1303                  for name in self.cfg.GetInstanceList()]
1304
1305     nv_dict = {}
1306     for inst in instances:
1307       inst_lvs = {}
1308       if (not inst.admin_up or
1309           inst.disk_template not in constants.DTS_NET_MIRROR):
1310         continue
1311       inst.MapLVsByNode(inst_lvs)
1312       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313       for node, vol_list in inst_lvs.iteritems():
1314         for vol in vol_list:
1315           nv_dict[(node, vol)] = inst
1316
1317     if not nv_dict:
1318       return result
1319
1320     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321
1322     to_act = set()
1323     for node in nodes:
1324       # node_volume
1325       node_res = node_lvs[node]
1326       if node_res.offline:
1327         continue
1328       msg = node_res.RemoteFailMsg()
1329       if msg:
1330         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331         res_nodes[node] = msg
1332         continue
1333
1334       lvs = node_res.payload
1335       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336         inst = nv_dict.pop((node, lv_name), None)
1337         if (not lv_online and inst is not None
1338             and inst.name not in res_instances):
1339           res_instances.append(inst.name)
1340
1341     # any leftover items in nv_dict are missing LVs, let's arrange the
1342     # data better
1343     for key, inst in nv_dict.iteritems():
1344       if inst.name not in res_missing:
1345         res_missing[inst.name] = []
1346       res_missing[inst.name].append(key)
1347
1348     return result
1349
1350
1351 class LURenameCluster(LogicalUnit):
1352   """Rename the cluster.
1353
1354   """
1355   HPATH = "cluster-rename"
1356   HTYPE = constants.HTYPE_CLUSTER
1357   _OP_REQP = ["name"]
1358
1359   def BuildHooksEnv(self):
1360     """Build hooks env.
1361
1362     """
1363     env = {
1364       "OP_TARGET": self.cfg.GetClusterName(),
1365       "NEW_NAME": self.op.name,
1366       }
1367     mn = self.cfg.GetMasterNode()
1368     return env, [mn], [mn]
1369
1370   def CheckPrereq(self):
1371     """Verify that the passed name is a valid one.
1372
1373     """
1374     hostname = utils.HostInfo(self.op.name)
1375
1376     new_name = hostname.name
1377     self.ip = new_ip = hostname.ip
1378     old_name = self.cfg.GetClusterName()
1379     old_ip = self.cfg.GetMasterIP()
1380     if new_name == old_name and new_ip == old_ip:
1381       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382                                  " cluster has changed")
1383     if new_ip != old_ip:
1384       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386                                    " reachable on the network. Aborting." %
1387                                    new_ip)
1388
1389     self.op.name = new_name
1390
1391   def Exec(self, feedback_fn):
1392     """Rename the cluster.
1393
1394     """
1395     clustername = self.op.name
1396     ip = self.ip
1397
1398     # shutdown the master IP
1399     master = self.cfg.GetMasterNode()
1400     result = self.rpc.call_node_stop_master(master, False)
1401     msg = result.RemoteFailMsg()
1402     if msg:
1403       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1404
1405     try:
1406       cluster = self.cfg.GetClusterInfo()
1407       cluster.cluster_name = clustername
1408       cluster.master_ip = ip
1409       self.cfg.Update(cluster)
1410
1411       # update the known hosts file
1412       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1413       node_list = self.cfg.GetNodeList()
1414       try:
1415         node_list.remove(master)
1416       except ValueError:
1417         pass
1418       result = self.rpc.call_upload_file(node_list,
1419                                          constants.SSH_KNOWN_HOSTS_FILE)
1420       for to_node, to_result in result.iteritems():
1421          msg = to_result.RemoteFailMsg()
1422          if msg:
1423            msg = ("Copy of file %s to node %s failed: %s" %
1424                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1425            self.proc.LogWarning(msg)
1426
1427     finally:
1428       result = self.rpc.call_node_start_master(master, False)
1429       msg = result.RemoteFailMsg()
1430       if msg:
1431         self.LogWarning("Could not re-enable the master role on"
1432                         " the master, please restart manually: %s", msg)
1433
1434
1435 def _RecursiveCheckIfLVMBased(disk):
1436   """Check if the given disk or its children are lvm-based.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to check
1440   @rtype: booleean
1441   @return: boolean indicating whether a LD_LV dev_type was found or not
1442
1443   """
1444   if disk.children:
1445     for chdisk in disk.children:
1446       if _RecursiveCheckIfLVMBased(chdisk):
1447         return True
1448   return disk.dev_type == constants.LD_LV
1449
1450
1451 class LUSetClusterParams(LogicalUnit):
1452   """Change the parameters of the cluster.
1453
1454   """
1455   HPATH = "cluster-modify"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   _OP_REQP = []
1458   REQ_BGL = False
1459
1460   def CheckArguments(self):
1461     """Check parameters
1462
1463     """
1464     if not hasattr(self.op, "candidate_pool_size"):
1465       self.op.candidate_pool_size = None
1466     if self.op.candidate_pool_size is not None:
1467       try:
1468         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469       except (ValueError, TypeError), err:
1470         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471                                    str(err))
1472       if self.op.candidate_pool_size < 1:
1473         raise errors.OpPrereqError("At least one master candidate needed")
1474
1475   def ExpandNames(self):
1476     # FIXME: in the future maybe other cluster params won't require checking on
1477     # all nodes to be modified.
1478     self.needed_locks = {
1479       locking.LEVEL_NODE: locking.ALL_SET,
1480     }
1481     self.share_locks[locking.LEVEL_NODE] = 1
1482
1483   def BuildHooksEnv(self):
1484     """Build hooks env.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.cfg.GetClusterName(),
1489       "NEW_VG_NAME": self.op.vg_name,
1490       }
1491     mn = self.cfg.GetMasterNode()
1492     return env, [mn], [mn]
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks whether the given params don't conflict and
1498     if the given volume group is valid.
1499
1500     """
1501     if self.op.vg_name is not None and not self.op.vg_name:
1502       instances = self.cfg.GetAllInstancesInfo().values()
1503       for inst in instances:
1504         for disk in inst.disks:
1505           if _RecursiveCheckIfLVMBased(disk):
1506             raise errors.OpPrereqError("Cannot disable lvm storage while"
1507                                        " lvm-based instances exist")
1508
1509     node_list = self.acquired_locks[locking.LEVEL_NODE]
1510
1511     # if vg_name not None, checks given volume group on all nodes
1512     if self.op.vg_name:
1513       vglist = self.rpc.call_vg_list(node_list)
1514       for node in node_list:
1515         msg = vglist[node].RemoteFailMsg()
1516         if msg:
1517           # ignoring down node
1518           self.LogWarning("Error while gathering data on node %s"
1519                           " (ignoring node): %s", node, msg)
1520           continue
1521         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1522                                               self.op.vg_name,
1523                                               constants.MIN_VG_SIZE)
1524         if vgstatus:
1525           raise errors.OpPrereqError("Error on node '%s': %s" %
1526                                      (node, vgstatus))
1527
1528     self.cluster = cluster = self.cfg.GetClusterInfo()
1529     # validate params changes
1530     if self.op.beparams:
1531       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1532       self.new_beparams = objects.FillDict(
1533         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1534
1535     if self.op.nicparams:
1536       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1537       self.new_nicparams = objects.FillDict(
1538         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1539       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1540
1541     # hypervisor list/parameters
1542     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1543     if self.op.hvparams:
1544       if not isinstance(self.op.hvparams, dict):
1545         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1546       for hv_name, hv_dict in self.op.hvparams.items():
1547         if hv_name not in self.new_hvparams:
1548           self.new_hvparams[hv_name] = hv_dict
1549         else:
1550           self.new_hvparams[hv_name].update(hv_dict)
1551
1552     if self.op.enabled_hypervisors is not None:
1553       self.hv_list = self.op.enabled_hypervisors
1554     else:
1555       self.hv_list = cluster.enabled_hypervisors
1556
1557     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1558       # either the enabled list has changed, or the parameters have, validate
1559       for hv_name, hv_params in self.new_hvparams.items():
1560         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1561             (self.op.enabled_hypervisors and
1562              hv_name in self.op.enabled_hypervisors)):
1563           # either this is a new hypervisor, or its parameters have changed
1564           hv_class = hypervisor.GetHypervisor(hv_name)
1565           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1566           hv_class.CheckParameterSyntax(hv_params)
1567           _CheckHVParams(self, node_list, hv_name, hv_params)
1568
1569   def Exec(self, feedback_fn):
1570     """Change the parameters of the cluster.
1571
1572     """
1573     if self.op.vg_name is not None:
1574       new_volume = self.op.vg_name
1575       if not new_volume:
1576         new_volume = None
1577       if new_volume != self.cfg.GetVGName():
1578         self.cfg.SetVGName(new_volume)
1579       else:
1580         feedback_fn("Cluster LVM configuration already in desired"
1581                     " state, not changing")
1582     if self.op.hvparams:
1583       self.cluster.hvparams = self.new_hvparams
1584     if self.op.enabled_hypervisors is not None:
1585       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1586     if self.op.beparams:
1587       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1588     if self.op.nicparams:
1589       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1590
1591     if self.op.candidate_pool_size is not None:
1592       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1593
1594     self.cfg.Update(self.cluster)
1595
1596     # we want to update nodes after the cluster so that if any errors
1597     # happen, we have recorded and saved the cluster info
1598     if self.op.candidate_pool_size is not None:
1599       _AdjustCandidatePool(self)
1600
1601
1602 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1603   """Distribute additional files which are part of the cluster configuration.
1604
1605   ConfigWriter takes care of distributing the config and ssconf files, but
1606   there are more files which should be distributed to all nodes. This function
1607   makes sure those are copied.
1608
1609   @param lu: calling logical unit
1610   @param additional_nodes: list of nodes not in the config to distribute to
1611
1612   """
1613   # 1. Gather target nodes
1614   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1615   dist_nodes = lu.cfg.GetNodeList()
1616   if additional_nodes is not None:
1617     dist_nodes.extend(additional_nodes)
1618   if myself.name in dist_nodes:
1619     dist_nodes.remove(myself.name)
1620   # 2. Gather files to distribute
1621   dist_files = set([constants.ETC_HOSTS,
1622                     constants.SSH_KNOWN_HOSTS_FILE,
1623                     constants.RAPI_CERT_FILE,
1624                     constants.RAPI_USERS_FILE,
1625                    ])
1626
1627   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1628   for hv_name in enabled_hypervisors:
1629     hv_class = hypervisor.GetHypervisor(hv_name)
1630     dist_files.update(hv_class.GetAncillaryFiles())
1631
1632   # 3. Perform the files upload
1633   for fname in dist_files:
1634     if os.path.exists(fname):
1635       result = lu.rpc.call_upload_file(dist_nodes, fname)
1636       for to_node, to_result in result.items():
1637          msg = to_result.RemoteFailMsg()
1638          if msg:
1639            msg = ("Copy of file %s to node %s failed: %s" %
1640                    (fname, to_node, msg))
1641            lu.proc.LogWarning(msg)
1642
1643
1644 class LURedistributeConfig(NoHooksLU):
1645   """Force the redistribution of cluster configuration.
1646
1647   This is a very simple LU.
1648
1649   """
1650   _OP_REQP = []
1651   REQ_BGL = False
1652
1653   def ExpandNames(self):
1654     self.needed_locks = {
1655       locking.LEVEL_NODE: locking.ALL_SET,
1656     }
1657     self.share_locks[locking.LEVEL_NODE] = 1
1658
1659   def CheckPrereq(self):
1660     """Check prerequisites.
1661
1662     """
1663
1664   def Exec(self, feedback_fn):
1665     """Redistribute the configuration.
1666
1667     """
1668     self.cfg.Update(self.cfg.GetClusterInfo())
1669     _RedistributeAncillaryFiles(self)
1670
1671
1672 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1673   """Sleep and poll for an instance's disk to sync.
1674
1675   """
1676   if not instance.disks:
1677     return True
1678
1679   if not oneshot:
1680     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1681
1682   node = instance.primary_node
1683
1684   for dev in instance.disks:
1685     lu.cfg.SetDiskID(dev, node)
1686
1687   retries = 0
1688   while True:
1689     max_time = 0
1690     done = True
1691     cumul_degraded = False
1692     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1693     msg = rstats.RemoteFailMsg()
1694     if msg:
1695       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1696       retries += 1
1697       if retries >= 10:
1698         raise errors.RemoteError("Can't contact node %s for mirror data,"
1699                                  " aborting." % node)
1700       time.sleep(6)
1701       continue
1702     rstats = rstats.payload
1703     retries = 0
1704     for i, mstat in enumerate(rstats):
1705       if mstat is None:
1706         lu.LogWarning("Can't compute data for node %s/%s",
1707                            node, instance.disks[i].iv_name)
1708         continue
1709       # we ignore the ldisk parameter
1710       perc_done, est_time, is_degraded, _ = mstat
1711       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1712       if perc_done is not None:
1713         done = False
1714         if est_time is not None:
1715           rem_time = "%d estimated seconds remaining" % est_time
1716           max_time = est_time
1717         else:
1718           rem_time = "no time estimate"
1719         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1720                         (instance.disks[i].iv_name, perc_done, rem_time))
1721     if done or oneshot:
1722       break
1723
1724     time.sleep(min(60, max_time))
1725
1726   if done:
1727     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1728   return not cumul_degraded
1729
1730
1731 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1732   """Check that mirrors are not degraded.
1733
1734   The ldisk parameter, if True, will change the test from the
1735   is_degraded attribute (which represents overall non-ok status for
1736   the device(s)) to the ldisk (representing the local storage status).
1737
1738   """
1739   lu.cfg.SetDiskID(dev, node)
1740   if ldisk:
1741     idx = 6
1742   else:
1743     idx = 5
1744
1745   result = True
1746   if on_primary or dev.AssembleOnSecondary():
1747     rstats = lu.rpc.call_blockdev_find(node, dev)
1748     msg = rstats.RemoteFailMsg()
1749     if msg:
1750       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1751       result = False
1752     elif not rstats.payload:
1753       lu.LogWarning("Can't find disk on node %s", node)
1754       result = False
1755     else:
1756       result = result and (not rstats.payload[idx])
1757   if dev.children:
1758     for child in dev.children:
1759       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1760
1761   return result
1762
1763
1764 class LUDiagnoseOS(NoHooksLU):
1765   """Logical unit for OS diagnose/query.
1766
1767   """
1768   _OP_REQP = ["output_fields", "names"]
1769   REQ_BGL = False
1770   _FIELDS_STATIC = utils.FieldSet()
1771   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1772
1773   def ExpandNames(self):
1774     if self.op.names:
1775       raise errors.OpPrereqError("Selective OS query not supported")
1776
1777     _CheckOutputFields(static=self._FIELDS_STATIC,
1778                        dynamic=self._FIELDS_DYNAMIC,
1779                        selected=self.op.output_fields)
1780
1781     # Lock all nodes, in shared mode
1782     # Temporary removal of locks, should be reverted later
1783     # TODO: reintroduce locks when they are lighter-weight
1784     self.needed_locks = {}
1785     #self.share_locks[locking.LEVEL_NODE] = 1
1786     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1787
1788   def CheckPrereq(self):
1789     """Check prerequisites.
1790
1791     """
1792
1793   @staticmethod
1794   def _DiagnoseByOS(node_list, rlist):
1795     """Remaps a per-node return list into an a per-os per-node dictionary
1796
1797     @param node_list: a list with the names of all nodes
1798     @param rlist: a map with node names as keys and OS objects as values
1799
1800     @rtype: dict
1801     @return: a dictionary with osnames as keys and as value another map, with
1802         nodes as keys and list of OS objects as values, eg::
1803
1804           {"debian-etch": {"node1": [<object>,...],
1805                            "node2": [<object>,]}
1806           }
1807
1808     """
1809     all_os = {}
1810     # we build here the list of nodes that didn't fail the RPC (at RPC
1811     # level), so that nodes with a non-responding node daemon don't
1812     # make all OSes invalid
1813     good_nodes = [node_name for node_name in rlist
1814                   if not rlist[node_name].RemoteFailMsg()]
1815     for node_name, nr in rlist.items():
1816       if nr.RemoteFailMsg() or not nr.payload:
1817         continue
1818       for os_serialized in nr.payload:
1819         os_obj = objects.OS.FromDict(os_serialized)
1820         if os_obj.name not in all_os:
1821           # build a list of nodes for this os containing empty lists
1822           # for each node in node_list
1823           all_os[os_obj.name] = {}
1824           for nname in good_nodes:
1825             all_os[os_obj.name][nname] = []
1826         all_os[os_obj.name][node_name].append(os_obj)
1827     return all_os
1828
1829   def Exec(self, feedback_fn):
1830     """Compute the list of OSes.
1831
1832     """
1833     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1834     node_data = self.rpc.call_os_diagnose(valid_nodes)
1835     pol = self._DiagnoseByOS(valid_nodes, node_data)
1836     output = []
1837     for os_name, os_data in pol.items():
1838       row = []
1839       for field in self.op.output_fields:
1840         if field == "name":
1841           val = os_name
1842         elif field == "valid":
1843           val = utils.all([osl and osl[0] for osl in os_data.values()])
1844         elif field == "node_status":
1845           val = {}
1846           for node_name, nos_list in os_data.iteritems():
1847             val[node_name] = [(v.status, v.path) for v in nos_list]
1848         else:
1849           raise errors.ParameterError(field)
1850         row.append(val)
1851       output.append(row)
1852
1853     return output
1854
1855
1856 class LURemoveNode(LogicalUnit):
1857   """Logical unit for removing a node.
1858
1859   """
1860   HPATH = "node-remove"
1861   HTYPE = constants.HTYPE_NODE
1862   _OP_REQP = ["node_name"]
1863
1864   def BuildHooksEnv(self):
1865     """Build hooks env.
1866
1867     This doesn't run on the target node in the pre phase as a failed
1868     node would then be impossible to remove.
1869
1870     """
1871     env = {
1872       "OP_TARGET": self.op.node_name,
1873       "NODE_NAME": self.op.node_name,
1874       }
1875     all_nodes = self.cfg.GetNodeList()
1876     all_nodes.remove(self.op.node_name)
1877     return env, all_nodes, all_nodes
1878
1879   def CheckPrereq(self):
1880     """Check prerequisites.
1881
1882     This checks:
1883      - the node exists in the configuration
1884      - it does not have primary or secondary instances
1885      - it's not the master
1886
1887     Any errors are signalled by raising errors.OpPrereqError.
1888
1889     """
1890     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1891     if node is None:
1892       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1893
1894     instance_list = self.cfg.GetInstanceList()
1895
1896     masternode = self.cfg.GetMasterNode()
1897     if node.name == masternode:
1898       raise errors.OpPrereqError("Node is the master node,"
1899                                  " you need to failover first.")
1900
1901     for instance_name in instance_list:
1902       instance = self.cfg.GetInstanceInfo(instance_name)
1903       if node.name in instance.all_nodes:
1904         raise errors.OpPrereqError("Instance %s is still running on the node,"
1905                                    " please remove first." % instance_name)
1906     self.op.node_name = node.name
1907     self.node = node
1908
1909   def Exec(self, feedback_fn):
1910     """Removes the node from the cluster.
1911
1912     """
1913     node = self.node
1914     logging.info("Stopping the node daemon and removing configs from node %s",
1915                  node.name)
1916
1917     self.context.RemoveNode(node.name)
1918
1919     result = self.rpc.call_node_leave_cluster(node.name)
1920     msg = result.RemoteFailMsg()
1921     if msg:
1922       self.LogWarning("Errors encountered on the remote node while leaving"
1923                       " the cluster: %s", msg)
1924
1925     # Promote nodes to master candidate as needed
1926     _AdjustCandidatePool(self)
1927
1928
1929 class LUQueryNodes(NoHooksLU):
1930   """Logical unit for querying nodes.
1931
1932   """
1933   _OP_REQP = ["output_fields", "names", "use_locking"]
1934   REQ_BGL = False
1935   _FIELDS_DYNAMIC = utils.FieldSet(
1936     "dtotal", "dfree",
1937     "mtotal", "mnode", "mfree",
1938     "bootid",
1939     "ctotal", "cnodes", "csockets",
1940     )
1941
1942   _FIELDS_STATIC = utils.FieldSet(
1943     "name", "pinst_cnt", "sinst_cnt",
1944     "pinst_list", "sinst_list",
1945     "pip", "sip", "tags",
1946     "serial_no",
1947     "master_candidate",
1948     "master",
1949     "offline",
1950     "drained",
1951     )
1952
1953   def ExpandNames(self):
1954     _CheckOutputFields(static=self._FIELDS_STATIC,
1955                        dynamic=self._FIELDS_DYNAMIC,
1956                        selected=self.op.output_fields)
1957
1958     self.needed_locks = {}
1959     self.share_locks[locking.LEVEL_NODE] = 1
1960
1961     if self.op.names:
1962       self.wanted = _GetWantedNodes(self, self.op.names)
1963     else:
1964       self.wanted = locking.ALL_SET
1965
1966     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1967     self.do_locking = self.do_node_query and self.op.use_locking
1968     if self.do_locking:
1969       # if we don't request only static fields, we need to lock the nodes
1970       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1971
1972
1973   def CheckPrereq(self):
1974     """Check prerequisites.
1975
1976     """
1977     # The validation of the node list is done in the _GetWantedNodes,
1978     # if non empty, and if empty, there's no validation to do
1979     pass
1980
1981   def Exec(self, feedback_fn):
1982     """Computes the list of nodes and their attributes.
1983
1984     """
1985     all_info = self.cfg.GetAllNodesInfo()
1986     if self.do_locking:
1987       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1988     elif self.wanted != locking.ALL_SET:
1989       nodenames = self.wanted
1990       missing = set(nodenames).difference(all_info.keys())
1991       if missing:
1992         raise errors.OpExecError(
1993           "Some nodes were removed before retrieving their data: %s" % missing)
1994     else:
1995       nodenames = all_info.keys()
1996
1997     nodenames = utils.NiceSort(nodenames)
1998     nodelist = [all_info[name] for name in nodenames]
1999
2000     # begin data gathering
2001
2002     if self.do_node_query:
2003       live_data = {}
2004       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2005                                           self.cfg.GetHypervisorType())
2006       for name in nodenames:
2007         nodeinfo = node_data[name]
2008         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2009           nodeinfo = nodeinfo.payload
2010           fn = utils.TryConvert
2011           live_data[name] = {
2012             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2013             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2014             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2015             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2016             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2017             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2018             "bootid": nodeinfo.get('bootid', None),
2019             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2020             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2021             }
2022         else:
2023           live_data[name] = {}
2024     else:
2025       live_data = dict.fromkeys(nodenames, {})
2026
2027     node_to_primary = dict([(name, set()) for name in nodenames])
2028     node_to_secondary = dict([(name, set()) for name in nodenames])
2029
2030     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2031                              "sinst_cnt", "sinst_list"))
2032     if inst_fields & frozenset(self.op.output_fields):
2033       instancelist = self.cfg.GetInstanceList()
2034
2035       for instance_name in instancelist:
2036         inst = self.cfg.GetInstanceInfo(instance_name)
2037         if inst.primary_node in node_to_primary:
2038           node_to_primary[inst.primary_node].add(inst.name)
2039         for secnode in inst.secondary_nodes:
2040           if secnode in node_to_secondary:
2041             node_to_secondary[secnode].add(inst.name)
2042
2043     master_node = self.cfg.GetMasterNode()
2044
2045     # end data gathering
2046
2047     output = []
2048     for node in nodelist:
2049       node_output = []
2050       for field in self.op.output_fields:
2051         if field == "name":
2052           val = node.name
2053         elif field == "pinst_list":
2054           val = list(node_to_primary[node.name])
2055         elif field == "sinst_list":
2056           val = list(node_to_secondary[node.name])
2057         elif field == "pinst_cnt":
2058           val = len(node_to_primary[node.name])
2059         elif field == "sinst_cnt":
2060           val = len(node_to_secondary[node.name])
2061         elif field == "pip":
2062           val = node.primary_ip
2063         elif field == "sip":
2064           val = node.secondary_ip
2065         elif field == "tags":
2066           val = list(node.GetTags())
2067         elif field == "serial_no":
2068           val = node.serial_no
2069         elif field == "master_candidate":
2070           val = node.master_candidate
2071         elif field == "master":
2072           val = node.name == master_node
2073         elif field == "offline":
2074           val = node.offline
2075         elif field == "drained":
2076           val = node.drained
2077         elif self._FIELDS_DYNAMIC.Matches(field):
2078           val = live_data[node.name].get(field, None)
2079         else:
2080           raise errors.ParameterError(field)
2081         node_output.append(val)
2082       output.append(node_output)
2083
2084     return output
2085
2086
2087 class LUQueryNodeVolumes(NoHooksLU):
2088   """Logical unit for getting volumes on node(s).
2089
2090   """
2091   _OP_REQP = ["nodes", "output_fields"]
2092   REQ_BGL = False
2093   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2094   _FIELDS_STATIC = utils.FieldSet("node")
2095
2096   def ExpandNames(self):
2097     _CheckOutputFields(static=self._FIELDS_STATIC,
2098                        dynamic=self._FIELDS_DYNAMIC,
2099                        selected=self.op.output_fields)
2100
2101     self.needed_locks = {}
2102     self.share_locks[locking.LEVEL_NODE] = 1
2103     if not self.op.nodes:
2104       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2105     else:
2106       self.needed_locks[locking.LEVEL_NODE] = \
2107         _GetWantedNodes(self, self.op.nodes)
2108
2109   def CheckPrereq(self):
2110     """Check prerequisites.
2111
2112     This checks that the fields required are valid output fields.
2113
2114     """
2115     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2116
2117   def Exec(self, feedback_fn):
2118     """Computes the list of nodes and their attributes.
2119
2120     """
2121     nodenames = self.nodes
2122     volumes = self.rpc.call_node_volumes(nodenames)
2123
2124     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2125              in self.cfg.GetInstanceList()]
2126
2127     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2128
2129     output = []
2130     for node in nodenames:
2131       nresult = volumes[node]
2132       if nresult.offline:
2133         continue
2134       msg = nresult.RemoteFailMsg()
2135       if msg:
2136         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2137         continue
2138
2139       node_vols = nresult.payload[:]
2140       node_vols.sort(key=lambda vol: vol['dev'])
2141
2142       for vol in node_vols:
2143         node_output = []
2144         for field in self.op.output_fields:
2145           if field == "node":
2146             val = node
2147           elif field == "phys":
2148             val = vol['dev']
2149           elif field == "vg":
2150             val = vol['vg']
2151           elif field == "name":
2152             val = vol['name']
2153           elif field == "size":
2154             val = int(float(vol['size']))
2155           elif field == "instance":
2156             for inst in ilist:
2157               if node not in lv_by_node[inst]:
2158                 continue
2159               if vol['name'] in lv_by_node[inst][node]:
2160                 val = inst.name
2161                 break
2162             else:
2163               val = '-'
2164           else:
2165             raise errors.ParameterError(field)
2166           node_output.append(str(val))
2167
2168         output.append(node_output)
2169
2170     return output
2171
2172
2173 class LUAddNode(LogicalUnit):
2174   """Logical unit for adding node to the cluster.
2175
2176   """
2177   HPATH = "node-add"
2178   HTYPE = constants.HTYPE_NODE
2179   _OP_REQP = ["node_name"]
2180
2181   def BuildHooksEnv(self):
2182     """Build hooks env.
2183
2184     This will run on all nodes before, and on all nodes + the new node after.
2185
2186     """
2187     env = {
2188       "OP_TARGET": self.op.node_name,
2189       "NODE_NAME": self.op.node_name,
2190       "NODE_PIP": self.op.primary_ip,
2191       "NODE_SIP": self.op.secondary_ip,
2192       }
2193     nodes_0 = self.cfg.GetNodeList()
2194     nodes_1 = nodes_0 + [self.op.node_name, ]
2195     return env, nodes_0, nodes_1
2196
2197   def CheckPrereq(self):
2198     """Check prerequisites.
2199
2200     This checks:
2201      - the new node is not already in the config
2202      - it is resolvable
2203      - its parameters (single/dual homed) matches the cluster
2204
2205     Any errors are signalled by raising errors.OpPrereqError.
2206
2207     """
2208     node_name = self.op.node_name
2209     cfg = self.cfg
2210
2211     dns_data = utils.HostInfo(node_name)
2212
2213     node = dns_data.name
2214     primary_ip = self.op.primary_ip = dns_data.ip
2215     secondary_ip = getattr(self.op, "secondary_ip", None)
2216     if secondary_ip is None:
2217       secondary_ip = primary_ip
2218     if not utils.IsValidIP(secondary_ip):
2219       raise errors.OpPrereqError("Invalid secondary IP given")
2220     self.op.secondary_ip = secondary_ip
2221
2222     node_list = cfg.GetNodeList()
2223     if not self.op.readd and node in node_list:
2224       raise errors.OpPrereqError("Node %s is already in the configuration" %
2225                                  node)
2226     elif self.op.readd and node not in node_list:
2227       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2228
2229     for existing_node_name in node_list:
2230       existing_node = cfg.GetNodeInfo(existing_node_name)
2231
2232       if self.op.readd and node == existing_node_name:
2233         if (existing_node.primary_ip != primary_ip or
2234             existing_node.secondary_ip != secondary_ip):
2235           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2236                                      " address configuration as before")
2237         continue
2238
2239       if (existing_node.primary_ip == primary_ip or
2240           existing_node.secondary_ip == primary_ip or
2241           existing_node.primary_ip == secondary_ip or
2242           existing_node.secondary_ip == secondary_ip):
2243         raise errors.OpPrereqError("New node ip address(es) conflict with"
2244                                    " existing node %s" % existing_node.name)
2245
2246     # check that the type of the node (single versus dual homed) is the
2247     # same as for the master
2248     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2249     master_singlehomed = myself.secondary_ip == myself.primary_ip
2250     newbie_singlehomed = secondary_ip == primary_ip
2251     if master_singlehomed != newbie_singlehomed:
2252       if master_singlehomed:
2253         raise errors.OpPrereqError("The master has no private ip but the"
2254                                    " new node has one")
2255       else:
2256         raise errors.OpPrereqError("The master has a private ip but the"
2257                                    " new node doesn't have one")
2258
2259     # checks reachablity
2260     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2261       raise errors.OpPrereqError("Node not reachable by ping")
2262
2263     if not newbie_singlehomed:
2264       # check reachability from my secondary ip to newbie's secondary ip
2265       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2266                            source=myself.secondary_ip):
2267         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2268                                    " based ping to noded port")
2269
2270     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2271     mc_now, _ = self.cfg.GetMasterCandidateStats()
2272     master_candidate = mc_now < cp_size
2273
2274     self.new_node = objects.Node(name=node,
2275                                  primary_ip=primary_ip,
2276                                  secondary_ip=secondary_ip,
2277                                  master_candidate=master_candidate,
2278                                  offline=False, drained=False)
2279
2280   def Exec(self, feedback_fn):
2281     """Adds the new node to the cluster.
2282
2283     """
2284     new_node = self.new_node
2285     node = new_node.name
2286
2287     # check connectivity
2288     result = self.rpc.call_version([node])[node]
2289     msg = result.RemoteFailMsg()
2290     if msg:
2291       raise errors.OpExecError("Can't get version information from"
2292                                " node %s: %s" % (node, msg))
2293     if constants.PROTOCOL_VERSION == result.payload:
2294       logging.info("Communication to node %s fine, sw version %s match",
2295                    node, result.payload)
2296     else:
2297       raise errors.OpExecError("Version mismatch master version %s,"
2298                                " node version %s" %
2299                                (constants.PROTOCOL_VERSION, result.payload))
2300
2301     # setup ssh on node
2302     logging.info("Copy ssh key to node %s", node)
2303     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2304     keyarray = []
2305     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2306                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2307                 priv_key, pub_key]
2308
2309     for i in keyfiles:
2310       f = open(i, 'r')
2311       try:
2312         keyarray.append(f.read())
2313       finally:
2314         f.close()
2315
2316     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2317                                     keyarray[2],
2318                                     keyarray[3], keyarray[4], keyarray[5])
2319
2320     msg = result.RemoteFailMsg()
2321     if msg:
2322       raise errors.OpExecError("Cannot transfer ssh keys to the"
2323                                " new node: %s" % msg)
2324
2325     # Add node to our /etc/hosts, and add key to known_hosts
2326     if self.cfg.GetClusterInfo().modify_etc_hosts:
2327       utils.AddHostToEtcHosts(new_node.name)
2328
2329     if new_node.secondary_ip != new_node.primary_ip:
2330       result = self.rpc.call_node_has_ip_address(new_node.name,
2331                                                  new_node.secondary_ip)
2332       msg = result.RemoteFailMsg()
2333       if msg:
2334         raise errors.OpPrereqError("Failure checking secondary ip"
2335                                    " on node %s: %s" % (new_node.name, msg))
2336       if not result.payload:
2337         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2338                                  " you gave (%s). Please fix and re-run this"
2339                                  " command." % new_node.secondary_ip)
2340
2341     node_verify_list = [self.cfg.GetMasterNode()]
2342     node_verify_param = {
2343       'nodelist': [node],
2344       # TODO: do a node-net-test as well?
2345     }
2346
2347     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2348                                        self.cfg.GetClusterName())
2349     for verifier in node_verify_list:
2350       msg = result[verifier].RemoteFailMsg()
2351       if msg:
2352         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2353                                  (verifier, msg))
2354       nl_payload = result[verifier].payload['nodelist']
2355       if nl_payload:
2356         for failed in nl_payload:
2357           feedback_fn("ssh/hostname verification failed %s -> %s" %
2358                       (verifier, nl_payload[failed]))
2359         raise errors.OpExecError("ssh/hostname verification failed.")
2360
2361     if self.op.readd:
2362       _RedistributeAncillaryFiles(self)
2363       self.context.ReaddNode(new_node)
2364     else:
2365       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2366       self.context.AddNode(new_node)
2367
2368
2369 class LUSetNodeParams(LogicalUnit):
2370   """Modifies the parameters of a node.
2371
2372   """
2373   HPATH = "node-modify"
2374   HTYPE = constants.HTYPE_NODE
2375   _OP_REQP = ["node_name"]
2376   REQ_BGL = False
2377
2378   def CheckArguments(self):
2379     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2380     if node_name is None:
2381       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2382     self.op.node_name = node_name
2383     _CheckBooleanOpField(self.op, 'master_candidate')
2384     _CheckBooleanOpField(self.op, 'offline')
2385     _CheckBooleanOpField(self.op, 'drained')
2386     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2387     if all_mods.count(None) == 3:
2388       raise errors.OpPrereqError("Please pass at least one modification")
2389     if all_mods.count(True) > 1:
2390       raise errors.OpPrereqError("Can't set the node into more than one"
2391                                  " state at the same time")
2392
2393   def ExpandNames(self):
2394     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2395
2396   def BuildHooksEnv(self):
2397     """Build hooks env.
2398
2399     This runs on the master node.
2400
2401     """
2402     env = {
2403       "OP_TARGET": self.op.node_name,
2404       "MASTER_CANDIDATE": str(self.op.master_candidate),
2405       "OFFLINE": str(self.op.offline),
2406       "DRAINED": str(self.op.drained),
2407       }
2408     nl = [self.cfg.GetMasterNode(),
2409           self.op.node_name]
2410     return env, nl, nl
2411
2412   def CheckPrereq(self):
2413     """Check prerequisites.
2414
2415     This only checks the instance list against the existing names.
2416
2417     """
2418     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2419
2420     if ((self.op.master_candidate == False or self.op.offline == True or
2421          self.op.drained == True) and node.master_candidate):
2422       # we will demote the node from master_candidate
2423       if self.op.node_name == self.cfg.GetMasterNode():
2424         raise errors.OpPrereqError("The master node has to be a"
2425                                    " master candidate, online and not drained")
2426       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2427       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2428       if num_candidates <= cp_size:
2429         msg = ("Not enough master candidates (desired"
2430                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2431         if self.op.force:
2432           self.LogWarning(msg)
2433         else:
2434           raise errors.OpPrereqError(msg)
2435
2436     if (self.op.master_candidate == True and
2437         ((node.offline and not self.op.offline == False) or
2438          (node.drained and not self.op.drained == False))):
2439       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2440                                  " to master_candidate" % node.name)
2441
2442     return
2443
2444   def Exec(self, feedback_fn):
2445     """Modifies a node.
2446
2447     """
2448     node = self.node
2449
2450     result = []
2451     changed_mc = False
2452
2453     if self.op.offline is not None:
2454       node.offline = self.op.offline
2455       result.append(("offline", str(self.op.offline)))
2456       if self.op.offline == True:
2457         if node.master_candidate:
2458           node.master_candidate = False
2459           changed_mc = True
2460           result.append(("master_candidate", "auto-demotion due to offline"))
2461         if node.drained:
2462           node.drained = False
2463           result.append(("drained", "clear drained status due to offline"))
2464
2465     if self.op.master_candidate is not None:
2466       node.master_candidate = self.op.master_candidate
2467       changed_mc = True
2468       result.append(("master_candidate", str(self.op.master_candidate)))
2469       if self.op.master_candidate == False:
2470         rrc = self.rpc.call_node_demote_from_mc(node.name)
2471         msg = rrc.RemoteFailMsg()
2472         if msg:
2473           self.LogWarning("Node failed to demote itself: %s" % msg)
2474
2475     if self.op.drained is not None:
2476       node.drained = self.op.drained
2477       result.append(("drained", str(self.op.drained)))
2478       if self.op.drained == True:
2479         if node.master_candidate:
2480           node.master_candidate = False
2481           changed_mc = True
2482           result.append(("master_candidate", "auto-demotion due to drain"))
2483         if node.offline:
2484           node.offline = False
2485           result.append(("offline", "clear offline status due to drain"))
2486
2487     # this will trigger configuration file update, if needed
2488     self.cfg.Update(node)
2489     # this will trigger job queue propagation or cleanup
2490     if changed_mc:
2491       self.context.ReaddNode(node)
2492
2493     return result
2494
2495
2496 class LUPowercycleNode(NoHooksLU):
2497   """Powercycles a node.
2498
2499   """
2500   _OP_REQP = ["node_name", "force"]
2501   REQ_BGL = False
2502
2503   def CheckArguments(self):
2504     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2505     if node_name is None:
2506       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2507     self.op.node_name = node_name
2508     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2509       raise errors.OpPrereqError("The node is the master and the force"
2510                                  " parameter was not set")
2511
2512   def ExpandNames(self):
2513     """Locking for PowercycleNode.
2514
2515     This is a last-resource option and shouldn't block on other
2516     jobs. Therefore, we grab no locks.
2517
2518     """
2519     self.needed_locks = {}
2520
2521   def CheckPrereq(self):
2522     """Check prerequisites.
2523
2524     This LU has no prereqs.
2525
2526     """
2527     pass
2528
2529   def Exec(self, feedback_fn):
2530     """Reboots a node.
2531
2532     """
2533     result = self.rpc.call_node_powercycle(self.op.node_name,
2534                                            self.cfg.GetHypervisorType())
2535     msg = result.RemoteFailMsg()
2536     if msg:
2537       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2538     return result.payload
2539
2540
2541 class LUQueryClusterInfo(NoHooksLU):
2542   """Query cluster configuration.
2543
2544   """
2545   _OP_REQP = []
2546   REQ_BGL = False
2547
2548   def ExpandNames(self):
2549     self.needed_locks = {}
2550
2551   def CheckPrereq(self):
2552     """No prerequsites needed for this LU.
2553
2554     """
2555     pass
2556
2557   def Exec(self, feedback_fn):
2558     """Return cluster config.
2559
2560     """
2561     cluster = self.cfg.GetClusterInfo()
2562     result = {
2563       "software_version": constants.RELEASE_VERSION,
2564       "protocol_version": constants.PROTOCOL_VERSION,
2565       "config_version": constants.CONFIG_VERSION,
2566       "os_api_version": constants.OS_API_VERSION,
2567       "export_version": constants.EXPORT_VERSION,
2568       "architecture": (platform.architecture()[0], platform.machine()),
2569       "name": cluster.cluster_name,
2570       "master": cluster.master_node,
2571       "default_hypervisor": cluster.default_hypervisor,
2572       "enabled_hypervisors": cluster.enabled_hypervisors,
2573       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2574                         for hypervisor in cluster.enabled_hypervisors]),
2575       "beparams": cluster.beparams,
2576       "nicparams": cluster.nicparams,
2577       "candidate_pool_size": cluster.candidate_pool_size,
2578       "master_netdev": cluster.master_netdev,
2579       "volume_group_name": cluster.volume_group_name,
2580       "file_storage_dir": cluster.file_storage_dir,
2581       }
2582
2583     return result
2584
2585
2586 class LUQueryConfigValues(NoHooksLU):
2587   """Return configuration values.
2588
2589   """
2590   _OP_REQP = []
2591   REQ_BGL = False
2592   _FIELDS_DYNAMIC = utils.FieldSet()
2593   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2594
2595   def ExpandNames(self):
2596     self.needed_locks = {}
2597
2598     _CheckOutputFields(static=self._FIELDS_STATIC,
2599                        dynamic=self._FIELDS_DYNAMIC,
2600                        selected=self.op.output_fields)
2601
2602   def CheckPrereq(self):
2603     """No prerequisites.
2604
2605     """
2606     pass
2607
2608   def Exec(self, feedback_fn):
2609     """Dump a representation of the cluster config to the standard output.
2610
2611     """
2612     values = []
2613     for field in self.op.output_fields:
2614       if field == "cluster_name":
2615         entry = self.cfg.GetClusterName()
2616       elif field == "master_node":
2617         entry = self.cfg.GetMasterNode()
2618       elif field == "drain_flag":
2619         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2620       else:
2621         raise errors.ParameterError(field)
2622       values.append(entry)
2623     return values
2624
2625
2626 class LUActivateInstanceDisks(NoHooksLU):
2627   """Bring up an instance's disks.
2628
2629   """
2630   _OP_REQP = ["instance_name"]
2631   REQ_BGL = False
2632
2633   def ExpandNames(self):
2634     self._ExpandAndLockInstance()
2635     self.needed_locks[locking.LEVEL_NODE] = []
2636     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2637
2638   def DeclareLocks(self, level):
2639     if level == locking.LEVEL_NODE:
2640       self._LockInstancesNodes()
2641
2642   def CheckPrereq(self):
2643     """Check prerequisites.
2644
2645     This checks that the instance is in the cluster.
2646
2647     """
2648     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2649     assert self.instance is not None, \
2650       "Cannot retrieve locked instance %s" % self.op.instance_name
2651     _CheckNodeOnline(self, self.instance.primary_node)
2652
2653   def Exec(self, feedback_fn):
2654     """Activate the disks.
2655
2656     """
2657     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2658     if not disks_ok:
2659       raise errors.OpExecError("Cannot activate block devices")
2660
2661     return disks_info
2662
2663
2664 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2665   """Prepare the block devices for an instance.
2666
2667   This sets up the block devices on all nodes.
2668
2669   @type lu: L{LogicalUnit}
2670   @param lu: the logical unit on whose behalf we execute
2671   @type instance: L{objects.Instance}
2672   @param instance: the instance for whose disks we assemble
2673   @type ignore_secondaries: boolean
2674   @param ignore_secondaries: if true, errors on secondary nodes
2675       won't result in an error return from the function
2676   @return: False if the operation failed, otherwise a list of
2677       (host, instance_visible_name, node_visible_name)
2678       with the mapping from node devices to instance devices
2679
2680   """
2681   device_info = []
2682   disks_ok = True
2683   iname = instance.name
2684   # With the two passes mechanism we try to reduce the window of
2685   # opportunity for the race condition of switching DRBD to primary
2686   # before handshaking occured, but we do not eliminate it
2687
2688   # The proper fix would be to wait (with some limits) until the
2689   # connection has been made and drbd transitions from WFConnection
2690   # into any other network-connected state (Connected, SyncTarget,
2691   # SyncSource, etc.)
2692
2693   # 1st pass, assemble on all nodes in secondary mode
2694   for inst_disk in instance.disks:
2695     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2696       lu.cfg.SetDiskID(node_disk, node)
2697       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2698       msg = result.RemoteFailMsg()
2699       if msg:
2700         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701                            " (is_primary=False, pass=1): %s",
2702                            inst_disk.iv_name, node, msg)
2703         if not ignore_secondaries:
2704           disks_ok = False
2705
2706   # FIXME: race condition on drbd migration to primary
2707
2708   # 2nd pass, do only the primary node
2709   for inst_disk in instance.disks:
2710     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2711       if node != instance.primary_node:
2712         continue
2713       lu.cfg.SetDiskID(node_disk, node)
2714       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2715       msg = result.RemoteFailMsg()
2716       if msg:
2717         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2718                            " (is_primary=True, pass=2): %s",
2719                            inst_disk.iv_name, node, msg)
2720         disks_ok = False
2721     device_info.append((instance.primary_node, inst_disk.iv_name,
2722                         result.payload))
2723
2724   # leave the disks configured for the primary node
2725   # this is a workaround that would be fixed better by
2726   # improving the logical/physical id handling
2727   for disk in instance.disks:
2728     lu.cfg.SetDiskID(disk, instance.primary_node)
2729
2730   return disks_ok, device_info
2731
2732
2733 def _StartInstanceDisks(lu, instance, force):
2734   """Start the disks of an instance.
2735
2736   """
2737   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2738                                            ignore_secondaries=force)
2739   if not disks_ok:
2740     _ShutdownInstanceDisks(lu, instance)
2741     if force is not None and not force:
2742       lu.proc.LogWarning("", hint="If the message above refers to a"
2743                          " secondary node,"
2744                          " you can retry the operation using '--force'.")
2745     raise errors.OpExecError("Disk consistency error")
2746
2747
2748 class LUDeactivateInstanceDisks(NoHooksLU):
2749   """Shutdown an instance's disks.
2750
2751   """
2752   _OP_REQP = ["instance_name"]
2753   REQ_BGL = False
2754
2755   def ExpandNames(self):
2756     self._ExpandAndLockInstance()
2757     self.needed_locks[locking.LEVEL_NODE] = []
2758     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2759
2760   def DeclareLocks(self, level):
2761     if level == locking.LEVEL_NODE:
2762       self._LockInstancesNodes()
2763
2764   def CheckPrereq(self):
2765     """Check prerequisites.
2766
2767     This checks that the instance is in the cluster.
2768
2769     """
2770     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2771     assert self.instance is not None, \
2772       "Cannot retrieve locked instance %s" % self.op.instance_name
2773
2774   def Exec(self, feedback_fn):
2775     """Deactivate the disks
2776
2777     """
2778     instance = self.instance
2779     _SafeShutdownInstanceDisks(self, instance)
2780
2781
2782 def _SafeShutdownInstanceDisks(lu, instance):
2783   """Shutdown block devices of an instance.
2784
2785   This function checks if an instance is running, before calling
2786   _ShutdownInstanceDisks.
2787
2788   """
2789   pnode = instance.primary_node
2790   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2791   ins_l = ins_l[pnode]
2792   msg = ins_l.RemoteFailMsg()
2793   if msg:
2794     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2795
2796   if instance.name in ins_l.payload:
2797     raise errors.OpExecError("Instance is running, can't shutdown"
2798                              " block devices.")
2799
2800   _ShutdownInstanceDisks(lu, instance)
2801
2802
2803 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2804   """Shutdown block devices of an instance.
2805
2806   This does the shutdown on all nodes of the instance.
2807
2808   If the ignore_primary is false, errors on the primary node are
2809   ignored.
2810
2811   """
2812   all_result = True
2813   for disk in instance.disks:
2814     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2815       lu.cfg.SetDiskID(top_disk, node)
2816       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2817       msg = result.RemoteFailMsg()
2818       if msg:
2819         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2820                       disk.iv_name, node, msg)
2821         if not ignore_primary or node != instance.primary_node:
2822           all_result = False
2823   return all_result
2824
2825
2826 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2827   """Checks if a node has enough free memory.
2828
2829   This function check if a given node has the needed amount of free
2830   memory. In case the node has less memory or we cannot get the
2831   information from the node, this function raise an OpPrereqError
2832   exception.
2833
2834   @type lu: C{LogicalUnit}
2835   @param lu: a logical unit from which we get configuration data
2836   @type node: C{str}
2837   @param node: the node to check
2838   @type reason: C{str}
2839   @param reason: string to use in the error message
2840   @type requested: C{int}
2841   @param requested: the amount of memory in MiB to check for
2842   @type hypervisor_name: C{str}
2843   @param hypervisor_name: the hypervisor to ask for memory stats
2844   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2845       we cannot check the node
2846
2847   """
2848   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2849   msg = nodeinfo[node].RemoteFailMsg()
2850   if msg:
2851     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2852   free_mem = nodeinfo[node].payload.get('memory_free', None)
2853   if not isinstance(free_mem, int):
2854     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2855                                " was '%s'" % (node, free_mem))
2856   if requested > free_mem:
2857     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2858                                " needed %s MiB, available %s MiB" %
2859                                (node, reason, requested, free_mem))
2860
2861
2862 class LUStartupInstance(LogicalUnit):
2863   """Starts an instance.
2864
2865   """
2866   HPATH = "instance-start"
2867   HTYPE = constants.HTYPE_INSTANCE
2868   _OP_REQP = ["instance_name", "force"]
2869   REQ_BGL = False
2870
2871   def ExpandNames(self):
2872     self._ExpandAndLockInstance()
2873
2874   def BuildHooksEnv(self):
2875     """Build hooks env.
2876
2877     This runs on master, primary and secondary nodes of the instance.
2878
2879     """
2880     env = {
2881       "FORCE": self.op.force,
2882       }
2883     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2884     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2885     return env, nl, nl
2886
2887   def CheckPrereq(self):
2888     """Check prerequisites.
2889
2890     This checks that the instance is in the cluster.
2891
2892     """
2893     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2894     assert self.instance is not None, \
2895       "Cannot retrieve locked instance %s" % self.op.instance_name
2896
2897     # extra beparams
2898     self.beparams = getattr(self.op, "beparams", {})
2899     if self.beparams:
2900       if not isinstance(self.beparams, dict):
2901         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2902                                    " dict" % (type(self.beparams), ))
2903       # fill the beparams dict
2904       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2905       self.op.beparams = self.beparams
2906
2907     # extra hvparams
2908     self.hvparams = getattr(self.op, "hvparams", {})
2909     if self.hvparams:
2910       if not isinstance(self.hvparams, dict):
2911         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2912                                    " dict" % (type(self.hvparams), ))
2913
2914       # check hypervisor parameter syntax (locally)
2915       cluster = self.cfg.GetClusterInfo()
2916       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2917       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2918                                     instance.hvparams)
2919       filled_hvp.update(self.hvparams)
2920       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2921       hv_type.CheckParameterSyntax(filled_hvp)
2922       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2923       self.op.hvparams = self.hvparams
2924
2925     _CheckNodeOnline(self, instance.primary_node)
2926
2927     bep = self.cfg.GetClusterInfo().FillBE(instance)
2928     # check bridges existance
2929     _CheckInstanceBridgesExist(self, instance)
2930
2931     remote_info = self.rpc.call_instance_info(instance.primary_node,
2932                                               instance.name,
2933                                               instance.hypervisor)
2934     msg = remote_info.RemoteFailMsg()
2935     if msg:
2936       raise errors.OpPrereqError("Error checking node %s: %s" %
2937                                  (instance.primary_node, msg))
2938     if not remote_info.payload: # not running already
2939       _CheckNodeFreeMemory(self, instance.primary_node,
2940                            "starting instance %s" % instance.name,
2941                            bep[constants.BE_MEMORY], instance.hypervisor)
2942
2943   def Exec(self, feedback_fn):
2944     """Start the instance.
2945
2946     """
2947     instance = self.instance
2948     force = self.op.force
2949
2950     self.cfg.MarkInstanceUp(instance.name)
2951
2952     node_current = instance.primary_node
2953
2954     _StartInstanceDisks(self, instance, force)
2955
2956     result = self.rpc.call_instance_start(node_current, instance,
2957                                           self.hvparams, self.beparams)
2958     msg = result.RemoteFailMsg()
2959     if msg:
2960       _ShutdownInstanceDisks(self, instance)
2961       raise errors.OpExecError("Could not start instance: %s" % msg)
2962
2963
2964 class LURebootInstance(LogicalUnit):
2965   """Reboot an instance.
2966
2967   """
2968   HPATH = "instance-reboot"
2969   HTYPE = constants.HTYPE_INSTANCE
2970   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2971   REQ_BGL = False
2972
2973   def ExpandNames(self):
2974     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2975                                    constants.INSTANCE_REBOOT_HARD,
2976                                    constants.INSTANCE_REBOOT_FULL]:
2977       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2978                                   (constants.INSTANCE_REBOOT_SOFT,
2979                                    constants.INSTANCE_REBOOT_HARD,
2980                                    constants.INSTANCE_REBOOT_FULL))
2981     self._ExpandAndLockInstance()
2982
2983   def BuildHooksEnv(self):
2984     """Build hooks env.
2985
2986     This runs on master, primary and secondary nodes of the instance.
2987
2988     """
2989     env = {
2990       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2991       "REBOOT_TYPE": self.op.reboot_type,
2992       }
2993     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2994     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2995     return env, nl, nl
2996
2997   def CheckPrereq(self):
2998     """Check prerequisites.
2999
3000     This checks that the instance is in the cluster.
3001
3002     """
3003     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3004     assert self.instance is not None, \
3005       "Cannot retrieve locked instance %s" % self.op.instance_name
3006
3007     _CheckNodeOnline(self, instance.primary_node)
3008
3009     # check bridges existance
3010     _CheckInstanceBridgesExist(self, instance)
3011
3012   def Exec(self, feedback_fn):
3013     """Reboot the instance.
3014
3015     """
3016     instance = self.instance
3017     ignore_secondaries = self.op.ignore_secondaries
3018     reboot_type = self.op.reboot_type
3019
3020     node_current = instance.primary_node
3021
3022     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3023                        constants.INSTANCE_REBOOT_HARD]:
3024       for disk in instance.disks:
3025         self.cfg.SetDiskID(disk, node_current)
3026       result = self.rpc.call_instance_reboot(node_current, instance,
3027                                              reboot_type)
3028       msg = result.RemoteFailMsg()
3029       if msg:
3030         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3031     else:
3032       result = self.rpc.call_instance_shutdown(node_current, instance)
3033       msg = result.RemoteFailMsg()
3034       if msg:
3035         raise errors.OpExecError("Could not shutdown instance for"
3036                                  " full reboot: %s" % msg)
3037       _ShutdownInstanceDisks(self, instance)
3038       _StartInstanceDisks(self, instance, ignore_secondaries)
3039       result = self.rpc.call_instance_start(node_current, instance, None, None)
3040       msg = result.RemoteFailMsg()
3041       if msg:
3042         _ShutdownInstanceDisks(self, instance)
3043         raise errors.OpExecError("Could not start instance for"
3044                                  " full reboot: %s" % msg)
3045
3046     self.cfg.MarkInstanceUp(instance.name)
3047
3048
3049 class LUShutdownInstance(LogicalUnit):
3050   """Shutdown an instance.
3051
3052   """
3053   HPATH = "instance-stop"
3054   HTYPE = constants.HTYPE_INSTANCE
3055   _OP_REQP = ["instance_name"]
3056   REQ_BGL = False
3057
3058   def ExpandNames(self):
3059     self._ExpandAndLockInstance()
3060
3061   def BuildHooksEnv(self):
3062     """Build hooks env.
3063
3064     This runs on master, primary and secondary nodes of the instance.
3065
3066     """
3067     env = _BuildInstanceHookEnvByObject(self, self.instance)
3068     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3069     return env, nl, nl
3070
3071   def CheckPrereq(self):
3072     """Check prerequisites.
3073
3074     This checks that the instance is in the cluster.
3075
3076     """
3077     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3078     assert self.instance is not None, \
3079       "Cannot retrieve locked instance %s" % self.op.instance_name
3080     _CheckNodeOnline(self, self.instance.primary_node)
3081
3082   def Exec(self, feedback_fn):
3083     """Shutdown the instance.
3084
3085     """
3086     instance = self.instance
3087     node_current = instance.primary_node
3088     self.cfg.MarkInstanceDown(instance.name)
3089     result = self.rpc.call_instance_shutdown(node_current, instance)
3090     msg = result.RemoteFailMsg()
3091     if msg:
3092       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3093
3094     _ShutdownInstanceDisks(self, instance)
3095
3096
3097 class LUReinstallInstance(LogicalUnit):
3098   """Reinstall an instance.
3099
3100   """
3101   HPATH = "instance-reinstall"
3102   HTYPE = constants.HTYPE_INSTANCE
3103   _OP_REQP = ["instance_name"]
3104   REQ_BGL = False
3105
3106   def ExpandNames(self):
3107     self._ExpandAndLockInstance()
3108
3109   def BuildHooksEnv(self):
3110     """Build hooks env.
3111
3112     This runs on master, primary and secondary nodes of the instance.
3113
3114     """
3115     env = _BuildInstanceHookEnvByObject(self, self.instance)
3116     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3117     return env, nl, nl
3118
3119   def CheckPrereq(self):
3120     """Check prerequisites.
3121
3122     This checks that the instance is in the cluster and is not running.
3123
3124     """
3125     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3126     assert instance is not None, \
3127       "Cannot retrieve locked instance %s" % self.op.instance_name
3128     _CheckNodeOnline(self, instance.primary_node)
3129
3130     if instance.disk_template == constants.DT_DISKLESS:
3131       raise errors.OpPrereqError("Instance '%s' has no disks" %
3132                                  self.op.instance_name)
3133     if instance.admin_up:
3134       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3135                                  self.op.instance_name)
3136     remote_info = self.rpc.call_instance_info(instance.primary_node,
3137                                               instance.name,
3138                                               instance.hypervisor)
3139     msg = remote_info.RemoteFailMsg()
3140     if msg:
3141       raise errors.OpPrereqError("Error checking node %s: %s" %
3142                                  (instance.primary_node, msg))
3143     if remote_info.payload:
3144       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3145                                  (self.op.instance_name,
3146                                   instance.primary_node))
3147
3148     self.op.os_type = getattr(self.op, "os_type", None)
3149     if self.op.os_type is not None:
3150       # OS verification
3151       pnode = self.cfg.GetNodeInfo(
3152         self.cfg.ExpandNodeName(instance.primary_node))
3153       if pnode is None:
3154         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3155                                    self.op.pnode)
3156       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3157       result.Raise()
3158       if not isinstance(result.data, objects.OS):
3159         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3160                                    " primary node"  % self.op.os_type)
3161
3162     self.instance = instance
3163
3164   def Exec(self, feedback_fn):
3165     """Reinstall the instance.
3166
3167     """
3168     inst = self.instance
3169
3170     if self.op.os_type is not None:
3171       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3172       inst.os = self.op.os_type
3173       self.cfg.Update(inst)
3174
3175     _StartInstanceDisks(self, inst, None)
3176     try:
3177       feedback_fn("Running the instance OS create scripts...")
3178       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3179       msg = result.RemoteFailMsg()
3180       if msg:
3181         raise errors.OpExecError("Could not install OS for instance %s"
3182                                  " on node %s: %s" %
3183                                  (inst.name, inst.primary_node, msg))
3184     finally:
3185       _ShutdownInstanceDisks(self, inst)
3186
3187
3188 class LURenameInstance(LogicalUnit):
3189   """Rename an instance.
3190
3191   """
3192   HPATH = "instance-rename"
3193   HTYPE = constants.HTYPE_INSTANCE
3194   _OP_REQP = ["instance_name", "new_name"]
3195
3196   def BuildHooksEnv(self):
3197     """Build hooks env.
3198
3199     This runs on master, primary and secondary nodes of the instance.
3200
3201     """
3202     env = _BuildInstanceHookEnvByObject(self, self.instance)
3203     env["INSTANCE_NEW_NAME"] = self.op.new_name
3204     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3205     return env, nl, nl
3206
3207   def CheckPrereq(self):
3208     """Check prerequisites.
3209
3210     This checks that the instance is in the cluster and is not running.
3211
3212     """
3213     instance = self.cfg.GetInstanceInfo(
3214       self.cfg.ExpandInstanceName(self.op.instance_name))
3215     if instance is None:
3216       raise errors.OpPrereqError("Instance '%s' not known" %
3217                                  self.op.instance_name)
3218     _CheckNodeOnline(self, instance.primary_node)
3219
3220     if instance.admin_up:
3221       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3222                                  self.op.instance_name)
3223     remote_info = self.rpc.call_instance_info(instance.primary_node,
3224                                               instance.name,
3225                                               instance.hypervisor)
3226     msg = remote_info.RemoteFailMsg()
3227     if msg:
3228       raise errors.OpPrereqError("Error checking node %s: %s" %
3229                                  (instance.primary_node, msg))
3230     if remote_info.payload:
3231       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3232                                  (self.op.instance_name,
3233                                   instance.primary_node))
3234     self.instance = instance
3235
3236     # new name verification
3237     name_info = utils.HostInfo(self.op.new_name)
3238
3239     self.op.new_name = new_name = name_info.name
3240     instance_list = self.cfg.GetInstanceList()
3241     if new_name in instance_list:
3242       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3243                                  new_name)
3244
3245     if not getattr(self.op, "ignore_ip", False):
3246       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3247         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3248                                    (name_info.ip, new_name))
3249
3250
3251   def Exec(self, feedback_fn):
3252     """Reinstall the instance.
3253
3254     """
3255     inst = self.instance
3256     old_name = inst.name
3257
3258     if inst.disk_template == constants.DT_FILE:
3259       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3260
3261     self.cfg.RenameInstance(inst.name, self.op.new_name)
3262     # Change the instance lock. This is definitely safe while we hold the BGL
3263     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3264     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3265
3266     # re-read the instance from the configuration after rename
3267     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3268
3269     if inst.disk_template == constants.DT_FILE:
3270       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3271       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3272                                                      old_file_storage_dir,
3273                                                      new_file_storage_dir)
3274       result.Raise()
3275       if not result.data:
3276         raise errors.OpExecError("Could not connect to node '%s' to rename"
3277                                  " directory '%s' to '%s' (but the instance"
3278                                  " has been renamed in Ganeti)" % (
3279                                  inst.primary_node, old_file_storage_dir,
3280                                  new_file_storage_dir))
3281
3282       if not result.data[0]:
3283         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3284                                  " (but the instance has been renamed in"
3285                                  " Ganeti)" % (old_file_storage_dir,
3286                                                new_file_storage_dir))
3287
3288     _StartInstanceDisks(self, inst, None)
3289     try:
3290       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3291                                                  old_name)
3292       msg = result.RemoteFailMsg()
3293       if msg:
3294         msg = ("Could not run OS rename script for instance %s on node %s"
3295                " (but the instance has been renamed in Ganeti): %s" %
3296                (inst.name, inst.primary_node, msg))
3297         self.proc.LogWarning(msg)
3298     finally:
3299       _ShutdownInstanceDisks(self, inst)
3300
3301
3302 class LURemoveInstance(LogicalUnit):
3303   """Remove an instance.
3304
3305   """
3306   HPATH = "instance-remove"
3307   HTYPE = constants.HTYPE_INSTANCE
3308   _OP_REQP = ["instance_name", "ignore_failures"]
3309   REQ_BGL = False
3310
3311   def ExpandNames(self):
3312     self._ExpandAndLockInstance()
3313     self.needed_locks[locking.LEVEL_NODE] = []
3314     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3315
3316   def DeclareLocks(self, level):
3317     if level == locking.LEVEL_NODE:
3318       self._LockInstancesNodes()
3319
3320   def BuildHooksEnv(self):
3321     """Build hooks env.
3322
3323     This runs on master, primary and secondary nodes of the instance.
3324
3325     """
3326     env = _BuildInstanceHookEnvByObject(self, self.instance)
3327     nl = [self.cfg.GetMasterNode()]
3328     return env, nl, nl
3329
3330   def CheckPrereq(self):
3331     """Check prerequisites.
3332
3333     This checks that the instance is in the cluster.
3334
3335     """
3336     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3337     assert self.instance is not None, \
3338       "Cannot retrieve locked instance %s" % self.op.instance_name
3339
3340   def Exec(self, feedback_fn):
3341     """Remove the instance.
3342
3343     """
3344     instance = self.instance
3345     logging.info("Shutting down instance %s on node %s",
3346                  instance.name, instance.primary_node)
3347
3348     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3349     msg = result.RemoteFailMsg()
3350     if msg:
3351       if self.op.ignore_failures:
3352         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3353       else:
3354         raise errors.OpExecError("Could not shutdown instance %s on"
3355                                  " node %s: %s" %
3356                                  (instance.name, instance.primary_node, msg))
3357
3358     logging.info("Removing block devices for instance %s", instance.name)
3359
3360     if not _RemoveDisks(self, instance):
3361       if self.op.ignore_failures:
3362         feedback_fn("Warning: can't remove instance's disks")
3363       else:
3364         raise errors.OpExecError("Can't remove instance's disks")
3365
3366     logging.info("Removing instance %s out of cluster config", instance.name)
3367
3368     self.cfg.RemoveInstance(instance.name)
3369     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3370
3371
3372 class LUQueryInstances(NoHooksLU):
3373   """Logical unit for querying instances.
3374
3375   """
3376   _OP_REQP = ["output_fields", "names", "use_locking"]
3377   REQ_BGL = False
3378   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3379                                     "admin_state",
3380                                     "disk_template", "ip", "mac", "bridge",
3381                                     "sda_size", "sdb_size", "vcpus", "tags",
3382                                     "network_port", "beparams",
3383                                     r"(disk)\.(size)/([0-9]+)",
3384                                     r"(disk)\.(sizes)", "disk_usage",
3385                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3386                                     r"(nic)\.(macs|ips|bridges)",
3387                                     r"(disk|nic)\.(count)",
3388                                     "serial_no", "hypervisor", "hvparams",] +
3389                                   ["hv/%s" % name
3390                                    for name in constants.HVS_PARAMETERS] +
3391                                   ["be/%s" % name
3392                                    for name in constants.BES_PARAMETERS])
3393   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3394
3395
3396   def ExpandNames(self):
3397     _CheckOutputFields(static=self._FIELDS_STATIC,
3398                        dynamic=self._FIELDS_DYNAMIC,
3399                        selected=self.op.output_fields)
3400
3401     self.needed_locks = {}
3402     self.share_locks[locking.LEVEL_INSTANCE] = 1
3403     self.share_locks[locking.LEVEL_NODE] = 1
3404
3405     if self.op.names:
3406       self.wanted = _GetWantedInstances(self, self.op.names)
3407     else:
3408       self.wanted = locking.ALL_SET
3409
3410     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3411     self.do_locking = self.do_node_query and self.op.use_locking
3412     if self.do_locking:
3413       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3414       self.needed_locks[locking.LEVEL_NODE] = []
3415       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3416
3417   def DeclareLocks(self, level):
3418     if level == locking.LEVEL_NODE and self.do_locking:
3419       self._LockInstancesNodes()
3420
3421   def CheckPrereq(self):
3422     """Check prerequisites.
3423
3424     """
3425     pass
3426
3427   def Exec(self, feedback_fn):
3428     """Computes the list of nodes and their attributes.
3429
3430     """
3431     all_info = self.cfg.GetAllInstancesInfo()
3432     if self.wanted == locking.ALL_SET:
3433       # caller didn't specify instance names, so ordering is not important
3434       if self.do_locking:
3435         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3436       else:
3437         instance_names = all_info.keys()
3438       instance_names = utils.NiceSort(instance_names)
3439     else:
3440       # caller did specify names, so we must keep the ordering
3441       if self.do_locking:
3442         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3443       else:
3444         tgt_set = all_info.keys()
3445       missing = set(self.wanted).difference(tgt_set)
3446       if missing:
3447         raise errors.OpExecError("Some instances were removed before"
3448                                  " retrieving their data: %s" % missing)
3449       instance_names = self.wanted
3450
3451     instance_list = [all_info[iname] for iname in instance_names]
3452
3453     # begin data gathering
3454
3455     nodes = frozenset([inst.primary_node for inst in instance_list])
3456     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3457
3458     bad_nodes = []
3459     off_nodes = []
3460     if self.do_node_query:
3461       live_data = {}
3462       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3463       for name in nodes:
3464         result = node_data[name]
3465         if result.offline:
3466           # offline nodes will be in both lists
3467           off_nodes.append(name)
3468         if result.failed or result.RemoteFailMsg():
3469           bad_nodes.append(name)
3470         else:
3471           if result.payload:
3472             live_data.update(result.payload)
3473           # else no instance is alive
3474     else:
3475       live_data = dict([(name, {}) for name in instance_names])
3476
3477     # end data gathering
3478
3479     HVPREFIX = "hv/"
3480     BEPREFIX = "be/"
3481     output = []
3482     for instance in instance_list:
3483       iout = []
3484       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3485       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3486       for field in self.op.output_fields:
3487         st_match = self._FIELDS_STATIC.Matches(field)
3488         if field == "name":
3489           val = instance.name
3490         elif field == "os":
3491           val = instance.os
3492         elif field == "pnode":
3493           val = instance.primary_node
3494         elif field == "snodes":
3495           val = list(instance.secondary_nodes)
3496         elif field == "admin_state":
3497           val = instance.admin_up
3498         elif field == "oper_state":
3499           if instance.primary_node in bad_nodes:
3500             val = None
3501           else:
3502             val = bool(live_data.get(instance.name))
3503         elif field == "status":
3504           if instance.primary_node in off_nodes:
3505             val = "ERROR_nodeoffline"
3506           elif instance.primary_node in bad_nodes:
3507             val = "ERROR_nodedown"
3508           else:
3509             running = bool(live_data.get(instance.name))
3510             if running:
3511               if instance.admin_up:
3512                 val = "running"
3513               else:
3514                 val = "ERROR_up"
3515             else:
3516               if instance.admin_up:
3517                 val = "ERROR_down"
3518               else:
3519                 val = "ADMIN_down"
3520         elif field == "oper_ram":
3521           if instance.primary_node in bad_nodes:
3522             val = None
3523           elif instance.name in live_data:
3524             val = live_data[instance.name].get("memory", "?")
3525           else:
3526             val = "-"
3527         elif field == "disk_template":
3528           val = instance.disk_template
3529         elif field == "ip":
3530           val = instance.nics[0].ip
3531         elif field == "bridge":
3532           val = instance.nics[0].bridge
3533         elif field == "mac":
3534           val = instance.nics[0].mac
3535         elif field == "sda_size" or field == "sdb_size":
3536           idx = ord(field[2]) - ord('a')
3537           try:
3538             val = instance.FindDisk(idx).size
3539           except errors.OpPrereqError:
3540             val = None
3541         elif field == "disk_usage": # total disk usage per node
3542           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3543           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3544         elif field == "tags":
3545           val = list(instance.GetTags())
3546         elif field == "serial_no":
3547           val = instance.serial_no
3548         elif field == "network_port":
3549           val = instance.network_port
3550         elif field == "hypervisor":
3551           val = instance.hypervisor
3552         elif field == "hvparams":
3553           val = i_hv
3554         elif (field.startswith(HVPREFIX) and
3555               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3556           val = i_hv.get(field[len(HVPREFIX):], None)
3557         elif field == "beparams":
3558           val = i_be
3559         elif (field.startswith(BEPREFIX) and
3560               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3561           val = i_be.get(field[len(BEPREFIX):], None)
3562         elif st_match and st_match.groups():
3563           # matches a variable list
3564           st_groups = st_match.groups()
3565           if st_groups and st_groups[0] == "disk":
3566             if st_groups[1] == "count":
3567               val = len(instance.disks)
3568             elif st_groups[1] == "sizes":
3569               val = [disk.size for disk in instance.disks]
3570             elif st_groups[1] == "size":
3571               try:
3572                 val = instance.FindDisk(st_groups[2]).size
3573               except errors.OpPrereqError:
3574                 val = None
3575             else:
3576               assert False, "Unhandled disk parameter"
3577           elif st_groups[0] == "nic":
3578             if st_groups[1] == "count":
3579               val = len(instance.nics)
3580             elif st_groups[1] == "macs":
3581               val = [nic.mac for nic in instance.nics]
3582             elif st_groups[1] == "ips":
3583               val = [nic.ip for nic in instance.nics]
3584             elif st_groups[1] == "bridges":
3585               val = [nic.bridge for nic in instance.nics]
3586             else:
3587               # index-based item
3588               nic_idx = int(st_groups[2])
3589               if nic_idx >= len(instance.nics):
3590                 val = None
3591               else:
3592                 if st_groups[1] == "mac":
3593                   val = instance.nics[nic_idx].mac
3594                 elif st_groups[1] == "ip":
3595                   val = instance.nics[nic_idx].ip
3596                 elif st_groups[1] == "bridge":
3597                   val = instance.nics[nic_idx].bridge
3598                 else:
3599                   assert False, "Unhandled NIC parameter"
3600           else:
3601             assert False, "Unhandled variable parameter"
3602         else:
3603           raise errors.ParameterError(field)
3604         iout.append(val)
3605       output.append(iout)
3606
3607     return output
3608
3609
3610 class LUFailoverInstance(LogicalUnit):
3611   """Failover an instance.
3612
3613   """
3614   HPATH = "instance-failover"
3615   HTYPE = constants.HTYPE_INSTANCE
3616   _OP_REQP = ["instance_name", "ignore_consistency"]
3617   REQ_BGL = False
3618
3619   def ExpandNames(self):
3620     self._ExpandAndLockInstance()
3621     self.needed_locks[locking.LEVEL_NODE] = []
3622     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3623
3624   def DeclareLocks(self, level):
3625     if level == locking.LEVEL_NODE:
3626       self._LockInstancesNodes()
3627
3628   def BuildHooksEnv(self):
3629     """Build hooks env.
3630
3631     This runs on master, primary and secondary nodes of the instance.
3632
3633     """
3634     env = {
3635       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3636       }
3637     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3638     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3639     return env, nl, nl
3640
3641   def CheckPrereq(self):
3642     """Check prerequisites.
3643
3644     This checks that the instance is in the cluster.
3645
3646     """
3647     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3648     assert self.instance is not None, \
3649       "Cannot retrieve locked instance %s" % self.op.instance_name
3650
3651     bep = self.cfg.GetClusterInfo().FillBE(instance)
3652     if instance.disk_template not in constants.DTS_NET_MIRROR:
3653       raise errors.OpPrereqError("Instance's disk layout is not"
3654                                  " network mirrored, cannot failover.")
3655
3656     secondary_nodes = instance.secondary_nodes
3657     if not secondary_nodes:
3658       raise errors.ProgrammerError("no secondary node but using "
3659                                    "a mirrored disk template")
3660
3661     target_node = secondary_nodes[0]
3662     _CheckNodeOnline(self, target_node)
3663     _CheckNodeNotDrained(self, target_node)
3664     # check memory requirements on the secondary node
3665     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3666                          instance.name, bep[constants.BE_MEMORY],
3667                          instance.hypervisor)
3668     # check bridge existance
3669     _CheckInstanceBridgesExist(self, instance, node=target_node)
3670
3671   def Exec(self, feedback_fn):
3672     """Failover an instance.
3673
3674     The failover is done by shutting it down on its present node and
3675     starting it on the secondary.
3676
3677     """
3678     instance = self.instance
3679
3680     source_node = instance.primary_node
3681     target_node = instance.secondary_nodes[0]
3682
3683     feedback_fn("* checking disk consistency between source and target")
3684     for dev in instance.disks:
3685       # for drbd, these are drbd over lvm
3686       if not _CheckDiskConsistency(self, dev, target_node, False):
3687         if instance.admin_up and not self.op.ignore_consistency:
3688           raise errors.OpExecError("Disk %s is degraded on target node,"
3689                                    " aborting failover." % dev.iv_name)
3690
3691     feedback_fn("* shutting down instance on source node")
3692     logging.info("Shutting down instance %s on node %s",
3693                  instance.name, source_node)
3694
3695     result = self.rpc.call_instance_shutdown(source_node, instance)
3696     msg = result.RemoteFailMsg()
3697     if msg:
3698       if self.op.ignore_consistency:
3699         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3700                              " Proceeding anyway. Please make sure node"
3701                              " %s is down. Error details: %s",
3702                              instance.name, source_node, source_node, msg)
3703       else:
3704         raise errors.OpExecError("Could not shutdown instance %s on"
3705                                  " node %s: %s" %
3706                                  (instance.name, source_node, msg))
3707
3708     feedback_fn("* deactivating the instance's disks on source node")
3709     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3710       raise errors.OpExecError("Can't shut down the instance's disks.")
3711
3712     instance.primary_node = target_node
3713     # distribute new instance config to the other nodes
3714     self.cfg.Update(instance)
3715
3716     # Only start the instance if it's marked as up
3717     if instance.admin_up:
3718       feedback_fn("* activating the instance's disks on target node")
3719       logging.info("Starting instance %s on node %s",
3720                    instance.name, target_node)
3721
3722       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3723                                                ignore_secondaries=True)
3724       if not disks_ok:
3725         _ShutdownInstanceDisks(self, instance)
3726         raise errors.OpExecError("Can't activate the instance's disks")
3727
3728       feedback_fn("* starting the instance on the target node")
3729       result = self.rpc.call_instance_start(target_node, instance, None, None)
3730       msg = result.RemoteFailMsg()
3731       if msg:
3732         _ShutdownInstanceDisks(self, instance)
3733         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3734                                  (instance.name, target_node, msg))
3735
3736
3737 class LUMigrateInstance(LogicalUnit):
3738   """Migrate an instance.
3739
3740   This is migration without shutting down, compared to the failover,
3741   which is done with shutdown.
3742
3743   """
3744   HPATH = "instance-migrate"
3745   HTYPE = constants.HTYPE_INSTANCE
3746   _OP_REQP = ["instance_name", "live", "cleanup"]
3747
3748   REQ_BGL = False
3749
3750   def ExpandNames(self):
3751     self._ExpandAndLockInstance()
3752     self.needed_locks[locking.LEVEL_NODE] = []
3753     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3754
3755   def DeclareLocks(self, level):
3756     if level == locking.LEVEL_NODE:
3757       self._LockInstancesNodes()
3758
3759   def BuildHooksEnv(self):
3760     """Build hooks env.
3761
3762     This runs on master, primary and secondary nodes of the instance.
3763
3764     """
3765     env = _BuildInstanceHookEnvByObject(self, self.instance)
3766     env["MIGRATE_LIVE"] = self.op.live
3767     env["MIGRATE_CLEANUP"] = self.op.cleanup
3768     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3769     return env, nl, nl
3770
3771   def CheckPrereq(self):
3772     """Check prerequisites.
3773
3774     This checks that the instance is in the cluster.
3775
3776     """
3777     instance = self.cfg.GetInstanceInfo(
3778       self.cfg.ExpandInstanceName(self.op.instance_name))
3779     if instance is None:
3780       raise errors.OpPrereqError("Instance '%s' not known" %
3781                                  self.op.instance_name)
3782
3783     if instance.disk_template != constants.DT_DRBD8:
3784       raise errors.OpPrereqError("Instance's disk layout is not"
3785                                  " drbd8, cannot migrate.")
3786
3787     secondary_nodes = instance.secondary_nodes
3788     if not secondary_nodes:
3789       raise errors.ConfigurationError("No secondary node but using"
3790                                       " drbd8 disk template")
3791
3792     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3793
3794     target_node = secondary_nodes[0]
3795     # check memory requirements on the secondary node
3796     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3797                          instance.name, i_be[constants.BE_MEMORY],
3798                          instance.hypervisor)
3799
3800     # check bridge existance
3801     _CheckInstanceBridgesExist(self, instance, node=target_node)
3802
3803     if not self.op.cleanup:
3804       _CheckNodeNotDrained(self, target_node)
3805       result = self.rpc.call_instance_migratable(instance.primary_node,
3806                                                  instance)
3807       msg = result.RemoteFailMsg()
3808       if msg:
3809         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3810                                    msg)
3811
3812     self.instance = instance
3813
3814   def _WaitUntilSync(self):
3815     """Poll with custom rpc for disk sync.
3816
3817     This uses our own step-based rpc call.
3818
3819     """
3820     self.feedback_fn("* wait until resync is done")
3821     all_done = False
3822     while not all_done:
3823       all_done = True
3824       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3825                                             self.nodes_ip,
3826                                             self.instance.disks)
3827       min_percent = 100
3828       for node, nres in result.items():
3829         msg = nres.RemoteFailMsg()
3830         if msg:
3831           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3832                                    (node, msg))
3833         node_done, node_percent = nres.payload
3834         all_done = all_done and node_done
3835         if node_percent is not None:
3836           min_percent = min(min_percent, node_percent)
3837       if not all_done:
3838         if min_percent < 100:
3839           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3840         time.sleep(2)
3841
3842   def _EnsureSecondary(self, node):
3843     """Demote a node to secondary.
3844
3845     """
3846     self.feedback_fn("* switching node %s to secondary mode" % node)
3847
3848     for dev in self.instance.disks:
3849       self.cfg.SetDiskID(dev, node)
3850
3851     result = self.rpc.call_blockdev_close(node, self.instance.name,
3852                                           self.instance.disks)
3853     msg = result.RemoteFailMsg()
3854     if msg:
3855       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3856                                " error %s" % (node, msg))
3857
3858   def _GoStandalone(self):
3859     """Disconnect from the network.
3860
3861     """
3862     self.feedback_fn("* changing into standalone mode")
3863     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3864                                                self.instance.disks)
3865     for node, nres in result.items():
3866       msg = nres.RemoteFailMsg()
3867       if msg:
3868         raise errors.OpExecError("Cannot disconnect disks node %s,"
3869                                  " error %s" % (node, msg))
3870
3871   def _GoReconnect(self, multimaster):
3872     """Reconnect to the network.
3873
3874     """
3875     if multimaster:
3876       msg = "dual-master"
3877     else:
3878       msg = "single-master"
3879     self.feedback_fn("* changing disks into %s mode" % msg)
3880     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3881                                            self.instance.disks,
3882                                            self.instance.name, multimaster)
3883     for node, nres in result.items():
3884       msg = nres.RemoteFailMsg()
3885       if msg:
3886         raise errors.OpExecError("Cannot change disks config on node %s,"
3887                                  " error: %s" % (node, msg))
3888
3889   def _ExecCleanup(self):
3890     """Try to cleanup after a failed migration.
3891
3892     The cleanup is done by:
3893       - check that the instance is running only on one node
3894         (and update the config if needed)
3895       - change disks on its secondary node to secondary
3896       - wait until disks are fully synchronized
3897       - disconnect from the network
3898       - change disks into single-master mode
3899       - wait again until disks are fully synchronized
3900
3901     """
3902     instance = self.instance
3903     target_node = self.target_node
3904     source_node = self.source_node
3905
3906     # check running on only one node
3907     self.feedback_fn("* checking where the instance actually runs"
3908                      " (if this hangs, the hypervisor might be in"
3909                      " a bad state)")
3910     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3911     for node, result in ins_l.items():
3912       msg = result.RemoteFailMsg()
3913       if msg:
3914         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3915
3916     runningon_source = instance.name in ins_l[source_node].payload
3917     runningon_target = instance.name in ins_l[target_node].payload
3918
3919     if runningon_source and runningon_target:
3920       raise errors.OpExecError("Instance seems to be running on two nodes,"
3921                                " or the hypervisor is confused. You will have"
3922                                " to ensure manually that it runs only on one"
3923                                " and restart this operation.")
3924
3925     if not (runningon_source or runningon_target):
3926       raise errors.OpExecError("Instance does not seem to be running at all."
3927                                " In this case, it's safer to repair by"
3928                                " running 'gnt-instance stop' to ensure disk"
3929                                " shutdown, and then restarting it.")
3930
3931     if runningon_target:
3932       # the migration has actually succeeded, we need to update the config
3933       self.feedback_fn("* instance running on secondary node (%s),"
3934                        " updating config" % target_node)
3935       instance.primary_node = target_node
3936       self.cfg.Update(instance)
3937       demoted_node = source_node
3938     else:
3939       self.feedback_fn("* instance confirmed to be running on its"
3940                        " primary node (%s)" % source_node)
3941       demoted_node = target_node
3942
3943     self._EnsureSecondary(demoted_node)
3944     try:
3945       self._WaitUntilSync()
3946     except errors.OpExecError:
3947       # we ignore here errors, since if the device is standalone, it
3948       # won't be able to sync
3949       pass
3950     self._GoStandalone()
3951     self._GoReconnect(False)
3952     self._WaitUntilSync()
3953
3954     self.feedback_fn("* done")
3955
3956   def _RevertDiskStatus(self):
3957     """Try to revert the disk status after a failed migration.
3958
3959     """
3960     target_node = self.target_node
3961     try:
3962       self._EnsureSecondary(target_node)
3963       self._GoStandalone()
3964       self._GoReconnect(False)
3965       self._WaitUntilSync()
3966     except errors.OpExecError, err:
3967       self.LogWarning("Migration failed and I can't reconnect the"
3968                       " drives: error '%s'\n"
3969                       "Please look and recover the instance status" %
3970                       str(err))
3971
3972   def _AbortMigration(self):
3973     """Call the hypervisor code to abort a started migration.
3974
3975     """
3976     instance = self.instance
3977     target_node = self.target_node
3978     migration_info = self.migration_info
3979
3980     abort_result = self.rpc.call_finalize_migration(target_node,
3981                                                     instance,
3982                                                     migration_info,
3983                                                     False)
3984     abort_msg = abort_result.RemoteFailMsg()
3985     if abort_msg:
3986       logging.error("Aborting migration failed on target node %s: %s" %
3987                     (target_node, abort_msg))
3988       # Don't raise an exception here, as we stil have to try to revert the
3989       # disk status, even if this step failed.
3990
3991   def _ExecMigration(self):
3992     """Migrate an instance.
3993
3994     The migrate is done by:
3995       - change the disks into dual-master mode
3996       - wait until disks are fully synchronized again
3997       - migrate the instance
3998       - change disks on the new secondary node (the old primary) to secondary
3999       - wait until disks are fully synchronized
4000       - change disks into single-master mode
4001
4002     """
4003     instance = self.instance
4004     target_node = self.target_node
4005     source_node = self.source_node
4006
4007     self.feedback_fn("* checking disk consistency between source and target")
4008     for dev in instance.disks:
4009       if not _CheckDiskConsistency(self, dev, target_node, False):
4010         raise errors.OpExecError("Disk %s is degraded or not fully"
4011                                  " synchronized on target node,"
4012                                  " aborting migrate." % dev.iv_name)
4013
4014     # First get the migration information from the remote node
4015     result = self.rpc.call_migration_info(source_node, instance)
4016     msg = result.RemoteFailMsg()
4017     if msg:
4018       log_err = ("Failed fetching source migration information from %s: %s" %
4019                  (source_node, msg))
4020       logging.error(log_err)
4021       raise errors.OpExecError(log_err)
4022
4023     self.migration_info = migration_info = result.payload
4024
4025     # Then switch the disks to master/master mode
4026     self._EnsureSecondary(target_node)
4027     self._GoStandalone()
4028     self._GoReconnect(True)
4029     self._WaitUntilSync()
4030
4031     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4032     result = self.rpc.call_accept_instance(target_node,
4033                                            instance,
4034                                            migration_info,
4035                                            self.nodes_ip[target_node])
4036
4037     msg = result.RemoteFailMsg()
4038     if msg:
4039       logging.error("Instance pre-migration failed, trying to revert"
4040                     " disk status: %s", msg)
4041       self._AbortMigration()
4042       self._RevertDiskStatus()
4043       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4044                                (instance.name, msg))
4045
4046     self.feedback_fn("* migrating instance to %s" % target_node)
4047     time.sleep(10)
4048     result = self.rpc.call_instance_migrate(source_node, instance,
4049                                             self.nodes_ip[target_node],
4050                                             self.op.live)
4051     msg = result.RemoteFailMsg()
4052     if msg:
4053       logging.error("Instance migration failed, trying to revert"
4054                     " disk status: %s", msg)
4055       self._AbortMigration()
4056       self._RevertDiskStatus()
4057       raise errors.OpExecError("Could not migrate instance %s: %s" %
4058                                (instance.name, msg))
4059     time.sleep(10)
4060
4061     instance.primary_node = target_node
4062     # distribute new instance config to the other nodes
4063     self.cfg.Update(instance)
4064
4065     result = self.rpc.call_finalize_migration(target_node,
4066                                               instance,
4067                                               migration_info,
4068                                               True)
4069     msg = result.RemoteFailMsg()
4070     if msg:
4071       logging.error("Instance migration succeeded, but finalization failed:"
4072                     " %s" % msg)
4073       raise errors.OpExecError("Could not finalize instance migration: %s" %
4074                                msg)
4075
4076     self._EnsureSecondary(source_node)
4077     self._WaitUntilSync()
4078     self._GoStandalone()
4079     self._GoReconnect(False)
4080     self._WaitUntilSync()
4081
4082     self.feedback_fn("* done")
4083
4084   def Exec(self, feedback_fn):
4085     """Perform the migration.
4086
4087     """
4088     self.feedback_fn = feedback_fn
4089
4090     self.source_node = self.instance.primary_node
4091     self.target_node = self.instance.secondary_nodes[0]
4092     self.all_nodes = [self.source_node, self.target_node]
4093     self.nodes_ip = {
4094       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4095       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4096       }
4097     if self.op.cleanup:
4098       return self._ExecCleanup()
4099     else:
4100       return self._ExecMigration()
4101
4102
4103 def _CreateBlockDev(lu, node, instance, device, force_create,
4104                     info, force_open):
4105   """Create a tree of block devices on a given node.
4106
4107   If this device type has to be created on secondaries, create it and
4108   all its children.
4109
4110   If not, just recurse to children keeping the same 'force' value.
4111
4112   @param lu: the lu on whose behalf we execute
4113   @param node: the node on which to create the device
4114   @type instance: L{objects.Instance}
4115   @param instance: the instance which owns the device
4116   @type device: L{objects.Disk}
4117   @param device: the device to create
4118   @type force_create: boolean
4119   @param force_create: whether to force creation of this device; this
4120       will be change to True whenever we find a device which has
4121       CreateOnSecondary() attribute
4122   @param info: the extra 'metadata' we should attach to the device
4123       (this will be represented as a LVM tag)
4124   @type force_open: boolean
4125   @param force_open: this parameter will be passes to the
4126       L{backend.BlockdevCreate} function where it specifies
4127       whether we run on primary or not, and it affects both
4128       the child assembly and the device own Open() execution
4129
4130   """
4131   if device.CreateOnSecondary():
4132     force_create = True
4133
4134   if device.children:
4135     for child in device.children:
4136       _CreateBlockDev(lu, node, instance, child, force_create,
4137                       info, force_open)
4138
4139   if not force_create:
4140     return
4141
4142   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4143
4144
4145 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4146   """Create a single block device on a given node.
4147
4148   This will not recurse over children of the device, so they must be
4149   created in advance.
4150
4151   @param lu: the lu on whose behalf we execute
4152   @param node: the node on which to create the device
4153   @type instance: L{objects.Instance}
4154   @param instance: the instance which owns the device
4155   @type device: L{objects.Disk}
4156   @param device: the device to create
4157   @param info: the extra 'metadata' we should attach to the device
4158       (this will be represented as a LVM tag)
4159   @type force_open: boolean
4160   @param force_open: this parameter will be passes to the
4161       L{backend.BlockdevCreate} function where it specifies
4162       whether we run on primary or not, and it affects both
4163       the child assembly and the device own Open() execution
4164
4165   """
4166   lu.cfg.SetDiskID(device, node)
4167   result = lu.rpc.call_blockdev_create(node, device, device.size,
4168                                        instance.name, force_open, info)
4169   msg = result.RemoteFailMsg()
4170   if msg:
4171     raise errors.OpExecError("Can't create block device %s on"
4172                              " node %s for instance %s: %s" %
4173                              (device, node, instance.name, msg))
4174   if device.physical_id is None:
4175     device.physical_id = result.payload
4176
4177
4178 def _GenerateUniqueNames(lu, exts):
4179   """Generate a suitable LV name.
4180
4181   This will generate a logical volume name for the given instance.
4182
4183   """
4184   results = []
4185   for val in exts:
4186     new_id = lu.cfg.GenerateUniqueID()
4187     results.append("%s%s" % (new_id, val))
4188   return results
4189
4190
4191 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4192                          p_minor, s_minor):
4193   """Generate a drbd8 device complete with its children.
4194
4195   """
4196   port = lu.cfg.AllocatePort()
4197   vgname = lu.cfg.GetVGName()
4198   shared_secret = lu.cfg.GenerateDRBDSecret()
4199   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4200                           logical_id=(vgname, names[0]))
4201   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4202                           logical_id=(vgname, names[1]))
4203   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4204                           logical_id=(primary, secondary, port,
4205                                       p_minor, s_minor,
4206                                       shared_secret),
4207                           children=[dev_data, dev_meta],
4208                           iv_name=iv_name)
4209   return drbd_dev
4210
4211
4212 def _GenerateDiskTemplate(lu, template_name,
4213                           instance_name, primary_node,
4214                           secondary_nodes, disk_info,
4215                           file_storage_dir, file_driver,
4216                           base_index):
4217   """Generate the entire disk layout for a given template type.
4218
4219   """
4220   #TODO: compute space requirements
4221
4222   vgname = lu.cfg.GetVGName()
4223   disk_count = len(disk_info)
4224   disks = []
4225   if template_name == constants.DT_DISKLESS:
4226     pass
4227   elif template_name == constants.DT_PLAIN:
4228     if len(secondary_nodes) != 0:
4229       raise errors.ProgrammerError("Wrong template configuration")
4230
4231     names = _GenerateUniqueNames(lu, [".disk%d" % i
4232                                       for i in range(disk_count)])
4233     for idx, disk in enumerate(disk_info):
4234       disk_index = idx + base_index
4235       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4236                               logical_id=(vgname, names[idx]),
4237                               iv_name="disk/%d" % disk_index,
4238                               mode=disk["mode"])
4239       disks.append(disk_dev)
4240   elif template_name == constants.DT_DRBD8:
4241     if len(secondary_nodes) != 1:
4242       raise errors.ProgrammerError("Wrong template configuration")
4243     remote_node = secondary_nodes[0]
4244     minors = lu.cfg.AllocateDRBDMinor(
4245       [primary_node, remote_node] * len(disk_info), instance_name)
4246
4247     names = []
4248     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4249                                                for i in range(disk_count)]):
4250       names.append(lv_prefix + "_data")
4251       names.append(lv_prefix + "_meta")
4252     for idx, disk in enumerate(disk_info):
4253       disk_index = idx + base_index
4254       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4255                                       disk["size"], names[idx*2:idx*2+2],
4256                                       "disk/%d" % disk_index,
4257                                       minors[idx*2], minors[idx*2+1])
4258       disk_dev.mode = disk["mode"]
4259       disks.append(disk_dev)
4260   elif template_name == constants.DT_FILE:
4261     if len(secondary_nodes) != 0:
4262       raise errors.ProgrammerError("Wrong template configuration")
4263
4264     for idx, disk in enumerate(disk_info):
4265       disk_index = idx + base_index
4266       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4267                               iv_name="disk/%d" % disk_index,
4268                               logical_id=(file_driver,
4269                                           "%s/disk%d" % (file_storage_dir,
4270                                                          disk_index)),
4271                               mode=disk["mode"])
4272       disks.append(disk_dev)
4273   else:
4274     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4275   return disks
4276
4277
4278 def _GetInstanceInfoText(instance):
4279   """Compute that text that should be added to the disk's metadata.
4280
4281   """
4282   return "originstname+%s" % instance.name
4283
4284
4285 def _CreateDisks(lu, instance):
4286   """Create all disks for an instance.
4287
4288   This abstracts away some work from AddInstance.
4289
4290   @type lu: L{LogicalUnit}
4291   @param lu: the logical unit on whose behalf we execute
4292   @type instance: L{objects.Instance}
4293   @param instance: the instance whose disks we should create
4294   @rtype: boolean
4295   @return: the success of the creation
4296
4297   """
4298   info = _GetInstanceInfoText(instance)
4299   pnode = instance.primary_node
4300
4301   if instance.disk_template == constants.DT_FILE:
4302     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4303     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4304
4305     if result.failed or not result.data:
4306       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4307
4308     if not result.data[0]:
4309       raise errors.OpExecError("Failed to create directory '%s'" %
4310                                file_storage_dir)
4311
4312   # Note: this needs to be kept in sync with adding of disks in
4313   # LUSetInstanceParams
4314   for device in instance.disks:
4315     logging.info("Creating volume %s for instance %s",
4316                  device.iv_name, instance.name)
4317     #HARDCODE
4318     for node in instance.all_nodes:
4319       f_create = node == pnode
4320       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4321
4322
4323 def _RemoveDisks(lu, instance):
4324   """Remove all disks for an instance.
4325
4326   This abstracts away some work from `AddInstance()` and
4327   `RemoveInstance()`. Note that in case some of the devices couldn't
4328   be removed, the removal will continue with the other ones (compare
4329   with `_CreateDisks()`).
4330
4331   @type lu: L{LogicalUnit}
4332   @param lu: the logical unit on whose behalf we execute
4333   @type instance: L{objects.Instance}
4334   @param instance: the instance whose disks we should remove
4335   @rtype: boolean
4336   @return: the success of the removal
4337
4338   """
4339   logging.info("Removing block devices for instance %s", instance.name)
4340
4341   all_result = True
4342   for device in instance.disks:
4343     for node, disk in device.ComputeNodeTree(instance.primary_node):
4344       lu.cfg.SetDiskID(disk, node)
4345       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4346       if msg:
4347         lu.LogWarning("Could not remove block device %s on node %s,"
4348                       " continuing anyway: %s", device.iv_name, node, msg)
4349         all_result = False
4350
4351   if instance.disk_template == constants.DT_FILE:
4352     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4353     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4354                                                  file_storage_dir)
4355     if result.failed or not result.data:
4356       logging.error("Could not remove directory '%s'", file_storage_dir)
4357       all_result = False
4358
4359   return all_result
4360
4361
4362 def _ComputeDiskSize(disk_template, disks):
4363   """Compute disk size requirements in the volume group
4364
4365   """
4366   # Required free disk space as a function of disk and swap space
4367   req_size_dict = {
4368     constants.DT_DISKLESS: None,
4369     constants.DT_PLAIN: sum(d["size"] for d in disks),
4370     # 128 MB are added for drbd metadata for each disk
4371     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4372     constants.DT_FILE: None,
4373   }
4374
4375   if disk_template not in req_size_dict:
4376     raise errors.ProgrammerError("Disk template '%s' size requirement"
4377                                  " is unknown" %  disk_template)
4378
4379   return req_size_dict[disk_template]
4380
4381
4382 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4383   """Hypervisor parameter validation.
4384
4385   This function abstract the hypervisor parameter validation to be
4386   used in both instance create and instance modify.
4387
4388   @type lu: L{LogicalUnit}
4389   @param lu: the logical unit for which we check
4390   @type nodenames: list
4391   @param nodenames: the list of nodes on which we should check
4392   @type hvname: string
4393   @param hvname: the name of the hypervisor we should use
4394   @type hvparams: dict
4395   @param hvparams: the parameters which we need to check
4396   @raise errors.OpPrereqError: if the parameters are not valid
4397
4398   """
4399   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4400                                                   hvname,
4401                                                   hvparams)
4402   for node in nodenames:
4403     info = hvinfo[node]
4404     if info.offline:
4405       continue
4406     msg = info.RemoteFailMsg()
4407     if msg:
4408       raise errors.OpPrereqError("Hypervisor parameter validation"
4409                                  " failed on node %s: %s" % (node, msg))
4410
4411
4412 class LUCreateInstance(LogicalUnit):
4413   """Create an instance.
4414
4415   """
4416   HPATH = "instance-add"
4417   HTYPE = constants.HTYPE_INSTANCE
4418   _OP_REQP = ["instance_name", "disks", "disk_template",
4419               "mode", "start",
4420               "wait_for_sync", "ip_check", "nics",
4421               "hvparams", "beparams"]
4422   REQ_BGL = False
4423
4424   def _ExpandNode(self, node):
4425     """Expands and checks one node name.
4426
4427     """
4428     node_full = self.cfg.ExpandNodeName(node)
4429     if node_full is None:
4430       raise errors.OpPrereqError("Unknown node %s" % node)
4431     return node_full
4432
4433   def ExpandNames(self):
4434     """ExpandNames for CreateInstance.
4435
4436     Figure out the right locks for instance creation.
4437
4438     """
4439     self.needed_locks = {}
4440
4441     # set optional parameters to none if they don't exist
4442     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4443       if not hasattr(self.op, attr):
4444         setattr(self.op, attr, None)
4445
4446     # cheap checks, mostly valid constants given
4447
4448     # verify creation mode
4449     if self.op.mode not in (constants.INSTANCE_CREATE,
4450                             constants.INSTANCE_IMPORT):
4451       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4452                                  self.op.mode)
4453
4454     # disk template and mirror node verification
4455     if self.op.disk_template not in constants.DISK_TEMPLATES:
4456       raise errors.OpPrereqError("Invalid disk template name")
4457
4458     if self.op.hypervisor is None:
4459       self.op.hypervisor = self.cfg.GetHypervisorType()
4460
4461     cluster = self.cfg.GetClusterInfo()
4462     enabled_hvs = cluster.enabled_hypervisors
4463     if self.op.hypervisor not in enabled_hvs:
4464       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4465                                  " cluster (%s)" % (self.op.hypervisor,
4466                                   ",".join(enabled_hvs)))
4467
4468     # check hypervisor parameter syntax (locally)
4469     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4470     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4471                                   self.op.hvparams)
4472     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4473     hv_type.CheckParameterSyntax(filled_hvp)
4474
4475     # fill and remember the beparams dict
4476     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4477     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4478                                     self.op.beparams)
4479
4480     #### instance parameters check
4481
4482     # instance name verification
4483     hostname1 = utils.HostInfo(self.op.instance_name)
4484     self.op.instance_name = instance_name = hostname1.name
4485
4486     # this is just a preventive check, but someone might still add this
4487     # instance in the meantime, and creation will fail at lock-add time
4488     if instance_name in self.cfg.GetInstanceList():
4489       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4490                                  instance_name)
4491
4492     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4493
4494     # NIC buildup
4495     self.nics = []
4496     for idx, nic in enumerate(self.op.nics):
4497       nic_mode_req = nic.get("mode", None)
4498       nic_mode = nic_mode_req
4499       if nic_mode is None:
4500         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4501
4502       # in routed mode, for the first nic, the default ip is 'auto'
4503       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4504         default_ip_mode = constants.VALUE_AUTO
4505       else:
4506         default_ip_mode = constants.VALUE_NONE
4507
4508       # ip validity checks
4509       ip = nic.get("ip", default_ip_mode)
4510       if ip is None or ip.lower() == constants.VALUE_NONE:
4511         nic_ip = None
4512       elif ip.lower() == constants.VALUE_AUTO:
4513         nic_ip = hostname1.ip
4514       else:
4515         if not utils.IsValidIP(ip):
4516           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4517                                      " like a valid IP" % ip)
4518         nic_ip = ip
4519
4520       # TODO: check the ip for uniqueness !!
4521       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4522         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4523
4524       # MAC address verification
4525       mac = nic.get("mac", constants.VALUE_AUTO)
4526       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4527         if not utils.IsValidMac(mac.lower()):
4528           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4529                                      mac)
4530       # bridge verification
4531       bridge = nic.get("bridge", None)
4532       link = nic.get("link", None)
4533       if bridge and link:
4534         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4535       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4536         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4537       elif bridge:
4538         link = bridge
4539
4540       nicparams = {}
4541       if nic_mode_req:
4542         nicparams[constants.NIC_MODE] = nic_mode_req
4543       if link:
4544         nicparams[constants.NIC_LINK] = link
4545
4546       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4547                                       nicparams)
4548       objects.NIC.CheckParameterSyntax(check_params)
4549       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4550
4551     # disk checks/pre-build
4552     self.disks = []
4553     for disk in self.op.disks:
4554       mode = disk.get("mode", constants.DISK_RDWR)
4555       if mode not in constants.DISK_ACCESS_SET:
4556         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4557                                    mode)
4558       size = disk.get("size", None)
4559       if size is None:
4560         raise errors.OpPrereqError("Missing disk size")
4561       try:
4562         size = int(size)
4563       except ValueError:
4564         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4565       self.disks.append({"size": size, "mode": mode})
4566
4567     # used in CheckPrereq for ip ping check
4568     self.check_ip = hostname1.ip
4569
4570     # file storage checks
4571     if (self.op.file_driver and
4572         not self.op.file_driver in constants.FILE_DRIVER):
4573       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4574                                  self.op.file_driver)
4575
4576     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4577       raise errors.OpPrereqError("File storage directory path not absolute")
4578
4579     ### Node/iallocator related checks
4580     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4581       raise errors.OpPrereqError("One and only one of iallocator and primary"
4582                                  " node must be given")
4583
4584     if self.op.iallocator:
4585       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4586     else:
4587       self.op.pnode = self._ExpandNode(self.op.pnode)
4588       nodelist = [self.op.pnode]
4589       if self.op.snode is not None:
4590         self.op.snode = self._ExpandNode(self.op.snode)
4591         nodelist.append(self.op.snode)
4592       self.needed_locks[locking.LEVEL_NODE] = nodelist
4593
4594     # in case of import lock the source node too
4595     if self.op.mode == constants.INSTANCE_IMPORT:
4596       src_node = getattr(self.op, "src_node", None)
4597       src_path = getattr(self.op, "src_path", None)
4598
4599       if src_path is None:
4600         self.op.src_path = src_path = self.op.instance_name
4601
4602       if src_node is None:
4603         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4604         self.op.src_node = None
4605         if os.path.isabs(src_path):
4606           raise errors.OpPrereqError("Importing an instance from an absolute"
4607                                      " path requires a source node option.")
4608       else:
4609         self.op.src_node = src_node = self._ExpandNode(src_node)
4610         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4611           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4612         if not os.path.isabs(src_path):
4613           self.op.src_path = src_path = \
4614             os.path.join(constants.EXPORT_DIR, src_path)
4615
4616     else: # INSTANCE_CREATE
4617       if getattr(self.op, "os_type", None) is None:
4618         raise errors.OpPrereqError("No guest OS specified")
4619
4620   def _RunAllocator(self):
4621     """Run the allocator based on input opcode.
4622
4623     """
4624     nics = [n.ToDict() for n in self.nics]
4625     ial = IAllocator(self,
4626                      mode=constants.IALLOCATOR_MODE_ALLOC,
4627                      name=self.op.instance_name,
4628                      disk_template=self.op.disk_template,
4629                      tags=[],
4630                      os=self.op.os_type,
4631                      vcpus=self.be_full[constants.BE_VCPUS],
4632                      mem_size=self.be_full[constants.BE_MEMORY],
4633                      disks=self.disks,
4634                      nics=nics,
4635                      hypervisor=self.op.hypervisor,
4636                      )
4637
4638     ial.Run(self.op.iallocator)
4639
4640     if not ial.success:
4641       raise errors.OpPrereqError("Can't compute nodes using"
4642                                  " iallocator '%s': %s" % (self.op.iallocator,
4643                                                            ial.info))
4644     if len(ial.nodes) != ial.required_nodes:
4645       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4646                                  " of nodes (%s), required %s" %
4647                                  (self.op.iallocator, len(ial.nodes),
4648                                   ial.required_nodes))
4649     self.op.pnode = ial.nodes[0]
4650     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4651                  self.op.instance_name, self.op.iallocator,
4652                  ", ".join(ial.nodes))
4653     if ial.required_nodes == 2:
4654       self.op.snode = ial.nodes[1]
4655
4656   def BuildHooksEnv(self):
4657     """Build hooks env.
4658
4659     This runs on master, primary and secondary nodes of the instance.
4660
4661     """
4662     env = {
4663       "ADD_MODE": self.op.mode,
4664       }
4665     if self.op.mode == constants.INSTANCE_IMPORT:
4666       env["SRC_NODE"] = self.op.src_node
4667       env["SRC_PATH"] = self.op.src_path
4668       env["SRC_IMAGES"] = self.src_images
4669
4670     env.update(_BuildInstanceHookEnv(
4671       name=self.op.instance_name,
4672       primary_node=self.op.pnode,
4673       secondary_nodes=self.secondaries,
4674       status=self.op.start,
4675       os_type=self.op.os_type,
4676       memory=self.be_full[constants.BE_MEMORY],
4677       vcpus=self.be_full[constants.BE_VCPUS],
4678       nics=_PreBuildNICHooksList(self, self.nics),
4679       disk_template=self.op.disk_template,
4680       disks=[(d["size"], d["mode"]) for d in self.disks],
4681     ))
4682
4683     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4684           self.secondaries)
4685     return env, nl, nl
4686
4687
4688   def CheckPrereq(self):
4689     """Check prerequisites.
4690
4691     """
4692     if (not self.cfg.GetVGName() and
4693         self.op.disk_template not in constants.DTS_NOT_LVM):
4694       raise errors.OpPrereqError("Cluster does not support lvm-based"
4695                                  " instances")
4696
4697     if self.op.mode == constants.INSTANCE_IMPORT:
4698       src_node = self.op.src_node
4699       src_path = self.op.src_path
4700
4701       if src_node is None:
4702         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4703         exp_list = self.rpc.call_export_list(locked_nodes)
4704         found = False
4705         for node in exp_list:
4706           if exp_list[node].RemoteFailMsg():
4707             continue
4708           if src_path in exp_list[node].payload:
4709             found = True
4710             self.op.src_node = src_node = node
4711             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4712                                                        src_path)
4713             break
4714         if not found:
4715           raise errors.OpPrereqError("No export found for relative path %s" %
4716                                       src_path)
4717
4718       _CheckNodeOnline(self, src_node)
4719       result = self.rpc.call_export_info(src_node, src_path)
4720       msg = result.RemoteFailMsg()
4721       if msg:
4722         raise errors.OpPrereqError("No export or invalid export found in"
4723                                    " dir %s: %s" % (src_path, msg))
4724
4725       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4726       if not export_info.has_section(constants.INISECT_EXP):
4727         raise errors.ProgrammerError("Corrupted export config")
4728
4729       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4730       if (int(ei_version) != constants.EXPORT_VERSION):
4731         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4732                                    (ei_version, constants.EXPORT_VERSION))
4733
4734       # Check that the new instance doesn't have less disks than the export
4735       instance_disks = len(self.disks)
4736       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4737       if instance_disks < export_disks:
4738         raise errors.OpPrereqError("Not enough disks to import."
4739                                    " (instance: %d, export: %d)" %
4740                                    (instance_disks, export_disks))
4741
4742       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4743       disk_images = []
4744       for idx in range(export_disks):
4745         option = 'disk%d_dump' % idx
4746         if export_info.has_option(constants.INISECT_INS, option):
4747           # FIXME: are the old os-es, disk sizes, etc. useful?
4748           export_name = export_info.get(constants.INISECT_INS, option)
4749           image = os.path.join(src_path, export_name)
4750           disk_images.append(image)
4751         else:
4752           disk_images.append(False)
4753
4754       self.src_images = disk_images
4755
4756       old_name = export_info.get(constants.INISECT_INS, 'name')
4757       # FIXME: int() here could throw a ValueError on broken exports
4758       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4759       if self.op.instance_name == old_name:
4760         for idx, nic in enumerate(self.nics):
4761           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4762             nic_mac_ini = 'nic%d_mac' % idx
4763             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4764
4765     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4766     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4767     if self.op.start and not self.op.ip_check:
4768       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4769                                  " adding an instance in start mode")
4770
4771     if self.op.ip_check:
4772       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4773         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4774                                    (self.check_ip, self.op.instance_name))
4775
4776     #### mac address generation
4777     # By generating here the mac address both the allocator and the hooks get
4778     # the real final mac address rather than the 'auto' or 'generate' value.
4779     # There is a race condition between the generation and the instance object
4780     # creation, which means that we know the mac is valid now, but we're not
4781     # sure it will be when we actually add the instance. If things go bad
4782     # adding the instance will abort because of a duplicate mac, and the
4783     # creation job will fail.
4784     for nic in self.nics:
4785       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4786         nic.mac = self.cfg.GenerateMAC()
4787
4788     #### allocator run
4789
4790     if self.op.iallocator is not None:
4791       self._RunAllocator()
4792
4793     #### node related checks
4794
4795     # check primary node
4796     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4797     assert self.pnode is not None, \
4798       "Cannot retrieve locked node %s" % self.op.pnode
4799     if pnode.offline:
4800       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4801                                  pnode.name)
4802     if pnode.drained:
4803       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4804                                  pnode.name)
4805
4806     self.secondaries = []
4807
4808     # mirror node verification
4809     if self.op.disk_template in constants.DTS_NET_MIRROR:
4810       if self.op.snode is None:
4811         raise errors.OpPrereqError("The networked disk templates need"
4812                                    " a mirror node")
4813       if self.op.snode == pnode.name:
4814         raise errors.OpPrereqError("The secondary node cannot be"
4815                                    " the primary node.")
4816       _CheckNodeOnline(self, self.op.snode)
4817       _CheckNodeNotDrained(self, self.op.snode)
4818       self.secondaries.append(self.op.snode)
4819
4820     nodenames = [pnode.name] + self.secondaries
4821
4822     req_size = _ComputeDiskSize(self.op.disk_template,
4823                                 self.disks)
4824
4825     # Check lv size requirements
4826     if req_size is not None:
4827       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4828                                          self.op.hypervisor)
4829       for node in nodenames:
4830         info = nodeinfo[node]
4831         msg = info.RemoteFailMsg()
4832         if msg:
4833           raise errors.OpPrereqError("Cannot get current information"
4834                                      " from node %s: %s" % (node, msg))
4835         info = info.payload
4836         vg_free = info.get('vg_free', None)
4837         if not isinstance(vg_free, int):
4838           raise errors.OpPrereqError("Can't compute free disk space on"
4839                                      " node %s" % node)
4840         if req_size > vg_free:
4841           raise errors.OpPrereqError("Not enough disk space on target node %s."
4842                                      " %d MB available, %d MB required" %
4843                                      (node, vg_free, req_size))
4844
4845     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4846
4847     # os verification
4848     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4849     result.Raise()
4850     if not isinstance(result.data, objects.OS):
4851       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4852                                  " primary node"  % self.op.os_type)
4853
4854     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4855
4856     # memory check on primary node
4857     if self.op.start:
4858       _CheckNodeFreeMemory(self, self.pnode.name,
4859                            "creating instance %s" % self.op.instance_name,
4860                            self.be_full[constants.BE_MEMORY],
4861                            self.op.hypervisor)
4862
4863   def Exec(self, feedback_fn):
4864     """Create and add the instance to the cluster.
4865
4866     """
4867     instance = self.op.instance_name
4868     pnode_name = self.pnode.name
4869
4870     ht_kind = self.op.hypervisor
4871     if ht_kind in constants.HTS_REQ_PORT:
4872       network_port = self.cfg.AllocatePort()
4873     else:
4874       network_port = None
4875
4876     ##if self.op.vnc_bind_address is None:
4877     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4878
4879     # this is needed because os.path.join does not accept None arguments
4880     if self.op.file_storage_dir is None:
4881       string_file_storage_dir = ""
4882     else:
4883       string_file_storage_dir = self.op.file_storage_dir
4884
4885     # build the full file storage dir path
4886     file_storage_dir = os.path.normpath(os.path.join(
4887                                         self.cfg.GetFileStorageDir(),
4888                                         string_file_storage_dir, instance))
4889
4890
4891     disks = _GenerateDiskTemplate(self,
4892                                   self.op.disk_template,
4893                                   instance, pnode_name,
4894                                   self.secondaries,
4895                                   self.disks,
4896                                   file_storage_dir,
4897                                   self.op.file_driver,
4898                                   0)
4899
4900     iobj = objects.Instance(name=instance, os=self.op.os_type,
4901                             primary_node=pnode_name,
4902                             nics=self.nics, disks=disks,
4903                             disk_template=self.op.disk_template,
4904                             admin_up=False,
4905                             network_port=network_port,
4906                             beparams=self.op.beparams,
4907                             hvparams=self.op.hvparams,
4908                             hypervisor=self.op.hypervisor,
4909                             )
4910
4911     feedback_fn("* creating instance disks...")
4912     try:
4913       _CreateDisks(self, iobj)
4914     except errors.OpExecError:
4915       self.LogWarning("Device creation failed, reverting...")
4916       try:
4917         _RemoveDisks(self, iobj)
4918       finally:
4919         self.cfg.ReleaseDRBDMinors(instance)
4920         raise
4921
4922     feedback_fn("adding instance %s to cluster config" % instance)
4923
4924     self.cfg.AddInstance(iobj)
4925     # Declare that we don't want to remove the instance lock anymore, as we've
4926     # added the instance to the config
4927     del self.remove_locks[locking.LEVEL_INSTANCE]
4928     # Unlock all the nodes
4929     if self.op.mode == constants.INSTANCE_IMPORT:
4930       nodes_keep = [self.op.src_node]
4931       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4932                        if node != self.op.src_node]
4933       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4934       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4935     else:
4936       self.context.glm.release(locking.LEVEL_NODE)
4937       del self.acquired_locks[locking.LEVEL_NODE]
4938
4939     if self.op.wait_for_sync:
4940       disk_abort = not _WaitForSync(self, iobj)
4941     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4942       # make sure the disks are not degraded (still sync-ing is ok)
4943       time.sleep(15)
4944       feedback_fn("* checking mirrors status")
4945       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4946     else:
4947       disk_abort = False
4948
4949     if disk_abort:
4950       _RemoveDisks(self, iobj)
4951       self.cfg.RemoveInstance(iobj.name)
4952       # Make sure the instance lock gets removed
4953       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4954       raise errors.OpExecError("There are some degraded disks for"
4955                                " this instance")
4956
4957     feedback_fn("creating os for instance %s on node %s" %
4958                 (instance, pnode_name))
4959
4960     if iobj.disk_template != constants.DT_DISKLESS:
4961       if self.op.mode == constants.INSTANCE_CREATE:
4962         feedback_fn("* running the instance OS create scripts...")
4963         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4964         msg = result.RemoteFailMsg()
4965         if msg:
4966           raise errors.OpExecError("Could not add os for instance %s"
4967                                    " on node %s: %s" %
4968                                    (instance, pnode_name, msg))
4969
4970       elif self.op.mode == constants.INSTANCE_IMPORT:
4971         feedback_fn("* running the instance OS import scripts...")
4972         src_node = self.op.src_node
4973         src_images = self.src_images
4974         cluster_name = self.cfg.GetClusterName()
4975         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4976                                                          src_node, src_images,
4977                                                          cluster_name)
4978         msg = import_result.RemoteFailMsg()
4979         if msg:
4980           self.LogWarning("Error while importing the disk images for instance"
4981                           " %s on node %s: %s" % (instance, pnode_name, msg))
4982       else:
4983         # also checked in the prereq part
4984         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4985                                      % self.op.mode)
4986
4987     if self.op.start:
4988       iobj.admin_up = True
4989       self.cfg.Update(iobj)
4990       logging.info("Starting instance %s on node %s", instance, pnode_name)
4991       feedback_fn("* starting instance...")
4992       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4993       msg = result.RemoteFailMsg()
4994       if msg:
4995         raise errors.OpExecError("Could not start instance: %s" % msg)
4996
4997
4998 class LUConnectConsole(NoHooksLU):
4999   """Connect to an instance's console.
5000
5001   This is somewhat special in that it returns the command line that
5002   you need to run on the master node in order to connect to the
5003   console.
5004
5005   """
5006   _OP_REQP = ["instance_name"]
5007   REQ_BGL = False
5008
5009   def ExpandNames(self):
5010     self._ExpandAndLockInstance()
5011
5012   def CheckPrereq(self):
5013     """Check prerequisites.
5014
5015     This checks that the instance is in the cluster.
5016
5017     """
5018     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5019     assert self.instance is not None, \
5020       "Cannot retrieve locked instance %s" % self.op.instance_name
5021     _CheckNodeOnline(self, self.instance.primary_node)
5022
5023   def Exec(self, feedback_fn):
5024     """Connect to the console of an instance
5025
5026     """
5027     instance = self.instance
5028     node = instance.primary_node
5029
5030     node_insts = self.rpc.call_instance_list([node],
5031                                              [instance.hypervisor])[node]
5032     msg = node_insts.RemoteFailMsg()
5033     if msg:
5034       raise errors.OpExecError("Can't get node information from %s: %s" %
5035                                (node, msg))
5036
5037     if instance.name not in node_insts.payload:
5038       raise errors.OpExecError("Instance %s is not running." % instance.name)
5039
5040     logging.debug("Connecting to console of %s on %s", instance.name, node)
5041
5042     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5043     cluster = self.cfg.GetClusterInfo()
5044     # beparams and hvparams are passed separately, to avoid editing the
5045     # instance and then saving the defaults in the instance itself.
5046     hvparams = cluster.FillHV(instance)
5047     beparams = cluster.FillBE(instance)
5048     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5049
5050     # build ssh cmdline
5051     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5052
5053
5054 class LUReplaceDisks(LogicalUnit):
5055   """Replace the disks of an instance.
5056
5057   """
5058   HPATH = "mirrors-replace"
5059   HTYPE = constants.HTYPE_INSTANCE
5060   _OP_REQP = ["instance_name", "mode", "disks"]
5061   REQ_BGL = False
5062
5063   def CheckArguments(self):
5064     if not hasattr(self.op, "remote_node"):
5065       self.op.remote_node = None
5066     if not hasattr(self.op, "iallocator"):
5067       self.op.iallocator = None
5068
5069     # check for valid parameter combination
5070     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5071     if self.op.mode == constants.REPLACE_DISK_CHG:
5072       if cnt == 2:
5073         raise errors.OpPrereqError("When changing the secondary either an"
5074                                    " iallocator script must be used or the"
5075                                    " new node given")
5076       elif cnt == 0:
5077         raise errors.OpPrereqError("Give either the iallocator or the new"
5078                                    " secondary, not both")
5079     else: # not replacing the secondary
5080       if cnt != 2:
5081         raise errors.OpPrereqError("The iallocator and new node options can"
5082                                    " be used only when changing the"
5083                                    " secondary node")
5084
5085   def ExpandNames(self):
5086     self._ExpandAndLockInstance()
5087
5088     if self.op.iallocator is not None:
5089       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5090     elif self.op.remote_node is not None:
5091       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5092       if remote_node is None:
5093         raise errors.OpPrereqError("Node '%s' not known" %
5094                                    self.op.remote_node)
5095       self.op.remote_node = remote_node
5096       # Warning: do not remove the locking of the new secondary here
5097       # unless DRBD8.AddChildren is changed to work in parallel;
5098       # currently it doesn't since parallel invocations of
5099       # FindUnusedMinor will conflict
5100       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5101       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5102     else:
5103       self.needed_locks[locking.LEVEL_NODE] = []
5104       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5105
5106   def DeclareLocks(self, level):
5107     # If we're not already locking all nodes in the set we have to declare the
5108     # instance's primary/secondary nodes.
5109     if (level == locking.LEVEL_NODE and
5110         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5111       self._LockInstancesNodes()
5112
5113   def _RunAllocator(self):
5114     """Compute a new secondary node using an IAllocator.
5115
5116     """
5117     ial = IAllocator(self,
5118                      mode=constants.IALLOCATOR_MODE_RELOC,
5119                      name=self.op.instance_name,
5120                      relocate_from=[self.sec_node])
5121
5122     ial.Run(self.op.iallocator)
5123
5124     if not ial.success:
5125       raise errors.OpPrereqError("Can't compute nodes using"
5126                                  " iallocator '%s': %s" % (self.op.iallocator,
5127                                                            ial.info))
5128     if len(ial.nodes) != ial.required_nodes:
5129       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5130                                  " of nodes (%s), required %s" %
5131                                  (len(ial.nodes), ial.required_nodes))
5132     self.op.remote_node = ial.nodes[0]
5133     self.LogInfo("Selected new secondary for the instance: %s",
5134                  self.op.remote_node)
5135
5136   def BuildHooksEnv(self):
5137     """Build hooks env.
5138
5139     This runs on the master, the primary and all the secondaries.
5140
5141     """
5142     env = {
5143       "MODE": self.op.mode,
5144       "NEW_SECONDARY": self.op.remote_node,
5145       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5146       }
5147     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5148     nl = [
5149       self.cfg.GetMasterNode(),
5150       self.instance.primary_node,
5151       ]
5152     if self.op.remote_node is not None:
5153       nl.append(self.op.remote_node)
5154     return env, nl, nl
5155
5156   def CheckPrereq(self):
5157     """Check prerequisites.
5158
5159     This checks that the instance is in the cluster.
5160
5161     """
5162     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5163     assert instance is not None, \
5164       "Cannot retrieve locked instance %s" % self.op.instance_name
5165     self.instance = instance
5166
5167     if instance.disk_template != constants.DT_DRBD8:
5168       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5169                                  " instances")
5170
5171     if len(instance.secondary_nodes) != 1:
5172       raise errors.OpPrereqError("The instance has a strange layout,"
5173                                  " expected one secondary but found %d" %
5174                                  len(instance.secondary_nodes))
5175
5176     self.sec_node = instance.secondary_nodes[0]
5177
5178     if self.op.iallocator is not None:
5179       self._RunAllocator()
5180
5181     remote_node = self.op.remote_node
5182     if remote_node is not None:
5183       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5184       assert self.remote_node_info is not None, \
5185         "Cannot retrieve locked node %s" % remote_node
5186     else:
5187       self.remote_node_info = None
5188     if remote_node == instance.primary_node:
5189       raise errors.OpPrereqError("The specified node is the primary node of"
5190                                  " the instance.")
5191     elif remote_node == self.sec_node:
5192       raise errors.OpPrereqError("The specified node is already the"
5193                                  " secondary node of the instance.")
5194
5195     if self.op.mode == constants.REPLACE_DISK_PRI:
5196       n1 = self.tgt_node = instance.primary_node
5197       n2 = self.oth_node = self.sec_node
5198     elif self.op.mode == constants.REPLACE_DISK_SEC:
5199       n1 = self.tgt_node = self.sec_node
5200       n2 = self.oth_node = instance.primary_node
5201     elif self.op.mode == constants.REPLACE_DISK_CHG:
5202       n1 = self.new_node = remote_node
5203       n2 = self.oth_node = instance.primary_node
5204       self.tgt_node = self.sec_node
5205       _CheckNodeNotDrained(self, remote_node)
5206     else:
5207       raise errors.ProgrammerError("Unhandled disk replace mode")
5208
5209     _CheckNodeOnline(self, n1)
5210     _CheckNodeOnline(self, n2)
5211
5212     if not self.op.disks:
5213       self.op.disks = range(len(instance.disks))
5214
5215     for disk_idx in self.op.disks:
5216       instance.FindDisk(disk_idx)
5217
5218   def _ExecD8DiskOnly(self, feedback_fn):
5219     """Replace a disk on the primary or secondary for dbrd8.
5220
5221     The algorithm for replace is quite complicated:
5222
5223       1. for each disk to be replaced:
5224
5225         1. create new LVs on the target node with unique names
5226         1. detach old LVs from the drbd device
5227         1. rename old LVs to name_replaced.<time_t>
5228         1. rename new LVs to old LVs
5229         1. attach the new LVs (with the old names now) to the drbd device
5230
5231       1. wait for sync across all devices
5232
5233       1. for each modified disk:
5234
5235         1. remove old LVs (which have the name name_replaces.<time_t>)
5236
5237     Failures are not very well handled.
5238
5239     """
5240     steps_total = 6
5241     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5242     instance = self.instance
5243     iv_names = {}
5244     vgname = self.cfg.GetVGName()
5245     # start of work
5246     cfg = self.cfg
5247     tgt_node = self.tgt_node
5248     oth_node = self.oth_node
5249
5250     # Step: check device activation
5251     self.proc.LogStep(1, steps_total, "check device existence")
5252     info("checking volume groups")
5253     my_vg = cfg.GetVGName()
5254     results = self.rpc.call_vg_list([oth_node, tgt_node])
5255     if not results:
5256       raise errors.OpExecError("Can't list volume groups on the nodes")
5257     for node in oth_node, tgt_node:
5258       res = results[node]
5259       msg = res.RemoteFailMsg()
5260       if msg:
5261         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5262       if my_vg not in res.payload:
5263         raise errors.OpExecError("Volume group '%s' not found on %s" %
5264                                  (my_vg, node))
5265     for idx, dev in enumerate(instance.disks):
5266       if idx not in self.op.disks:
5267         continue
5268       for node in tgt_node, oth_node:
5269         info("checking disk/%d on %s" % (idx, node))
5270         cfg.SetDiskID(dev, node)
5271         result = self.rpc.call_blockdev_find(node, dev)
5272         msg = result.RemoteFailMsg()
5273         if not msg and not result.payload:
5274           msg = "disk not found"
5275         if msg:
5276           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5277                                    (idx, node, msg))
5278
5279     # Step: check other node consistency
5280     self.proc.LogStep(2, steps_total, "check peer consistency")
5281     for idx, dev in enumerate(instance.disks):
5282       if idx not in self.op.disks:
5283         continue
5284       info("checking disk/%d consistency on %s" % (idx, oth_node))
5285       if not _CheckDiskConsistency(self, dev, oth_node,
5286                                    oth_node==instance.primary_node):
5287         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5288                                  " to replace disks on this node (%s)" %
5289                                  (oth_node, tgt_node))
5290
5291     # Step: create new storage
5292     self.proc.LogStep(3, steps_total, "allocate new storage")
5293     for idx, dev in enumerate(instance.disks):
5294       if idx not in self.op.disks:
5295         continue
5296       size = dev.size
5297       cfg.SetDiskID(dev, tgt_node)
5298       lv_names = [".disk%d_%s" % (idx, suf)
5299                   for suf in ["data", "meta"]]
5300       names = _GenerateUniqueNames(self, lv_names)
5301       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5302                              logical_id=(vgname, names[0]))
5303       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5304                              logical_id=(vgname, names[1]))
5305       new_lvs = [lv_data, lv_meta]
5306       old_lvs = dev.children
5307       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5308       info("creating new local storage on %s for %s" %
5309            (tgt_node, dev.iv_name))
5310       # we pass force_create=True to force the LVM creation
5311       for new_lv in new_lvs:
5312         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5313                         _GetInstanceInfoText(instance), False)
5314
5315     # Step: for each lv, detach+rename*2+attach
5316     self.proc.LogStep(4, steps_total, "change drbd configuration")
5317     for dev, old_lvs, new_lvs in iv_names.itervalues():
5318       info("detaching %s drbd from local storage" % dev.iv_name)
5319       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5320       msg = result.RemoteFailMsg()
5321       if msg:
5322         raise errors.OpExecError("Can't detach drbd from local storage on node"
5323                                  " %s for device %s: %s" %
5324                                  (tgt_node, dev.iv_name, msg))
5325       #dev.children = []
5326       #cfg.Update(instance)
5327
5328       # ok, we created the new LVs, so now we know we have the needed
5329       # storage; as such, we proceed on the target node to rename
5330       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5331       # using the assumption that logical_id == physical_id (which in
5332       # turn is the unique_id on that node)
5333
5334       # FIXME(iustin): use a better name for the replaced LVs
5335       temp_suffix = int(time.time())
5336       ren_fn = lambda d, suff: (d.physical_id[0],
5337                                 d.physical_id[1] + "_replaced-%s" % suff)
5338       # build the rename list based on what LVs exist on the node
5339       rlist = []
5340       for to_ren in old_lvs:
5341         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5342         if not result.RemoteFailMsg() and result.payload:
5343           # device exists
5344           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5345
5346       info("renaming the old LVs on the target node")
5347       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5348       msg = result.RemoteFailMsg()
5349       if msg:
5350         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5351                                  (tgt_node, msg))
5352       # now we rename the new LVs to the old LVs
5353       info("renaming the new LVs on the target node")
5354       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5355       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5356       msg = result.RemoteFailMsg()
5357       if msg:
5358         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5359                                  (tgt_node, msg))
5360
5361       for old, new in zip(old_lvs, new_lvs):
5362         new.logical_id = old.logical_id
5363         cfg.SetDiskID(new, tgt_node)
5364
5365       for disk in old_lvs:
5366         disk.logical_id = ren_fn(disk, temp_suffix)
5367         cfg.SetDiskID(disk, tgt_node)
5368
5369       # now that the new lvs have the old name, we can add them to the device
5370       info("adding new mirror component on %s" % tgt_node)
5371       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5372       msg = result.RemoteFailMsg()
5373       if msg:
5374         for new_lv in new_lvs:
5375           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5376           if msg:
5377             warning("Can't rollback device %s: %s", dev, msg,
5378                     hint="cleanup manually the unused logical volumes")
5379         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5380
5381       dev.children = new_lvs
5382       cfg.Update(instance)
5383
5384     # Step: wait for sync
5385
5386     # this can fail as the old devices are degraded and _WaitForSync
5387     # does a combined result over all disks, so we don't check its
5388     # return value
5389     self.proc.LogStep(5, steps_total, "sync devices")
5390     _WaitForSync(self, instance, unlock=True)
5391
5392     # so check manually all the devices
5393     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5394       cfg.SetDiskID(dev, instance.primary_node)
5395       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5396       msg = result.RemoteFailMsg()
5397       if not msg and not result.payload:
5398         msg = "disk not found"
5399       if msg:
5400         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5401                                  (name, msg))
5402       if result.payload[5]:
5403         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5404
5405     # Step: remove old storage
5406     self.proc.LogStep(6, steps_total, "removing old storage")
5407     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5408       info("remove logical volumes for %s" % name)
5409       for lv in old_lvs:
5410         cfg.SetDiskID(lv, tgt_node)
5411         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5412         if msg:
5413           warning("Can't remove old LV: %s" % msg,
5414                   hint="manually remove unused LVs")
5415           continue
5416
5417   def _ExecD8Secondary(self, feedback_fn):
5418     """Replace the secondary node for drbd8.
5419
5420     The algorithm for replace is quite complicated:
5421       - for all disks of the instance:
5422         - create new LVs on the new node with same names
5423         - shutdown the drbd device on the old secondary
5424         - disconnect the drbd network on the primary
5425         - create the drbd device on the new secondary
5426         - network attach the drbd on the primary, using an artifice:
5427           the drbd code for Attach() will connect to the network if it
5428           finds a device which is connected to the good local disks but
5429           not network enabled
5430       - wait for sync across all devices
5431       - remove all disks from the old secondary
5432
5433     Failures are not very well handled.
5434
5435     """
5436     steps_total = 6
5437     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5438     instance = self.instance
5439     iv_names = {}
5440     # start of work
5441     cfg = self.cfg
5442     old_node = self.tgt_node
5443     new_node = self.new_node
5444     pri_node = instance.primary_node
5445     nodes_ip = {
5446       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5447       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5448       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5449       }
5450
5451     # Step: check device activation
5452     self.proc.LogStep(1, steps_total, "check device existence")
5453     info("checking volume groups")
5454     my_vg = cfg.GetVGName()
5455     results = self.rpc.call_vg_list([pri_node, new_node])
5456     for node in pri_node, new_node:
5457       res = results[node]
5458       msg = res.RemoteFailMsg()
5459       if msg:
5460         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5461       if my_vg not in res.payload:
5462         raise errors.OpExecError("Volume group '%s' not found on %s" %
5463                                  (my_vg, node))
5464     for idx, dev in enumerate(instance.disks):
5465       if idx not in self.op.disks:
5466         continue
5467       info("checking disk/%d on %s" % (idx, pri_node))
5468       cfg.SetDiskID(dev, pri_node)
5469       result = self.rpc.call_blockdev_find(pri_node, dev)
5470       msg = result.RemoteFailMsg()
5471       if not msg and not result.payload:
5472         msg = "disk not found"
5473       if msg:
5474         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5475                                  (idx, pri_node, msg))
5476
5477     # Step: check other node consistency
5478     self.proc.LogStep(2, steps_total, "check peer consistency")
5479     for idx, dev in enumerate(instance.disks):
5480       if idx not in self.op.disks:
5481         continue
5482       info("checking disk/%d consistency on %s" % (idx, pri_node))
5483       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5484         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5485                                  " unsafe to replace the secondary" %
5486                                  pri_node)
5487
5488     # Step: create new storage
5489     self.proc.LogStep(3, steps_total, "allocate new storage")
5490     for idx, dev in enumerate(instance.disks):
5491       info("adding new local storage on %s for disk/%d" %
5492            (new_node, idx))
5493       # we pass force_create=True to force LVM creation
5494       for new_lv in dev.children:
5495         _CreateBlockDev(self, new_node, instance, new_lv, True,
5496                         _GetInstanceInfoText(instance), False)
5497
5498     # Step 4: dbrd minors and drbd setups changes
5499     # after this, we must manually remove the drbd minors on both the
5500     # error and the success paths
5501     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5502                                    instance.name)
5503     logging.debug("Allocated minors %s" % (minors,))
5504     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5505     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5506       size = dev.size
5507       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5508       # create new devices on new_node; note that we create two IDs:
5509       # one without port, so the drbd will be activated without
5510       # networking information on the new node at this stage, and one
5511       # with network, for the latter activation in step 4
5512       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5513       if pri_node == o_node1:
5514         p_minor = o_minor1
5515       else:
5516         p_minor = o_minor2
5517
5518       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5519       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5520
5521       iv_names[idx] = (dev, dev.children, new_net_id)
5522       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5523                     new_net_id)
5524       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5525                               logical_id=new_alone_id,
5526                               children=dev.children)
5527       try:
5528         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5529                               _GetInstanceInfoText(instance), False)
5530       except errors.GenericError:
5531         self.cfg.ReleaseDRBDMinors(instance.name)
5532         raise
5533
5534     for idx, dev in enumerate(instance.disks):
5535       # we have new devices, shutdown the drbd on the old secondary
5536       info("shutting down drbd for disk/%d on old node" % idx)
5537       cfg.SetDiskID(dev, old_node)
5538       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5539       if msg:
5540         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5541                 (idx, msg),
5542                 hint="Please cleanup this device manually as soon as possible")
5543
5544     info("detaching primary drbds from the network (=> standalone)")
5545     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5546                                                instance.disks)[pri_node]
5547
5548     msg = result.RemoteFailMsg()
5549     if msg:
5550       # detaches didn't succeed (unlikely)
5551       self.cfg.ReleaseDRBDMinors(instance.name)
5552       raise errors.OpExecError("Can't detach the disks from the network on"
5553                                " old node: %s" % (msg,))
5554
5555     # if we managed to detach at least one, we update all the disks of
5556     # the instance to point to the new secondary
5557     info("updating instance configuration")
5558     for dev, _, new_logical_id in iv_names.itervalues():
5559       dev.logical_id = new_logical_id
5560       cfg.SetDiskID(dev, pri_node)
5561     cfg.Update(instance)
5562
5563     # and now perform the drbd attach
5564     info("attaching primary drbds to new secondary (standalone => connected)")
5565     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5566                                            instance.disks, instance.name,
5567                                            False)
5568     for to_node, to_result in result.items():
5569       msg = to_result.RemoteFailMsg()
5570       if msg:
5571         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5572                 hint="please do a gnt-instance info to see the"
5573                 " status of disks")
5574
5575     # this can fail as the old devices are degraded and _WaitForSync
5576     # does a combined result over all disks, so we don't check its
5577     # return value
5578     self.proc.LogStep(5, steps_total, "sync devices")
5579     _WaitForSync(self, instance, unlock=True)
5580
5581     # so check manually all the devices
5582     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5583       cfg.SetDiskID(dev, pri_node)
5584       result = self.rpc.call_blockdev_find(pri_node, dev)
5585       msg = result.RemoteFailMsg()
5586       if not msg and not result.payload:
5587         msg = "disk not found"
5588       if msg:
5589         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5590                                  (idx, msg))
5591       if result.payload[5]:
5592         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5593
5594     self.proc.LogStep(6, steps_total, "removing old storage")
5595     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5596       info("remove logical volumes for disk/%d" % idx)
5597       for lv in old_lvs:
5598         cfg.SetDiskID(lv, old_node)
5599         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5600         if msg:
5601           warning("Can't remove LV on old secondary: %s", msg,
5602                   hint="Cleanup stale volumes by hand")
5603
5604   def Exec(self, feedback_fn):
5605     """Execute disk replacement.
5606
5607     This dispatches the disk replacement to the appropriate handler.
5608
5609     """
5610     instance = self.instance
5611
5612     # Activate the instance disks if we're replacing them on a down instance
5613     if not instance.admin_up:
5614       _StartInstanceDisks(self, instance, True)
5615
5616     if self.op.mode == constants.REPLACE_DISK_CHG:
5617       fn = self._ExecD8Secondary
5618     else:
5619       fn = self._ExecD8DiskOnly
5620
5621     ret = fn(feedback_fn)
5622
5623     # Deactivate the instance disks if we're replacing them on a down instance
5624     if not instance.admin_up:
5625       _SafeShutdownInstanceDisks(self, instance)
5626
5627     return ret
5628
5629
5630 class LUGrowDisk(LogicalUnit):
5631   """Grow a disk of an instance.
5632
5633   """
5634   HPATH = "disk-grow"
5635   HTYPE = constants.HTYPE_INSTANCE
5636   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5637   REQ_BGL = False
5638
5639   def ExpandNames(self):
5640     self._ExpandAndLockInstance()
5641     self.needed_locks[locking.LEVEL_NODE] = []
5642     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5643
5644   def DeclareLocks(self, level):
5645     if level == locking.LEVEL_NODE:
5646       self._LockInstancesNodes()
5647
5648   def BuildHooksEnv(self):
5649     """Build hooks env.
5650
5651     This runs on the master, the primary and all the secondaries.
5652
5653     """
5654     env = {
5655       "DISK": self.op.disk,
5656       "AMOUNT": self.op.amount,
5657       }
5658     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5659     nl = [
5660       self.cfg.GetMasterNode(),
5661       self.instance.primary_node,
5662       ]
5663     return env, nl, nl
5664
5665   def CheckPrereq(self):
5666     """Check prerequisites.
5667
5668     This checks that the instance is in the cluster.
5669
5670     """
5671     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5672     assert instance is not None, \
5673       "Cannot retrieve locked instance %s" % self.op.instance_name
5674     nodenames = list(instance.all_nodes)
5675     for node in nodenames:
5676       _CheckNodeOnline(self, node)
5677
5678
5679     self.instance = instance
5680
5681     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5682       raise errors.OpPrereqError("Instance's disk layout does not support"
5683                                  " growing.")
5684
5685     self.disk = instance.FindDisk(self.op.disk)
5686
5687     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5688                                        instance.hypervisor)
5689     for node in nodenames:
5690       info = nodeinfo[node]
5691       msg = info.RemoteFailMsg()
5692       if msg:
5693         raise errors.OpPrereqError("Cannot get current information"
5694                                    " from node %s:" % (node, msg))
5695       vg_free = info.payload.get('vg_free', None)
5696       if not isinstance(vg_free, int):
5697         raise errors.OpPrereqError("Can't compute free disk space on"
5698                                    " node %s" % node)
5699       if self.op.amount > vg_free:
5700         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5701                                    " %d MiB available, %d MiB required" %
5702                                    (node, vg_free, self.op.amount))
5703
5704   def Exec(self, feedback_fn):
5705     """Execute disk grow.
5706
5707     """
5708     instance = self.instance
5709     disk = self.disk
5710     for node in instance.all_nodes:
5711       self.cfg.SetDiskID(disk, node)
5712       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5713       msg = result.RemoteFailMsg()
5714       if msg:
5715         raise errors.OpExecError("Grow request failed to node %s: %s" %
5716                                  (node, msg))
5717     disk.RecordGrow(self.op.amount)
5718     self.cfg.Update(instance)
5719     if self.op.wait_for_sync:
5720       disk_abort = not _WaitForSync(self, instance)
5721       if disk_abort:
5722         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5723                              " status.\nPlease check the instance.")
5724
5725
5726 class LUQueryInstanceData(NoHooksLU):
5727   """Query runtime instance data.
5728
5729   """
5730   _OP_REQP = ["instances", "static"]
5731   REQ_BGL = False
5732
5733   def ExpandNames(self):
5734     self.needed_locks = {}
5735     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5736
5737     if not isinstance(self.op.instances, list):
5738       raise errors.OpPrereqError("Invalid argument type 'instances'")
5739
5740     if self.op.instances:
5741       self.wanted_names = []
5742       for name in self.op.instances:
5743         full_name = self.cfg.ExpandInstanceName(name)
5744         if full_name is None:
5745           raise errors.OpPrereqError("Instance '%s' not known" % name)
5746         self.wanted_names.append(full_name)
5747       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5748     else:
5749       self.wanted_names = None
5750       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5751
5752     self.needed_locks[locking.LEVEL_NODE] = []
5753     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5754
5755   def DeclareLocks(self, level):
5756     if level == locking.LEVEL_NODE:
5757       self._LockInstancesNodes()
5758
5759   def CheckPrereq(self):
5760     """Check prerequisites.
5761
5762     This only checks the optional instance list against the existing names.
5763
5764     """
5765     if self.wanted_names is None:
5766       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5767
5768     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5769                              in self.wanted_names]
5770     return
5771
5772   def _ComputeDiskStatus(self, instance, snode, dev):
5773     """Compute block device status.
5774
5775     """
5776     static = self.op.static
5777     if not static:
5778       self.cfg.SetDiskID(dev, instance.primary_node)
5779       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5780       if dev_pstatus.offline:
5781         dev_pstatus = None
5782       else:
5783         msg = dev_pstatus.RemoteFailMsg()
5784         if msg:
5785           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5786                                    (instance.name, msg))
5787         dev_pstatus = dev_pstatus.payload
5788     else:
5789       dev_pstatus = None
5790
5791     if dev.dev_type in constants.LDS_DRBD:
5792       # we change the snode then (otherwise we use the one passed in)
5793       if dev.logical_id[0] == instance.primary_node:
5794         snode = dev.logical_id[1]
5795       else:
5796         snode = dev.logical_id[0]
5797
5798     if snode and not static:
5799       self.cfg.SetDiskID(dev, snode)
5800       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5801       if dev_sstatus.offline:
5802         dev_sstatus = None
5803       else:
5804         msg = dev_sstatus.RemoteFailMsg()
5805         if msg:
5806           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5807                                    (instance.name, msg))
5808         dev_sstatus = dev_sstatus.payload
5809     else:
5810       dev_sstatus = None
5811
5812     if dev.children:
5813       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5814                       for child in dev.children]
5815     else:
5816       dev_children = []
5817
5818     data = {
5819       "iv_name": dev.iv_name,
5820       "dev_type": dev.dev_type,
5821       "logical_id": dev.logical_id,
5822       "physical_id": dev.physical_id,
5823       "pstatus": dev_pstatus,
5824       "sstatus": dev_sstatus,
5825       "children": dev_children,
5826       "mode": dev.mode,
5827       }
5828
5829     return data
5830
5831   def Exec(self, feedback_fn):
5832     """Gather and return data"""
5833     result = {}
5834
5835     cluster = self.cfg.GetClusterInfo()
5836
5837     for instance in self.wanted_instances:
5838       if not self.op.static:
5839         remote_info = self.rpc.call_instance_info(instance.primary_node,
5840                                                   instance.name,
5841                                                   instance.hypervisor)
5842         msg = remote_info.RemoteFailMsg()
5843         if msg:
5844           raise errors.OpExecError("Error checking node %s: %s" %
5845                                    (instance.primary_node, msg))
5846         remote_info = remote_info.payload
5847         if remote_info and "state" in remote_info:
5848           remote_state = "up"
5849         else:
5850           remote_state = "down"
5851       else:
5852         remote_state = None
5853       if instance.admin_up:
5854         config_state = "up"
5855       else:
5856         config_state = "down"
5857
5858       disks = [self._ComputeDiskStatus(instance, None, device)
5859                for device in instance.disks]
5860
5861       idict = {
5862         "name": instance.name,
5863         "config_state": config_state,
5864         "run_state": remote_state,
5865         "pnode": instance.primary_node,
5866         "snodes": instance.secondary_nodes,
5867         "os": instance.os,
5868         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5869         "disks": disks,
5870         "hypervisor": instance.hypervisor,
5871         "network_port": instance.network_port,
5872         "hv_instance": instance.hvparams,
5873         "hv_actual": cluster.FillHV(instance),
5874         "be_instance": instance.beparams,
5875         "be_actual": cluster.FillBE(instance),
5876         }
5877
5878       result[instance.name] = idict
5879
5880     return result
5881
5882
5883 class LUSetInstanceParams(LogicalUnit):
5884   """Modifies an instances's parameters.
5885
5886   """
5887   HPATH = "instance-modify"
5888   HTYPE = constants.HTYPE_INSTANCE
5889   _OP_REQP = ["instance_name"]
5890   REQ_BGL = False
5891
5892   def CheckArguments(self):
5893     if not hasattr(self.op, 'nics'):
5894       self.op.nics = []
5895     if not hasattr(self.op, 'disks'):
5896       self.op.disks = []
5897     if not hasattr(self.op, 'beparams'):
5898       self.op.beparams = {}
5899     if not hasattr(self.op, 'hvparams'):
5900       self.op.hvparams = {}
5901     self.op.force = getattr(self.op, "force", False)
5902     if not (self.op.nics or self.op.disks or
5903             self.op.hvparams or self.op.beparams):
5904       raise errors.OpPrereqError("No changes submitted")
5905
5906     # Disk validation
5907     disk_addremove = 0
5908     for disk_op, disk_dict in self.op.disks:
5909       if disk_op == constants.DDM_REMOVE:
5910         disk_addremove += 1
5911         continue
5912       elif disk_op == constants.DDM_ADD:
5913         disk_addremove += 1
5914       else:
5915         if not isinstance(disk_op, int):
5916           raise errors.OpPrereqError("Invalid disk index")
5917       if disk_op == constants.DDM_ADD:
5918         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5919         if mode not in constants.DISK_ACCESS_SET:
5920           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5921         size = disk_dict.get('size', None)
5922         if size is None:
5923           raise errors.OpPrereqError("Required disk parameter size missing")
5924         try:
5925           size = int(size)
5926         except ValueError, err:
5927           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5928                                      str(err))
5929         disk_dict['size'] = size
5930       else:
5931         # modification of disk
5932         if 'size' in disk_dict:
5933           raise errors.OpPrereqError("Disk size change not possible, use"
5934                                      " grow-disk")
5935
5936     if disk_addremove > 1:
5937       raise errors.OpPrereqError("Only one disk add or remove operation"
5938                                  " supported at a time")
5939
5940     # NIC validation
5941     nic_addremove = 0
5942     for nic_op, nic_dict in self.op.nics:
5943       if nic_op == constants.DDM_REMOVE:
5944         nic_addremove += 1
5945         continue
5946       elif nic_op == constants.DDM_ADD:
5947         nic_addremove += 1
5948       else:
5949         if not isinstance(nic_op, int):
5950           raise errors.OpPrereqError("Invalid nic index")
5951
5952       # nic_dict should be a dict
5953       nic_ip = nic_dict.get('ip', None)
5954       if nic_ip is not None:
5955         if nic_ip.lower() == constants.VALUE_NONE:
5956           nic_dict['ip'] = None
5957         else:
5958           if not utils.IsValidIP(nic_ip):
5959             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5960
5961       nic_bridge = nic_dict.get('bridge', None)
5962       nic_link = nic_dict.get('link', None)
5963       if nic_bridge and nic_link:
5964         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5965       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5966         nic_dict['bridge'] = None
5967       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5968         nic_dict['link'] = None
5969
5970       if nic_op == constants.DDM_ADD:
5971         nic_mac = nic_dict.get('mac', None)
5972         if nic_mac is None:
5973           nic_dict['mac'] = constants.VALUE_AUTO
5974
5975       if 'mac' in nic_dict:
5976         nic_mac = nic_dict['mac']
5977         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5978           if not utils.IsValidMac(nic_mac):
5979             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5980         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5981           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5982                                      " modifying an existing nic")
5983
5984     if nic_addremove > 1:
5985       raise errors.OpPrereqError("Only one NIC add or remove operation"
5986                                  " supported at a time")
5987
5988   def ExpandNames(self):
5989     self._ExpandAndLockInstance()
5990     self.needed_locks[locking.LEVEL_NODE] = []
5991     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5992
5993   def DeclareLocks(self, level):
5994     if level == locking.LEVEL_NODE:
5995       self._LockInstancesNodes()
5996
5997   def BuildHooksEnv(self):
5998     """Build hooks env.
5999
6000     This runs on the master, primary and secondaries.
6001
6002     """
6003     args = dict()
6004     if constants.BE_MEMORY in self.be_new:
6005       args['memory'] = self.be_new[constants.BE_MEMORY]
6006     if constants.BE_VCPUS in self.be_new:
6007       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6008     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6009     # information at all.
6010     if self.op.nics:
6011       args['nics'] = []
6012       nic_override = dict(self.op.nics)
6013       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6014       for idx, nic in enumerate(self.instance.nics):
6015         if idx in nic_override:
6016           this_nic_override = nic_override[idx]
6017         else:
6018           this_nic_override = {}
6019         if 'ip' in this_nic_override:
6020           ip = this_nic_override['ip']
6021         else:
6022           ip = nic.ip
6023         if 'mac' in this_nic_override:
6024           mac = this_nic_override['mac']
6025         else:
6026           mac = nic.mac
6027         if idx in self.nic_pnew:
6028           nicparams = self.nic_pnew[idx]
6029         else:
6030           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6031         mode = nicparams[constants.NIC_MODE]
6032         link = nicparams[constants.NIC_LINK]
6033         args['nics'].append((ip, mac, mode, link))
6034       if constants.DDM_ADD in nic_override:
6035         ip = nic_override[constants.DDM_ADD].get('ip', None)
6036         mac = nic_override[constants.DDM_ADD]['mac']
6037         nicparams = self.nic_pnew[constants.DDM_ADD]
6038         mode = nicparams[constants.NIC_MODE]
6039         link = nicparams[constants.NIC_LINK]
6040         args['nics'].append((ip, mac, mode, link))
6041       elif constants.DDM_REMOVE in nic_override:
6042         del args['nics'][-1]
6043
6044     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6045     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6046     return env, nl, nl
6047
6048   def _GetUpdatedParams(self, old_params, update_dict,
6049                         default_values, parameter_types):
6050     """Return the new params dict for the given params.
6051
6052     @type old_params: dict
6053     @type old_params: old parameters
6054     @type update_dict: dict
6055     @type update_dict: dict containing new parameter values,
6056                        or constants.VALUE_DEFAULT to reset the
6057                        parameter to its default value
6058     @type default_values: dict
6059     @param default_values: default values for the filled parameters
6060     @type parameter_types: dict
6061     @param parameter_types: dict mapping target dict keys to types
6062                             in constants.ENFORCEABLE_TYPES
6063     @rtype: (dict, dict)
6064     @return: (new_parameters, filled_parameters)
6065
6066     """
6067     params_copy = copy.deepcopy(old_params)
6068     for key, val in update_dict.iteritems():
6069       if val == constants.VALUE_DEFAULT:
6070         try:
6071           del params_copy[key]
6072         except KeyError:
6073           pass
6074       else:
6075         params_copy[key] = val
6076     utils.ForceDictType(params_copy, parameter_types)
6077     params_filled = objects.FillDict(default_values, params_copy)
6078     return (params_copy, params_filled)
6079
6080   def CheckPrereq(self):
6081     """Check prerequisites.
6082
6083     This only checks the instance list against the existing names.
6084
6085     """
6086     force = self.force = self.op.force
6087
6088     # checking the new params on the primary/secondary nodes
6089
6090     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6091     cluster = self.cluster = self.cfg.GetClusterInfo()
6092     assert self.instance is not None, \
6093       "Cannot retrieve locked instance %s" % self.op.instance_name
6094     pnode = instance.primary_node
6095     nodelist = list(instance.all_nodes)
6096
6097     # hvparams processing
6098     if self.op.hvparams:
6099       i_hvdict, hv_new = self._GetUpdatedParams(
6100                              instance.hvparams, self.op.hvparams,
6101                              cluster.hvparams[instance.hypervisor],
6102                              constants.HVS_PARAMETER_TYPES)
6103       # local check
6104       hypervisor.GetHypervisor(
6105         instance.hypervisor).CheckParameterSyntax(hv_new)
6106       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6107       self.hv_new = hv_new # the new actual values
6108       self.hv_inst = i_hvdict # the new dict (without defaults)
6109     else:
6110       self.hv_new = self.hv_inst = {}
6111
6112     # beparams processing
6113     if self.op.beparams:
6114       i_bedict, be_new = self._GetUpdatedParams(
6115                              instance.beparams, self.op.beparams,
6116                              cluster.beparams[constants.PP_DEFAULT],
6117                              constants.BES_PARAMETER_TYPES)
6118       self.be_new = be_new # the new actual values
6119       self.be_inst = i_bedict # the new dict (without defaults)
6120     else:
6121       self.be_new = self.be_inst = {}
6122
6123     self.warn = []
6124
6125     if constants.BE_MEMORY in self.op.beparams and not self.force:
6126       mem_check_list = [pnode]
6127       if be_new[constants.BE_AUTO_BALANCE]:
6128         # either we changed auto_balance to yes or it was from before
6129         mem_check_list.extend(instance.secondary_nodes)
6130       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6131                                                   instance.hypervisor)
6132       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6133                                          instance.hypervisor)
6134       pninfo = nodeinfo[pnode]
6135       msg = pninfo.RemoteFailMsg()
6136       if msg:
6137         # Assume the primary node is unreachable and go ahead
6138         self.warn.append("Can't get info from primary node %s: %s" %
6139                          (pnode,  msg))
6140       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6141         self.warn.append("Node data from primary node %s doesn't contain"
6142                          " free memory information" % pnode)
6143       elif instance_info.RemoteFailMsg():
6144         self.warn.append("Can't get instance runtime information: %s" %
6145                         instance_info.RemoteFailMsg())
6146       else:
6147         if instance_info.payload:
6148           current_mem = int(instance_info.payload['memory'])
6149         else:
6150           # Assume instance not running
6151           # (there is a slight race condition here, but it's not very probable,
6152           # and we have no other way to check)
6153           current_mem = 0
6154         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6155                     pninfo.payload['memory_free'])
6156         if miss_mem > 0:
6157           raise errors.OpPrereqError("This change will prevent the instance"
6158                                      " from starting, due to %d MB of memory"
6159                                      " missing on its primary node" % miss_mem)
6160
6161       if be_new[constants.BE_AUTO_BALANCE]:
6162         for node, nres in nodeinfo.items():
6163           if node not in instance.secondary_nodes:
6164             continue
6165           msg = nres.RemoteFailMsg()
6166           if msg:
6167             self.warn.append("Can't get info from secondary node %s: %s" %
6168                              (node, msg))
6169           elif not isinstance(nres.payload.get('memory_free', None), int):
6170             self.warn.append("Secondary node %s didn't return free"
6171                              " memory information" % node)
6172           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6173             self.warn.append("Not enough memory to failover instance to"
6174                              " secondary node %s" % node)
6175
6176     # NIC processing
6177     self.nic_pnew = {}
6178     self.nic_pinst = {}
6179     for nic_op, nic_dict in self.op.nics:
6180       if nic_op == constants.DDM_REMOVE:
6181         if not instance.nics:
6182           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6183         continue
6184       if nic_op != constants.DDM_ADD:
6185         # an existing nic
6186         if nic_op < 0 or nic_op >= len(instance.nics):
6187           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6188                                      " are 0 to %d" %
6189                                      (nic_op, len(instance.nics)))
6190         old_nic_params = instance.nics[nic_op].nicparams
6191         old_nic_ip = instance.nics[nic_op].ip
6192       else:
6193         old_nic_params = {}
6194         old_nic_ip = None
6195
6196       update_params_dict = dict([(key, nic_dict[key])
6197                                  for key in constants.NICS_PARAMETERS
6198                                  if key in nic_dict])
6199
6200       if 'bridge' in nic_dict:
6201         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6202
6203       new_nic_params, new_filled_nic_params = \
6204           self._GetUpdatedParams(old_nic_params, update_params_dict,
6205                                  cluster.nicparams[constants.PP_DEFAULT],
6206                                  constants.NICS_PARAMETER_TYPES)
6207       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6208       self.nic_pinst[nic_op] = new_nic_params
6209       self.nic_pnew[nic_op] = new_filled_nic_params
6210       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6211
6212       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6213         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6214         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6215         msg = result.RemoteFailMsg()
6216         if msg:
6217           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6218           if self.force:
6219             self.warn.append(msg)
6220           else:
6221             raise errors.OpPrereqError(msg)
6222       if new_nic_mode == constants.NIC_MODE_ROUTED:
6223         if 'ip' in nic_dict:
6224           nic_ip = nic_dict['ip']
6225         else:
6226           nic_ip = old_nic_ip
6227         if nic_ip is None:
6228           raise errors.OpPrereqError('Cannot set the nic ip to None'
6229                                      ' on a routed nic')
6230       if 'mac' in nic_dict:
6231         nic_mac = nic_dict['mac']
6232         if nic_mac is None:
6233           raise errors.OpPrereqError('Cannot set the nic mac to None')
6234         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6235           # otherwise generate the mac
6236           nic_dict['mac'] = self.cfg.GenerateMAC()
6237         else:
6238           # or validate/reserve the current one
6239           if self.cfg.IsMacInUse(nic_mac):
6240             raise errors.OpPrereqError("MAC address %s already in use"
6241                                        " in cluster" % nic_mac)
6242
6243     # DISK processing
6244     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6245       raise errors.OpPrereqError("Disk operations not supported for"
6246                                  " diskless instances")
6247     for disk_op, disk_dict in self.op.disks:
6248       if disk_op == constants.DDM_REMOVE:
6249         if len(instance.disks) == 1:
6250           raise errors.OpPrereqError("Cannot remove the last disk of"
6251                                      " an instance")
6252         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6253         ins_l = ins_l[pnode]
6254         msg = ins_l.RemoteFailMsg()
6255         if msg:
6256           raise errors.OpPrereqError("Can't contact node %s: %s" %
6257                                      (pnode, msg))
6258         if instance.name in ins_l.payload:
6259           raise errors.OpPrereqError("Instance is running, can't remove"
6260                                      " disks.")
6261
6262       if (disk_op == constants.DDM_ADD and
6263           len(instance.nics) >= constants.MAX_DISKS):
6264         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6265                                    " add more" % constants.MAX_DISKS)
6266       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6267         # an existing disk
6268         if disk_op < 0 or disk_op >= len(instance.disks):
6269           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6270                                      " are 0 to %d" %
6271                                      (disk_op, len(instance.disks)))
6272
6273     return
6274
6275   def Exec(self, feedback_fn):
6276     """Modifies an instance.
6277
6278     All parameters take effect only at the next restart of the instance.
6279
6280     """
6281     # Process here the warnings from CheckPrereq, as we don't have a
6282     # feedback_fn there.
6283     for warn in self.warn:
6284       feedback_fn("WARNING: %s" % warn)
6285
6286     result = []
6287     instance = self.instance
6288     cluster = self.cluster
6289     # disk changes
6290     for disk_op, disk_dict in self.op.disks:
6291       if disk_op == constants.DDM_REMOVE:
6292         # remove the last disk
6293         device = instance.disks.pop()
6294         device_idx = len(instance.disks)
6295         for node, disk in device.ComputeNodeTree(instance.primary_node):
6296           self.cfg.SetDiskID(disk, node)
6297           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6298           if msg:
6299             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6300                             " continuing anyway", device_idx, node, msg)
6301         result.append(("disk/%d" % device_idx, "remove"))
6302       elif disk_op == constants.DDM_ADD:
6303         # add a new disk
6304         if instance.disk_template == constants.DT_FILE:
6305           file_driver, file_path = instance.disks[0].logical_id
6306           file_path = os.path.dirname(file_path)
6307         else:
6308           file_driver = file_path = None
6309         disk_idx_base = len(instance.disks)
6310         new_disk = _GenerateDiskTemplate(self,
6311                                          instance.disk_template,
6312                                          instance.name, instance.primary_node,
6313                                          instance.secondary_nodes,
6314                                          [disk_dict],
6315                                          file_path,
6316                                          file_driver,
6317                                          disk_idx_base)[0]
6318         instance.disks.append(new_disk)
6319         info = _GetInstanceInfoText(instance)
6320
6321         logging.info("Creating volume %s for instance %s",
6322                      new_disk.iv_name, instance.name)
6323         # Note: this needs to be kept in sync with _CreateDisks
6324         #HARDCODE
6325         for node in instance.all_nodes:
6326           f_create = node == instance.primary_node
6327           try:
6328             _CreateBlockDev(self, node, instance, new_disk,
6329                             f_create, info, f_create)
6330           except errors.OpExecError, err:
6331             self.LogWarning("Failed to create volume %s (%s) on"
6332                             " node %s: %s",
6333                             new_disk.iv_name, new_disk, node, err)
6334         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6335                        (new_disk.size, new_disk.mode)))
6336       else:
6337         # change a given disk
6338         instance.disks[disk_op].mode = disk_dict['mode']
6339         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6340     # NIC changes
6341     for nic_op, nic_dict in self.op.nics:
6342       if nic_op == constants.DDM_REMOVE:
6343         # remove the last nic
6344         del instance.nics[-1]
6345         result.append(("nic.%d" % len(instance.nics), "remove"))
6346       elif nic_op == constants.DDM_ADD:
6347         # mac and bridge should be set, by now
6348         mac = nic_dict['mac']
6349         ip = nic_dict.get('ip', None)
6350         nicparams = self.nic_pinst[constants.DDM_ADD]
6351         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6352         instance.nics.append(new_nic)
6353         result.append(("nic.%d" % (len(instance.nics) - 1),
6354                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6355                        (new_nic.mac, new_nic.ip,
6356                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6357                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6358                        )))
6359       else:
6360         for key in 'mac', 'ip':
6361           if key in nic_dict:
6362             setattr(instance.nics[nic_op], key, nic_dict[key])
6363         if nic_op in self.nic_pnew:
6364           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6365         for key, val in nic_dict.iteritems():
6366           result.append(("nic.%s/%d" % (key, nic_op), val))
6367
6368     # hvparams changes
6369     if self.op.hvparams:
6370       instance.hvparams = self.hv_inst
6371       for key, val in self.op.hvparams.iteritems():
6372         result.append(("hv/%s" % key, val))
6373
6374     # beparams changes
6375     if self.op.beparams:
6376       instance.beparams = self.be_inst
6377       for key, val in self.op.beparams.iteritems():
6378         result.append(("be/%s" % key, val))
6379
6380     self.cfg.Update(instance)
6381
6382     return result
6383
6384
6385 class LUQueryExports(NoHooksLU):
6386   """Query the exports list
6387
6388   """
6389   _OP_REQP = ['nodes']
6390   REQ_BGL = False
6391
6392   def ExpandNames(self):
6393     self.needed_locks = {}
6394     self.share_locks[locking.LEVEL_NODE] = 1
6395     if not self.op.nodes:
6396       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6397     else:
6398       self.needed_locks[locking.LEVEL_NODE] = \
6399         _GetWantedNodes(self, self.op.nodes)
6400
6401   def CheckPrereq(self):
6402     """Check prerequisites.
6403
6404     """
6405     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6406
6407   def Exec(self, feedback_fn):
6408     """Compute the list of all the exported system images.
6409
6410     @rtype: dict
6411     @return: a dictionary with the structure node->(export-list)
6412         where export-list is a list of the instances exported on
6413         that node.
6414
6415     """
6416     rpcresult = self.rpc.call_export_list(self.nodes)
6417     result = {}
6418     for node in rpcresult:
6419       if rpcresult[node].RemoteFailMsg():
6420         result[node] = False
6421       else:
6422         result[node] = rpcresult[node].payload
6423
6424     return result
6425
6426
6427 class LUExportInstance(LogicalUnit):
6428   """Export an instance to an image in the cluster.
6429
6430   """
6431   HPATH = "instance-export"
6432   HTYPE = constants.HTYPE_INSTANCE
6433   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6434   REQ_BGL = False
6435
6436   def ExpandNames(self):
6437     self._ExpandAndLockInstance()
6438     # FIXME: lock only instance primary and destination node
6439     #
6440     # Sad but true, for now we have do lock all nodes, as we don't know where
6441     # the previous export might be, and and in this LU we search for it and
6442     # remove it from its current node. In the future we could fix this by:
6443     #  - making a tasklet to search (share-lock all), then create the new one,
6444     #    then one to remove, after
6445     #  - removing the removal operation altoghether
6446     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6447
6448   def DeclareLocks(self, level):
6449     """Last minute lock declaration."""
6450     # All nodes are locked anyway, so nothing to do here.
6451
6452   def BuildHooksEnv(self):
6453     """Build hooks env.
6454
6455     This will run on the master, primary node and target node.
6456
6457     """
6458     env = {
6459       "EXPORT_NODE": self.op.target_node,
6460       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6461       }
6462     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6463     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6464           self.op.target_node]
6465     return env, nl, nl
6466
6467   def CheckPrereq(self):
6468     """Check prerequisites.
6469
6470     This checks that the instance and node names are valid.
6471
6472     """
6473     instance_name = self.op.instance_name
6474     self.instance = self.cfg.GetInstanceInfo(instance_name)
6475     assert self.instance is not None, \
6476           "Cannot retrieve locked instance %s" % self.op.instance_name
6477     _CheckNodeOnline(self, self.instance.primary_node)
6478
6479     self.dst_node = self.cfg.GetNodeInfo(
6480       self.cfg.ExpandNodeName(self.op.target_node))
6481
6482     if self.dst_node is None:
6483       # This is wrong node name, not a non-locked node
6484       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6485     _CheckNodeOnline(self, self.dst_node.name)
6486     _CheckNodeNotDrained(self, self.dst_node.name)
6487
6488     # instance disk type verification
6489     for disk in self.instance.disks:
6490       if disk.dev_type == constants.LD_FILE:
6491         raise errors.OpPrereqError("Export not supported for instances with"
6492                                    " file-based disks")
6493
6494   def Exec(self, feedback_fn):
6495     """Export an instance to an image in the cluster.
6496
6497     """
6498     instance = self.instance
6499     dst_node = self.dst_node
6500     src_node = instance.primary_node
6501     if self.op.shutdown:
6502       # shutdown the instance, but not the disks
6503       result = self.rpc.call_instance_shutdown(src_node, instance)
6504       msg = result.RemoteFailMsg()
6505       if msg:
6506         raise errors.OpExecError("Could not shutdown instance %s on"
6507                                  " node %s: %s" %
6508                                  (instance.name, src_node, msg))
6509
6510     vgname = self.cfg.GetVGName()
6511
6512     snap_disks = []
6513
6514     # set the disks ID correctly since call_instance_start needs the
6515     # correct drbd minor to create the symlinks
6516     for disk in instance.disks:
6517       self.cfg.SetDiskID(disk, src_node)
6518
6519     try:
6520       for disk in instance.disks:
6521         # result.payload will be a snapshot of an lvm leaf of the one we passed
6522         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6523         msg = result.RemoteFailMsg()
6524         if msg:
6525           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6526                           disk.logical_id[1], src_node, msg)
6527           snap_disks.append(False)
6528         else:
6529           disk_id = (vgname, result.payload)
6530           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6531                                  logical_id=disk_id, physical_id=disk_id,
6532                                  iv_name=disk.iv_name)
6533           snap_disks.append(new_dev)
6534
6535     finally:
6536       if self.op.shutdown and instance.admin_up:
6537         result = self.rpc.call_instance_start(src_node, instance, None, None)
6538         msg = result.RemoteFailMsg()
6539         if msg:
6540           _ShutdownInstanceDisks(self, instance)
6541           raise errors.OpExecError("Could not start instance: %s" % msg)
6542
6543     # TODO: check for size
6544
6545     cluster_name = self.cfg.GetClusterName()
6546     for idx, dev in enumerate(snap_disks):
6547       if dev:
6548         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6549                                                instance, cluster_name, idx)
6550         msg = result.RemoteFailMsg()
6551         if msg:
6552           self.LogWarning("Could not export block device %s from node %s to"
6553                           " node %s: %s", dev.logical_id[1], src_node,
6554                           dst_node.name, msg)
6555         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6556         if msg:
6557           self.LogWarning("Could not remove snapshot block device %s from node"
6558                           " %s: %s", dev.logical_id[1], src_node, msg)
6559
6560     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6561     msg = result.RemoteFailMsg()
6562     if msg:
6563       self.LogWarning("Could not finalize export for instance %s"
6564                       " on node %s: %s", instance.name, dst_node.name, msg)
6565
6566     nodelist = self.cfg.GetNodeList()
6567     nodelist.remove(dst_node.name)
6568
6569     # on one-node clusters nodelist will be empty after the removal
6570     # if we proceed the backup would be removed because OpQueryExports
6571     # substitutes an empty list with the full cluster node list.
6572     iname = instance.name
6573     if nodelist:
6574       exportlist = self.rpc.call_export_list(nodelist)
6575       for node in exportlist:
6576         if exportlist[node].RemoteFailMsg():
6577           continue
6578         if iname in exportlist[node].payload:
6579           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6580           if msg:
6581             self.LogWarning("Could not remove older export for instance %s"
6582                             " on node %s: %s", iname, node, msg)
6583
6584
6585 class LURemoveExport(NoHooksLU):
6586   """Remove exports related to the named instance.
6587
6588   """
6589   _OP_REQP = ["instance_name"]
6590   REQ_BGL = False
6591
6592   def ExpandNames(self):
6593     self.needed_locks = {}
6594     # We need all nodes to be locked in order for RemoveExport to work, but we
6595     # don't need to lock the instance itself, as nothing will happen to it (and
6596     # we can remove exports also for a removed instance)
6597     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6598
6599   def CheckPrereq(self):
6600     """Check prerequisites.
6601     """
6602     pass
6603
6604   def Exec(self, feedback_fn):
6605     """Remove any export.
6606
6607     """
6608     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6609     # If the instance was not found we'll try with the name that was passed in.
6610     # This will only work if it was an FQDN, though.
6611     fqdn_warn = False
6612     if not instance_name:
6613       fqdn_warn = True
6614       instance_name = self.op.instance_name
6615
6616     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6617     exportlist = self.rpc.call_export_list(locked_nodes)
6618     found = False
6619     for node in exportlist:
6620       msg = exportlist[node].RemoteFailMsg()
6621       if msg:
6622         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6623         continue
6624       if instance_name in exportlist[node].payload:
6625         found = True
6626         result = self.rpc.call_export_remove(node, instance_name)
6627         msg = result.RemoteFailMsg()
6628         if msg:
6629           logging.error("Could not remove export for instance %s"
6630                         " on node %s: %s", instance_name, node, msg)
6631
6632     if fqdn_warn and not found:
6633       feedback_fn("Export not found. If trying to remove an export belonging"
6634                   " to a deleted instance please use its Fully Qualified"
6635                   " Domain Name.")
6636
6637
6638 class TagsLU(NoHooksLU):
6639   """Generic tags LU.
6640
6641   This is an abstract class which is the parent of all the other tags LUs.
6642
6643   """
6644
6645   def ExpandNames(self):
6646     self.needed_locks = {}
6647     if self.op.kind == constants.TAG_NODE:
6648       name = self.cfg.ExpandNodeName(self.op.name)
6649       if name is None:
6650         raise errors.OpPrereqError("Invalid node name (%s)" %
6651                                    (self.op.name,))
6652       self.op.name = name
6653       self.needed_locks[locking.LEVEL_NODE] = name
6654     elif self.op.kind == constants.TAG_INSTANCE:
6655       name = self.cfg.ExpandInstanceName(self.op.name)
6656       if name is None:
6657         raise errors.OpPrereqError("Invalid instance name (%s)" %
6658                                    (self.op.name,))
6659       self.op.name = name
6660       self.needed_locks[locking.LEVEL_INSTANCE] = name
6661
6662   def CheckPrereq(self):
6663     """Check prerequisites.
6664
6665     """
6666     if self.op.kind == constants.TAG_CLUSTER:
6667       self.target = self.cfg.GetClusterInfo()
6668     elif self.op.kind == constants.TAG_NODE:
6669       self.target = self.cfg.GetNodeInfo(self.op.name)
6670     elif self.op.kind == constants.TAG_INSTANCE:
6671       self.target = self.cfg.GetInstanceInfo(self.op.name)
6672     else:
6673       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6674                                  str(self.op.kind))
6675
6676
6677 class LUGetTags(TagsLU):
6678   """Returns the tags of a given object.
6679
6680   """
6681   _OP_REQP = ["kind", "name"]
6682   REQ_BGL = False
6683
6684   def Exec(self, feedback_fn):
6685     """Returns the tag list.
6686
6687     """
6688     return list(self.target.GetTags())
6689
6690
6691 class LUSearchTags(NoHooksLU):
6692   """Searches the tags for a given pattern.
6693
6694   """
6695   _OP_REQP = ["pattern"]
6696   REQ_BGL = False
6697
6698   def ExpandNames(self):
6699     self.needed_locks = {}
6700
6701   def CheckPrereq(self):
6702     """Check prerequisites.
6703
6704     This checks the pattern passed for validity by compiling it.
6705
6706     """
6707     try:
6708       self.re = re.compile(self.op.pattern)
6709     except re.error, err:
6710       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6711                                  (self.op.pattern, err))
6712
6713   def Exec(self, feedback_fn):
6714     """Returns the tag list.
6715
6716     """
6717     cfg = self.cfg
6718     tgts = [("/cluster", cfg.GetClusterInfo())]
6719     ilist = cfg.GetAllInstancesInfo().values()
6720     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6721     nlist = cfg.GetAllNodesInfo().values()
6722     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6723     results = []
6724     for path, target in tgts:
6725       for tag in target.GetTags():
6726         if self.re.search(tag):
6727           results.append((path, tag))
6728     return results
6729
6730
6731 class LUAddTags(TagsLU):
6732   """Sets a tag on a given object.
6733
6734   """
6735   _OP_REQP = ["kind", "name", "tags"]
6736   REQ_BGL = False
6737
6738   def CheckPrereq(self):
6739     """Check prerequisites.
6740
6741     This checks the type and length of the tag name and value.
6742
6743     """
6744     TagsLU.CheckPrereq(self)
6745     for tag in self.op.tags:
6746       objects.TaggableObject.ValidateTag(tag)
6747
6748   def Exec(self, feedback_fn):
6749     """Sets the tag.
6750
6751     """
6752     try:
6753       for tag in self.op.tags:
6754         self.target.AddTag(tag)
6755     except errors.TagError, err:
6756       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6757     try:
6758       self.cfg.Update(self.target)
6759     except errors.ConfigurationError:
6760       raise errors.OpRetryError("There has been a modification to the"
6761                                 " config file and the operation has been"
6762                                 " aborted. Please retry.")
6763
6764
6765 class LUDelTags(TagsLU):
6766   """Delete a list of tags from a given object.
6767
6768   """
6769   _OP_REQP = ["kind", "name", "tags"]
6770   REQ_BGL = False
6771
6772   def CheckPrereq(self):
6773     """Check prerequisites.
6774
6775     This checks that we have the given tag.
6776
6777     """
6778     TagsLU.CheckPrereq(self)
6779     for tag in self.op.tags:
6780       objects.TaggableObject.ValidateTag(tag)
6781     del_tags = frozenset(self.op.tags)
6782     cur_tags = self.target.GetTags()
6783     if not del_tags <= cur_tags:
6784       diff_tags = del_tags - cur_tags
6785       diff_names = ["'%s'" % tag for tag in diff_tags]
6786       diff_names.sort()
6787       raise errors.OpPrereqError("Tag(s) %s not found" %
6788                                  (",".join(diff_names)))
6789
6790   def Exec(self, feedback_fn):
6791     """Remove the tag from the object.
6792
6793     """
6794     for tag in self.op.tags:
6795       self.target.RemoveTag(tag)
6796     try:
6797       self.cfg.Update(self.target)
6798     except errors.ConfigurationError:
6799       raise errors.OpRetryError("There has been a modification to the"
6800                                 " config file and the operation has been"
6801                                 " aborted. Please retry.")
6802
6803
6804 class LUTestDelay(NoHooksLU):
6805   """Sleep for a specified amount of time.
6806
6807   This LU sleeps on the master and/or nodes for a specified amount of
6808   time.
6809
6810   """
6811   _OP_REQP = ["duration", "on_master", "on_nodes"]
6812   REQ_BGL = False
6813
6814   def ExpandNames(self):
6815     """Expand names and set required locks.
6816
6817     This expands the node list, if any.
6818
6819     """
6820     self.needed_locks = {}
6821     if self.op.on_nodes:
6822       # _GetWantedNodes can be used here, but is not always appropriate to use
6823       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6824       # more information.
6825       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6826       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6827
6828   def CheckPrereq(self):
6829     """Check prerequisites.
6830
6831     """
6832
6833   def Exec(self, feedback_fn):
6834     """Do the actual sleep.
6835
6836     """
6837     if self.op.on_master:
6838       if not utils.TestDelay(self.op.duration):
6839         raise errors.OpExecError("Error during master delay test")
6840     if self.op.on_nodes:
6841       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6842       if not result:
6843         raise errors.OpExecError("Complete failure from rpc call")
6844       for node, node_result in result.items():
6845         node_result.Raise()
6846         if not node_result.data:
6847           raise errors.OpExecError("Failure during rpc call to node %s,"
6848                                    " result: %s" % (node, node_result.data))
6849
6850
6851 class IAllocator(object):
6852   """IAllocator framework.
6853
6854   An IAllocator instance has three sets of attributes:
6855     - cfg that is needed to query the cluster
6856     - input data (all members of the _KEYS class attribute are required)
6857     - four buffer attributes (in|out_data|text), that represent the
6858       input (to the external script) in text and data structure format,
6859       and the output from it, again in two formats
6860     - the result variables from the script (success, info, nodes) for
6861       easy usage
6862
6863   """
6864   _ALLO_KEYS = [
6865     "mem_size", "disks", "disk_template",
6866     "os", "tags", "nics", "vcpus", "hypervisor",
6867     ]
6868   _RELO_KEYS = [
6869     "relocate_from",
6870     ]
6871
6872   def __init__(self, lu, mode, name, **kwargs):
6873     self.lu = lu
6874     # init buffer variables
6875     self.in_text = self.out_text = self.in_data = self.out_data = None
6876     # init all input fields so that pylint is happy
6877     self.mode = mode
6878     self.name = name
6879     self.mem_size = self.disks = self.disk_template = None
6880     self.os = self.tags = self.nics = self.vcpus = None
6881     self.hypervisor = None
6882     self.relocate_from = None
6883     # computed fields
6884     self.required_nodes = None
6885     # init result fields
6886     self.success = self.info = self.nodes = None
6887     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6888       keyset = self._ALLO_KEYS
6889     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6890       keyset = self._RELO_KEYS
6891     else:
6892       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6893                                    " IAllocator" % self.mode)
6894     for key in kwargs:
6895       if key not in keyset:
6896         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6897                                      " IAllocator" % key)
6898       setattr(self, key, kwargs[key])
6899     for key in keyset:
6900       if key not in kwargs:
6901         raise errors.ProgrammerError("Missing input parameter '%s' to"
6902                                      " IAllocator" % key)
6903     self._BuildInputData()
6904
6905   def _ComputeClusterData(self):
6906     """Compute the generic allocator input data.
6907
6908     This is the data that is independent of the actual operation.
6909
6910     """
6911     cfg = self.lu.cfg
6912     cluster_info = cfg.GetClusterInfo()
6913     # cluster data
6914     data = {
6915       "version": constants.IALLOCATOR_VERSION,
6916       "cluster_name": cfg.GetClusterName(),
6917       "cluster_tags": list(cluster_info.GetTags()),
6918       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6919       # we don't have job IDs
6920       }
6921     iinfo = cfg.GetAllInstancesInfo().values()
6922     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6923
6924     # node data
6925     node_results = {}
6926     node_list = cfg.GetNodeList()
6927
6928     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6929       hypervisor_name = self.hypervisor
6930     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6931       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6932
6933     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6934                                            hypervisor_name)
6935     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6936                        cluster_info.enabled_hypervisors)
6937     for nname, nresult in node_data.items():
6938       # first fill in static (config-based) values
6939       ninfo = cfg.GetNodeInfo(nname)
6940       pnr = {
6941         "tags": list(ninfo.GetTags()),
6942         "primary_ip": ninfo.primary_ip,
6943         "secondary_ip": ninfo.secondary_ip,
6944         "offline": ninfo.offline,
6945         "drained": ninfo.drained,
6946         "master_candidate": ninfo.master_candidate,
6947         }
6948
6949       if not ninfo.offline:
6950         msg = nresult.RemoteFailMsg()
6951         if msg:
6952           raise errors.OpExecError("Can't get data for node %s: %s" %
6953                                    (nname, msg))
6954         msg = node_iinfo[nname].RemoteFailMsg()
6955         if msg:
6956           raise errors.OpExecError("Can't get node instance info"
6957                                    " from node %s: %s" % (nname, msg))
6958         remote_info = nresult.payload
6959         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6960                      'vg_size', 'vg_free', 'cpu_total']:
6961           if attr not in remote_info:
6962             raise errors.OpExecError("Node '%s' didn't return attribute"
6963                                      " '%s'" % (nname, attr))
6964           if not isinstance(remote_info[attr], int):
6965             raise errors.OpExecError("Node '%s' returned invalid value"
6966                                      " for '%s': %s" %
6967                                      (nname, attr, remote_info[attr]))
6968         # compute memory used by primary instances
6969         i_p_mem = i_p_up_mem = 0
6970         for iinfo, beinfo in i_list:
6971           if iinfo.primary_node == nname:
6972             i_p_mem += beinfo[constants.BE_MEMORY]
6973             if iinfo.name not in node_iinfo[nname].payload:
6974               i_used_mem = 0
6975             else:
6976               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6977             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6978             remote_info['memory_free'] -= max(0, i_mem_diff)
6979
6980             if iinfo.admin_up:
6981               i_p_up_mem += beinfo[constants.BE_MEMORY]
6982
6983         # compute memory used by instances
6984         pnr_dyn = {
6985           "total_memory": remote_info['memory_total'],
6986           "reserved_memory": remote_info['memory_dom0'],
6987           "free_memory": remote_info['memory_free'],
6988           "total_disk": remote_info['vg_size'],
6989           "free_disk": remote_info['vg_free'],
6990           "total_cpus": remote_info['cpu_total'],
6991           "i_pri_memory": i_p_mem,
6992           "i_pri_up_memory": i_p_up_mem,
6993           }
6994         pnr.update(pnr_dyn)
6995
6996       node_results[nname] = pnr
6997     data["nodes"] = node_results
6998
6999     # instance data
7000     instance_data = {}
7001     for iinfo, beinfo in i_list:
7002       nic_data = []
7003       for nic in iinfo.nics:
7004         filled_params = objects.FillDict(
7005             cluster_info.nicparams[constants.PP_DEFAULT],
7006             nic.nicparams)
7007         nic_dict = {"mac": nic.mac,
7008                     "ip": nic.ip,
7009                     "mode": filled_params[constants.NIC_MODE],
7010                     "link": filled_params[constants.NIC_LINK],
7011                    }
7012         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7013           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7014         nic_data.append(nic_dict)
7015       pir = {
7016         "tags": list(iinfo.GetTags()),
7017         "admin_up": iinfo.admin_up,
7018         "vcpus": beinfo[constants.BE_VCPUS],
7019         "memory": beinfo[constants.BE_MEMORY],
7020         "os": iinfo.os,
7021         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7022         "nics": nic_data,
7023         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7024         "disk_template": iinfo.disk_template,
7025         "hypervisor": iinfo.hypervisor,
7026         }
7027       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7028                                                  pir["disks"])
7029       instance_data[iinfo.name] = pir
7030
7031     data["instances"] = instance_data
7032
7033     self.in_data = data
7034
7035   def _AddNewInstance(self):
7036     """Add new instance data to allocator structure.
7037
7038     This in combination with _AllocatorGetClusterData will create the
7039     correct structure needed as input for the allocator.
7040
7041     The checks for the completeness of the opcode must have already been
7042     done.
7043
7044     """
7045     data = self.in_data
7046
7047     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7048
7049     if self.disk_template in constants.DTS_NET_MIRROR:
7050       self.required_nodes = 2
7051     else:
7052       self.required_nodes = 1
7053     request = {
7054       "type": "allocate",
7055       "name": self.name,
7056       "disk_template": self.disk_template,
7057       "tags": self.tags,
7058       "os": self.os,
7059       "vcpus": self.vcpus,
7060       "memory": self.mem_size,
7061       "disks": self.disks,
7062       "disk_space_total": disk_space,
7063       "nics": self.nics,
7064       "required_nodes": self.required_nodes,
7065       }
7066     data["request"] = request
7067
7068   def _AddRelocateInstance(self):
7069     """Add relocate instance data to allocator structure.
7070
7071     This in combination with _IAllocatorGetClusterData will create the
7072     correct structure needed as input for the allocator.
7073
7074     The checks for the completeness of the opcode must have already been
7075     done.
7076
7077     """
7078     instance = self.lu.cfg.GetInstanceInfo(self.name)
7079     if instance is None:
7080       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7081                                    " IAllocator" % self.name)
7082
7083     if instance.disk_template not in constants.DTS_NET_MIRROR:
7084       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7085
7086     if len(instance.secondary_nodes) != 1:
7087       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7088
7089     self.required_nodes = 1
7090     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7091     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7092
7093     request = {
7094       "type": "relocate",
7095       "name": self.name,
7096       "disk_space_total": disk_space,
7097       "required_nodes": self.required_nodes,
7098       "relocate_from": self.relocate_from,
7099       }
7100     self.in_data["request"] = request
7101
7102   def _BuildInputData(self):
7103     """Build input data structures.
7104
7105     """
7106     self._ComputeClusterData()
7107
7108     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7109       self._AddNewInstance()
7110     else:
7111       self._AddRelocateInstance()
7112
7113     self.in_text = serializer.Dump(self.in_data)
7114
7115   def Run(self, name, validate=True, call_fn=None):
7116     """Run an instance allocator and return the results.
7117
7118     """
7119     if call_fn is None:
7120       call_fn = self.lu.rpc.call_iallocator_runner
7121     data = self.in_text
7122
7123     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7124     result.Raise()
7125
7126     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7127       raise errors.OpExecError("Invalid result from master iallocator runner")
7128
7129     rcode, stdout, stderr, fail = result.data
7130
7131     if rcode == constants.IARUN_NOTFOUND:
7132       raise errors.OpExecError("Can't find allocator '%s'" % name)
7133     elif rcode == constants.IARUN_FAILURE:
7134       raise errors.OpExecError("Instance allocator call failed: %s,"
7135                                " output: %s" % (fail, stdout+stderr))
7136     self.out_text = stdout
7137     if validate:
7138       self._ValidateResult()
7139
7140   def _ValidateResult(self):
7141     """Process the allocator results.
7142
7143     This will process and if successful save the result in
7144     self.out_data and the other parameters.
7145
7146     """
7147     try:
7148       rdict = serializer.Load(self.out_text)
7149     except Exception, err:
7150       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7151
7152     if not isinstance(rdict, dict):
7153       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7154
7155     for key in "success", "info", "nodes":
7156       if key not in rdict:
7157         raise errors.OpExecError("Can't parse iallocator results:"
7158                                  " missing key '%s'" % key)
7159       setattr(self, key, rdict[key])
7160
7161     if not isinstance(rdict["nodes"], list):
7162       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7163                                " is not a list")
7164     self.out_data = rdict
7165
7166
7167 class LUTestAllocator(NoHooksLU):
7168   """Run allocator tests.
7169
7170   This LU runs the allocator tests
7171
7172   """
7173   _OP_REQP = ["direction", "mode", "name"]
7174
7175   def CheckPrereq(self):
7176     """Check prerequisites.
7177
7178     This checks the opcode parameters depending on the director and mode test.
7179
7180     """
7181     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7182       for attr in ["name", "mem_size", "disks", "disk_template",
7183                    "os", "tags", "nics", "vcpus"]:
7184         if not hasattr(self.op, attr):
7185           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7186                                      attr)
7187       iname = self.cfg.ExpandInstanceName(self.op.name)
7188       if iname is not None:
7189         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7190                                    iname)
7191       if not isinstance(self.op.nics, list):
7192         raise errors.OpPrereqError("Invalid parameter 'nics'")
7193       for row in self.op.nics:
7194         if (not isinstance(row, dict) or
7195             "mac" not in row or
7196             "ip" not in row or
7197             "bridge" not in row):
7198           raise errors.OpPrereqError("Invalid contents of the"
7199                                      " 'nics' parameter")
7200       if not isinstance(self.op.disks, list):
7201         raise errors.OpPrereqError("Invalid parameter 'disks'")
7202       for row in self.op.disks:
7203         if (not isinstance(row, dict) or
7204             "size" not in row or
7205             not isinstance(row["size"], int) or
7206             "mode" not in row or
7207             row["mode"] not in ['r', 'w']):
7208           raise errors.OpPrereqError("Invalid contents of the"
7209                                      " 'disks' parameter")
7210       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7211         self.op.hypervisor = self.cfg.GetHypervisorType()
7212     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7213       if not hasattr(self.op, "name"):
7214         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7215       fname = self.cfg.ExpandInstanceName(self.op.name)
7216       if fname is None:
7217         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7218                                    self.op.name)
7219       self.op.name = fname
7220       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7221     else:
7222       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7223                                  self.op.mode)
7224
7225     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7226       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7227         raise errors.OpPrereqError("Missing allocator name")
7228     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7229       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7230                                  self.op.direction)
7231
7232   def Exec(self, feedback_fn):
7233     """Run the allocator test.
7234
7235     """
7236     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7237       ial = IAllocator(self,
7238                        mode=self.op.mode,
7239                        name=self.op.name,
7240                        mem_size=self.op.mem_size,
7241                        disks=self.op.disks,
7242                        disk_template=self.op.disk_template,
7243                        os=self.op.os,
7244                        tags=self.op.tags,
7245                        nics=self.op.nics,
7246                        vcpus=self.op.vcpus,
7247                        hypervisor=self.op.hypervisor,
7248                        )
7249     else:
7250       ial = IAllocator(self,
7251                        mode=self.op.mode,
7252                        name=self.op.name,
7253                        relocate_from=list(self.relocate_from),
7254                        )
7255
7256     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7257       result = ial.in_text
7258     else:
7259       ial.Run(self.op.allocator, validate=False)
7260       result = ial.out_text
7261     return result