Conver node_leave_cluster rpc to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     @rtype: tuple of three items
1293     @return: a tuple of (dict of node-to-node_error, list of instances
1294         which need activate-disks, dict of instance: (node, volume) for
1295         missing volumes
1296
1297     """
1298     result = res_nodes, res_instances, res_missing = {}, [], {}
1299
1300     vg_name = self.cfg.GetVGName()
1301     nodes = utils.NiceSort(self.cfg.GetNodeList())
1302     instances = [self.cfg.GetInstanceInfo(name)
1303                  for name in self.cfg.GetInstanceList()]
1304
1305     nv_dict = {}
1306     for inst in instances:
1307       inst_lvs = {}
1308       if (not inst.admin_up or
1309           inst.disk_template not in constants.DTS_NET_MIRROR):
1310         continue
1311       inst.MapLVsByNode(inst_lvs)
1312       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313       for node, vol_list in inst_lvs.iteritems():
1314         for vol in vol_list:
1315           nv_dict[(node, vol)] = inst
1316
1317     if not nv_dict:
1318       return result
1319
1320     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321
1322     to_act = set()
1323     for node in nodes:
1324       # node_volume
1325       node_res = node_lvs[node]
1326       if node_res.offline:
1327         continue
1328       msg = node_res.RemoteFailMsg()
1329       if msg:
1330         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331         res_nodes[node] = msg
1332         continue
1333
1334       lvs = node_res.payload
1335       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336         inst = nv_dict.pop((node, lv_name), None)
1337         if (not lv_online and inst is not None
1338             and inst.name not in res_instances):
1339           res_instances.append(inst.name)
1340
1341     # any leftover items in nv_dict are missing LVs, let's arrange the
1342     # data better
1343     for key, inst in nv_dict.iteritems():
1344       if inst.name not in res_missing:
1345         res_missing[inst.name] = []
1346       res_missing[inst.name].append(key)
1347
1348     return result
1349
1350
1351 class LURenameCluster(LogicalUnit):
1352   """Rename the cluster.
1353
1354   """
1355   HPATH = "cluster-rename"
1356   HTYPE = constants.HTYPE_CLUSTER
1357   _OP_REQP = ["name"]
1358
1359   def BuildHooksEnv(self):
1360     """Build hooks env.
1361
1362     """
1363     env = {
1364       "OP_TARGET": self.cfg.GetClusterName(),
1365       "NEW_NAME": self.op.name,
1366       }
1367     mn = self.cfg.GetMasterNode()
1368     return env, [mn], [mn]
1369
1370   def CheckPrereq(self):
1371     """Verify that the passed name is a valid one.
1372
1373     """
1374     hostname = utils.HostInfo(self.op.name)
1375
1376     new_name = hostname.name
1377     self.ip = new_ip = hostname.ip
1378     old_name = self.cfg.GetClusterName()
1379     old_ip = self.cfg.GetMasterIP()
1380     if new_name == old_name and new_ip == old_ip:
1381       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382                                  " cluster has changed")
1383     if new_ip != old_ip:
1384       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386                                    " reachable on the network. Aborting." %
1387                                    new_ip)
1388
1389     self.op.name = new_name
1390
1391   def Exec(self, feedback_fn):
1392     """Rename the cluster.
1393
1394     """
1395     clustername = self.op.name
1396     ip = self.ip
1397
1398     # shutdown the master IP
1399     master = self.cfg.GetMasterNode()
1400     result = self.rpc.call_node_stop_master(master, False)
1401     msg = result.RemoteFailMsg()
1402     if msg:
1403       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1404
1405     try:
1406       cluster = self.cfg.GetClusterInfo()
1407       cluster.cluster_name = clustername
1408       cluster.master_ip = ip
1409       self.cfg.Update(cluster)
1410
1411       # update the known hosts file
1412       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1413       node_list = self.cfg.GetNodeList()
1414       try:
1415         node_list.remove(master)
1416       except ValueError:
1417         pass
1418       result = self.rpc.call_upload_file(node_list,
1419                                          constants.SSH_KNOWN_HOSTS_FILE)
1420       for to_node, to_result in result.iteritems():
1421          msg = to_result.RemoteFailMsg()
1422          if msg:
1423            msg = ("Copy of file %s to node %s failed: %s" %
1424                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1425            self.proc.LogWarning(msg)
1426
1427     finally:
1428       result = self.rpc.call_node_start_master(master, False)
1429       msg = result.RemoteFailMsg()
1430       if msg:
1431         self.LogWarning("Could not re-enable the master role on"
1432                         " the master, please restart manually: %s", msg)
1433
1434
1435 def _RecursiveCheckIfLVMBased(disk):
1436   """Check if the given disk or its children are lvm-based.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to check
1440   @rtype: booleean
1441   @return: boolean indicating whether a LD_LV dev_type was found or not
1442
1443   """
1444   if disk.children:
1445     for chdisk in disk.children:
1446       if _RecursiveCheckIfLVMBased(chdisk):
1447         return True
1448   return disk.dev_type == constants.LD_LV
1449
1450
1451 class LUSetClusterParams(LogicalUnit):
1452   """Change the parameters of the cluster.
1453
1454   """
1455   HPATH = "cluster-modify"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   _OP_REQP = []
1458   REQ_BGL = False
1459
1460   def CheckArguments(self):
1461     """Check parameters
1462
1463     """
1464     if not hasattr(self.op, "candidate_pool_size"):
1465       self.op.candidate_pool_size = None
1466     if self.op.candidate_pool_size is not None:
1467       try:
1468         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469       except (ValueError, TypeError), err:
1470         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471                                    str(err))
1472       if self.op.candidate_pool_size < 1:
1473         raise errors.OpPrereqError("At least one master candidate needed")
1474
1475   def ExpandNames(self):
1476     # FIXME: in the future maybe other cluster params won't require checking on
1477     # all nodes to be modified.
1478     self.needed_locks = {
1479       locking.LEVEL_NODE: locking.ALL_SET,
1480     }
1481     self.share_locks[locking.LEVEL_NODE] = 1
1482
1483   def BuildHooksEnv(self):
1484     """Build hooks env.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.cfg.GetClusterName(),
1489       "NEW_VG_NAME": self.op.vg_name,
1490       }
1491     mn = self.cfg.GetMasterNode()
1492     return env, [mn], [mn]
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks whether the given params don't conflict and
1498     if the given volume group is valid.
1499
1500     """
1501     if self.op.vg_name is not None and not self.op.vg_name:
1502       instances = self.cfg.GetAllInstancesInfo().values()
1503       for inst in instances:
1504         for disk in inst.disks:
1505           if _RecursiveCheckIfLVMBased(disk):
1506             raise errors.OpPrereqError("Cannot disable lvm storage while"
1507                                        " lvm-based instances exist")
1508
1509     node_list = self.acquired_locks[locking.LEVEL_NODE]
1510
1511     # if vg_name not None, checks given volume group on all nodes
1512     if self.op.vg_name:
1513       vglist = self.rpc.call_vg_list(node_list)
1514       for node in node_list:
1515         msg = vglist[node].RemoteFailMsg()
1516         if msg:
1517           # ignoring down node
1518           self.LogWarning("Error while gathering data on node %s"
1519                           " (ignoring node): %s", node, msg)
1520           continue
1521         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1522                                               self.op.vg_name,
1523                                               constants.MIN_VG_SIZE)
1524         if vgstatus:
1525           raise errors.OpPrereqError("Error on node '%s': %s" %
1526                                      (node, vgstatus))
1527
1528     self.cluster = cluster = self.cfg.GetClusterInfo()
1529     # validate params changes
1530     if self.op.beparams:
1531       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1532       self.new_beparams = objects.FillDict(
1533         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1534
1535     if self.op.nicparams:
1536       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1537       self.new_nicparams = objects.FillDict(
1538         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1539       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1540
1541     # hypervisor list/parameters
1542     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1543     if self.op.hvparams:
1544       if not isinstance(self.op.hvparams, dict):
1545         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1546       for hv_name, hv_dict in self.op.hvparams.items():
1547         if hv_name not in self.new_hvparams:
1548           self.new_hvparams[hv_name] = hv_dict
1549         else:
1550           self.new_hvparams[hv_name].update(hv_dict)
1551
1552     if self.op.enabled_hypervisors is not None:
1553       self.hv_list = self.op.enabled_hypervisors
1554     else:
1555       self.hv_list = cluster.enabled_hypervisors
1556
1557     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1558       # either the enabled list has changed, or the parameters have, validate
1559       for hv_name, hv_params in self.new_hvparams.items():
1560         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1561             (self.op.enabled_hypervisors and
1562              hv_name in self.op.enabled_hypervisors)):
1563           # either this is a new hypervisor, or its parameters have changed
1564           hv_class = hypervisor.GetHypervisor(hv_name)
1565           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1566           hv_class.CheckParameterSyntax(hv_params)
1567           _CheckHVParams(self, node_list, hv_name, hv_params)
1568
1569   def Exec(self, feedback_fn):
1570     """Change the parameters of the cluster.
1571
1572     """
1573     if self.op.vg_name is not None:
1574       new_volume = self.op.vg_name
1575       if not new_volume:
1576         new_volume = None
1577       if new_volume != self.cfg.GetVGName():
1578         self.cfg.SetVGName(new_volume)
1579       else:
1580         feedback_fn("Cluster LVM configuration already in desired"
1581                     " state, not changing")
1582     if self.op.hvparams:
1583       self.cluster.hvparams = self.new_hvparams
1584     if self.op.enabled_hypervisors is not None:
1585       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1586     if self.op.beparams:
1587       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1588     if self.op.nicparams:
1589       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1590
1591     if self.op.candidate_pool_size is not None:
1592       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1593
1594     self.cfg.Update(self.cluster)
1595
1596     # we want to update nodes after the cluster so that if any errors
1597     # happen, we have recorded and saved the cluster info
1598     if self.op.candidate_pool_size is not None:
1599       _AdjustCandidatePool(self)
1600
1601
1602 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1603   """Distribute additional files which are part of the cluster configuration.
1604
1605   ConfigWriter takes care of distributing the config and ssconf files, but
1606   there are more files which should be distributed to all nodes. This function
1607   makes sure those are copied.
1608
1609   @param lu: calling logical unit
1610   @param additional_nodes: list of nodes not in the config to distribute to
1611
1612   """
1613   # 1. Gather target nodes
1614   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1615   dist_nodes = lu.cfg.GetNodeList()
1616   if additional_nodes is not None:
1617     dist_nodes.extend(additional_nodes)
1618   if myself.name in dist_nodes:
1619     dist_nodes.remove(myself.name)
1620   # 2. Gather files to distribute
1621   dist_files = set([constants.ETC_HOSTS,
1622                     constants.SSH_KNOWN_HOSTS_FILE,
1623                     constants.RAPI_CERT_FILE,
1624                     constants.RAPI_USERS_FILE,
1625                    ])
1626
1627   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1628   for hv_name in enabled_hypervisors:
1629     hv_class = hypervisor.GetHypervisor(hv_name)
1630     dist_files.update(hv_class.GetAncillaryFiles())
1631
1632   # 3. Perform the files upload
1633   for fname in dist_files:
1634     if os.path.exists(fname):
1635       result = lu.rpc.call_upload_file(dist_nodes, fname)
1636       for to_node, to_result in result.items():
1637          msg = to_result.RemoteFailMsg()
1638          if msg:
1639            msg = ("Copy of file %s to node %s failed: %s" %
1640                    (fname, to_node, msg))
1641            lu.proc.LogWarning(msg)
1642
1643
1644 class LURedistributeConfig(NoHooksLU):
1645   """Force the redistribution of cluster configuration.
1646
1647   This is a very simple LU.
1648
1649   """
1650   _OP_REQP = []
1651   REQ_BGL = False
1652
1653   def ExpandNames(self):
1654     self.needed_locks = {
1655       locking.LEVEL_NODE: locking.ALL_SET,
1656     }
1657     self.share_locks[locking.LEVEL_NODE] = 1
1658
1659   def CheckPrereq(self):
1660     """Check prerequisites.
1661
1662     """
1663
1664   def Exec(self, feedback_fn):
1665     """Redistribute the configuration.
1666
1667     """
1668     self.cfg.Update(self.cfg.GetClusterInfo())
1669     _RedistributeAncillaryFiles(self)
1670
1671
1672 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1673   """Sleep and poll for an instance's disk to sync.
1674
1675   """
1676   if not instance.disks:
1677     return True
1678
1679   if not oneshot:
1680     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1681
1682   node = instance.primary_node
1683
1684   for dev in instance.disks:
1685     lu.cfg.SetDiskID(dev, node)
1686
1687   retries = 0
1688   while True:
1689     max_time = 0
1690     done = True
1691     cumul_degraded = False
1692     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1693     msg = rstats.RemoteFailMsg()
1694     if msg:
1695       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1696       retries += 1
1697       if retries >= 10:
1698         raise errors.RemoteError("Can't contact node %s for mirror data,"
1699                                  " aborting." % node)
1700       time.sleep(6)
1701       continue
1702     rstats = rstats.payload
1703     retries = 0
1704     for i, mstat in enumerate(rstats):
1705       if mstat is None:
1706         lu.LogWarning("Can't compute data for node %s/%s",
1707                            node, instance.disks[i].iv_name)
1708         continue
1709       # we ignore the ldisk parameter
1710       perc_done, est_time, is_degraded, _ = mstat
1711       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1712       if perc_done is not None:
1713         done = False
1714         if est_time is not None:
1715           rem_time = "%d estimated seconds remaining" % est_time
1716           max_time = est_time
1717         else:
1718           rem_time = "no time estimate"
1719         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1720                         (instance.disks[i].iv_name, perc_done, rem_time))
1721     if done or oneshot:
1722       break
1723
1724     time.sleep(min(60, max_time))
1725
1726   if done:
1727     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1728   return not cumul_degraded
1729
1730
1731 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1732   """Check that mirrors are not degraded.
1733
1734   The ldisk parameter, if True, will change the test from the
1735   is_degraded attribute (which represents overall non-ok status for
1736   the device(s)) to the ldisk (representing the local storage status).
1737
1738   """
1739   lu.cfg.SetDiskID(dev, node)
1740   if ldisk:
1741     idx = 6
1742   else:
1743     idx = 5
1744
1745   result = True
1746   if on_primary or dev.AssembleOnSecondary():
1747     rstats = lu.rpc.call_blockdev_find(node, dev)
1748     msg = rstats.RemoteFailMsg()
1749     if msg:
1750       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1751       result = False
1752     elif not rstats.payload:
1753       lu.LogWarning("Can't find disk on node %s", node)
1754       result = False
1755     else:
1756       result = result and (not rstats.payload[idx])
1757   if dev.children:
1758     for child in dev.children:
1759       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1760
1761   return result
1762
1763
1764 class LUDiagnoseOS(NoHooksLU):
1765   """Logical unit for OS diagnose/query.
1766
1767   """
1768   _OP_REQP = ["output_fields", "names"]
1769   REQ_BGL = False
1770   _FIELDS_STATIC = utils.FieldSet()
1771   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1772
1773   def ExpandNames(self):
1774     if self.op.names:
1775       raise errors.OpPrereqError("Selective OS query not supported")
1776
1777     _CheckOutputFields(static=self._FIELDS_STATIC,
1778                        dynamic=self._FIELDS_DYNAMIC,
1779                        selected=self.op.output_fields)
1780
1781     # Lock all nodes, in shared mode
1782     # Temporary removal of locks, should be reverted later
1783     # TODO: reintroduce locks when they are lighter-weight
1784     self.needed_locks = {}
1785     #self.share_locks[locking.LEVEL_NODE] = 1
1786     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1787
1788   def CheckPrereq(self):
1789     """Check prerequisites.
1790
1791     """
1792
1793   @staticmethod
1794   def _DiagnoseByOS(node_list, rlist):
1795     """Remaps a per-node return list into an a per-os per-node dictionary
1796
1797     @param node_list: a list with the names of all nodes
1798     @param rlist: a map with node names as keys and OS objects as values
1799
1800     @rtype: dict
1801     @return: a dictionary with osnames as keys and as value another map, with
1802         nodes as keys and list of OS objects as values, eg::
1803
1804           {"debian-etch": {"node1": [<object>,...],
1805                            "node2": [<object>,]}
1806           }
1807
1808     """
1809     all_os = {}
1810     # we build here the list of nodes that didn't fail the RPC (at RPC
1811     # level), so that nodes with a non-responding node daemon don't
1812     # make all OSes invalid
1813     good_nodes = [node_name for node_name in rlist
1814                   if not rlist[node_name].failed]
1815     for node_name, nr in rlist.iteritems():
1816       if nr.failed or not nr.data:
1817         continue
1818       for os_obj in nr.data:
1819         if os_obj.name not in all_os:
1820           # build a list of nodes for this os containing empty lists
1821           # for each node in node_list
1822           all_os[os_obj.name] = {}
1823           for nname in good_nodes:
1824             all_os[os_obj.name][nname] = []
1825         all_os[os_obj.name][node_name].append(os_obj)
1826     return all_os
1827
1828   def Exec(self, feedback_fn):
1829     """Compute the list of OSes.
1830
1831     """
1832     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1833     node_data = self.rpc.call_os_diagnose(valid_nodes)
1834     if node_data == False:
1835       raise errors.OpExecError("Can't gather the list of OSes")
1836     pol = self._DiagnoseByOS(valid_nodes, node_data)
1837     output = []
1838     for os_name, os_data in pol.iteritems():
1839       row = []
1840       for field in self.op.output_fields:
1841         if field == "name":
1842           val = os_name
1843         elif field == "valid":
1844           val = utils.all([osl and osl[0] for osl in os_data.values()])
1845         elif field == "node_status":
1846           val = {}
1847           for node_name, nos_list in os_data.iteritems():
1848             val[node_name] = [(v.status, v.path) for v in nos_list]
1849         else:
1850           raise errors.ParameterError(field)
1851         row.append(val)
1852       output.append(row)
1853
1854     return output
1855
1856
1857 class LURemoveNode(LogicalUnit):
1858   """Logical unit for removing a node.
1859
1860   """
1861   HPATH = "node-remove"
1862   HTYPE = constants.HTYPE_NODE
1863   _OP_REQP = ["node_name"]
1864
1865   def BuildHooksEnv(self):
1866     """Build hooks env.
1867
1868     This doesn't run on the target node in the pre phase as a failed
1869     node would then be impossible to remove.
1870
1871     """
1872     env = {
1873       "OP_TARGET": self.op.node_name,
1874       "NODE_NAME": self.op.node_name,
1875       }
1876     all_nodes = self.cfg.GetNodeList()
1877     all_nodes.remove(self.op.node_name)
1878     return env, all_nodes, all_nodes
1879
1880   def CheckPrereq(self):
1881     """Check prerequisites.
1882
1883     This checks:
1884      - the node exists in the configuration
1885      - it does not have primary or secondary instances
1886      - it's not the master
1887
1888     Any errors are signalled by raising errors.OpPrereqError.
1889
1890     """
1891     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1892     if node is None:
1893       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1894
1895     instance_list = self.cfg.GetInstanceList()
1896
1897     masternode = self.cfg.GetMasterNode()
1898     if node.name == masternode:
1899       raise errors.OpPrereqError("Node is the master node,"
1900                                  " you need to failover first.")
1901
1902     for instance_name in instance_list:
1903       instance = self.cfg.GetInstanceInfo(instance_name)
1904       if node.name in instance.all_nodes:
1905         raise errors.OpPrereqError("Instance %s is still running on the node,"
1906                                    " please remove first." % instance_name)
1907     self.op.node_name = node.name
1908     self.node = node
1909
1910   def Exec(self, feedback_fn):
1911     """Removes the node from the cluster.
1912
1913     """
1914     node = self.node
1915     logging.info("Stopping the node daemon and removing configs from node %s",
1916                  node.name)
1917
1918     self.context.RemoveNode(node.name)
1919
1920     result = self.rpc.call_node_leave_cluster(node.name)
1921     msg = result.RemoteFailMsg()
1922     if msg:
1923       self.LogWarning("Errors encountered on the remote node while leaving"
1924                       " the cluster: %s", msg)
1925
1926     # Promote nodes to master candidate as needed
1927     _AdjustCandidatePool(self)
1928
1929
1930 class LUQueryNodes(NoHooksLU):
1931   """Logical unit for querying nodes.
1932
1933   """
1934   _OP_REQP = ["output_fields", "names", "use_locking"]
1935   REQ_BGL = False
1936   _FIELDS_DYNAMIC = utils.FieldSet(
1937     "dtotal", "dfree",
1938     "mtotal", "mnode", "mfree",
1939     "bootid",
1940     "ctotal", "cnodes", "csockets",
1941     )
1942
1943   _FIELDS_STATIC = utils.FieldSet(
1944     "name", "pinst_cnt", "sinst_cnt",
1945     "pinst_list", "sinst_list",
1946     "pip", "sip", "tags",
1947     "serial_no",
1948     "master_candidate",
1949     "master",
1950     "offline",
1951     "drained",
1952     )
1953
1954   def ExpandNames(self):
1955     _CheckOutputFields(static=self._FIELDS_STATIC,
1956                        dynamic=self._FIELDS_DYNAMIC,
1957                        selected=self.op.output_fields)
1958
1959     self.needed_locks = {}
1960     self.share_locks[locking.LEVEL_NODE] = 1
1961
1962     if self.op.names:
1963       self.wanted = _GetWantedNodes(self, self.op.names)
1964     else:
1965       self.wanted = locking.ALL_SET
1966
1967     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1968     self.do_locking = self.do_node_query and self.op.use_locking
1969     if self.do_locking:
1970       # if we don't request only static fields, we need to lock the nodes
1971       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1972
1973
1974   def CheckPrereq(self):
1975     """Check prerequisites.
1976
1977     """
1978     # The validation of the node list is done in the _GetWantedNodes,
1979     # if non empty, and if empty, there's no validation to do
1980     pass
1981
1982   def Exec(self, feedback_fn):
1983     """Computes the list of nodes and their attributes.
1984
1985     """
1986     all_info = self.cfg.GetAllNodesInfo()
1987     if self.do_locking:
1988       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1989     elif self.wanted != locking.ALL_SET:
1990       nodenames = self.wanted
1991       missing = set(nodenames).difference(all_info.keys())
1992       if missing:
1993         raise errors.OpExecError(
1994           "Some nodes were removed before retrieving their data: %s" % missing)
1995     else:
1996       nodenames = all_info.keys()
1997
1998     nodenames = utils.NiceSort(nodenames)
1999     nodelist = [all_info[name] for name in nodenames]
2000
2001     # begin data gathering
2002
2003     if self.do_node_query:
2004       live_data = {}
2005       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2006                                           self.cfg.GetHypervisorType())
2007       for name in nodenames:
2008         nodeinfo = node_data[name]
2009         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2010           nodeinfo = nodeinfo.payload
2011           fn = utils.TryConvert
2012           live_data[name] = {
2013             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2014             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2015             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2016             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2017             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2018             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2019             "bootid": nodeinfo.get('bootid', None),
2020             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2021             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2022             }
2023         else:
2024           live_data[name] = {}
2025     else:
2026       live_data = dict.fromkeys(nodenames, {})
2027
2028     node_to_primary = dict([(name, set()) for name in nodenames])
2029     node_to_secondary = dict([(name, set()) for name in nodenames])
2030
2031     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2032                              "sinst_cnt", "sinst_list"))
2033     if inst_fields & frozenset(self.op.output_fields):
2034       instancelist = self.cfg.GetInstanceList()
2035
2036       for instance_name in instancelist:
2037         inst = self.cfg.GetInstanceInfo(instance_name)
2038         if inst.primary_node in node_to_primary:
2039           node_to_primary[inst.primary_node].add(inst.name)
2040         for secnode in inst.secondary_nodes:
2041           if secnode in node_to_secondary:
2042             node_to_secondary[secnode].add(inst.name)
2043
2044     master_node = self.cfg.GetMasterNode()
2045
2046     # end data gathering
2047
2048     output = []
2049     for node in nodelist:
2050       node_output = []
2051       for field in self.op.output_fields:
2052         if field == "name":
2053           val = node.name
2054         elif field == "pinst_list":
2055           val = list(node_to_primary[node.name])
2056         elif field == "sinst_list":
2057           val = list(node_to_secondary[node.name])
2058         elif field == "pinst_cnt":
2059           val = len(node_to_primary[node.name])
2060         elif field == "sinst_cnt":
2061           val = len(node_to_secondary[node.name])
2062         elif field == "pip":
2063           val = node.primary_ip
2064         elif field == "sip":
2065           val = node.secondary_ip
2066         elif field == "tags":
2067           val = list(node.GetTags())
2068         elif field == "serial_no":
2069           val = node.serial_no
2070         elif field == "master_candidate":
2071           val = node.master_candidate
2072         elif field == "master":
2073           val = node.name == master_node
2074         elif field == "offline":
2075           val = node.offline
2076         elif field == "drained":
2077           val = node.drained
2078         elif self._FIELDS_DYNAMIC.Matches(field):
2079           val = live_data[node.name].get(field, None)
2080         else:
2081           raise errors.ParameterError(field)
2082         node_output.append(val)
2083       output.append(node_output)
2084
2085     return output
2086
2087
2088 class LUQueryNodeVolumes(NoHooksLU):
2089   """Logical unit for getting volumes on node(s).
2090
2091   """
2092   _OP_REQP = ["nodes", "output_fields"]
2093   REQ_BGL = False
2094   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2095   _FIELDS_STATIC = utils.FieldSet("node")
2096
2097   def ExpandNames(self):
2098     _CheckOutputFields(static=self._FIELDS_STATIC,
2099                        dynamic=self._FIELDS_DYNAMIC,
2100                        selected=self.op.output_fields)
2101
2102     self.needed_locks = {}
2103     self.share_locks[locking.LEVEL_NODE] = 1
2104     if not self.op.nodes:
2105       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2106     else:
2107       self.needed_locks[locking.LEVEL_NODE] = \
2108         _GetWantedNodes(self, self.op.nodes)
2109
2110   def CheckPrereq(self):
2111     """Check prerequisites.
2112
2113     This checks that the fields required are valid output fields.
2114
2115     """
2116     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2117
2118   def Exec(self, feedback_fn):
2119     """Computes the list of nodes and their attributes.
2120
2121     """
2122     nodenames = self.nodes
2123     volumes = self.rpc.call_node_volumes(nodenames)
2124
2125     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2126              in self.cfg.GetInstanceList()]
2127
2128     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2129
2130     output = []
2131     for node in nodenames:
2132       if node not in volumes or volumes[node].failed or not volumes[node].data:
2133         continue
2134
2135       node_vols = volumes[node].data[:]
2136       node_vols.sort(key=lambda vol: vol['dev'])
2137
2138       for vol in node_vols:
2139         node_output = []
2140         for field in self.op.output_fields:
2141           if field == "node":
2142             val = node
2143           elif field == "phys":
2144             val = vol['dev']
2145           elif field == "vg":
2146             val = vol['vg']
2147           elif field == "name":
2148             val = vol['name']
2149           elif field == "size":
2150             val = int(float(vol['size']))
2151           elif field == "instance":
2152             for inst in ilist:
2153               if node not in lv_by_node[inst]:
2154                 continue
2155               if vol['name'] in lv_by_node[inst][node]:
2156                 val = inst.name
2157                 break
2158             else:
2159               val = '-'
2160           else:
2161             raise errors.ParameterError(field)
2162           node_output.append(str(val))
2163
2164         output.append(node_output)
2165
2166     return output
2167
2168
2169 class LUAddNode(LogicalUnit):
2170   """Logical unit for adding node to the cluster.
2171
2172   """
2173   HPATH = "node-add"
2174   HTYPE = constants.HTYPE_NODE
2175   _OP_REQP = ["node_name"]
2176
2177   def BuildHooksEnv(self):
2178     """Build hooks env.
2179
2180     This will run on all nodes before, and on all nodes + the new node after.
2181
2182     """
2183     env = {
2184       "OP_TARGET": self.op.node_name,
2185       "NODE_NAME": self.op.node_name,
2186       "NODE_PIP": self.op.primary_ip,
2187       "NODE_SIP": self.op.secondary_ip,
2188       }
2189     nodes_0 = self.cfg.GetNodeList()
2190     nodes_1 = nodes_0 + [self.op.node_name, ]
2191     return env, nodes_0, nodes_1
2192
2193   def CheckPrereq(self):
2194     """Check prerequisites.
2195
2196     This checks:
2197      - the new node is not already in the config
2198      - it is resolvable
2199      - its parameters (single/dual homed) matches the cluster
2200
2201     Any errors are signalled by raising errors.OpPrereqError.
2202
2203     """
2204     node_name = self.op.node_name
2205     cfg = self.cfg
2206
2207     dns_data = utils.HostInfo(node_name)
2208
2209     node = dns_data.name
2210     primary_ip = self.op.primary_ip = dns_data.ip
2211     secondary_ip = getattr(self.op, "secondary_ip", None)
2212     if secondary_ip is None:
2213       secondary_ip = primary_ip
2214     if not utils.IsValidIP(secondary_ip):
2215       raise errors.OpPrereqError("Invalid secondary IP given")
2216     self.op.secondary_ip = secondary_ip
2217
2218     node_list = cfg.GetNodeList()
2219     if not self.op.readd and node in node_list:
2220       raise errors.OpPrereqError("Node %s is already in the configuration" %
2221                                  node)
2222     elif self.op.readd and node not in node_list:
2223       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2224
2225     for existing_node_name in node_list:
2226       existing_node = cfg.GetNodeInfo(existing_node_name)
2227
2228       if self.op.readd and node == existing_node_name:
2229         if (existing_node.primary_ip != primary_ip or
2230             existing_node.secondary_ip != secondary_ip):
2231           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2232                                      " address configuration as before")
2233         continue
2234
2235       if (existing_node.primary_ip == primary_ip or
2236           existing_node.secondary_ip == primary_ip or
2237           existing_node.primary_ip == secondary_ip or
2238           existing_node.secondary_ip == secondary_ip):
2239         raise errors.OpPrereqError("New node ip address(es) conflict with"
2240                                    " existing node %s" % existing_node.name)
2241
2242     # check that the type of the node (single versus dual homed) is the
2243     # same as for the master
2244     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2245     master_singlehomed = myself.secondary_ip == myself.primary_ip
2246     newbie_singlehomed = secondary_ip == primary_ip
2247     if master_singlehomed != newbie_singlehomed:
2248       if master_singlehomed:
2249         raise errors.OpPrereqError("The master has no private ip but the"
2250                                    " new node has one")
2251       else:
2252         raise errors.OpPrereqError("The master has a private ip but the"
2253                                    " new node doesn't have one")
2254
2255     # checks reachablity
2256     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2257       raise errors.OpPrereqError("Node not reachable by ping")
2258
2259     if not newbie_singlehomed:
2260       # check reachability from my secondary ip to newbie's secondary ip
2261       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2262                            source=myself.secondary_ip):
2263         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2264                                    " based ping to noded port")
2265
2266     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2267     mc_now, _ = self.cfg.GetMasterCandidateStats()
2268     master_candidate = mc_now < cp_size
2269
2270     self.new_node = objects.Node(name=node,
2271                                  primary_ip=primary_ip,
2272                                  secondary_ip=secondary_ip,
2273                                  master_candidate=master_candidate,
2274                                  offline=False, drained=False)
2275
2276   def Exec(self, feedback_fn):
2277     """Adds the new node to the cluster.
2278
2279     """
2280     new_node = self.new_node
2281     node = new_node.name
2282
2283     # check connectivity
2284     result = self.rpc.call_version([node])[node]
2285     result.Raise()
2286     if result.data:
2287       if constants.PROTOCOL_VERSION == result.data:
2288         logging.info("Communication to node %s fine, sw version %s match",
2289                      node, result.data)
2290       else:
2291         raise errors.OpExecError("Version mismatch master version %s,"
2292                                  " node version %s" %
2293                                  (constants.PROTOCOL_VERSION, result.data))
2294     else:
2295       raise errors.OpExecError("Cannot get version from the new node")
2296
2297     # setup ssh on node
2298     logging.info("Copy ssh key to node %s", node)
2299     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2300     keyarray = []
2301     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2302                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2303                 priv_key, pub_key]
2304
2305     for i in keyfiles:
2306       f = open(i, 'r')
2307       try:
2308         keyarray.append(f.read())
2309       finally:
2310         f.close()
2311
2312     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2313                                     keyarray[2],
2314                                     keyarray[3], keyarray[4], keyarray[5])
2315
2316     msg = result.RemoteFailMsg()
2317     if msg:
2318       raise errors.OpExecError("Cannot transfer ssh keys to the"
2319                                " new node: %s" % msg)
2320
2321     # Add node to our /etc/hosts, and add key to known_hosts
2322     if self.cfg.GetClusterInfo().modify_etc_hosts:
2323       utils.AddHostToEtcHosts(new_node.name)
2324
2325     if new_node.secondary_ip != new_node.primary_ip:
2326       result = self.rpc.call_node_has_ip_address(new_node.name,
2327                                                  new_node.secondary_ip)
2328       msg = result.RemoteFailMsg()
2329       if msg:
2330         raise errors.OpPrereqError("Failure checking secondary ip"
2331                                    " on node %s: %s" % (new_node.name, msg))
2332       if not result.payload:
2333         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2334                                  " you gave (%s). Please fix and re-run this"
2335                                  " command." % new_node.secondary_ip)
2336
2337     node_verify_list = [self.cfg.GetMasterNode()]
2338     node_verify_param = {
2339       'nodelist': [node],
2340       # TODO: do a node-net-test as well?
2341     }
2342
2343     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2344                                        self.cfg.GetClusterName())
2345     for verifier in node_verify_list:
2346       msg = result[verifier].RemoteFailMsg()
2347       if msg:
2348         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2349                                  (verifier, msg))
2350       nl_payload = result[verifier].payload['nodelist']
2351       if nl_payload:
2352         for failed in nl_payload:
2353           feedback_fn("ssh/hostname verification failed %s -> %s" %
2354                       (verifier, nl_payload[failed]))
2355         raise errors.OpExecError("ssh/hostname verification failed.")
2356
2357     if self.op.readd:
2358       _RedistributeAncillaryFiles(self)
2359       self.context.ReaddNode(new_node)
2360     else:
2361       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2362       self.context.AddNode(new_node)
2363
2364
2365 class LUSetNodeParams(LogicalUnit):
2366   """Modifies the parameters of a node.
2367
2368   """
2369   HPATH = "node-modify"
2370   HTYPE = constants.HTYPE_NODE
2371   _OP_REQP = ["node_name"]
2372   REQ_BGL = False
2373
2374   def CheckArguments(self):
2375     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2376     if node_name is None:
2377       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2378     self.op.node_name = node_name
2379     _CheckBooleanOpField(self.op, 'master_candidate')
2380     _CheckBooleanOpField(self.op, 'offline')
2381     _CheckBooleanOpField(self.op, 'drained')
2382     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2383     if all_mods.count(None) == 3:
2384       raise errors.OpPrereqError("Please pass at least one modification")
2385     if all_mods.count(True) > 1:
2386       raise errors.OpPrereqError("Can't set the node into more than one"
2387                                  " state at the same time")
2388
2389   def ExpandNames(self):
2390     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2391
2392   def BuildHooksEnv(self):
2393     """Build hooks env.
2394
2395     This runs on the master node.
2396
2397     """
2398     env = {
2399       "OP_TARGET": self.op.node_name,
2400       "MASTER_CANDIDATE": str(self.op.master_candidate),
2401       "OFFLINE": str(self.op.offline),
2402       "DRAINED": str(self.op.drained),
2403       }
2404     nl = [self.cfg.GetMasterNode(),
2405           self.op.node_name]
2406     return env, nl, nl
2407
2408   def CheckPrereq(self):
2409     """Check prerequisites.
2410
2411     This only checks the instance list against the existing names.
2412
2413     """
2414     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2415
2416     if ((self.op.master_candidate == False or self.op.offline == True or
2417          self.op.drained == True) and node.master_candidate):
2418       # we will demote the node from master_candidate
2419       if self.op.node_name == self.cfg.GetMasterNode():
2420         raise errors.OpPrereqError("The master node has to be a"
2421                                    " master candidate, online and not drained")
2422       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2423       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2424       if num_candidates <= cp_size:
2425         msg = ("Not enough master candidates (desired"
2426                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2427         if self.op.force:
2428           self.LogWarning(msg)
2429         else:
2430           raise errors.OpPrereqError(msg)
2431
2432     if (self.op.master_candidate == True and
2433         ((node.offline and not self.op.offline == False) or
2434          (node.drained and not self.op.drained == False))):
2435       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2436                                  " to master_candidate" % node.name)
2437
2438     return
2439
2440   def Exec(self, feedback_fn):
2441     """Modifies a node.
2442
2443     """
2444     node = self.node
2445
2446     result = []
2447     changed_mc = False
2448
2449     if self.op.offline is not None:
2450       node.offline = self.op.offline
2451       result.append(("offline", str(self.op.offline)))
2452       if self.op.offline == True:
2453         if node.master_candidate:
2454           node.master_candidate = False
2455           changed_mc = True
2456           result.append(("master_candidate", "auto-demotion due to offline"))
2457         if node.drained:
2458           node.drained = False
2459           result.append(("drained", "clear drained status due to offline"))
2460
2461     if self.op.master_candidate is not None:
2462       node.master_candidate = self.op.master_candidate
2463       changed_mc = True
2464       result.append(("master_candidate", str(self.op.master_candidate)))
2465       if self.op.master_candidate == False:
2466         rrc = self.rpc.call_node_demote_from_mc(node.name)
2467         msg = rrc.RemoteFailMsg()
2468         if msg:
2469           self.LogWarning("Node failed to demote itself: %s" % msg)
2470
2471     if self.op.drained is not None:
2472       node.drained = self.op.drained
2473       result.append(("drained", str(self.op.drained)))
2474       if self.op.drained == True:
2475         if node.master_candidate:
2476           node.master_candidate = False
2477           changed_mc = True
2478           result.append(("master_candidate", "auto-demotion due to drain"))
2479         if node.offline:
2480           node.offline = False
2481           result.append(("offline", "clear offline status due to drain"))
2482
2483     # this will trigger configuration file update, if needed
2484     self.cfg.Update(node)
2485     # this will trigger job queue propagation or cleanup
2486     if changed_mc:
2487       self.context.ReaddNode(node)
2488
2489     return result
2490
2491
2492 class LUPowercycleNode(NoHooksLU):
2493   """Powercycles a node.
2494
2495   """
2496   _OP_REQP = ["node_name", "force"]
2497   REQ_BGL = False
2498
2499   def CheckArguments(self):
2500     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2501     if node_name is None:
2502       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2503     self.op.node_name = node_name
2504     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2505       raise errors.OpPrereqError("The node is the master and the force"
2506                                  " parameter was not set")
2507
2508   def ExpandNames(self):
2509     """Locking for PowercycleNode.
2510
2511     This is a last-resource option and shouldn't block on other
2512     jobs. Therefore, we grab no locks.
2513
2514     """
2515     self.needed_locks = {}
2516
2517   def CheckPrereq(self):
2518     """Check prerequisites.
2519
2520     This LU has no prereqs.
2521
2522     """
2523     pass
2524
2525   def Exec(self, feedback_fn):
2526     """Reboots a node.
2527
2528     """
2529     result = self.rpc.call_node_powercycle(self.op.node_name,
2530                                            self.cfg.GetHypervisorType())
2531     msg = result.RemoteFailMsg()
2532     if msg:
2533       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2534     return result.payload
2535
2536
2537 class LUQueryClusterInfo(NoHooksLU):
2538   """Query cluster configuration.
2539
2540   """
2541   _OP_REQP = []
2542   REQ_BGL = False
2543
2544   def ExpandNames(self):
2545     self.needed_locks = {}
2546
2547   def CheckPrereq(self):
2548     """No prerequsites needed for this LU.
2549
2550     """
2551     pass
2552
2553   def Exec(self, feedback_fn):
2554     """Return cluster config.
2555
2556     """
2557     cluster = self.cfg.GetClusterInfo()
2558     result = {
2559       "software_version": constants.RELEASE_VERSION,
2560       "protocol_version": constants.PROTOCOL_VERSION,
2561       "config_version": constants.CONFIG_VERSION,
2562       "os_api_version": constants.OS_API_VERSION,
2563       "export_version": constants.EXPORT_VERSION,
2564       "architecture": (platform.architecture()[0], platform.machine()),
2565       "name": cluster.cluster_name,
2566       "master": cluster.master_node,
2567       "default_hypervisor": cluster.default_hypervisor,
2568       "enabled_hypervisors": cluster.enabled_hypervisors,
2569       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2570                         for hypervisor in cluster.enabled_hypervisors]),
2571       "beparams": cluster.beparams,
2572       "nicparams": cluster.nicparams,
2573       "candidate_pool_size": cluster.candidate_pool_size,
2574       "master_netdev": cluster.master_netdev,
2575       "volume_group_name": cluster.volume_group_name,
2576       "file_storage_dir": cluster.file_storage_dir,
2577       }
2578
2579     return result
2580
2581
2582 class LUQueryConfigValues(NoHooksLU):
2583   """Return configuration values.
2584
2585   """
2586   _OP_REQP = []
2587   REQ_BGL = False
2588   _FIELDS_DYNAMIC = utils.FieldSet()
2589   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2590
2591   def ExpandNames(self):
2592     self.needed_locks = {}
2593
2594     _CheckOutputFields(static=self._FIELDS_STATIC,
2595                        dynamic=self._FIELDS_DYNAMIC,
2596                        selected=self.op.output_fields)
2597
2598   def CheckPrereq(self):
2599     """No prerequisites.
2600
2601     """
2602     pass
2603
2604   def Exec(self, feedback_fn):
2605     """Dump a representation of the cluster config to the standard output.
2606
2607     """
2608     values = []
2609     for field in self.op.output_fields:
2610       if field == "cluster_name":
2611         entry = self.cfg.GetClusterName()
2612       elif field == "master_node":
2613         entry = self.cfg.GetMasterNode()
2614       elif field == "drain_flag":
2615         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2616       else:
2617         raise errors.ParameterError(field)
2618       values.append(entry)
2619     return values
2620
2621
2622 class LUActivateInstanceDisks(NoHooksLU):
2623   """Bring up an instance's disks.
2624
2625   """
2626   _OP_REQP = ["instance_name"]
2627   REQ_BGL = False
2628
2629   def ExpandNames(self):
2630     self._ExpandAndLockInstance()
2631     self.needed_locks[locking.LEVEL_NODE] = []
2632     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2633
2634   def DeclareLocks(self, level):
2635     if level == locking.LEVEL_NODE:
2636       self._LockInstancesNodes()
2637
2638   def CheckPrereq(self):
2639     """Check prerequisites.
2640
2641     This checks that the instance is in the cluster.
2642
2643     """
2644     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2645     assert self.instance is not None, \
2646       "Cannot retrieve locked instance %s" % self.op.instance_name
2647     _CheckNodeOnline(self, self.instance.primary_node)
2648
2649   def Exec(self, feedback_fn):
2650     """Activate the disks.
2651
2652     """
2653     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2654     if not disks_ok:
2655       raise errors.OpExecError("Cannot activate block devices")
2656
2657     return disks_info
2658
2659
2660 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2661   """Prepare the block devices for an instance.
2662
2663   This sets up the block devices on all nodes.
2664
2665   @type lu: L{LogicalUnit}
2666   @param lu: the logical unit on whose behalf we execute
2667   @type instance: L{objects.Instance}
2668   @param instance: the instance for whose disks we assemble
2669   @type ignore_secondaries: boolean
2670   @param ignore_secondaries: if true, errors on secondary nodes
2671       won't result in an error return from the function
2672   @return: False if the operation failed, otherwise a list of
2673       (host, instance_visible_name, node_visible_name)
2674       with the mapping from node devices to instance devices
2675
2676   """
2677   device_info = []
2678   disks_ok = True
2679   iname = instance.name
2680   # With the two passes mechanism we try to reduce the window of
2681   # opportunity for the race condition of switching DRBD to primary
2682   # before handshaking occured, but we do not eliminate it
2683
2684   # The proper fix would be to wait (with some limits) until the
2685   # connection has been made and drbd transitions from WFConnection
2686   # into any other network-connected state (Connected, SyncTarget,
2687   # SyncSource, etc.)
2688
2689   # 1st pass, assemble on all nodes in secondary mode
2690   for inst_disk in instance.disks:
2691     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2692       lu.cfg.SetDiskID(node_disk, node)
2693       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2694       msg = result.RemoteFailMsg()
2695       if msg:
2696         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2697                            " (is_primary=False, pass=1): %s",
2698                            inst_disk.iv_name, node, msg)
2699         if not ignore_secondaries:
2700           disks_ok = False
2701
2702   # FIXME: race condition on drbd migration to primary
2703
2704   # 2nd pass, do only the primary node
2705   for inst_disk in instance.disks:
2706     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2707       if node != instance.primary_node:
2708         continue
2709       lu.cfg.SetDiskID(node_disk, node)
2710       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2711       msg = result.RemoteFailMsg()
2712       if msg:
2713         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2714                            " (is_primary=True, pass=2): %s",
2715                            inst_disk.iv_name, node, msg)
2716         disks_ok = False
2717     device_info.append((instance.primary_node, inst_disk.iv_name,
2718                         result.payload))
2719
2720   # leave the disks configured for the primary node
2721   # this is a workaround that would be fixed better by
2722   # improving the logical/physical id handling
2723   for disk in instance.disks:
2724     lu.cfg.SetDiskID(disk, instance.primary_node)
2725
2726   return disks_ok, device_info
2727
2728
2729 def _StartInstanceDisks(lu, instance, force):
2730   """Start the disks of an instance.
2731
2732   """
2733   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2734                                            ignore_secondaries=force)
2735   if not disks_ok:
2736     _ShutdownInstanceDisks(lu, instance)
2737     if force is not None and not force:
2738       lu.proc.LogWarning("", hint="If the message above refers to a"
2739                          " secondary node,"
2740                          " you can retry the operation using '--force'.")
2741     raise errors.OpExecError("Disk consistency error")
2742
2743
2744 class LUDeactivateInstanceDisks(NoHooksLU):
2745   """Shutdown an instance's disks.
2746
2747   """
2748   _OP_REQP = ["instance_name"]
2749   REQ_BGL = False
2750
2751   def ExpandNames(self):
2752     self._ExpandAndLockInstance()
2753     self.needed_locks[locking.LEVEL_NODE] = []
2754     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2755
2756   def DeclareLocks(self, level):
2757     if level == locking.LEVEL_NODE:
2758       self._LockInstancesNodes()
2759
2760   def CheckPrereq(self):
2761     """Check prerequisites.
2762
2763     This checks that the instance is in the cluster.
2764
2765     """
2766     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2767     assert self.instance is not None, \
2768       "Cannot retrieve locked instance %s" % self.op.instance_name
2769
2770   def Exec(self, feedback_fn):
2771     """Deactivate the disks
2772
2773     """
2774     instance = self.instance
2775     _SafeShutdownInstanceDisks(self, instance)
2776
2777
2778 def _SafeShutdownInstanceDisks(lu, instance):
2779   """Shutdown block devices of an instance.
2780
2781   This function checks if an instance is running, before calling
2782   _ShutdownInstanceDisks.
2783
2784   """
2785   pnode = instance.primary_node
2786   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2787   ins_l = ins_l[pnode]
2788   msg = ins_l.RemoteFailMsg()
2789   if msg:
2790     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2791
2792   if instance.name in ins_l.payload:
2793     raise errors.OpExecError("Instance is running, can't shutdown"
2794                              " block devices.")
2795
2796   _ShutdownInstanceDisks(lu, instance)
2797
2798
2799 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2800   """Shutdown block devices of an instance.
2801
2802   This does the shutdown on all nodes of the instance.
2803
2804   If the ignore_primary is false, errors on the primary node are
2805   ignored.
2806
2807   """
2808   all_result = True
2809   for disk in instance.disks:
2810     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2811       lu.cfg.SetDiskID(top_disk, node)
2812       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2813       msg = result.RemoteFailMsg()
2814       if msg:
2815         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2816                       disk.iv_name, node, msg)
2817         if not ignore_primary or node != instance.primary_node:
2818           all_result = False
2819   return all_result
2820
2821
2822 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2823   """Checks if a node has enough free memory.
2824
2825   This function check if a given node has the needed amount of free
2826   memory. In case the node has less memory or we cannot get the
2827   information from the node, this function raise an OpPrereqError
2828   exception.
2829
2830   @type lu: C{LogicalUnit}
2831   @param lu: a logical unit from which we get configuration data
2832   @type node: C{str}
2833   @param node: the node to check
2834   @type reason: C{str}
2835   @param reason: string to use in the error message
2836   @type requested: C{int}
2837   @param requested: the amount of memory in MiB to check for
2838   @type hypervisor_name: C{str}
2839   @param hypervisor_name: the hypervisor to ask for memory stats
2840   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2841       we cannot check the node
2842
2843   """
2844   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2845   msg = nodeinfo[node].RemoteFailMsg()
2846   if msg:
2847     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2848   free_mem = nodeinfo[node].payload.get('memory_free', None)
2849   if not isinstance(free_mem, int):
2850     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2851                                " was '%s'" % (node, free_mem))
2852   if requested > free_mem:
2853     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2854                                " needed %s MiB, available %s MiB" %
2855                                (node, reason, requested, free_mem))
2856
2857
2858 class LUStartupInstance(LogicalUnit):
2859   """Starts an instance.
2860
2861   """
2862   HPATH = "instance-start"
2863   HTYPE = constants.HTYPE_INSTANCE
2864   _OP_REQP = ["instance_name", "force"]
2865   REQ_BGL = False
2866
2867   def ExpandNames(self):
2868     self._ExpandAndLockInstance()
2869
2870   def BuildHooksEnv(self):
2871     """Build hooks env.
2872
2873     This runs on master, primary and secondary nodes of the instance.
2874
2875     """
2876     env = {
2877       "FORCE": self.op.force,
2878       }
2879     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2880     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2881     return env, nl, nl
2882
2883   def CheckPrereq(self):
2884     """Check prerequisites.
2885
2886     This checks that the instance is in the cluster.
2887
2888     """
2889     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2890     assert self.instance is not None, \
2891       "Cannot retrieve locked instance %s" % self.op.instance_name
2892
2893     # extra beparams
2894     self.beparams = getattr(self.op, "beparams", {})
2895     if self.beparams:
2896       if not isinstance(self.beparams, dict):
2897         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2898                                    " dict" % (type(self.beparams), ))
2899       # fill the beparams dict
2900       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2901       self.op.beparams = self.beparams
2902
2903     # extra hvparams
2904     self.hvparams = getattr(self.op, "hvparams", {})
2905     if self.hvparams:
2906       if not isinstance(self.hvparams, dict):
2907         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2908                                    " dict" % (type(self.hvparams), ))
2909
2910       # check hypervisor parameter syntax (locally)
2911       cluster = self.cfg.GetClusterInfo()
2912       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2913       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2914                                     instance.hvparams)
2915       filled_hvp.update(self.hvparams)
2916       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2917       hv_type.CheckParameterSyntax(filled_hvp)
2918       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2919       self.op.hvparams = self.hvparams
2920
2921     _CheckNodeOnline(self, instance.primary_node)
2922
2923     bep = self.cfg.GetClusterInfo().FillBE(instance)
2924     # check bridges existance
2925     _CheckInstanceBridgesExist(self, instance)
2926
2927     remote_info = self.rpc.call_instance_info(instance.primary_node,
2928                                               instance.name,
2929                                               instance.hypervisor)
2930     msg = remote_info.RemoteFailMsg()
2931     if msg:
2932       raise errors.OpPrereqError("Error checking node %s: %s" %
2933                                  (instance.primary_node, msg))
2934     if not remote_info.payload: # not running already
2935       _CheckNodeFreeMemory(self, instance.primary_node,
2936                            "starting instance %s" % instance.name,
2937                            bep[constants.BE_MEMORY], instance.hypervisor)
2938
2939   def Exec(self, feedback_fn):
2940     """Start the instance.
2941
2942     """
2943     instance = self.instance
2944     force = self.op.force
2945
2946     self.cfg.MarkInstanceUp(instance.name)
2947
2948     node_current = instance.primary_node
2949
2950     _StartInstanceDisks(self, instance, force)
2951
2952     result = self.rpc.call_instance_start(node_current, instance,
2953                                           self.hvparams, self.beparams)
2954     msg = result.RemoteFailMsg()
2955     if msg:
2956       _ShutdownInstanceDisks(self, instance)
2957       raise errors.OpExecError("Could not start instance: %s" % msg)
2958
2959
2960 class LURebootInstance(LogicalUnit):
2961   """Reboot an instance.
2962
2963   """
2964   HPATH = "instance-reboot"
2965   HTYPE = constants.HTYPE_INSTANCE
2966   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2967   REQ_BGL = False
2968
2969   def ExpandNames(self):
2970     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2971                                    constants.INSTANCE_REBOOT_HARD,
2972                                    constants.INSTANCE_REBOOT_FULL]:
2973       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2974                                   (constants.INSTANCE_REBOOT_SOFT,
2975                                    constants.INSTANCE_REBOOT_HARD,
2976                                    constants.INSTANCE_REBOOT_FULL))
2977     self._ExpandAndLockInstance()
2978
2979   def BuildHooksEnv(self):
2980     """Build hooks env.
2981
2982     This runs on master, primary and secondary nodes of the instance.
2983
2984     """
2985     env = {
2986       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2987       "REBOOT_TYPE": self.op.reboot_type,
2988       }
2989     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2990     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2991     return env, nl, nl
2992
2993   def CheckPrereq(self):
2994     """Check prerequisites.
2995
2996     This checks that the instance is in the cluster.
2997
2998     """
2999     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3000     assert self.instance is not None, \
3001       "Cannot retrieve locked instance %s" % self.op.instance_name
3002
3003     _CheckNodeOnline(self, instance.primary_node)
3004
3005     # check bridges existance
3006     _CheckInstanceBridgesExist(self, instance)
3007
3008   def Exec(self, feedback_fn):
3009     """Reboot the instance.
3010
3011     """
3012     instance = self.instance
3013     ignore_secondaries = self.op.ignore_secondaries
3014     reboot_type = self.op.reboot_type
3015
3016     node_current = instance.primary_node
3017
3018     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3019                        constants.INSTANCE_REBOOT_HARD]:
3020       for disk in instance.disks:
3021         self.cfg.SetDiskID(disk, node_current)
3022       result = self.rpc.call_instance_reboot(node_current, instance,
3023                                              reboot_type)
3024       msg = result.RemoteFailMsg()
3025       if msg:
3026         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3027     else:
3028       result = self.rpc.call_instance_shutdown(node_current, instance)
3029       msg = result.RemoteFailMsg()
3030       if msg:
3031         raise errors.OpExecError("Could not shutdown instance for"
3032                                  " full reboot: %s" % msg)
3033       _ShutdownInstanceDisks(self, instance)
3034       _StartInstanceDisks(self, instance, ignore_secondaries)
3035       result = self.rpc.call_instance_start(node_current, instance, None, None)
3036       msg = result.RemoteFailMsg()
3037       if msg:
3038         _ShutdownInstanceDisks(self, instance)
3039         raise errors.OpExecError("Could not start instance for"
3040                                  " full reboot: %s" % msg)
3041
3042     self.cfg.MarkInstanceUp(instance.name)
3043
3044
3045 class LUShutdownInstance(LogicalUnit):
3046   """Shutdown an instance.
3047
3048   """
3049   HPATH = "instance-stop"
3050   HTYPE = constants.HTYPE_INSTANCE
3051   _OP_REQP = ["instance_name"]
3052   REQ_BGL = False
3053
3054   def ExpandNames(self):
3055     self._ExpandAndLockInstance()
3056
3057   def BuildHooksEnv(self):
3058     """Build hooks env.
3059
3060     This runs on master, primary and secondary nodes of the instance.
3061
3062     """
3063     env = _BuildInstanceHookEnvByObject(self, self.instance)
3064     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3065     return env, nl, nl
3066
3067   def CheckPrereq(self):
3068     """Check prerequisites.
3069
3070     This checks that the instance is in the cluster.
3071
3072     """
3073     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3074     assert self.instance is not None, \
3075       "Cannot retrieve locked instance %s" % self.op.instance_name
3076     _CheckNodeOnline(self, self.instance.primary_node)
3077
3078   def Exec(self, feedback_fn):
3079     """Shutdown the instance.
3080
3081     """
3082     instance = self.instance
3083     node_current = instance.primary_node
3084     self.cfg.MarkInstanceDown(instance.name)
3085     result = self.rpc.call_instance_shutdown(node_current, instance)
3086     msg = result.RemoteFailMsg()
3087     if msg:
3088       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3089
3090     _ShutdownInstanceDisks(self, instance)
3091
3092
3093 class LUReinstallInstance(LogicalUnit):
3094   """Reinstall an instance.
3095
3096   """
3097   HPATH = "instance-reinstall"
3098   HTYPE = constants.HTYPE_INSTANCE
3099   _OP_REQP = ["instance_name"]
3100   REQ_BGL = False
3101
3102   def ExpandNames(self):
3103     self._ExpandAndLockInstance()
3104
3105   def BuildHooksEnv(self):
3106     """Build hooks env.
3107
3108     This runs on master, primary and secondary nodes of the instance.
3109
3110     """
3111     env = _BuildInstanceHookEnvByObject(self, self.instance)
3112     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3113     return env, nl, nl
3114
3115   def CheckPrereq(self):
3116     """Check prerequisites.
3117
3118     This checks that the instance is in the cluster and is not running.
3119
3120     """
3121     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3122     assert instance is not None, \
3123       "Cannot retrieve locked instance %s" % self.op.instance_name
3124     _CheckNodeOnline(self, instance.primary_node)
3125
3126     if instance.disk_template == constants.DT_DISKLESS:
3127       raise errors.OpPrereqError("Instance '%s' has no disks" %
3128                                  self.op.instance_name)
3129     if instance.admin_up:
3130       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3131                                  self.op.instance_name)
3132     remote_info = self.rpc.call_instance_info(instance.primary_node,
3133                                               instance.name,
3134                                               instance.hypervisor)
3135     msg = remote_info.RemoteFailMsg()
3136     if msg:
3137       raise errors.OpPrereqError("Error checking node %s: %s" %
3138                                  (instance.primary_node, msg))
3139     if remote_info.payload:
3140       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3141                                  (self.op.instance_name,
3142                                   instance.primary_node))
3143
3144     self.op.os_type = getattr(self.op, "os_type", None)
3145     if self.op.os_type is not None:
3146       # OS verification
3147       pnode = self.cfg.GetNodeInfo(
3148         self.cfg.ExpandNodeName(instance.primary_node))
3149       if pnode is None:
3150         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3151                                    self.op.pnode)
3152       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3153       result.Raise()
3154       if not isinstance(result.data, objects.OS):
3155         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3156                                    " primary node"  % self.op.os_type)
3157
3158     self.instance = instance
3159
3160   def Exec(self, feedback_fn):
3161     """Reinstall the instance.
3162
3163     """
3164     inst = self.instance
3165
3166     if self.op.os_type is not None:
3167       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3168       inst.os = self.op.os_type
3169       self.cfg.Update(inst)
3170
3171     _StartInstanceDisks(self, inst, None)
3172     try:
3173       feedback_fn("Running the instance OS create scripts...")
3174       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3175       msg = result.RemoteFailMsg()
3176       if msg:
3177         raise errors.OpExecError("Could not install OS for instance %s"
3178                                  " on node %s: %s" %
3179                                  (inst.name, inst.primary_node, msg))
3180     finally:
3181       _ShutdownInstanceDisks(self, inst)
3182
3183
3184 class LURenameInstance(LogicalUnit):
3185   """Rename an instance.
3186
3187   """
3188   HPATH = "instance-rename"
3189   HTYPE = constants.HTYPE_INSTANCE
3190   _OP_REQP = ["instance_name", "new_name"]
3191
3192   def BuildHooksEnv(self):
3193     """Build hooks env.
3194
3195     This runs on master, primary and secondary nodes of the instance.
3196
3197     """
3198     env = _BuildInstanceHookEnvByObject(self, self.instance)
3199     env["INSTANCE_NEW_NAME"] = self.op.new_name
3200     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3201     return env, nl, nl
3202
3203   def CheckPrereq(self):
3204     """Check prerequisites.
3205
3206     This checks that the instance is in the cluster and is not running.
3207
3208     """
3209     instance = self.cfg.GetInstanceInfo(
3210       self.cfg.ExpandInstanceName(self.op.instance_name))
3211     if instance is None:
3212       raise errors.OpPrereqError("Instance '%s' not known" %
3213                                  self.op.instance_name)
3214     _CheckNodeOnline(self, instance.primary_node)
3215
3216     if instance.admin_up:
3217       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3218                                  self.op.instance_name)
3219     remote_info = self.rpc.call_instance_info(instance.primary_node,
3220                                               instance.name,
3221                                               instance.hypervisor)
3222     msg = remote_info.RemoteFailMsg()
3223     if msg:
3224       raise errors.OpPrereqError("Error checking node %s: %s" %
3225                                  (instance.primary_node, msg))
3226     if remote_info.payload:
3227       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3228                                  (self.op.instance_name,
3229                                   instance.primary_node))
3230     self.instance = instance
3231
3232     # new name verification
3233     name_info = utils.HostInfo(self.op.new_name)
3234
3235     self.op.new_name = new_name = name_info.name
3236     instance_list = self.cfg.GetInstanceList()
3237     if new_name in instance_list:
3238       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3239                                  new_name)
3240
3241     if not getattr(self.op, "ignore_ip", False):
3242       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3243         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3244                                    (name_info.ip, new_name))
3245
3246
3247   def Exec(self, feedback_fn):
3248     """Reinstall the instance.
3249
3250     """
3251     inst = self.instance
3252     old_name = inst.name
3253
3254     if inst.disk_template == constants.DT_FILE:
3255       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3256
3257     self.cfg.RenameInstance(inst.name, self.op.new_name)
3258     # Change the instance lock. This is definitely safe while we hold the BGL
3259     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3260     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3261
3262     # re-read the instance from the configuration after rename
3263     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3264
3265     if inst.disk_template == constants.DT_FILE:
3266       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3267       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3268                                                      old_file_storage_dir,
3269                                                      new_file_storage_dir)
3270       result.Raise()
3271       if not result.data:
3272         raise errors.OpExecError("Could not connect to node '%s' to rename"
3273                                  " directory '%s' to '%s' (but the instance"
3274                                  " has been renamed in Ganeti)" % (
3275                                  inst.primary_node, old_file_storage_dir,
3276                                  new_file_storage_dir))
3277
3278       if not result.data[0]:
3279         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3280                                  " (but the instance has been renamed in"
3281                                  " Ganeti)" % (old_file_storage_dir,
3282                                                new_file_storage_dir))
3283
3284     _StartInstanceDisks(self, inst, None)
3285     try:
3286       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3287                                                  old_name)
3288       msg = result.RemoteFailMsg()
3289       if msg:
3290         msg = ("Could not run OS rename script for instance %s on node %s"
3291                " (but the instance has been renamed in Ganeti): %s" %
3292                (inst.name, inst.primary_node, msg))
3293         self.proc.LogWarning(msg)
3294     finally:
3295       _ShutdownInstanceDisks(self, inst)
3296
3297
3298 class LURemoveInstance(LogicalUnit):
3299   """Remove an instance.
3300
3301   """
3302   HPATH = "instance-remove"
3303   HTYPE = constants.HTYPE_INSTANCE
3304   _OP_REQP = ["instance_name", "ignore_failures"]
3305   REQ_BGL = False
3306
3307   def ExpandNames(self):
3308     self._ExpandAndLockInstance()
3309     self.needed_locks[locking.LEVEL_NODE] = []
3310     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3311
3312   def DeclareLocks(self, level):
3313     if level == locking.LEVEL_NODE:
3314       self._LockInstancesNodes()
3315
3316   def BuildHooksEnv(self):
3317     """Build hooks env.
3318
3319     This runs on master, primary and secondary nodes of the instance.
3320
3321     """
3322     env = _BuildInstanceHookEnvByObject(self, self.instance)
3323     nl = [self.cfg.GetMasterNode()]
3324     return env, nl, nl
3325
3326   def CheckPrereq(self):
3327     """Check prerequisites.
3328
3329     This checks that the instance is in the cluster.
3330
3331     """
3332     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3333     assert self.instance is not None, \
3334       "Cannot retrieve locked instance %s" % self.op.instance_name
3335
3336   def Exec(self, feedback_fn):
3337     """Remove the instance.
3338
3339     """
3340     instance = self.instance
3341     logging.info("Shutting down instance %s on node %s",
3342                  instance.name, instance.primary_node)
3343
3344     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3345     msg = result.RemoteFailMsg()
3346     if msg:
3347       if self.op.ignore_failures:
3348         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3349       else:
3350         raise errors.OpExecError("Could not shutdown instance %s on"
3351                                  " node %s: %s" %
3352                                  (instance.name, instance.primary_node, msg))
3353
3354     logging.info("Removing block devices for instance %s", instance.name)
3355
3356     if not _RemoveDisks(self, instance):
3357       if self.op.ignore_failures:
3358         feedback_fn("Warning: can't remove instance's disks")
3359       else:
3360         raise errors.OpExecError("Can't remove instance's disks")
3361
3362     logging.info("Removing instance %s out of cluster config", instance.name)
3363
3364     self.cfg.RemoveInstance(instance.name)
3365     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3366
3367
3368 class LUQueryInstances(NoHooksLU):
3369   """Logical unit for querying instances.
3370
3371   """
3372   _OP_REQP = ["output_fields", "names", "use_locking"]
3373   REQ_BGL = False
3374   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3375                                     "admin_state",
3376                                     "disk_template", "ip", "mac", "bridge",
3377                                     "sda_size", "sdb_size", "vcpus", "tags",
3378                                     "network_port", "beparams",
3379                                     r"(disk)\.(size)/([0-9]+)",
3380                                     r"(disk)\.(sizes)", "disk_usage",
3381                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3382                                     r"(nic)\.(macs|ips|bridges)",
3383                                     r"(disk|nic)\.(count)",
3384                                     "serial_no", "hypervisor", "hvparams",] +
3385                                   ["hv/%s" % name
3386                                    for name in constants.HVS_PARAMETERS] +
3387                                   ["be/%s" % name
3388                                    for name in constants.BES_PARAMETERS])
3389   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3390
3391
3392   def ExpandNames(self):
3393     _CheckOutputFields(static=self._FIELDS_STATIC,
3394                        dynamic=self._FIELDS_DYNAMIC,
3395                        selected=self.op.output_fields)
3396
3397     self.needed_locks = {}
3398     self.share_locks[locking.LEVEL_INSTANCE] = 1
3399     self.share_locks[locking.LEVEL_NODE] = 1
3400
3401     if self.op.names:
3402       self.wanted = _GetWantedInstances(self, self.op.names)
3403     else:
3404       self.wanted = locking.ALL_SET
3405
3406     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3407     self.do_locking = self.do_node_query and self.op.use_locking
3408     if self.do_locking:
3409       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3410       self.needed_locks[locking.LEVEL_NODE] = []
3411       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3412
3413   def DeclareLocks(self, level):
3414     if level == locking.LEVEL_NODE and self.do_locking:
3415       self._LockInstancesNodes()
3416
3417   def CheckPrereq(self):
3418     """Check prerequisites.
3419
3420     """
3421     pass
3422
3423   def Exec(self, feedback_fn):
3424     """Computes the list of nodes and their attributes.
3425
3426     """
3427     all_info = self.cfg.GetAllInstancesInfo()
3428     if self.wanted == locking.ALL_SET:
3429       # caller didn't specify instance names, so ordering is not important
3430       if self.do_locking:
3431         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3432       else:
3433         instance_names = all_info.keys()
3434       instance_names = utils.NiceSort(instance_names)
3435     else:
3436       # caller did specify names, so we must keep the ordering
3437       if self.do_locking:
3438         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3439       else:
3440         tgt_set = all_info.keys()
3441       missing = set(self.wanted).difference(tgt_set)
3442       if missing:
3443         raise errors.OpExecError("Some instances were removed before"
3444                                  " retrieving their data: %s" % missing)
3445       instance_names = self.wanted
3446
3447     instance_list = [all_info[iname] for iname in instance_names]
3448
3449     # begin data gathering
3450
3451     nodes = frozenset([inst.primary_node for inst in instance_list])
3452     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3453
3454     bad_nodes = []
3455     off_nodes = []
3456     if self.do_node_query:
3457       live_data = {}
3458       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3459       for name in nodes:
3460         result = node_data[name]
3461         if result.offline:
3462           # offline nodes will be in both lists
3463           off_nodes.append(name)
3464         if result.failed or result.RemoteFailMsg():
3465           bad_nodes.append(name)
3466         else:
3467           if result.payload:
3468             live_data.update(result.payload)
3469           # else no instance is alive
3470     else:
3471       live_data = dict([(name, {}) for name in instance_names])
3472
3473     # end data gathering
3474
3475     HVPREFIX = "hv/"
3476     BEPREFIX = "be/"
3477     output = []
3478     for instance in instance_list:
3479       iout = []
3480       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3481       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3482       for field in self.op.output_fields:
3483         st_match = self._FIELDS_STATIC.Matches(field)
3484         if field == "name":
3485           val = instance.name
3486         elif field == "os":
3487           val = instance.os
3488         elif field == "pnode":
3489           val = instance.primary_node
3490         elif field == "snodes":
3491           val = list(instance.secondary_nodes)
3492         elif field == "admin_state":
3493           val = instance.admin_up
3494         elif field == "oper_state":
3495           if instance.primary_node in bad_nodes:
3496             val = None
3497           else:
3498             val = bool(live_data.get(instance.name))
3499         elif field == "status":
3500           if instance.primary_node in off_nodes:
3501             val = "ERROR_nodeoffline"
3502           elif instance.primary_node in bad_nodes:
3503             val = "ERROR_nodedown"
3504           else:
3505             running = bool(live_data.get(instance.name))
3506             if running:
3507               if instance.admin_up:
3508                 val = "running"
3509               else:
3510                 val = "ERROR_up"
3511             else:
3512               if instance.admin_up:
3513                 val = "ERROR_down"
3514               else:
3515                 val = "ADMIN_down"
3516         elif field == "oper_ram":
3517           if instance.primary_node in bad_nodes:
3518             val = None
3519           elif instance.name in live_data:
3520             val = live_data[instance.name].get("memory", "?")
3521           else:
3522             val = "-"
3523         elif field == "disk_template":
3524           val = instance.disk_template
3525         elif field == "ip":
3526           val = instance.nics[0].ip
3527         elif field == "bridge":
3528           val = instance.nics[0].bridge
3529         elif field == "mac":
3530           val = instance.nics[0].mac
3531         elif field == "sda_size" or field == "sdb_size":
3532           idx = ord(field[2]) - ord('a')
3533           try:
3534             val = instance.FindDisk(idx).size
3535           except errors.OpPrereqError:
3536             val = None
3537         elif field == "disk_usage": # total disk usage per node
3538           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3539           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3540         elif field == "tags":
3541           val = list(instance.GetTags())
3542         elif field == "serial_no":
3543           val = instance.serial_no
3544         elif field == "network_port":
3545           val = instance.network_port
3546         elif field == "hypervisor":
3547           val = instance.hypervisor
3548         elif field == "hvparams":
3549           val = i_hv
3550         elif (field.startswith(HVPREFIX) and
3551               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3552           val = i_hv.get(field[len(HVPREFIX):], None)
3553         elif field == "beparams":
3554           val = i_be
3555         elif (field.startswith(BEPREFIX) and
3556               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3557           val = i_be.get(field[len(BEPREFIX):], None)
3558         elif st_match and st_match.groups():
3559           # matches a variable list
3560           st_groups = st_match.groups()
3561           if st_groups and st_groups[0] == "disk":
3562             if st_groups[1] == "count":
3563               val = len(instance.disks)
3564             elif st_groups[1] == "sizes":
3565               val = [disk.size for disk in instance.disks]
3566             elif st_groups[1] == "size":
3567               try:
3568                 val = instance.FindDisk(st_groups[2]).size
3569               except errors.OpPrereqError:
3570                 val = None
3571             else:
3572               assert False, "Unhandled disk parameter"
3573           elif st_groups[0] == "nic":
3574             if st_groups[1] == "count":
3575               val = len(instance.nics)
3576             elif st_groups[1] == "macs":
3577               val = [nic.mac for nic in instance.nics]
3578             elif st_groups[1] == "ips":
3579               val = [nic.ip for nic in instance.nics]
3580             elif st_groups[1] == "bridges":
3581               val = [nic.bridge for nic in instance.nics]
3582             else:
3583               # index-based item
3584               nic_idx = int(st_groups[2])
3585               if nic_idx >= len(instance.nics):
3586                 val = None
3587               else:
3588                 if st_groups[1] == "mac":
3589                   val = instance.nics[nic_idx].mac
3590                 elif st_groups[1] == "ip":
3591                   val = instance.nics[nic_idx].ip
3592                 elif st_groups[1] == "bridge":
3593                   val = instance.nics[nic_idx].bridge
3594                 else:
3595                   assert False, "Unhandled NIC parameter"
3596           else:
3597             assert False, "Unhandled variable parameter"
3598         else:
3599           raise errors.ParameterError(field)
3600         iout.append(val)
3601       output.append(iout)
3602
3603     return output
3604
3605
3606 class LUFailoverInstance(LogicalUnit):
3607   """Failover an instance.
3608
3609   """
3610   HPATH = "instance-failover"
3611   HTYPE = constants.HTYPE_INSTANCE
3612   _OP_REQP = ["instance_name", "ignore_consistency"]
3613   REQ_BGL = False
3614
3615   def ExpandNames(self):
3616     self._ExpandAndLockInstance()
3617     self.needed_locks[locking.LEVEL_NODE] = []
3618     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3619
3620   def DeclareLocks(self, level):
3621     if level == locking.LEVEL_NODE:
3622       self._LockInstancesNodes()
3623
3624   def BuildHooksEnv(self):
3625     """Build hooks env.
3626
3627     This runs on master, primary and secondary nodes of the instance.
3628
3629     """
3630     env = {
3631       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3632       }
3633     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3634     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3635     return env, nl, nl
3636
3637   def CheckPrereq(self):
3638     """Check prerequisites.
3639
3640     This checks that the instance is in the cluster.
3641
3642     """
3643     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3644     assert self.instance is not None, \
3645       "Cannot retrieve locked instance %s" % self.op.instance_name
3646
3647     bep = self.cfg.GetClusterInfo().FillBE(instance)
3648     if instance.disk_template not in constants.DTS_NET_MIRROR:
3649       raise errors.OpPrereqError("Instance's disk layout is not"
3650                                  " network mirrored, cannot failover.")
3651
3652     secondary_nodes = instance.secondary_nodes
3653     if not secondary_nodes:
3654       raise errors.ProgrammerError("no secondary node but using "
3655                                    "a mirrored disk template")
3656
3657     target_node = secondary_nodes[0]
3658     _CheckNodeOnline(self, target_node)
3659     _CheckNodeNotDrained(self, target_node)
3660     # check memory requirements on the secondary node
3661     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3662                          instance.name, bep[constants.BE_MEMORY],
3663                          instance.hypervisor)
3664     # check bridge existance
3665     _CheckInstanceBridgesExist(self, instance, node=target_node)
3666
3667   def Exec(self, feedback_fn):
3668     """Failover an instance.
3669
3670     The failover is done by shutting it down on its present node and
3671     starting it on the secondary.
3672
3673     """
3674     instance = self.instance
3675
3676     source_node = instance.primary_node
3677     target_node = instance.secondary_nodes[0]
3678
3679     feedback_fn("* checking disk consistency between source and target")
3680     for dev in instance.disks:
3681       # for drbd, these are drbd over lvm
3682       if not _CheckDiskConsistency(self, dev, target_node, False):
3683         if instance.admin_up and not self.op.ignore_consistency:
3684           raise errors.OpExecError("Disk %s is degraded on target node,"
3685                                    " aborting failover." % dev.iv_name)
3686
3687     feedback_fn("* shutting down instance on source node")
3688     logging.info("Shutting down instance %s on node %s",
3689                  instance.name, source_node)
3690
3691     result = self.rpc.call_instance_shutdown(source_node, instance)
3692     msg = result.RemoteFailMsg()
3693     if msg:
3694       if self.op.ignore_consistency:
3695         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3696                              " Proceeding anyway. Please make sure node"
3697                              " %s is down. Error details: %s",
3698                              instance.name, source_node, source_node, msg)
3699       else:
3700         raise errors.OpExecError("Could not shutdown instance %s on"
3701                                  " node %s: %s" %
3702                                  (instance.name, source_node, msg))
3703
3704     feedback_fn("* deactivating the instance's disks on source node")
3705     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3706       raise errors.OpExecError("Can't shut down the instance's disks.")
3707
3708     instance.primary_node = target_node
3709     # distribute new instance config to the other nodes
3710     self.cfg.Update(instance)
3711
3712     # Only start the instance if it's marked as up
3713     if instance.admin_up:
3714       feedback_fn("* activating the instance's disks on target node")
3715       logging.info("Starting instance %s on node %s",
3716                    instance.name, target_node)
3717
3718       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3719                                                ignore_secondaries=True)
3720       if not disks_ok:
3721         _ShutdownInstanceDisks(self, instance)
3722         raise errors.OpExecError("Can't activate the instance's disks")
3723
3724       feedback_fn("* starting the instance on the target node")
3725       result = self.rpc.call_instance_start(target_node, instance, None, None)
3726       msg = result.RemoteFailMsg()
3727       if msg:
3728         _ShutdownInstanceDisks(self, instance)
3729         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3730                                  (instance.name, target_node, msg))
3731
3732
3733 class LUMigrateInstance(LogicalUnit):
3734   """Migrate an instance.
3735
3736   This is migration without shutting down, compared to the failover,
3737   which is done with shutdown.
3738
3739   """
3740   HPATH = "instance-migrate"
3741   HTYPE = constants.HTYPE_INSTANCE
3742   _OP_REQP = ["instance_name", "live", "cleanup"]
3743
3744   REQ_BGL = False
3745
3746   def ExpandNames(self):
3747     self._ExpandAndLockInstance()
3748     self.needed_locks[locking.LEVEL_NODE] = []
3749     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3750
3751   def DeclareLocks(self, level):
3752     if level == locking.LEVEL_NODE:
3753       self._LockInstancesNodes()
3754
3755   def BuildHooksEnv(self):
3756     """Build hooks env.
3757
3758     This runs on master, primary and secondary nodes of the instance.
3759
3760     """
3761     env = _BuildInstanceHookEnvByObject(self, self.instance)
3762     env["MIGRATE_LIVE"] = self.op.live
3763     env["MIGRATE_CLEANUP"] = self.op.cleanup
3764     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3765     return env, nl, nl
3766
3767   def CheckPrereq(self):
3768     """Check prerequisites.
3769
3770     This checks that the instance is in the cluster.
3771
3772     """
3773     instance = self.cfg.GetInstanceInfo(
3774       self.cfg.ExpandInstanceName(self.op.instance_name))
3775     if instance is None:
3776       raise errors.OpPrereqError("Instance '%s' not known" %
3777                                  self.op.instance_name)
3778
3779     if instance.disk_template != constants.DT_DRBD8:
3780       raise errors.OpPrereqError("Instance's disk layout is not"
3781                                  " drbd8, cannot migrate.")
3782
3783     secondary_nodes = instance.secondary_nodes
3784     if not secondary_nodes:
3785       raise errors.ConfigurationError("No secondary node but using"
3786                                       " drbd8 disk template")
3787
3788     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3789
3790     target_node = secondary_nodes[0]
3791     # check memory requirements on the secondary node
3792     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3793                          instance.name, i_be[constants.BE_MEMORY],
3794                          instance.hypervisor)
3795
3796     # check bridge existance
3797     _CheckInstanceBridgesExist(self, instance, node=target_node)
3798
3799     if not self.op.cleanup:
3800       _CheckNodeNotDrained(self, target_node)
3801       result = self.rpc.call_instance_migratable(instance.primary_node,
3802                                                  instance)
3803       msg = result.RemoteFailMsg()
3804       if msg:
3805         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3806                                    msg)
3807
3808     self.instance = instance
3809
3810   def _WaitUntilSync(self):
3811     """Poll with custom rpc for disk sync.
3812
3813     This uses our own step-based rpc call.
3814
3815     """
3816     self.feedback_fn("* wait until resync is done")
3817     all_done = False
3818     while not all_done:
3819       all_done = True
3820       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3821                                             self.nodes_ip,
3822                                             self.instance.disks)
3823       min_percent = 100
3824       for node, nres in result.items():
3825         msg = nres.RemoteFailMsg()
3826         if msg:
3827           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3828                                    (node, msg))
3829         node_done, node_percent = nres.payload
3830         all_done = all_done and node_done
3831         if node_percent is not None:
3832           min_percent = min(min_percent, node_percent)
3833       if not all_done:
3834         if min_percent < 100:
3835           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3836         time.sleep(2)
3837
3838   def _EnsureSecondary(self, node):
3839     """Demote a node to secondary.
3840
3841     """
3842     self.feedback_fn("* switching node %s to secondary mode" % node)
3843
3844     for dev in self.instance.disks:
3845       self.cfg.SetDiskID(dev, node)
3846
3847     result = self.rpc.call_blockdev_close(node, self.instance.name,
3848                                           self.instance.disks)
3849     msg = result.RemoteFailMsg()
3850     if msg:
3851       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3852                                " error %s" % (node, msg))
3853
3854   def _GoStandalone(self):
3855     """Disconnect from the network.
3856
3857     """
3858     self.feedback_fn("* changing into standalone mode")
3859     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3860                                                self.instance.disks)
3861     for node, nres in result.items():
3862       msg = nres.RemoteFailMsg()
3863       if msg:
3864         raise errors.OpExecError("Cannot disconnect disks node %s,"
3865                                  " error %s" % (node, msg))
3866
3867   def _GoReconnect(self, multimaster):
3868     """Reconnect to the network.
3869
3870     """
3871     if multimaster:
3872       msg = "dual-master"
3873     else:
3874       msg = "single-master"
3875     self.feedback_fn("* changing disks into %s mode" % msg)
3876     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3877                                            self.instance.disks,
3878                                            self.instance.name, multimaster)
3879     for node, nres in result.items():
3880       msg = nres.RemoteFailMsg()
3881       if msg:
3882         raise errors.OpExecError("Cannot change disks config on node %s,"
3883                                  " error: %s" % (node, msg))
3884
3885   def _ExecCleanup(self):
3886     """Try to cleanup after a failed migration.
3887
3888     The cleanup is done by:
3889       - check that the instance is running only on one node
3890         (and update the config if needed)
3891       - change disks on its secondary node to secondary
3892       - wait until disks are fully synchronized
3893       - disconnect from the network
3894       - change disks into single-master mode
3895       - wait again until disks are fully synchronized
3896
3897     """
3898     instance = self.instance
3899     target_node = self.target_node
3900     source_node = self.source_node
3901
3902     # check running on only one node
3903     self.feedback_fn("* checking where the instance actually runs"
3904                      " (if this hangs, the hypervisor might be in"
3905                      " a bad state)")
3906     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3907     for node, result in ins_l.items():
3908       msg = result.RemoteFailMsg()
3909       if msg:
3910         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3911
3912     runningon_source = instance.name in ins_l[source_node].payload
3913     runningon_target = instance.name in ins_l[target_node].payload
3914
3915     if runningon_source and runningon_target:
3916       raise errors.OpExecError("Instance seems to be running on two nodes,"
3917                                " or the hypervisor is confused. You will have"
3918                                " to ensure manually that it runs only on one"
3919                                " and restart this operation.")
3920
3921     if not (runningon_source or runningon_target):
3922       raise errors.OpExecError("Instance does not seem to be running at all."
3923                                " In this case, it's safer to repair by"
3924                                " running 'gnt-instance stop' to ensure disk"
3925                                " shutdown, and then restarting it.")
3926
3927     if runningon_target:
3928       # the migration has actually succeeded, we need to update the config
3929       self.feedback_fn("* instance running on secondary node (%s),"
3930                        " updating config" % target_node)
3931       instance.primary_node = target_node
3932       self.cfg.Update(instance)
3933       demoted_node = source_node
3934     else:
3935       self.feedback_fn("* instance confirmed to be running on its"
3936                        " primary node (%s)" % source_node)
3937       demoted_node = target_node
3938
3939     self._EnsureSecondary(demoted_node)
3940     try:
3941       self._WaitUntilSync()
3942     except errors.OpExecError:
3943       # we ignore here errors, since if the device is standalone, it
3944       # won't be able to sync
3945       pass
3946     self._GoStandalone()
3947     self._GoReconnect(False)
3948     self._WaitUntilSync()
3949
3950     self.feedback_fn("* done")
3951
3952   def _RevertDiskStatus(self):
3953     """Try to revert the disk status after a failed migration.
3954
3955     """
3956     target_node = self.target_node
3957     try:
3958       self._EnsureSecondary(target_node)
3959       self._GoStandalone()
3960       self._GoReconnect(False)
3961       self._WaitUntilSync()
3962     except errors.OpExecError, err:
3963       self.LogWarning("Migration failed and I can't reconnect the"
3964                       " drives: error '%s'\n"
3965                       "Please look and recover the instance status" %
3966                       str(err))
3967
3968   def _AbortMigration(self):
3969     """Call the hypervisor code to abort a started migration.
3970
3971     """
3972     instance = self.instance
3973     target_node = self.target_node
3974     migration_info = self.migration_info
3975
3976     abort_result = self.rpc.call_finalize_migration(target_node,
3977                                                     instance,
3978                                                     migration_info,
3979                                                     False)
3980     abort_msg = abort_result.RemoteFailMsg()
3981     if abort_msg:
3982       logging.error("Aborting migration failed on target node %s: %s" %
3983                     (target_node, abort_msg))
3984       # Don't raise an exception here, as we stil have to try to revert the
3985       # disk status, even if this step failed.
3986
3987   def _ExecMigration(self):
3988     """Migrate an instance.
3989
3990     The migrate is done by:
3991       - change the disks into dual-master mode
3992       - wait until disks are fully synchronized again
3993       - migrate the instance
3994       - change disks on the new secondary node (the old primary) to secondary
3995       - wait until disks are fully synchronized
3996       - change disks into single-master mode
3997
3998     """
3999     instance = self.instance
4000     target_node = self.target_node
4001     source_node = self.source_node
4002
4003     self.feedback_fn("* checking disk consistency between source and target")
4004     for dev in instance.disks:
4005       if not _CheckDiskConsistency(self, dev, target_node, False):
4006         raise errors.OpExecError("Disk %s is degraded or not fully"
4007                                  " synchronized on target node,"
4008                                  " aborting migrate." % dev.iv_name)
4009
4010     # First get the migration information from the remote node
4011     result = self.rpc.call_migration_info(source_node, instance)
4012     msg = result.RemoteFailMsg()
4013     if msg:
4014       log_err = ("Failed fetching source migration information from %s: %s" %
4015                  (source_node, msg))
4016       logging.error(log_err)
4017       raise errors.OpExecError(log_err)
4018
4019     self.migration_info = migration_info = result.payload
4020
4021     # Then switch the disks to master/master mode
4022     self._EnsureSecondary(target_node)
4023     self._GoStandalone()
4024     self._GoReconnect(True)
4025     self._WaitUntilSync()
4026
4027     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4028     result = self.rpc.call_accept_instance(target_node,
4029                                            instance,
4030                                            migration_info,
4031                                            self.nodes_ip[target_node])
4032
4033     msg = result.RemoteFailMsg()
4034     if msg:
4035       logging.error("Instance pre-migration failed, trying to revert"
4036                     " disk status: %s", msg)
4037       self._AbortMigration()
4038       self._RevertDiskStatus()
4039       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4040                                (instance.name, msg))
4041
4042     self.feedback_fn("* migrating instance to %s" % target_node)
4043     time.sleep(10)
4044     result = self.rpc.call_instance_migrate(source_node, instance,
4045                                             self.nodes_ip[target_node],
4046                                             self.op.live)
4047     msg = result.RemoteFailMsg()
4048     if msg:
4049       logging.error("Instance migration failed, trying to revert"
4050                     " disk status: %s", msg)
4051       self._AbortMigration()
4052       self._RevertDiskStatus()
4053       raise errors.OpExecError("Could not migrate instance %s: %s" %
4054                                (instance.name, msg))
4055     time.sleep(10)
4056
4057     instance.primary_node = target_node
4058     # distribute new instance config to the other nodes
4059     self.cfg.Update(instance)
4060
4061     result = self.rpc.call_finalize_migration(target_node,
4062                                               instance,
4063                                               migration_info,
4064                                               True)
4065     msg = result.RemoteFailMsg()
4066     if msg:
4067       logging.error("Instance migration succeeded, but finalization failed:"
4068                     " %s" % msg)
4069       raise errors.OpExecError("Could not finalize instance migration: %s" %
4070                                msg)
4071
4072     self._EnsureSecondary(source_node)
4073     self._WaitUntilSync()
4074     self._GoStandalone()
4075     self._GoReconnect(False)
4076     self._WaitUntilSync()
4077
4078     self.feedback_fn("* done")
4079
4080   def Exec(self, feedback_fn):
4081     """Perform the migration.
4082
4083     """
4084     self.feedback_fn = feedback_fn
4085
4086     self.source_node = self.instance.primary_node
4087     self.target_node = self.instance.secondary_nodes[0]
4088     self.all_nodes = [self.source_node, self.target_node]
4089     self.nodes_ip = {
4090       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4091       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4092       }
4093     if self.op.cleanup:
4094       return self._ExecCleanup()
4095     else:
4096       return self._ExecMigration()
4097
4098
4099 def _CreateBlockDev(lu, node, instance, device, force_create,
4100                     info, force_open):
4101   """Create a tree of block devices on a given node.
4102
4103   If this device type has to be created on secondaries, create it and
4104   all its children.
4105
4106   If not, just recurse to children keeping the same 'force' value.
4107
4108   @param lu: the lu on whose behalf we execute
4109   @param node: the node on which to create the device
4110   @type instance: L{objects.Instance}
4111   @param instance: the instance which owns the device
4112   @type device: L{objects.Disk}
4113   @param device: the device to create
4114   @type force_create: boolean
4115   @param force_create: whether to force creation of this device; this
4116       will be change to True whenever we find a device which has
4117       CreateOnSecondary() attribute
4118   @param info: the extra 'metadata' we should attach to the device
4119       (this will be represented as a LVM tag)
4120   @type force_open: boolean
4121   @param force_open: this parameter will be passes to the
4122       L{backend.BlockdevCreate} function where it specifies
4123       whether we run on primary or not, and it affects both
4124       the child assembly and the device own Open() execution
4125
4126   """
4127   if device.CreateOnSecondary():
4128     force_create = True
4129
4130   if device.children:
4131     for child in device.children:
4132       _CreateBlockDev(lu, node, instance, child, force_create,
4133                       info, force_open)
4134
4135   if not force_create:
4136     return
4137
4138   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4139
4140
4141 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4142   """Create a single block device on a given node.
4143
4144   This will not recurse over children of the device, so they must be
4145   created in advance.
4146
4147   @param lu: the lu on whose behalf we execute
4148   @param node: the node on which to create the device
4149   @type instance: L{objects.Instance}
4150   @param instance: the instance which owns the device
4151   @type device: L{objects.Disk}
4152   @param device: the device to create
4153   @param info: the extra 'metadata' we should attach to the device
4154       (this will be represented as a LVM tag)
4155   @type force_open: boolean
4156   @param force_open: this parameter will be passes to the
4157       L{backend.BlockdevCreate} function where it specifies
4158       whether we run on primary or not, and it affects both
4159       the child assembly and the device own Open() execution
4160
4161   """
4162   lu.cfg.SetDiskID(device, node)
4163   result = lu.rpc.call_blockdev_create(node, device, device.size,
4164                                        instance.name, force_open, info)
4165   msg = result.RemoteFailMsg()
4166   if msg:
4167     raise errors.OpExecError("Can't create block device %s on"
4168                              " node %s for instance %s: %s" %
4169                              (device, node, instance.name, msg))
4170   if device.physical_id is None:
4171     device.physical_id = result.payload
4172
4173
4174 def _GenerateUniqueNames(lu, exts):
4175   """Generate a suitable LV name.
4176
4177   This will generate a logical volume name for the given instance.
4178
4179   """
4180   results = []
4181   for val in exts:
4182     new_id = lu.cfg.GenerateUniqueID()
4183     results.append("%s%s" % (new_id, val))
4184   return results
4185
4186
4187 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4188                          p_minor, s_minor):
4189   """Generate a drbd8 device complete with its children.
4190
4191   """
4192   port = lu.cfg.AllocatePort()
4193   vgname = lu.cfg.GetVGName()
4194   shared_secret = lu.cfg.GenerateDRBDSecret()
4195   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4196                           logical_id=(vgname, names[0]))
4197   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4198                           logical_id=(vgname, names[1]))
4199   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4200                           logical_id=(primary, secondary, port,
4201                                       p_minor, s_minor,
4202                                       shared_secret),
4203                           children=[dev_data, dev_meta],
4204                           iv_name=iv_name)
4205   return drbd_dev
4206
4207
4208 def _GenerateDiskTemplate(lu, template_name,
4209                           instance_name, primary_node,
4210                           secondary_nodes, disk_info,
4211                           file_storage_dir, file_driver,
4212                           base_index):
4213   """Generate the entire disk layout for a given template type.
4214
4215   """
4216   #TODO: compute space requirements
4217
4218   vgname = lu.cfg.GetVGName()
4219   disk_count = len(disk_info)
4220   disks = []
4221   if template_name == constants.DT_DISKLESS:
4222     pass
4223   elif template_name == constants.DT_PLAIN:
4224     if len(secondary_nodes) != 0:
4225       raise errors.ProgrammerError("Wrong template configuration")
4226
4227     names = _GenerateUniqueNames(lu, [".disk%d" % i
4228                                       for i in range(disk_count)])
4229     for idx, disk in enumerate(disk_info):
4230       disk_index = idx + base_index
4231       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4232                               logical_id=(vgname, names[idx]),
4233                               iv_name="disk/%d" % disk_index,
4234                               mode=disk["mode"])
4235       disks.append(disk_dev)
4236   elif template_name == constants.DT_DRBD8:
4237     if len(secondary_nodes) != 1:
4238       raise errors.ProgrammerError("Wrong template configuration")
4239     remote_node = secondary_nodes[0]
4240     minors = lu.cfg.AllocateDRBDMinor(
4241       [primary_node, remote_node] * len(disk_info), instance_name)
4242
4243     names = []
4244     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4245                                                for i in range(disk_count)]):
4246       names.append(lv_prefix + "_data")
4247       names.append(lv_prefix + "_meta")
4248     for idx, disk in enumerate(disk_info):
4249       disk_index = idx + base_index
4250       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4251                                       disk["size"], names[idx*2:idx*2+2],
4252                                       "disk/%d" % disk_index,
4253                                       minors[idx*2], minors[idx*2+1])
4254       disk_dev.mode = disk["mode"]
4255       disks.append(disk_dev)
4256   elif template_name == constants.DT_FILE:
4257     if len(secondary_nodes) != 0:
4258       raise errors.ProgrammerError("Wrong template configuration")
4259
4260     for idx, disk in enumerate(disk_info):
4261       disk_index = idx + base_index
4262       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4263                               iv_name="disk/%d" % disk_index,
4264                               logical_id=(file_driver,
4265                                           "%s/disk%d" % (file_storage_dir,
4266                                                          disk_index)),
4267                               mode=disk["mode"])
4268       disks.append(disk_dev)
4269   else:
4270     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4271   return disks
4272
4273
4274 def _GetInstanceInfoText(instance):
4275   """Compute that text that should be added to the disk's metadata.
4276
4277   """
4278   return "originstname+%s" % instance.name
4279
4280
4281 def _CreateDisks(lu, instance):
4282   """Create all disks for an instance.
4283
4284   This abstracts away some work from AddInstance.
4285
4286   @type lu: L{LogicalUnit}
4287   @param lu: the logical unit on whose behalf we execute
4288   @type instance: L{objects.Instance}
4289   @param instance: the instance whose disks we should create
4290   @rtype: boolean
4291   @return: the success of the creation
4292
4293   """
4294   info = _GetInstanceInfoText(instance)
4295   pnode = instance.primary_node
4296
4297   if instance.disk_template == constants.DT_FILE:
4298     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4299     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4300
4301     if result.failed or not result.data:
4302       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4303
4304     if not result.data[0]:
4305       raise errors.OpExecError("Failed to create directory '%s'" %
4306                                file_storage_dir)
4307
4308   # Note: this needs to be kept in sync with adding of disks in
4309   # LUSetInstanceParams
4310   for device in instance.disks:
4311     logging.info("Creating volume %s for instance %s",
4312                  device.iv_name, instance.name)
4313     #HARDCODE
4314     for node in instance.all_nodes:
4315       f_create = node == pnode
4316       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4317
4318
4319 def _RemoveDisks(lu, instance):
4320   """Remove all disks for an instance.
4321
4322   This abstracts away some work from `AddInstance()` and
4323   `RemoveInstance()`. Note that in case some of the devices couldn't
4324   be removed, the removal will continue with the other ones (compare
4325   with `_CreateDisks()`).
4326
4327   @type lu: L{LogicalUnit}
4328   @param lu: the logical unit on whose behalf we execute
4329   @type instance: L{objects.Instance}
4330   @param instance: the instance whose disks we should remove
4331   @rtype: boolean
4332   @return: the success of the removal
4333
4334   """
4335   logging.info("Removing block devices for instance %s", instance.name)
4336
4337   all_result = True
4338   for device in instance.disks:
4339     for node, disk in device.ComputeNodeTree(instance.primary_node):
4340       lu.cfg.SetDiskID(disk, node)
4341       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4342       if msg:
4343         lu.LogWarning("Could not remove block device %s on node %s,"
4344                       " continuing anyway: %s", device.iv_name, node, msg)
4345         all_result = False
4346
4347   if instance.disk_template == constants.DT_FILE:
4348     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4349     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4350                                                  file_storage_dir)
4351     if result.failed or not result.data:
4352       logging.error("Could not remove directory '%s'", file_storage_dir)
4353       all_result = False
4354
4355   return all_result
4356
4357
4358 def _ComputeDiskSize(disk_template, disks):
4359   """Compute disk size requirements in the volume group
4360
4361   """
4362   # Required free disk space as a function of disk and swap space
4363   req_size_dict = {
4364     constants.DT_DISKLESS: None,
4365     constants.DT_PLAIN: sum(d["size"] for d in disks),
4366     # 128 MB are added for drbd metadata for each disk
4367     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4368     constants.DT_FILE: None,
4369   }
4370
4371   if disk_template not in req_size_dict:
4372     raise errors.ProgrammerError("Disk template '%s' size requirement"
4373                                  " is unknown" %  disk_template)
4374
4375   return req_size_dict[disk_template]
4376
4377
4378 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4379   """Hypervisor parameter validation.
4380
4381   This function abstract the hypervisor parameter validation to be
4382   used in both instance create and instance modify.
4383
4384   @type lu: L{LogicalUnit}
4385   @param lu: the logical unit for which we check
4386   @type nodenames: list
4387   @param nodenames: the list of nodes on which we should check
4388   @type hvname: string
4389   @param hvname: the name of the hypervisor we should use
4390   @type hvparams: dict
4391   @param hvparams: the parameters which we need to check
4392   @raise errors.OpPrereqError: if the parameters are not valid
4393
4394   """
4395   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4396                                                   hvname,
4397                                                   hvparams)
4398   for node in nodenames:
4399     info = hvinfo[node]
4400     if info.offline:
4401       continue
4402     msg = info.RemoteFailMsg()
4403     if msg:
4404       raise errors.OpPrereqError("Hypervisor parameter validation"
4405                                  " failed on node %s: %s" % (node, msg))
4406
4407
4408 class LUCreateInstance(LogicalUnit):
4409   """Create an instance.
4410
4411   """
4412   HPATH = "instance-add"
4413   HTYPE = constants.HTYPE_INSTANCE
4414   _OP_REQP = ["instance_name", "disks", "disk_template",
4415               "mode", "start",
4416               "wait_for_sync", "ip_check", "nics",
4417               "hvparams", "beparams"]
4418   REQ_BGL = False
4419
4420   def _ExpandNode(self, node):
4421     """Expands and checks one node name.
4422
4423     """
4424     node_full = self.cfg.ExpandNodeName(node)
4425     if node_full is None:
4426       raise errors.OpPrereqError("Unknown node %s" % node)
4427     return node_full
4428
4429   def ExpandNames(self):
4430     """ExpandNames for CreateInstance.
4431
4432     Figure out the right locks for instance creation.
4433
4434     """
4435     self.needed_locks = {}
4436
4437     # set optional parameters to none if they don't exist
4438     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4439       if not hasattr(self.op, attr):
4440         setattr(self.op, attr, None)
4441
4442     # cheap checks, mostly valid constants given
4443
4444     # verify creation mode
4445     if self.op.mode not in (constants.INSTANCE_CREATE,
4446                             constants.INSTANCE_IMPORT):
4447       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4448                                  self.op.mode)
4449
4450     # disk template and mirror node verification
4451     if self.op.disk_template not in constants.DISK_TEMPLATES:
4452       raise errors.OpPrereqError("Invalid disk template name")
4453
4454     if self.op.hypervisor is None:
4455       self.op.hypervisor = self.cfg.GetHypervisorType()
4456
4457     cluster = self.cfg.GetClusterInfo()
4458     enabled_hvs = cluster.enabled_hypervisors
4459     if self.op.hypervisor not in enabled_hvs:
4460       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4461                                  " cluster (%s)" % (self.op.hypervisor,
4462                                   ",".join(enabled_hvs)))
4463
4464     # check hypervisor parameter syntax (locally)
4465     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4466     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4467                                   self.op.hvparams)
4468     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4469     hv_type.CheckParameterSyntax(filled_hvp)
4470
4471     # fill and remember the beparams dict
4472     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4473     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4474                                     self.op.beparams)
4475
4476     #### instance parameters check
4477
4478     # instance name verification
4479     hostname1 = utils.HostInfo(self.op.instance_name)
4480     self.op.instance_name = instance_name = hostname1.name
4481
4482     # this is just a preventive check, but someone might still add this
4483     # instance in the meantime, and creation will fail at lock-add time
4484     if instance_name in self.cfg.GetInstanceList():
4485       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4486                                  instance_name)
4487
4488     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4489
4490     # NIC buildup
4491     self.nics = []
4492     for idx, nic in enumerate(self.op.nics):
4493       nic_mode_req = nic.get("mode", None)
4494       nic_mode = nic_mode_req
4495       if nic_mode is None:
4496         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4497
4498       # in routed mode, for the first nic, the default ip is 'auto'
4499       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4500         default_ip_mode = constants.VALUE_AUTO
4501       else:
4502         default_ip_mode = constants.VALUE_NONE
4503
4504       # ip validity checks
4505       ip = nic.get("ip", default_ip_mode)
4506       if ip is None or ip.lower() == constants.VALUE_NONE:
4507         nic_ip = None
4508       elif ip.lower() == constants.VALUE_AUTO:
4509         nic_ip = hostname1.ip
4510       else:
4511         if not utils.IsValidIP(ip):
4512           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4513                                      " like a valid IP" % ip)
4514         nic_ip = ip
4515
4516       # TODO: check the ip for uniqueness !!
4517       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4518         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4519
4520       # MAC address verification
4521       mac = nic.get("mac", constants.VALUE_AUTO)
4522       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4523         if not utils.IsValidMac(mac.lower()):
4524           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4525                                      mac)
4526       # bridge verification
4527       bridge = nic.get("bridge", None)
4528       link = nic.get("link", None)
4529       if bridge and link:
4530         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4531       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4532         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4533       elif bridge:
4534         link = bridge
4535
4536       nicparams = {}
4537       if nic_mode_req:
4538         nicparams[constants.NIC_MODE] = nic_mode_req
4539       if link:
4540         nicparams[constants.NIC_LINK] = link
4541
4542       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4543                                       nicparams)
4544       objects.NIC.CheckParameterSyntax(check_params)
4545       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4546
4547     # disk checks/pre-build
4548     self.disks = []
4549     for disk in self.op.disks:
4550       mode = disk.get("mode", constants.DISK_RDWR)
4551       if mode not in constants.DISK_ACCESS_SET:
4552         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4553                                    mode)
4554       size = disk.get("size", None)
4555       if size is None:
4556         raise errors.OpPrereqError("Missing disk size")
4557       try:
4558         size = int(size)
4559       except ValueError:
4560         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4561       self.disks.append({"size": size, "mode": mode})
4562
4563     # used in CheckPrereq for ip ping check
4564     self.check_ip = hostname1.ip
4565
4566     # file storage checks
4567     if (self.op.file_driver and
4568         not self.op.file_driver in constants.FILE_DRIVER):
4569       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4570                                  self.op.file_driver)
4571
4572     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4573       raise errors.OpPrereqError("File storage directory path not absolute")
4574
4575     ### Node/iallocator related checks
4576     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4577       raise errors.OpPrereqError("One and only one of iallocator and primary"
4578                                  " node must be given")
4579
4580     if self.op.iallocator:
4581       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4582     else:
4583       self.op.pnode = self._ExpandNode(self.op.pnode)
4584       nodelist = [self.op.pnode]
4585       if self.op.snode is not None:
4586         self.op.snode = self._ExpandNode(self.op.snode)
4587         nodelist.append(self.op.snode)
4588       self.needed_locks[locking.LEVEL_NODE] = nodelist
4589
4590     # in case of import lock the source node too
4591     if self.op.mode == constants.INSTANCE_IMPORT:
4592       src_node = getattr(self.op, "src_node", None)
4593       src_path = getattr(self.op, "src_path", None)
4594
4595       if src_path is None:
4596         self.op.src_path = src_path = self.op.instance_name
4597
4598       if src_node is None:
4599         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4600         self.op.src_node = None
4601         if os.path.isabs(src_path):
4602           raise errors.OpPrereqError("Importing an instance from an absolute"
4603                                      " path requires a source node option.")
4604       else:
4605         self.op.src_node = src_node = self._ExpandNode(src_node)
4606         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4607           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4608         if not os.path.isabs(src_path):
4609           self.op.src_path = src_path = \
4610             os.path.join(constants.EXPORT_DIR, src_path)
4611
4612     else: # INSTANCE_CREATE
4613       if getattr(self.op, "os_type", None) is None:
4614         raise errors.OpPrereqError("No guest OS specified")
4615
4616   def _RunAllocator(self):
4617     """Run the allocator based on input opcode.
4618
4619     """
4620     nics = [n.ToDict() for n in self.nics]
4621     ial = IAllocator(self,
4622                      mode=constants.IALLOCATOR_MODE_ALLOC,
4623                      name=self.op.instance_name,
4624                      disk_template=self.op.disk_template,
4625                      tags=[],
4626                      os=self.op.os_type,
4627                      vcpus=self.be_full[constants.BE_VCPUS],
4628                      mem_size=self.be_full[constants.BE_MEMORY],
4629                      disks=self.disks,
4630                      nics=nics,
4631                      hypervisor=self.op.hypervisor,
4632                      )
4633
4634     ial.Run(self.op.iallocator)
4635
4636     if not ial.success:
4637       raise errors.OpPrereqError("Can't compute nodes using"
4638                                  " iallocator '%s': %s" % (self.op.iallocator,
4639                                                            ial.info))
4640     if len(ial.nodes) != ial.required_nodes:
4641       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4642                                  " of nodes (%s), required %s" %
4643                                  (self.op.iallocator, len(ial.nodes),
4644                                   ial.required_nodes))
4645     self.op.pnode = ial.nodes[0]
4646     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4647                  self.op.instance_name, self.op.iallocator,
4648                  ", ".join(ial.nodes))
4649     if ial.required_nodes == 2:
4650       self.op.snode = ial.nodes[1]
4651
4652   def BuildHooksEnv(self):
4653     """Build hooks env.
4654
4655     This runs on master, primary and secondary nodes of the instance.
4656
4657     """
4658     env = {
4659       "ADD_MODE": self.op.mode,
4660       }
4661     if self.op.mode == constants.INSTANCE_IMPORT:
4662       env["SRC_NODE"] = self.op.src_node
4663       env["SRC_PATH"] = self.op.src_path
4664       env["SRC_IMAGES"] = self.src_images
4665
4666     env.update(_BuildInstanceHookEnv(
4667       name=self.op.instance_name,
4668       primary_node=self.op.pnode,
4669       secondary_nodes=self.secondaries,
4670       status=self.op.start,
4671       os_type=self.op.os_type,
4672       memory=self.be_full[constants.BE_MEMORY],
4673       vcpus=self.be_full[constants.BE_VCPUS],
4674       nics=_PreBuildNICHooksList(self, self.nics),
4675       disk_template=self.op.disk_template,
4676       disks=[(d["size"], d["mode"]) for d in self.disks],
4677     ))
4678
4679     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4680           self.secondaries)
4681     return env, nl, nl
4682
4683
4684   def CheckPrereq(self):
4685     """Check prerequisites.
4686
4687     """
4688     if (not self.cfg.GetVGName() and
4689         self.op.disk_template not in constants.DTS_NOT_LVM):
4690       raise errors.OpPrereqError("Cluster does not support lvm-based"
4691                                  " instances")
4692
4693     if self.op.mode == constants.INSTANCE_IMPORT:
4694       src_node = self.op.src_node
4695       src_path = self.op.src_path
4696
4697       if src_node is None:
4698         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4699         exp_list = self.rpc.call_export_list(locked_nodes)
4700         found = False
4701         for node in exp_list:
4702           if exp_list[node].RemoteFailMsg():
4703             continue
4704           if src_path in exp_list[node].payload:
4705             found = True
4706             self.op.src_node = src_node = node
4707             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4708                                                        src_path)
4709             break
4710         if not found:
4711           raise errors.OpPrereqError("No export found for relative path %s" %
4712                                       src_path)
4713
4714       _CheckNodeOnline(self, src_node)
4715       result = self.rpc.call_export_info(src_node, src_path)
4716       msg = result.RemoteFailMsg()
4717       if msg:
4718         raise errors.OpPrereqError("No export or invalid export found in"
4719                                    " dir %s: %s" % (src_path, msg))
4720
4721       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4722       if not export_info.has_section(constants.INISECT_EXP):
4723         raise errors.ProgrammerError("Corrupted export config")
4724
4725       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4726       if (int(ei_version) != constants.EXPORT_VERSION):
4727         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4728                                    (ei_version, constants.EXPORT_VERSION))
4729
4730       # Check that the new instance doesn't have less disks than the export
4731       instance_disks = len(self.disks)
4732       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4733       if instance_disks < export_disks:
4734         raise errors.OpPrereqError("Not enough disks to import."
4735                                    " (instance: %d, export: %d)" %
4736                                    (instance_disks, export_disks))
4737
4738       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4739       disk_images = []
4740       for idx in range(export_disks):
4741         option = 'disk%d_dump' % idx
4742         if export_info.has_option(constants.INISECT_INS, option):
4743           # FIXME: are the old os-es, disk sizes, etc. useful?
4744           export_name = export_info.get(constants.INISECT_INS, option)
4745           image = os.path.join(src_path, export_name)
4746           disk_images.append(image)
4747         else:
4748           disk_images.append(False)
4749
4750       self.src_images = disk_images
4751
4752       old_name = export_info.get(constants.INISECT_INS, 'name')
4753       # FIXME: int() here could throw a ValueError on broken exports
4754       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4755       if self.op.instance_name == old_name:
4756         for idx, nic in enumerate(self.nics):
4757           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4758             nic_mac_ini = 'nic%d_mac' % idx
4759             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4760
4761     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4762     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4763     if self.op.start and not self.op.ip_check:
4764       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4765                                  " adding an instance in start mode")
4766
4767     if self.op.ip_check:
4768       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4769         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4770                                    (self.check_ip, self.op.instance_name))
4771
4772     #### mac address generation
4773     # By generating here the mac address both the allocator and the hooks get
4774     # the real final mac address rather than the 'auto' or 'generate' value.
4775     # There is a race condition between the generation and the instance object
4776     # creation, which means that we know the mac is valid now, but we're not
4777     # sure it will be when we actually add the instance. If things go bad
4778     # adding the instance will abort because of a duplicate mac, and the
4779     # creation job will fail.
4780     for nic in self.nics:
4781       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4782         nic.mac = self.cfg.GenerateMAC()
4783
4784     #### allocator run
4785
4786     if self.op.iallocator is not None:
4787       self._RunAllocator()
4788
4789     #### node related checks
4790
4791     # check primary node
4792     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4793     assert self.pnode is not None, \
4794       "Cannot retrieve locked node %s" % self.op.pnode
4795     if pnode.offline:
4796       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4797                                  pnode.name)
4798     if pnode.drained:
4799       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4800                                  pnode.name)
4801
4802     self.secondaries = []
4803
4804     # mirror node verification
4805     if self.op.disk_template in constants.DTS_NET_MIRROR:
4806       if self.op.snode is None:
4807         raise errors.OpPrereqError("The networked disk templates need"
4808                                    " a mirror node")
4809       if self.op.snode == pnode.name:
4810         raise errors.OpPrereqError("The secondary node cannot be"
4811                                    " the primary node.")
4812       _CheckNodeOnline(self, self.op.snode)
4813       _CheckNodeNotDrained(self, self.op.snode)
4814       self.secondaries.append(self.op.snode)
4815
4816     nodenames = [pnode.name] + self.secondaries
4817
4818     req_size = _ComputeDiskSize(self.op.disk_template,
4819                                 self.disks)
4820
4821     # Check lv size requirements
4822     if req_size is not None:
4823       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4824                                          self.op.hypervisor)
4825       for node in nodenames:
4826         info = nodeinfo[node]
4827         msg = info.RemoteFailMsg()
4828         if msg:
4829           raise errors.OpPrereqError("Cannot get current information"
4830                                      " from node %s: %s" % (node, msg))
4831         info = info.payload
4832         vg_free = info.get('vg_free', None)
4833         if not isinstance(vg_free, int):
4834           raise errors.OpPrereqError("Can't compute free disk space on"
4835                                      " node %s" % node)
4836         if req_size > vg_free:
4837           raise errors.OpPrereqError("Not enough disk space on target node %s."
4838                                      " %d MB available, %d MB required" %
4839                                      (node, vg_free, req_size))
4840
4841     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4842
4843     # os verification
4844     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4845     result.Raise()
4846     if not isinstance(result.data, objects.OS):
4847       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4848                                  " primary node"  % self.op.os_type)
4849
4850     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4851
4852     # memory check on primary node
4853     if self.op.start:
4854       _CheckNodeFreeMemory(self, self.pnode.name,
4855                            "creating instance %s" % self.op.instance_name,
4856                            self.be_full[constants.BE_MEMORY],
4857                            self.op.hypervisor)
4858
4859   def Exec(self, feedback_fn):
4860     """Create and add the instance to the cluster.
4861
4862     """
4863     instance = self.op.instance_name
4864     pnode_name = self.pnode.name
4865
4866     ht_kind = self.op.hypervisor
4867     if ht_kind in constants.HTS_REQ_PORT:
4868       network_port = self.cfg.AllocatePort()
4869     else:
4870       network_port = None
4871
4872     ##if self.op.vnc_bind_address is None:
4873     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4874
4875     # this is needed because os.path.join does not accept None arguments
4876     if self.op.file_storage_dir is None:
4877       string_file_storage_dir = ""
4878     else:
4879       string_file_storage_dir = self.op.file_storage_dir
4880
4881     # build the full file storage dir path
4882     file_storage_dir = os.path.normpath(os.path.join(
4883                                         self.cfg.GetFileStorageDir(),
4884                                         string_file_storage_dir, instance))
4885
4886
4887     disks = _GenerateDiskTemplate(self,
4888                                   self.op.disk_template,
4889                                   instance, pnode_name,
4890                                   self.secondaries,
4891                                   self.disks,
4892                                   file_storage_dir,
4893                                   self.op.file_driver,
4894                                   0)
4895
4896     iobj = objects.Instance(name=instance, os=self.op.os_type,
4897                             primary_node=pnode_name,
4898                             nics=self.nics, disks=disks,
4899                             disk_template=self.op.disk_template,
4900                             admin_up=False,
4901                             network_port=network_port,
4902                             beparams=self.op.beparams,
4903                             hvparams=self.op.hvparams,
4904                             hypervisor=self.op.hypervisor,
4905                             )
4906
4907     feedback_fn("* creating instance disks...")
4908     try:
4909       _CreateDisks(self, iobj)
4910     except errors.OpExecError:
4911       self.LogWarning("Device creation failed, reverting...")
4912       try:
4913         _RemoveDisks(self, iobj)
4914       finally:
4915         self.cfg.ReleaseDRBDMinors(instance)
4916         raise
4917
4918     feedback_fn("adding instance %s to cluster config" % instance)
4919
4920     self.cfg.AddInstance(iobj)
4921     # Declare that we don't want to remove the instance lock anymore, as we've
4922     # added the instance to the config
4923     del self.remove_locks[locking.LEVEL_INSTANCE]
4924     # Unlock all the nodes
4925     if self.op.mode == constants.INSTANCE_IMPORT:
4926       nodes_keep = [self.op.src_node]
4927       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4928                        if node != self.op.src_node]
4929       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4930       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4931     else:
4932       self.context.glm.release(locking.LEVEL_NODE)
4933       del self.acquired_locks[locking.LEVEL_NODE]
4934
4935     if self.op.wait_for_sync:
4936       disk_abort = not _WaitForSync(self, iobj)
4937     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4938       # make sure the disks are not degraded (still sync-ing is ok)
4939       time.sleep(15)
4940       feedback_fn("* checking mirrors status")
4941       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4942     else:
4943       disk_abort = False
4944
4945     if disk_abort:
4946       _RemoveDisks(self, iobj)
4947       self.cfg.RemoveInstance(iobj.name)
4948       # Make sure the instance lock gets removed
4949       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4950       raise errors.OpExecError("There are some degraded disks for"
4951                                " this instance")
4952
4953     feedback_fn("creating os for instance %s on node %s" %
4954                 (instance, pnode_name))
4955
4956     if iobj.disk_template != constants.DT_DISKLESS:
4957       if self.op.mode == constants.INSTANCE_CREATE:
4958         feedback_fn("* running the instance OS create scripts...")
4959         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4960         msg = result.RemoteFailMsg()
4961         if msg:
4962           raise errors.OpExecError("Could not add os for instance %s"
4963                                    " on node %s: %s" %
4964                                    (instance, pnode_name, msg))
4965
4966       elif self.op.mode == constants.INSTANCE_IMPORT:
4967         feedback_fn("* running the instance OS import scripts...")
4968         src_node = self.op.src_node
4969         src_images = self.src_images
4970         cluster_name = self.cfg.GetClusterName()
4971         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4972                                                          src_node, src_images,
4973                                                          cluster_name)
4974         msg = import_result.RemoteFailMsg()
4975         if msg:
4976           self.LogWarning("Error while importing the disk images for instance"
4977                           " %s on node %s: %s" % (instance, pnode_name, msg))
4978       else:
4979         # also checked in the prereq part
4980         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4981                                      % self.op.mode)
4982
4983     if self.op.start:
4984       iobj.admin_up = True
4985       self.cfg.Update(iobj)
4986       logging.info("Starting instance %s on node %s", instance, pnode_name)
4987       feedback_fn("* starting instance...")
4988       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4989       msg = result.RemoteFailMsg()
4990       if msg:
4991         raise errors.OpExecError("Could not start instance: %s" % msg)
4992
4993
4994 class LUConnectConsole(NoHooksLU):
4995   """Connect to an instance's console.
4996
4997   This is somewhat special in that it returns the command line that
4998   you need to run on the master node in order to connect to the
4999   console.
5000
5001   """
5002   _OP_REQP = ["instance_name"]
5003   REQ_BGL = False
5004
5005   def ExpandNames(self):
5006     self._ExpandAndLockInstance()
5007
5008   def CheckPrereq(self):
5009     """Check prerequisites.
5010
5011     This checks that the instance is in the cluster.
5012
5013     """
5014     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5015     assert self.instance is not None, \
5016       "Cannot retrieve locked instance %s" % self.op.instance_name
5017     _CheckNodeOnline(self, self.instance.primary_node)
5018
5019   def Exec(self, feedback_fn):
5020     """Connect to the console of an instance
5021
5022     """
5023     instance = self.instance
5024     node = instance.primary_node
5025
5026     node_insts = self.rpc.call_instance_list([node],
5027                                              [instance.hypervisor])[node]
5028     msg = node_insts.RemoteFailMsg()
5029     if msg:
5030       raise errors.OpExecError("Can't get node information from %s: %s" %
5031                                (node, msg))
5032
5033     if instance.name not in node_insts.payload:
5034       raise errors.OpExecError("Instance %s is not running." % instance.name)
5035
5036     logging.debug("Connecting to console of %s on %s", instance.name, node)
5037
5038     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5039     cluster = self.cfg.GetClusterInfo()
5040     # beparams and hvparams are passed separately, to avoid editing the
5041     # instance and then saving the defaults in the instance itself.
5042     hvparams = cluster.FillHV(instance)
5043     beparams = cluster.FillBE(instance)
5044     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5045
5046     # build ssh cmdline
5047     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5048
5049
5050 class LUReplaceDisks(LogicalUnit):
5051   """Replace the disks of an instance.
5052
5053   """
5054   HPATH = "mirrors-replace"
5055   HTYPE = constants.HTYPE_INSTANCE
5056   _OP_REQP = ["instance_name", "mode", "disks"]
5057   REQ_BGL = False
5058
5059   def CheckArguments(self):
5060     if not hasattr(self.op, "remote_node"):
5061       self.op.remote_node = None
5062     if not hasattr(self.op, "iallocator"):
5063       self.op.iallocator = None
5064
5065     # check for valid parameter combination
5066     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5067     if self.op.mode == constants.REPLACE_DISK_CHG:
5068       if cnt == 2:
5069         raise errors.OpPrereqError("When changing the secondary either an"
5070                                    " iallocator script must be used or the"
5071                                    " new node given")
5072       elif cnt == 0:
5073         raise errors.OpPrereqError("Give either the iallocator or the new"
5074                                    " secondary, not both")
5075     else: # not replacing the secondary
5076       if cnt != 2:
5077         raise errors.OpPrereqError("The iallocator and new node options can"
5078                                    " be used only when changing the"
5079                                    " secondary node")
5080
5081   def ExpandNames(self):
5082     self._ExpandAndLockInstance()
5083
5084     if self.op.iallocator is not None:
5085       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5086     elif self.op.remote_node is not None:
5087       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5088       if remote_node is None:
5089         raise errors.OpPrereqError("Node '%s' not known" %
5090                                    self.op.remote_node)
5091       self.op.remote_node = remote_node
5092       # Warning: do not remove the locking of the new secondary here
5093       # unless DRBD8.AddChildren is changed to work in parallel;
5094       # currently it doesn't since parallel invocations of
5095       # FindUnusedMinor will conflict
5096       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5097       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5098     else:
5099       self.needed_locks[locking.LEVEL_NODE] = []
5100       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5101
5102   def DeclareLocks(self, level):
5103     # If we're not already locking all nodes in the set we have to declare the
5104     # instance's primary/secondary nodes.
5105     if (level == locking.LEVEL_NODE and
5106         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5107       self._LockInstancesNodes()
5108
5109   def _RunAllocator(self):
5110     """Compute a new secondary node using an IAllocator.
5111
5112     """
5113     ial = IAllocator(self,
5114                      mode=constants.IALLOCATOR_MODE_RELOC,
5115                      name=self.op.instance_name,
5116                      relocate_from=[self.sec_node])
5117
5118     ial.Run(self.op.iallocator)
5119
5120     if not ial.success:
5121       raise errors.OpPrereqError("Can't compute nodes using"
5122                                  " iallocator '%s': %s" % (self.op.iallocator,
5123                                                            ial.info))
5124     if len(ial.nodes) != ial.required_nodes:
5125       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5126                                  " of nodes (%s), required %s" %
5127                                  (len(ial.nodes), ial.required_nodes))
5128     self.op.remote_node = ial.nodes[0]
5129     self.LogInfo("Selected new secondary for the instance: %s",
5130                  self.op.remote_node)
5131
5132   def BuildHooksEnv(self):
5133     """Build hooks env.
5134
5135     This runs on the master, the primary and all the secondaries.
5136
5137     """
5138     env = {
5139       "MODE": self.op.mode,
5140       "NEW_SECONDARY": self.op.remote_node,
5141       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5142       }
5143     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5144     nl = [
5145       self.cfg.GetMasterNode(),
5146       self.instance.primary_node,
5147       ]
5148     if self.op.remote_node is not None:
5149       nl.append(self.op.remote_node)
5150     return env, nl, nl
5151
5152   def CheckPrereq(self):
5153     """Check prerequisites.
5154
5155     This checks that the instance is in the cluster.
5156
5157     """
5158     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5159     assert instance is not None, \
5160       "Cannot retrieve locked instance %s" % self.op.instance_name
5161     self.instance = instance
5162
5163     if instance.disk_template != constants.DT_DRBD8:
5164       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5165                                  " instances")
5166
5167     if len(instance.secondary_nodes) != 1:
5168       raise errors.OpPrereqError("The instance has a strange layout,"
5169                                  " expected one secondary but found %d" %
5170                                  len(instance.secondary_nodes))
5171
5172     self.sec_node = instance.secondary_nodes[0]
5173
5174     if self.op.iallocator is not None:
5175       self._RunAllocator()
5176
5177     remote_node = self.op.remote_node
5178     if remote_node is not None:
5179       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5180       assert self.remote_node_info is not None, \
5181         "Cannot retrieve locked node %s" % remote_node
5182     else:
5183       self.remote_node_info = None
5184     if remote_node == instance.primary_node:
5185       raise errors.OpPrereqError("The specified node is the primary node of"
5186                                  " the instance.")
5187     elif remote_node == self.sec_node:
5188       raise errors.OpPrereqError("The specified node is already the"
5189                                  " secondary node of the instance.")
5190
5191     if self.op.mode == constants.REPLACE_DISK_PRI:
5192       n1 = self.tgt_node = instance.primary_node
5193       n2 = self.oth_node = self.sec_node
5194     elif self.op.mode == constants.REPLACE_DISK_SEC:
5195       n1 = self.tgt_node = self.sec_node
5196       n2 = self.oth_node = instance.primary_node
5197     elif self.op.mode == constants.REPLACE_DISK_CHG:
5198       n1 = self.new_node = remote_node
5199       n2 = self.oth_node = instance.primary_node
5200       self.tgt_node = self.sec_node
5201       _CheckNodeNotDrained(self, remote_node)
5202     else:
5203       raise errors.ProgrammerError("Unhandled disk replace mode")
5204
5205     _CheckNodeOnline(self, n1)
5206     _CheckNodeOnline(self, n2)
5207
5208     if not self.op.disks:
5209       self.op.disks = range(len(instance.disks))
5210
5211     for disk_idx in self.op.disks:
5212       instance.FindDisk(disk_idx)
5213
5214   def _ExecD8DiskOnly(self, feedback_fn):
5215     """Replace a disk on the primary or secondary for dbrd8.
5216
5217     The algorithm for replace is quite complicated:
5218
5219       1. for each disk to be replaced:
5220
5221         1. create new LVs on the target node with unique names
5222         1. detach old LVs from the drbd device
5223         1. rename old LVs to name_replaced.<time_t>
5224         1. rename new LVs to old LVs
5225         1. attach the new LVs (with the old names now) to the drbd device
5226
5227       1. wait for sync across all devices
5228
5229       1. for each modified disk:
5230
5231         1. remove old LVs (which have the name name_replaces.<time_t>)
5232
5233     Failures are not very well handled.
5234
5235     """
5236     steps_total = 6
5237     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5238     instance = self.instance
5239     iv_names = {}
5240     vgname = self.cfg.GetVGName()
5241     # start of work
5242     cfg = self.cfg
5243     tgt_node = self.tgt_node
5244     oth_node = self.oth_node
5245
5246     # Step: check device activation
5247     self.proc.LogStep(1, steps_total, "check device existence")
5248     info("checking volume groups")
5249     my_vg = cfg.GetVGName()
5250     results = self.rpc.call_vg_list([oth_node, tgt_node])
5251     if not results:
5252       raise errors.OpExecError("Can't list volume groups on the nodes")
5253     for node in oth_node, tgt_node:
5254       res = results[node]
5255       msg = res.RemoteFailMsg()
5256       if msg:
5257         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5258       if my_vg not in res.payload:
5259         raise errors.OpExecError("Volume group '%s' not found on %s" %
5260                                  (my_vg, node))
5261     for idx, dev in enumerate(instance.disks):
5262       if idx not in self.op.disks:
5263         continue
5264       for node in tgt_node, oth_node:
5265         info("checking disk/%d on %s" % (idx, node))
5266         cfg.SetDiskID(dev, node)
5267         result = self.rpc.call_blockdev_find(node, dev)
5268         msg = result.RemoteFailMsg()
5269         if not msg and not result.payload:
5270           msg = "disk not found"
5271         if msg:
5272           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5273                                    (idx, node, msg))
5274
5275     # Step: check other node consistency
5276     self.proc.LogStep(2, steps_total, "check peer consistency")
5277     for idx, dev in enumerate(instance.disks):
5278       if idx not in self.op.disks:
5279         continue
5280       info("checking disk/%d consistency on %s" % (idx, oth_node))
5281       if not _CheckDiskConsistency(self, dev, oth_node,
5282                                    oth_node==instance.primary_node):
5283         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5284                                  " to replace disks on this node (%s)" %
5285                                  (oth_node, tgt_node))
5286
5287     # Step: create new storage
5288     self.proc.LogStep(3, steps_total, "allocate new storage")
5289     for idx, dev in enumerate(instance.disks):
5290       if idx not in self.op.disks:
5291         continue
5292       size = dev.size
5293       cfg.SetDiskID(dev, tgt_node)
5294       lv_names = [".disk%d_%s" % (idx, suf)
5295                   for suf in ["data", "meta"]]
5296       names = _GenerateUniqueNames(self, lv_names)
5297       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5298                              logical_id=(vgname, names[0]))
5299       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5300                              logical_id=(vgname, names[1]))
5301       new_lvs = [lv_data, lv_meta]
5302       old_lvs = dev.children
5303       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5304       info("creating new local storage on %s for %s" %
5305            (tgt_node, dev.iv_name))
5306       # we pass force_create=True to force the LVM creation
5307       for new_lv in new_lvs:
5308         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5309                         _GetInstanceInfoText(instance), False)
5310
5311     # Step: for each lv, detach+rename*2+attach
5312     self.proc.LogStep(4, steps_total, "change drbd configuration")
5313     for dev, old_lvs, new_lvs in iv_names.itervalues():
5314       info("detaching %s drbd from local storage" % dev.iv_name)
5315       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5316       msg = result.RemoteFailMsg()
5317       if msg:
5318         raise errors.OpExecError("Can't detach drbd from local storage on node"
5319                                  " %s for device %s: %s" %
5320                                  (tgt_node, dev.iv_name, msg))
5321       #dev.children = []
5322       #cfg.Update(instance)
5323
5324       # ok, we created the new LVs, so now we know we have the needed
5325       # storage; as such, we proceed on the target node to rename
5326       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5327       # using the assumption that logical_id == physical_id (which in
5328       # turn is the unique_id on that node)
5329
5330       # FIXME(iustin): use a better name for the replaced LVs
5331       temp_suffix = int(time.time())
5332       ren_fn = lambda d, suff: (d.physical_id[0],
5333                                 d.physical_id[1] + "_replaced-%s" % suff)
5334       # build the rename list based on what LVs exist on the node
5335       rlist = []
5336       for to_ren in old_lvs:
5337         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5338         if not result.RemoteFailMsg() and result.payload:
5339           # device exists
5340           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5341
5342       info("renaming the old LVs on the target node")
5343       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5344       msg = result.RemoteFailMsg()
5345       if msg:
5346         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5347                                  (tgt_node, msg))
5348       # now we rename the new LVs to the old LVs
5349       info("renaming the new LVs on the target node")
5350       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5351       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5352       msg = result.RemoteFailMsg()
5353       if msg:
5354         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5355                                  (tgt_node, msg))
5356
5357       for old, new in zip(old_lvs, new_lvs):
5358         new.logical_id = old.logical_id
5359         cfg.SetDiskID(new, tgt_node)
5360
5361       for disk in old_lvs:
5362         disk.logical_id = ren_fn(disk, temp_suffix)
5363         cfg.SetDiskID(disk, tgt_node)
5364
5365       # now that the new lvs have the old name, we can add them to the device
5366       info("adding new mirror component on %s" % tgt_node)
5367       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5368       msg = result.RemoteFailMsg()
5369       if msg:
5370         for new_lv in new_lvs:
5371           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5372           if msg:
5373             warning("Can't rollback device %s: %s", dev, msg,
5374                     hint="cleanup manually the unused logical volumes")
5375         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5376
5377       dev.children = new_lvs
5378       cfg.Update(instance)
5379
5380     # Step: wait for sync
5381
5382     # this can fail as the old devices are degraded and _WaitForSync
5383     # does a combined result over all disks, so we don't check its
5384     # return value
5385     self.proc.LogStep(5, steps_total, "sync devices")
5386     _WaitForSync(self, instance, unlock=True)
5387
5388     # so check manually all the devices
5389     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5390       cfg.SetDiskID(dev, instance.primary_node)
5391       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5392       msg = result.RemoteFailMsg()
5393       if not msg and not result.payload:
5394         msg = "disk not found"
5395       if msg:
5396         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5397                                  (name, msg))
5398       if result.payload[5]:
5399         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5400
5401     # Step: remove old storage
5402     self.proc.LogStep(6, steps_total, "removing old storage")
5403     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5404       info("remove logical volumes for %s" % name)
5405       for lv in old_lvs:
5406         cfg.SetDiskID(lv, tgt_node)
5407         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5408         if msg:
5409           warning("Can't remove old LV: %s" % msg,
5410                   hint="manually remove unused LVs")
5411           continue
5412
5413   def _ExecD8Secondary(self, feedback_fn):
5414     """Replace the secondary node for drbd8.
5415
5416     The algorithm for replace is quite complicated:
5417       - for all disks of the instance:
5418         - create new LVs on the new node with same names
5419         - shutdown the drbd device on the old secondary
5420         - disconnect the drbd network on the primary
5421         - create the drbd device on the new secondary
5422         - network attach the drbd on the primary, using an artifice:
5423           the drbd code for Attach() will connect to the network if it
5424           finds a device which is connected to the good local disks but
5425           not network enabled
5426       - wait for sync across all devices
5427       - remove all disks from the old secondary
5428
5429     Failures are not very well handled.
5430
5431     """
5432     steps_total = 6
5433     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5434     instance = self.instance
5435     iv_names = {}
5436     # start of work
5437     cfg = self.cfg
5438     old_node = self.tgt_node
5439     new_node = self.new_node
5440     pri_node = instance.primary_node
5441     nodes_ip = {
5442       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5443       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5444       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5445       }
5446
5447     # Step: check device activation
5448     self.proc.LogStep(1, steps_total, "check device existence")
5449     info("checking volume groups")
5450     my_vg = cfg.GetVGName()
5451     results = self.rpc.call_vg_list([pri_node, new_node])
5452     for node in pri_node, new_node:
5453       res = results[node]
5454       msg = res.RemoteFailMsg()
5455       if msg:
5456         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5457       if my_vg not in res.payload:
5458         raise errors.OpExecError("Volume group '%s' not found on %s" %
5459                                  (my_vg, node))
5460     for idx, dev in enumerate(instance.disks):
5461       if idx not in self.op.disks:
5462         continue
5463       info("checking disk/%d on %s" % (idx, pri_node))
5464       cfg.SetDiskID(dev, pri_node)
5465       result = self.rpc.call_blockdev_find(pri_node, dev)
5466       msg = result.RemoteFailMsg()
5467       if not msg and not result.payload:
5468         msg = "disk not found"
5469       if msg:
5470         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5471                                  (idx, pri_node, msg))
5472
5473     # Step: check other node consistency
5474     self.proc.LogStep(2, steps_total, "check peer consistency")
5475     for idx, dev in enumerate(instance.disks):
5476       if idx not in self.op.disks:
5477         continue
5478       info("checking disk/%d consistency on %s" % (idx, pri_node))
5479       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5480         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5481                                  " unsafe to replace the secondary" %
5482                                  pri_node)
5483
5484     # Step: create new storage
5485     self.proc.LogStep(3, steps_total, "allocate new storage")
5486     for idx, dev in enumerate(instance.disks):
5487       info("adding new local storage on %s for disk/%d" %
5488            (new_node, idx))
5489       # we pass force_create=True to force LVM creation
5490       for new_lv in dev.children:
5491         _CreateBlockDev(self, new_node, instance, new_lv, True,
5492                         _GetInstanceInfoText(instance), False)
5493
5494     # Step 4: dbrd minors and drbd setups changes
5495     # after this, we must manually remove the drbd minors on both the
5496     # error and the success paths
5497     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5498                                    instance.name)
5499     logging.debug("Allocated minors %s" % (minors,))
5500     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5501     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5502       size = dev.size
5503       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5504       # create new devices on new_node; note that we create two IDs:
5505       # one without port, so the drbd will be activated without
5506       # networking information on the new node at this stage, and one
5507       # with network, for the latter activation in step 4
5508       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5509       if pri_node == o_node1:
5510         p_minor = o_minor1
5511       else:
5512         p_minor = o_minor2
5513
5514       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5515       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5516
5517       iv_names[idx] = (dev, dev.children, new_net_id)
5518       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5519                     new_net_id)
5520       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5521                               logical_id=new_alone_id,
5522                               children=dev.children)
5523       try:
5524         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5525                               _GetInstanceInfoText(instance), False)
5526       except errors.GenericError:
5527         self.cfg.ReleaseDRBDMinors(instance.name)
5528         raise
5529
5530     for idx, dev in enumerate(instance.disks):
5531       # we have new devices, shutdown the drbd on the old secondary
5532       info("shutting down drbd for disk/%d on old node" % idx)
5533       cfg.SetDiskID(dev, old_node)
5534       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5535       if msg:
5536         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5537                 (idx, msg),
5538                 hint="Please cleanup this device manually as soon as possible")
5539
5540     info("detaching primary drbds from the network (=> standalone)")
5541     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5542                                                instance.disks)[pri_node]
5543
5544     msg = result.RemoteFailMsg()
5545     if msg:
5546       # detaches didn't succeed (unlikely)
5547       self.cfg.ReleaseDRBDMinors(instance.name)
5548       raise errors.OpExecError("Can't detach the disks from the network on"
5549                                " old node: %s" % (msg,))
5550
5551     # if we managed to detach at least one, we update all the disks of
5552     # the instance to point to the new secondary
5553     info("updating instance configuration")
5554     for dev, _, new_logical_id in iv_names.itervalues():
5555       dev.logical_id = new_logical_id
5556       cfg.SetDiskID(dev, pri_node)
5557     cfg.Update(instance)
5558
5559     # and now perform the drbd attach
5560     info("attaching primary drbds to new secondary (standalone => connected)")
5561     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5562                                            instance.disks, instance.name,
5563                                            False)
5564     for to_node, to_result in result.items():
5565       msg = to_result.RemoteFailMsg()
5566       if msg:
5567         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5568                 hint="please do a gnt-instance info to see the"
5569                 " status of disks")
5570
5571     # this can fail as the old devices are degraded and _WaitForSync
5572     # does a combined result over all disks, so we don't check its
5573     # return value
5574     self.proc.LogStep(5, steps_total, "sync devices")
5575     _WaitForSync(self, instance, unlock=True)
5576
5577     # so check manually all the devices
5578     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5579       cfg.SetDiskID(dev, pri_node)
5580       result = self.rpc.call_blockdev_find(pri_node, dev)
5581       msg = result.RemoteFailMsg()
5582       if not msg and not result.payload:
5583         msg = "disk not found"
5584       if msg:
5585         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5586                                  (idx, msg))
5587       if result.payload[5]:
5588         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5589
5590     self.proc.LogStep(6, steps_total, "removing old storage")
5591     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5592       info("remove logical volumes for disk/%d" % idx)
5593       for lv in old_lvs:
5594         cfg.SetDiskID(lv, old_node)
5595         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5596         if msg:
5597           warning("Can't remove LV on old secondary: %s", msg,
5598                   hint="Cleanup stale volumes by hand")
5599
5600   def Exec(self, feedback_fn):
5601     """Execute disk replacement.
5602
5603     This dispatches the disk replacement to the appropriate handler.
5604
5605     """
5606     instance = self.instance
5607
5608     # Activate the instance disks if we're replacing them on a down instance
5609     if not instance.admin_up:
5610       _StartInstanceDisks(self, instance, True)
5611
5612     if self.op.mode == constants.REPLACE_DISK_CHG:
5613       fn = self._ExecD8Secondary
5614     else:
5615       fn = self._ExecD8DiskOnly
5616
5617     ret = fn(feedback_fn)
5618
5619     # Deactivate the instance disks if we're replacing them on a down instance
5620     if not instance.admin_up:
5621       _SafeShutdownInstanceDisks(self, instance)
5622
5623     return ret
5624
5625
5626 class LUGrowDisk(LogicalUnit):
5627   """Grow a disk of an instance.
5628
5629   """
5630   HPATH = "disk-grow"
5631   HTYPE = constants.HTYPE_INSTANCE
5632   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5633   REQ_BGL = False
5634
5635   def ExpandNames(self):
5636     self._ExpandAndLockInstance()
5637     self.needed_locks[locking.LEVEL_NODE] = []
5638     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5639
5640   def DeclareLocks(self, level):
5641     if level == locking.LEVEL_NODE:
5642       self._LockInstancesNodes()
5643
5644   def BuildHooksEnv(self):
5645     """Build hooks env.
5646
5647     This runs on the master, the primary and all the secondaries.
5648
5649     """
5650     env = {
5651       "DISK": self.op.disk,
5652       "AMOUNT": self.op.amount,
5653       }
5654     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5655     nl = [
5656       self.cfg.GetMasterNode(),
5657       self.instance.primary_node,
5658       ]
5659     return env, nl, nl
5660
5661   def CheckPrereq(self):
5662     """Check prerequisites.
5663
5664     This checks that the instance is in the cluster.
5665
5666     """
5667     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5668     assert instance is not None, \
5669       "Cannot retrieve locked instance %s" % self.op.instance_name
5670     nodenames = list(instance.all_nodes)
5671     for node in nodenames:
5672       _CheckNodeOnline(self, node)
5673
5674
5675     self.instance = instance
5676
5677     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5678       raise errors.OpPrereqError("Instance's disk layout does not support"
5679                                  " growing.")
5680
5681     self.disk = instance.FindDisk(self.op.disk)
5682
5683     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5684                                        instance.hypervisor)
5685     for node in nodenames:
5686       info = nodeinfo[node]
5687       msg = info.RemoteFailMsg()
5688       if msg:
5689         raise errors.OpPrereqError("Cannot get current information"
5690                                    " from node %s:" % (node, msg))
5691       vg_free = info.payload.get('vg_free', None)
5692       if not isinstance(vg_free, int):
5693         raise errors.OpPrereqError("Can't compute free disk space on"
5694                                    " node %s" % node)
5695       if self.op.amount > vg_free:
5696         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5697                                    " %d MiB available, %d MiB required" %
5698                                    (node, vg_free, self.op.amount))
5699
5700   def Exec(self, feedback_fn):
5701     """Execute disk grow.
5702
5703     """
5704     instance = self.instance
5705     disk = self.disk
5706     for node in instance.all_nodes:
5707       self.cfg.SetDiskID(disk, node)
5708       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5709       msg = result.RemoteFailMsg()
5710       if msg:
5711         raise errors.OpExecError("Grow request failed to node %s: %s" %
5712                                  (node, msg))
5713     disk.RecordGrow(self.op.amount)
5714     self.cfg.Update(instance)
5715     if self.op.wait_for_sync:
5716       disk_abort = not _WaitForSync(self, instance)
5717       if disk_abort:
5718         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5719                              " status.\nPlease check the instance.")
5720
5721
5722 class LUQueryInstanceData(NoHooksLU):
5723   """Query runtime instance data.
5724
5725   """
5726   _OP_REQP = ["instances", "static"]
5727   REQ_BGL = False
5728
5729   def ExpandNames(self):
5730     self.needed_locks = {}
5731     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5732
5733     if not isinstance(self.op.instances, list):
5734       raise errors.OpPrereqError("Invalid argument type 'instances'")
5735
5736     if self.op.instances:
5737       self.wanted_names = []
5738       for name in self.op.instances:
5739         full_name = self.cfg.ExpandInstanceName(name)
5740         if full_name is None:
5741           raise errors.OpPrereqError("Instance '%s' not known" % name)
5742         self.wanted_names.append(full_name)
5743       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5744     else:
5745       self.wanted_names = None
5746       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5747
5748     self.needed_locks[locking.LEVEL_NODE] = []
5749     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5750
5751   def DeclareLocks(self, level):
5752     if level == locking.LEVEL_NODE:
5753       self._LockInstancesNodes()
5754
5755   def CheckPrereq(self):
5756     """Check prerequisites.
5757
5758     This only checks the optional instance list against the existing names.
5759
5760     """
5761     if self.wanted_names is None:
5762       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5763
5764     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5765                              in self.wanted_names]
5766     return
5767
5768   def _ComputeDiskStatus(self, instance, snode, dev):
5769     """Compute block device status.
5770
5771     """
5772     static = self.op.static
5773     if not static:
5774       self.cfg.SetDiskID(dev, instance.primary_node)
5775       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5776       if dev_pstatus.offline:
5777         dev_pstatus = None
5778       else:
5779         msg = dev_pstatus.RemoteFailMsg()
5780         if msg:
5781           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5782                                    (instance.name, msg))
5783         dev_pstatus = dev_pstatus.payload
5784     else:
5785       dev_pstatus = None
5786
5787     if dev.dev_type in constants.LDS_DRBD:
5788       # we change the snode then (otherwise we use the one passed in)
5789       if dev.logical_id[0] == instance.primary_node:
5790         snode = dev.logical_id[1]
5791       else:
5792         snode = dev.logical_id[0]
5793
5794     if snode and not static:
5795       self.cfg.SetDiskID(dev, snode)
5796       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5797       if dev_sstatus.offline:
5798         dev_sstatus = None
5799       else:
5800         msg = dev_sstatus.RemoteFailMsg()
5801         if msg:
5802           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5803                                    (instance.name, msg))
5804         dev_sstatus = dev_sstatus.payload
5805     else:
5806       dev_sstatus = None
5807
5808     if dev.children:
5809       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5810                       for child in dev.children]
5811     else:
5812       dev_children = []
5813
5814     data = {
5815       "iv_name": dev.iv_name,
5816       "dev_type": dev.dev_type,
5817       "logical_id": dev.logical_id,
5818       "physical_id": dev.physical_id,
5819       "pstatus": dev_pstatus,
5820       "sstatus": dev_sstatus,
5821       "children": dev_children,
5822       "mode": dev.mode,
5823       }
5824
5825     return data
5826
5827   def Exec(self, feedback_fn):
5828     """Gather and return data"""
5829     result = {}
5830
5831     cluster = self.cfg.GetClusterInfo()
5832
5833     for instance in self.wanted_instances:
5834       if not self.op.static:
5835         remote_info = self.rpc.call_instance_info(instance.primary_node,
5836                                                   instance.name,
5837                                                   instance.hypervisor)
5838         msg = remote_info.RemoteFailMsg()
5839         if msg:
5840           raise errors.OpExecError("Error checking node %s: %s" %
5841                                    (instance.primary_node, msg))
5842         remote_info = remote_info.payload
5843         if remote_info and "state" in remote_info:
5844           remote_state = "up"
5845         else:
5846           remote_state = "down"
5847       else:
5848         remote_state = None
5849       if instance.admin_up:
5850         config_state = "up"
5851       else:
5852         config_state = "down"
5853
5854       disks = [self._ComputeDiskStatus(instance, None, device)
5855                for device in instance.disks]
5856
5857       idict = {
5858         "name": instance.name,
5859         "config_state": config_state,
5860         "run_state": remote_state,
5861         "pnode": instance.primary_node,
5862         "snodes": instance.secondary_nodes,
5863         "os": instance.os,
5864         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5865         "disks": disks,
5866         "hypervisor": instance.hypervisor,
5867         "network_port": instance.network_port,
5868         "hv_instance": instance.hvparams,
5869         "hv_actual": cluster.FillHV(instance),
5870         "be_instance": instance.beparams,
5871         "be_actual": cluster.FillBE(instance),
5872         }
5873
5874       result[instance.name] = idict
5875
5876     return result
5877
5878
5879 class LUSetInstanceParams(LogicalUnit):
5880   """Modifies an instances's parameters.
5881
5882   """
5883   HPATH = "instance-modify"
5884   HTYPE = constants.HTYPE_INSTANCE
5885   _OP_REQP = ["instance_name"]
5886   REQ_BGL = False
5887
5888   def CheckArguments(self):
5889     if not hasattr(self.op, 'nics'):
5890       self.op.nics = []
5891     if not hasattr(self.op, 'disks'):
5892       self.op.disks = []
5893     if not hasattr(self.op, 'beparams'):
5894       self.op.beparams = {}
5895     if not hasattr(self.op, 'hvparams'):
5896       self.op.hvparams = {}
5897     self.op.force = getattr(self.op, "force", False)
5898     if not (self.op.nics or self.op.disks or
5899             self.op.hvparams or self.op.beparams):
5900       raise errors.OpPrereqError("No changes submitted")
5901
5902     # Disk validation
5903     disk_addremove = 0
5904     for disk_op, disk_dict in self.op.disks:
5905       if disk_op == constants.DDM_REMOVE:
5906         disk_addremove += 1
5907         continue
5908       elif disk_op == constants.DDM_ADD:
5909         disk_addremove += 1
5910       else:
5911         if not isinstance(disk_op, int):
5912           raise errors.OpPrereqError("Invalid disk index")
5913       if disk_op == constants.DDM_ADD:
5914         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5915         if mode not in constants.DISK_ACCESS_SET:
5916           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5917         size = disk_dict.get('size', None)
5918         if size is None:
5919           raise errors.OpPrereqError("Required disk parameter size missing")
5920         try:
5921           size = int(size)
5922         except ValueError, err:
5923           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5924                                      str(err))
5925         disk_dict['size'] = size
5926       else:
5927         # modification of disk
5928         if 'size' in disk_dict:
5929           raise errors.OpPrereqError("Disk size change not possible, use"
5930                                      " grow-disk")
5931
5932     if disk_addremove > 1:
5933       raise errors.OpPrereqError("Only one disk add or remove operation"
5934                                  " supported at a time")
5935
5936     # NIC validation
5937     nic_addremove = 0
5938     for nic_op, nic_dict in self.op.nics:
5939       if nic_op == constants.DDM_REMOVE:
5940         nic_addremove += 1
5941         continue
5942       elif nic_op == constants.DDM_ADD:
5943         nic_addremove += 1
5944       else:
5945         if not isinstance(nic_op, int):
5946           raise errors.OpPrereqError("Invalid nic index")
5947
5948       # nic_dict should be a dict
5949       nic_ip = nic_dict.get('ip', None)
5950       if nic_ip is not None:
5951         if nic_ip.lower() == constants.VALUE_NONE:
5952           nic_dict['ip'] = None
5953         else:
5954           if not utils.IsValidIP(nic_ip):
5955             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5956
5957       nic_bridge = nic_dict.get('bridge', None)
5958       nic_link = nic_dict.get('link', None)
5959       if nic_bridge and nic_link:
5960         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5961       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5962         nic_dict['bridge'] = None
5963       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5964         nic_dict['link'] = None
5965
5966       if nic_op == constants.DDM_ADD:
5967         nic_mac = nic_dict.get('mac', None)
5968         if nic_mac is None:
5969           nic_dict['mac'] = constants.VALUE_AUTO
5970
5971       if 'mac' in nic_dict:
5972         nic_mac = nic_dict['mac']
5973         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5974           if not utils.IsValidMac(nic_mac):
5975             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5976         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5977           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5978                                      " modifying an existing nic")
5979
5980     if nic_addremove > 1:
5981       raise errors.OpPrereqError("Only one NIC add or remove operation"
5982                                  " supported at a time")
5983
5984   def ExpandNames(self):
5985     self._ExpandAndLockInstance()
5986     self.needed_locks[locking.LEVEL_NODE] = []
5987     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5988
5989   def DeclareLocks(self, level):
5990     if level == locking.LEVEL_NODE:
5991       self._LockInstancesNodes()
5992
5993   def BuildHooksEnv(self):
5994     """Build hooks env.
5995
5996     This runs on the master, primary and secondaries.
5997
5998     """
5999     args = dict()
6000     if constants.BE_MEMORY in self.be_new:
6001       args['memory'] = self.be_new[constants.BE_MEMORY]
6002     if constants.BE_VCPUS in self.be_new:
6003       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6004     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6005     # information at all.
6006     if self.op.nics:
6007       args['nics'] = []
6008       nic_override = dict(self.op.nics)
6009       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6010       for idx, nic in enumerate(self.instance.nics):
6011         if idx in nic_override:
6012           this_nic_override = nic_override[idx]
6013         else:
6014           this_nic_override = {}
6015         if 'ip' in this_nic_override:
6016           ip = this_nic_override['ip']
6017         else:
6018           ip = nic.ip
6019         if 'mac' in this_nic_override:
6020           mac = this_nic_override['mac']
6021         else:
6022           mac = nic.mac
6023         if idx in self.nic_pnew:
6024           nicparams = self.nic_pnew[idx]
6025         else:
6026           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6027         mode = nicparams[constants.NIC_MODE]
6028         link = nicparams[constants.NIC_LINK]
6029         args['nics'].append((ip, mac, mode, link))
6030       if constants.DDM_ADD in nic_override:
6031         ip = nic_override[constants.DDM_ADD].get('ip', None)
6032         mac = nic_override[constants.DDM_ADD]['mac']
6033         nicparams = self.nic_pnew[constants.DDM_ADD]
6034         mode = nicparams[constants.NIC_MODE]
6035         link = nicparams[constants.NIC_LINK]
6036         args['nics'].append((ip, mac, mode, link))
6037       elif constants.DDM_REMOVE in nic_override:
6038         del args['nics'][-1]
6039
6040     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6041     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6042     return env, nl, nl
6043
6044   def _GetUpdatedParams(self, old_params, update_dict,
6045                         default_values, parameter_types):
6046     """Return the new params dict for the given params.
6047
6048     @type old_params: dict
6049     @type old_params: old parameters
6050     @type update_dict: dict
6051     @type update_dict: dict containing new parameter values,
6052                        or constants.VALUE_DEFAULT to reset the
6053                        parameter to its default value
6054     @type default_values: dict
6055     @param default_values: default values for the filled parameters
6056     @type parameter_types: dict
6057     @param parameter_types: dict mapping target dict keys to types
6058                             in constants.ENFORCEABLE_TYPES
6059     @rtype: (dict, dict)
6060     @return: (new_parameters, filled_parameters)
6061
6062     """
6063     params_copy = copy.deepcopy(old_params)
6064     for key, val in update_dict.iteritems():
6065       if val == constants.VALUE_DEFAULT:
6066         try:
6067           del params_copy[key]
6068         except KeyError:
6069           pass
6070       else:
6071         params_copy[key] = val
6072     utils.ForceDictType(params_copy, parameter_types)
6073     params_filled = objects.FillDict(default_values, params_copy)
6074     return (params_copy, params_filled)
6075
6076   def CheckPrereq(self):
6077     """Check prerequisites.
6078
6079     This only checks the instance list against the existing names.
6080
6081     """
6082     force = self.force = self.op.force
6083
6084     # checking the new params on the primary/secondary nodes
6085
6086     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6087     cluster = self.cluster = self.cfg.GetClusterInfo()
6088     assert self.instance is not None, \
6089       "Cannot retrieve locked instance %s" % self.op.instance_name
6090     pnode = instance.primary_node
6091     nodelist = list(instance.all_nodes)
6092
6093     # hvparams processing
6094     if self.op.hvparams:
6095       i_hvdict, hv_new = self._GetUpdatedParams(
6096                              instance.hvparams, self.op.hvparams,
6097                              cluster.hvparams[instance.hypervisor],
6098                              constants.HVS_PARAMETER_TYPES)
6099       # local check
6100       hypervisor.GetHypervisor(
6101         instance.hypervisor).CheckParameterSyntax(hv_new)
6102       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6103       self.hv_new = hv_new # the new actual values
6104       self.hv_inst = i_hvdict # the new dict (without defaults)
6105     else:
6106       self.hv_new = self.hv_inst = {}
6107
6108     # beparams processing
6109     if self.op.beparams:
6110       i_bedict, be_new = self._GetUpdatedParams(
6111                              instance.beparams, self.op.beparams,
6112                              cluster.beparams[constants.PP_DEFAULT],
6113                              constants.BES_PARAMETER_TYPES)
6114       self.be_new = be_new # the new actual values
6115       self.be_inst = i_bedict # the new dict (without defaults)
6116     else:
6117       self.be_new = self.be_inst = {}
6118
6119     self.warn = []
6120
6121     if constants.BE_MEMORY in self.op.beparams and not self.force:
6122       mem_check_list = [pnode]
6123       if be_new[constants.BE_AUTO_BALANCE]:
6124         # either we changed auto_balance to yes or it was from before
6125         mem_check_list.extend(instance.secondary_nodes)
6126       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6127                                                   instance.hypervisor)
6128       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6129                                          instance.hypervisor)
6130       pninfo = nodeinfo[pnode]
6131       msg = pninfo.RemoteFailMsg()
6132       if msg:
6133         # Assume the primary node is unreachable and go ahead
6134         self.warn.append("Can't get info from primary node %s: %s" %
6135                          (pnode,  msg))
6136       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6137         self.warn.append("Node data from primary node %s doesn't contain"
6138                          " free memory information" % pnode)
6139       elif instance_info.RemoteFailMsg():
6140         self.warn.append("Can't get instance runtime information: %s" %
6141                         instance_info.RemoteFailMsg())
6142       else:
6143         if instance_info.payload:
6144           current_mem = int(instance_info.payload['memory'])
6145         else:
6146           # Assume instance not running
6147           # (there is a slight race condition here, but it's not very probable,
6148           # and we have no other way to check)
6149           current_mem = 0
6150         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6151                     pninfo.payload['memory_free'])
6152         if miss_mem > 0:
6153           raise errors.OpPrereqError("This change will prevent the instance"
6154                                      " from starting, due to %d MB of memory"
6155                                      " missing on its primary node" % miss_mem)
6156
6157       if be_new[constants.BE_AUTO_BALANCE]:
6158         for node, nres in nodeinfo.items():
6159           if node not in instance.secondary_nodes:
6160             continue
6161           msg = nres.RemoteFailMsg()
6162           if msg:
6163             self.warn.append("Can't get info from secondary node %s: %s" %
6164                              (node, msg))
6165           elif not isinstance(nres.payload.get('memory_free', None), int):
6166             self.warn.append("Secondary node %s didn't return free"
6167                              " memory information" % node)
6168           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6169             self.warn.append("Not enough memory to failover instance to"
6170                              " secondary node %s" % node)
6171
6172     # NIC processing
6173     self.nic_pnew = {}
6174     self.nic_pinst = {}
6175     for nic_op, nic_dict in self.op.nics:
6176       if nic_op == constants.DDM_REMOVE:
6177         if not instance.nics:
6178           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6179         continue
6180       if nic_op != constants.DDM_ADD:
6181         # an existing nic
6182         if nic_op < 0 or nic_op >= len(instance.nics):
6183           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6184                                      " are 0 to %d" %
6185                                      (nic_op, len(instance.nics)))
6186         old_nic_params = instance.nics[nic_op].nicparams
6187         old_nic_ip = instance.nics[nic_op].ip
6188       else:
6189         old_nic_params = {}
6190         old_nic_ip = None
6191
6192       update_params_dict = dict([(key, nic_dict[key])
6193                                  for key in constants.NICS_PARAMETERS
6194                                  if key in nic_dict])
6195
6196       if 'bridge' in nic_dict:
6197         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6198
6199       new_nic_params, new_filled_nic_params = \
6200           self._GetUpdatedParams(old_nic_params, update_params_dict,
6201                                  cluster.nicparams[constants.PP_DEFAULT],
6202                                  constants.NICS_PARAMETER_TYPES)
6203       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6204       self.nic_pinst[nic_op] = new_nic_params
6205       self.nic_pnew[nic_op] = new_filled_nic_params
6206       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6207
6208       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6209         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6210         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6211         msg = result.RemoteFailMsg()
6212         if msg:
6213           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6214           if self.force:
6215             self.warn.append(msg)
6216           else:
6217             raise errors.OpPrereqError(msg)
6218       if new_nic_mode == constants.NIC_MODE_ROUTED:
6219         if 'ip' in nic_dict:
6220           nic_ip = nic_dict['ip']
6221         else:
6222           nic_ip = old_nic_ip
6223         if nic_ip is None:
6224           raise errors.OpPrereqError('Cannot set the nic ip to None'
6225                                      ' on a routed nic')
6226       if 'mac' in nic_dict:
6227         nic_mac = nic_dict['mac']
6228         if nic_mac is None:
6229           raise errors.OpPrereqError('Cannot set the nic mac to None')
6230         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6231           # otherwise generate the mac
6232           nic_dict['mac'] = self.cfg.GenerateMAC()
6233         else:
6234           # or validate/reserve the current one
6235           if self.cfg.IsMacInUse(nic_mac):
6236             raise errors.OpPrereqError("MAC address %s already in use"
6237                                        " in cluster" % nic_mac)
6238
6239     # DISK processing
6240     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6241       raise errors.OpPrereqError("Disk operations not supported for"
6242                                  " diskless instances")
6243     for disk_op, disk_dict in self.op.disks:
6244       if disk_op == constants.DDM_REMOVE:
6245         if len(instance.disks) == 1:
6246           raise errors.OpPrereqError("Cannot remove the last disk of"
6247                                      " an instance")
6248         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6249         ins_l = ins_l[pnode]
6250         msg = ins_l.RemoteFailMsg()
6251         if msg:
6252           raise errors.OpPrereqError("Can't contact node %s: %s" %
6253                                      (pnode, msg))
6254         if instance.name in ins_l.payload:
6255           raise errors.OpPrereqError("Instance is running, can't remove"
6256                                      " disks.")
6257
6258       if (disk_op == constants.DDM_ADD and
6259           len(instance.nics) >= constants.MAX_DISKS):
6260         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6261                                    " add more" % constants.MAX_DISKS)
6262       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6263         # an existing disk
6264         if disk_op < 0 or disk_op >= len(instance.disks):
6265           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6266                                      " are 0 to %d" %
6267                                      (disk_op, len(instance.disks)))
6268
6269     return
6270
6271   def Exec(self, feedback_fn):
6272     """Modifies an instance.
6273
6274     All parameters take effect only at the next restart of the instance.
6275
6276     """
6277     # Process here the warnings from CheckPrereq, as we don't have a
6278     # feedback_fn there.
6279     for warn in self.warn:
6280       feedback_fn("WARNING: %s" % warn)
6281
6282     result = []
6283     instance = self.instance
6284     cluster = self.cluster
6285     # disk changes
6286     for disk_op, disk_dict in self.op.disks:
6287       if disk_op == constants.DDM_REMOVE:
6288         # remove the last disk
6289         device = instance.disks.pop()
6290         device_idx = len(instance.disks)
6291         for node, disk in device.ComputeNodeTree(instance.primary_node):
6292           self.cfg.SetDiskID(disk, node)
6293           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6294           if msg:
6295             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6296                             " continuing anyway", device_idx, node, msg)
6297         result.append(("disk/%d" % device_idx, "remove"))
6298       elif disk_op == constants.DDM_ADD:
6299         # add a new disk
6300         if instance.disk_template == constants.DT_FILE:
6301           file_driver, file_path = instance.disks[0].logical_id
6302           file_path = os.path.dirname(file_path)
6303         else:
6304           file_driver = file_path = None
6305         disk_idx_base = len(instance.disks)
6306         new_disk = _GenerateDiskTemplate(self,
6307                                          instance.disk_template,
6308                                          instance.name, instance.primary_node,
6309                                          instance.secondary_nodes,
6310                                          [disk_dict],
6311                                          file_path,
6312                                          file_driver,
6313                                          disk_idx_base)[0]
6314         instance.disks.append(new_disk)
6315         info = _GetInstanceInfoText(instance)
6316
6317         logging.info("Creating volume %s for instance %s",
6318                      new_disk.iv_name, instance.name)
6319         # Note: this needs to be kept in sync with _CreateDisks
6320         #HARDCODE
6321         for node in instance.all_nodes:
6322           f_create = node == instance.primary_node
6323           try:
6324             _CreateBlockDev(self, node, instance, new_disk,
6325                             f_create, info, f_create)
6326           except errors.OpExecError, err:
6327             self.LogWarning("Failed to create volume %s (%s) on"
6328                             " node %s: %s",
6329                             new_disk.iv_name, new_disk, node, err)
6330         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6331                        (new_disk.size, new_disk.mode)))
6332       else:
6333         # change a given disk
6334         instance.disks[disk_op].mode = disk_dict['mode']
6335         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6336     # NIC changes
6337     for nic_op, nic_dict in self.op.nics:
6338       if nic_op == constants.DDM_REMOVE:
6339         # remove the last nic
6340         del instance.nics[-1]
6341         result.append(("nic.%d" % len(instance.nics), "remove"))
6342       elif nic_op == constants.DDM_ADD:
6343         # mac and bridge should be set, by now
6344         mac = nic_dict['mac']
6345         ip = nic_dict.get('ip', None)
6346         nicparams = self.nic_pinst[constants.DDM_ADD]
6347         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6348         instance.nics.append(new_nic)
6349         result.append(("nic.%d" % (len(instance.nics) - 1),
6350                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6351                        (new_nic.mac, new_nic.ip,
6352                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6353                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6354                        )))
6355       else:
6356         for key in 'mac', 'ip':
6357           if key in nic_dict:
6358             setattr(instance.nics[nic_op], key, nic_dict[key])
6359         if nic_op in self.nic_pnew:
6360           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6361         for key, val in nic_dict.iteritems():
6362           result.append(("nic.%s/%d" % (key, nic_op), val))
6363
6364     # hvparams changes
6365     if self.op.hvparams:
6366       instance.hvparams = self.hv_inst
6367       for key, val in self.op.hvparams.iteritems():
6368         result.append(("hv/%s" % key, val))
6369
6370     # beparams changes
6371     if self.op.beparams:
6372       instance.beparams = self.be_inst
6373       for key, val in self.op.beparams.iteritems():
6374         result.append(("be/%s" % key, val))
6375
6376     self.cfg.Update(instance)
6377
6378     return result
6379
6380
6381 class LUQueryExports(NoHooksLU):
6382   """Query the exports list
6383
6384   """
6385   _OP_REQP = ['nodes']
6386   REQ_BGL = False
6387
6388   def ExpandNames(self):
6389     self.needed_locks = {}
6390     self.share_locks[locking.LEVEL_NODE] = 1
6391     if not self.op.nodes:
6392       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6393     else:
6394       self.needed_locks[locking.LEVEL_NODE] = \
6395         _GetWantedNodes(self, self.op.nodes)
6396
6397   def CheckPrereq(self):
6398     """Check prerequisites.
6399
6400     """
6401     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6402
6403   def Exec(self, feedback_fn):
6404     """Compute the list of all the exported system images.
6405
6406     @rtype: dict
6407     @return: a dictionary with the structure node->(export-list)
6408         where export-list is a list of the instances exported on
6409         that node.
6410
6411     """
6412     rpcresult = self.rpc.call_export_list(self.nodes)
6413     result = {}
6414     for node in rpcresult:
6415       if rpcresult[node].RemoteFailMsg():
6416         result[node] = False
6417       else:
6418         result[node] = rpcresult[node].payload
6419
6420     return result
6421
6422
6423 class LUExportInstance(LogicalUnit):
6424   """Export an instance to an image in the cluster.
6425
6426   """
6427   HPATH = "instance-export"
6428   HTYPE = constants.HTYPE_INSTANCE
6429   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6430   REQ_BGL = False
6431
6432   def ExpandNames(self):
6433     self._ExpandAndLockInstance()
6434     # FIXME: lock only instance primary and destination node
6435     #
6436     # Sad but true, for now we have do lock all nodes, as we don't know where
6437     # the previous export might be, and and in this LU we search for it and
6438     # remove it from its current node. In the future we could fix this by:
6439     #  - making a tasklet to search (share-lock all), then create the new one,
6440     #    then one to remove, after
6441     #  - removing the removal operation altoghether
6442     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6443
6444   def DeclareLocks(self, level):
6445     """Last minute lock declaration."""
6446     # All nodes are locked anyway, so nothing to do here.
6447
6448   def BuildHooksEnv(self):
6449     """Build hooks env.
6450
6451     This will run on the master, primary node and target node.
6452
6453     """
6454     env = {
6455       "EXPORT_NODE": self.op.target_node,
6456       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6457       }
6458     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6459     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6460           self.op.target_node]
6461     return env, nl, nl
6462
6463   def CheckPrereq(self):
6464     """Check prerequisites.
6465
6466     This checks that the instance and node names are valid.
6467
6468     """
6469     instance_name = self.op.instance_name
6470     self.instance = self.cfg.GetInstanceInfo(instance_name)
6471     assert self.instance is not None, \
6472           "Cannot retrieve locked instance %s" % self.op.instance_name
6473     _CheckNodeOnline(self, self.instance.primary_node)
6474
6475     self.dst_node = self.cfg.GetNodeInfo(
6476       self.cfg.ExpandNodeName(self.op.target_node))
6477
6478     if self.dst_node is None:
6479       # This is wrong node name, not a non-locked node
6480       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6481     _CheckNodeOnline(self, self.dst_node.name)
6482     _CheckNodeNotDrained(self, self.dst_node.name)
6483
6484     # instance disk type verification
6485     for disk in self.instance.disks:
6486       if disk.dev_type == constants.LD_FILE:
6487         raise errors.OpPrereqError("Export not supported for instances with"
6488                                    " file-based disks")
6489
6490   def Exec(self, feedback_fn):
6491     """Export an instance to an image in the cluster.
6492
6493     """
6494     instance = self.instance
6495     dst_node = self.dst_node
6496     src_node = instance.primary_node
6497     if self.op.shutdown:
6498       # shutdown the instance, but not the disks
6499       result = self.rpc.call_instance_shutdown(src_node, instance)
6500       msg = result.RemoteFailMsg()
6501       if msg:
6502         raise errors.OpExecError("Could not shutdown instance %s on"
6503                                  " node %s: %s" %
6504                                  (instance.name, src_node, msg))
6505
6506     vgname = self.cfg.GetVGName()
6507
6508     snap_disks = []
6509
6510     # set the disks ID correctly since call_instance_start needs the
6511     # correct drbd minor to create the symlinks
6512     for disk in instance.disks:
6513       self.cfg.SetDiskID(disk, src_node)
6514
6515     try:
6516       for disk in instance.disks:
6517         # result.payload will be a snapshot of an lvm leaf of the one we passed
6518         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6519         msg = result.RemoteFailMsg()
6520         if msg:
6521           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6522                           disk.logical_id[1], src_node, msg)
6523           snap_disks.append(False)
6524         else:
6525           disk_id = (vgname, result.payload)
6526           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6527                                  logical_id=disk_id, physical_id=disk_id,
6528                                  iv_name=disk.iv_name)
6529           snap_disks.append(new_dev)
6530
6531     finally:
6532       if self.op.shutdown and instance.admin_up:
6533         result = self.rpc.call_instance_start(src_node, instance, None, None)
6534         msg = result.RemoteFailMsg()
6535         if msg:
6536           _ShutdownInstanceDisks(self, instance)
6537           raise errors.OpExecError("Could not start instance: %s" % msg)
6538
6539     # TODO: check for size
6540
6541     cluster_name = self.cfg.GetClusterName()
6542     for idx, dev in enumerate(snap_disks):
6543       if dev:
6544         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6545                                                instance, cluster_name, idx)
6546         msg = result.RemoteFailMsg()
6547         if msg:
6548           self.LogWarning("Could not export block device %s from node %s to"
6549                           " node %s: %s", dev.logical_id[1], src_node,
6550                           dst_node.name, msg)
6551         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6552         if msg:
6553           self.LogWarning("Could not remove snapshot block device %s from node"
6554                           " %s: %s", dev.logical_id[1], src_node, msg)
6555
6556     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6557     msg = result.RemoteFailMsg()
6558     if msg:
6559       self.LogWarning("Could not finalize export for instance %s"
6560                       " on node %s: %s", instance.name, dst_node.name, msg)
6561
6562     nodelist = self.cfg.GetNodeList()
6563     nodelist.remove(dst_node.name)
6564
6565     # on one-node clusters nodelist will be empty after the removal
6566     # if we proceed the backup would be removed because OpQueryExports
6567     # substitutes an empty list with the full cluster node list.
6568     iname = instance.name
6569     if nodelist:
6570       exportlist = self.rpc.call_export_list(nodelist)
6571       for node in exportlist:
6572         if exportlist[node].RemoteFailMsg():
6573           continue
6574         if iname in exportlist[node].payload:
6575           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6576           if msg:
6577             self.LogWarning("Could not remove older export for instance %s"
6578                             " on node %s: %s", iname, node, msg)
6579
6580
6581 class LURemoveExport(NoHooksLU):
6582   """Remove exports related to the named instance.
6583
6584   """
6585   _OP_REQP = ["instance_name"]
6586   REQ_BGL = False
6587
6588   def ExpandNames(self):
6589     self.needed_locks = {}
6590     # We need all nodes to be locked in order for RemoveExport to work, but we
6591     # don't need to lock the instance itself, as nothing will happen to it (and
6592     # we can remove exports also for a removed instance)
6593     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6594
6595   def CheckPrereq(self):
6596     """Check prerequisites.
6597     """
6598     pass
6599
6600   def Exec(self, feedback_fn):
6601     """Remove any export.
6602
6603     """
6604     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6605     # If the instance was not found we'll try with the name that was passed in.
6606     # This will only work if it was an FQDN, though.
6607     fqdn_warn = False
6608     if not instance_name:
6609       fqdn_warn = True
6610       instance_name = self.op.instance_name
6611
6612     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6613     exportlist = self.rpc.call_export_list(locked_nodes)
6614     found = False
6615     for node in exportlist:
6616       msg = exportlist[node].RemoteFailMsg()
6617       if msg:
6618         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6619         continue
6620       if instance_name in exportlist[node].payload:
6621         found = True
6622         result = self.rpc.call_export_remove(node, instance_name)
6623         msg = result.RemoteFailMsg()
6624         if msg:
6625           logging.error("Could not remove export for instance %s"
6626                         " on node %s: %s", instance_name, node, msg)
6627
6628     if fqdn_warn and not found:
6629       feedback_fn("Export not found. If trying to remove an export belonging"
6630                   " to a deleted instance please use its Fully Qualified"
6631                   " Domain Name.")
6632
6633
6634 class TagsLU(NoHooksLU):
6635   """Generic tags LU.
6636
6637   This is an abstract class which is the parent of all the other tags LUs.
6638
6639   """
6640
6641   def ExpandNames(self):
6642     self.needed_locks = {}
6643     if self.op.kind == constants.TAG_NODE:
6644       name = self.cfg.ExpandNodeName(self.op.name)
6645       if name is None:
6646         raise errors.OpPrereqError("Invalid node name (%s)" %
6647                                    (self.op.name,))
6648       self.op.name = name
6649       self.needed_locks[locking.LEVEL_NODE] = name
6650     elif self.op.kind == constants.TAG_INSTANCE:
6651       name = self.cfg.ExpandInstanceName(self.op.name)
6652       if name is None:
6653         raise errors.OpPrereqError("Invalid instance name (%s)" %
6654                                    (self.op.name,))
6655       self.op.name = name
6656       self.needed_locks[locking.LEVEL_INSTANCE] = name
6657
6658   def CheckPrereq(self):
6659     """Check prerequisites.
6660
6661     """
6662     if self.op.kind == constants.TAG_CLUSTER:
6663       self.target = self.cfg.GetClusterInfo()
6664     elif self.op.kind == constants.TAG_NODE:
6665       self.target = self.cfg.GetNodeInfo(self.op.name)
6666     elif self.op.kind == constants.TAG_INSTANCE:
6667       self.target = self.cfg.GetInstanceInfo(self.op.name)
6668     else:
6669       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6670                                  str(self.op.kind))
6671
6672
6673 class LUGetTags(TagsLU):
6674   """Returns the tags of a given object.
6675
6676   """
6677   _OP_REQP = ["kind", "name"]
6678   REQ_BGL = False
6679
6680   def Exec(self, feedback_fn):
6681     """Returns the tag list.
6682
6683     """
6684     return list(self.target.GetTags())
6685
6686
6687 class LUSearchTags(NoHooksLU):
6688   """Searches the tags for a given pattern.
6689
6690   """
6691   _OP_REQP = ["pattern"]
6692   REQ_BGL = False
6693
6694   def ExpandNames(self):
6695     self.needed_locks = {}
6696
6697   def CheckPrereq(self):
6698     """Check prerequisites.
6699
6700     This checks the pattern passed for validity by compiling it.
6701
6702     """
6703     try:
6704       self.re = re.compile(self.op.pattern)
6705     except re.error, err:
6706       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6707                                  (self.op.pattern, err))
6708
6709   def Exec(self, feedback_fn):
6710     """Returns the tag list.
6711
6712     """
6713     cfg = self.cfg
6714     tgts = [("/cluster", cfg.GetClusterInfo())]
6715     ilist = cfg.GetAllInstancesInfo().values()
6716     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6717     nlist = cfg.GetAllNodesInfo().values()
6718     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6719     results = []
6720     for path, target in tgts:
6721       for tag in target.GetTags():
6722         if self.re.search(tag):
6723           results.append((path, tag))
6724     return results
6725
6726
6727 class LUAddTags(TagsLU):
6728   """Sets a tag on a given object.
6729
6730   """
6731   _OP_REQP = ["kind", "name", "tags"]
6732   REQ_BGL = False
6733
6734   def CheckPrereq(self):
6735     """Check prerequisites.
6736
6737     This checks the type and length of the tag name and value.
6738
6739     """
6740     TagsLU.CheckPrereq(self)
6741     for tag in self.op.tags:
6742       objects.TaggableObject.ValidateTag(tag)
6743
6744   def Exec(self, feedback_fn):
6745     """Sets the tag.
6746
6747     """
6748     try:
6749       for tag in self.op.tags:
6750         self.target.AddTag(tag)
6751     except errors.TagError, err:
6752       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6753     try:
6754       self.cfg.Update(self.target)
6755     except errors.ConfigurationError:
6756       raise errors.OpRetryError("There has been a modification to the"
6757                                 " config file and the operation has been"
6758                                 " aborted. Please retry.")
6759
6760
6761 class LUDelTags(TagsLU):
6762   """Delete a list of tags from a given object.
6763
6764   """
6765   _OP_REQP = ["kind", "name", "tags"]
6766   REQ_BGL = False
6767
6768   def CheckPrereq(self):
6769     """Check prerequisites.
6770
6771     This checks that we have the given tag.
6772
6773     """
6774     TagsLU.CheckPrereq(self)
6775     for tag in self.op.tags:
6776       objects.TaggableObject.ValidateTag(tag)
6777     del_tags = frozenset(self.op.tags)
6778     cur_tags = self.target.GetTags()
6779     if not del_tags <= cur_tags:
6780       diff_tags = del_tags - cur_tags
6781       diff_names = ["'%s'" % tag for tag in diff_tags]
6782       diff_names.sort()
6783       raise errors.OpPrereqError("Tag(s) %s not found" %
6784                                  (",".join(diff_names)))
6785
6786   def Exec(self, feedback_fn):
6787     """Remove the tag from the object.
6788
6789     """
6790     for tag in self.op.tags:
6791       self.target.RemoveTag(tag)
6792     try:
6793       self.cfg.Update(self.target)
6794     except errors.ConfigurationError:
6795       raise errors.OpRetryError("There has been a modification to the"
6796                                 " config file and the operation has been"
6797                                 " aborted. Please retry.")
6798
6799
6800 class LUTestDelay(NoHooksLU):
6801   """Sleep for a specified amount of time.
6802
6803   This LU sleeps on the master and/or nodes for a specified amount of
6804   time.
6805
6806   """
6807   _OP_REQP = ["duration", "on_master", "on_nodes"]
6808   REQ_BGL = False
6809
6810   def ExpandNames(self):
6811     """Expand names and set required locks.
6812
6813     This expands the node list, if any.
6814
6815     """
6816     self.needed_locks = {}
6817     if self.op.on_nodes:
6818       # _GetWantedNodes can be used here, but is not always appropriate to use
6819       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6820       # more information.
6821       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6822       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6823
6824   def CheckPrereq(self):
6825     """Check prerequisites.
6826
6827     """
6828
6829   def Exec(self, feedback_fn):
6830     """Do the actual sleep.
6831
6832     """
6833     if self.op.on_master:
6834       if not utils.TestDelay(self.op.duration):
6835         raise errors.OpExecError("Error during master delay test")
6836     if self.op.on_nodes:
6837       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6838       if not result:
6839         raise errors.OpExecError("Complete failure from rpc call")
6840       for node, node_result in result.items():
6841         node_result.Raise()
6842         if not node_result.data:
6843           raise errors.OpExecError("Failure during rpc call to node %s,"
6844                                    " result: %s" % (node, node_result.data))
6845
6846
6847 class IAllocator(object):
6848   """IAllocator framework.
6849
6850   An IAllocator instance has three sets of attributes:
6851     - cfg that is needed to query the cluster
6852     - input data (all members of the _KEYS class attribute are required)
6853     - four buffer attributes (in|out_data|text), that represent the
6854       input (to the external script) in text and data structure format,
6855       and the output from it, again in two formats
6856     - the result variables from the script (success, info, nodes) for
6857       easy usage
6858
6859   """
6860   _ALLO_KEYS = [
6861     "mem_size", "disks", "disk_template",
6862     "os", "tags", "nics", "vcpus", "hypervisor",
6863     ]
6864   _RELO_KEYS = [
6865     "relocate_from",
6866     ]
6867
6868   def __init__(self, lu, mode, name, **kwargs):
6869     self.lu = lu
6870     # init buffer variables
6871     self.in_text = self.out_text = self.in_data = self.out_data = None
6872     # init all input fields so that pylint is happy
6873     self.mode = mode
6874     self.name = name
6875     self.mem_size = self.disks = self.disk_template = None
6876     self.os = self.tags = self.nics = self.vcpus = None
6877     self.hypervisor = None
6878     self.relocate_from = None
6879     # computed fields
6880     self.required_nodes = None
6881     # init result fields
6882     self.success = self.info = self.nodes = None
6883     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6884       keyset = self._ALLO_KEYS
6885     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6886       keyset = self._RELO_KEYS
6887     else:
6888       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6889                                    " IAllocator" % self.mode)
6890     for key in kwargs:
6891       if key not in keyset:
6892         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6893                                      " IAllocator" % key)
6894       setattr(self, key, kwargs[key])
6895     for key in keyset:
6896       if key not in kwargs:
6897         raise errors.ProgrammerError("Missing input parameter '%s' to"
6898                                      " IAllocator" % key)
6899     self._BuildInputData()
6900
6901   def _ComputeClusterData(self):
6902     """Compute the generic allocator input data.
6903
6904     This is the data that is independent of the actual operation.
6905
6906     """
6907     cfg = self.lu.cfg
6908     cluster_info = cfg.GetClusterInfo()
6909     # cluster data
6910     data = {
6911       "version": constants.IALLOCATOR_VERSION,
6912       "cluster_name": cfg.GetClusterName(),
6913       "cluster_tags": list(cluster_info.GetTags()),
6914       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6915       # we don't have job IDs
6916       }
6917     iinfo = cfg.GetAllInstancesInfo().values()
6918     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6919
6920     # node data
6921     node_results = {}
6922     node_list = cfg.GetNodeList()
6923
6924     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6925       hypervisor_name = self.hypervisor
6926     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6927       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6928
6929     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6930                                            hypervisor_name)
6931     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6932                        cluster_info.enabled_hypervisors)
6933     for nname, nresult in node_data.items():
6934       # first fill in static (config-based) values
6935       ninfo = cfg.GetNodeInfo(nname)
6936       pnr = {
6937         "tags": list(ninfo.GetTags()),
6938         "primary_ip": ninfo.primary_ip,
6939         "secondary_ip": ninfo.secondary_ip,
6940         "offline": ninfo.offline,
6941         "drained": ninfo.drained,
6942         "master_candidate": ninfo.master_candidate,
6943         }
6944
6945       if not ninfo.offline:
6946         msg = nresult.RemoteFailMsg()
6947         if msg:
6948           raise errors.OpExecError("Can't get data for node %s: %s" %
6949                                    (nname, msg))
6950         msg = node_iinfo[nname].RemoteFailMsg()
6951         if msg:
6952           raise errors.OpExecError("Can't get node instance info"
6953                                    " from node %s: %s" % (nname, msg))
6954         remote_info = nresult.payload
6955         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6956                      'vg_size', 'vg_free', 'cpu_total']:
6957           if attr not in remote_info:
6958             raise errors.OpExecError("Node '%s' didn't return attribute"
6959                                      " '%s'" % (nname, attr))
6960           if not isinstance(remote_info[attr], int):
6961             raise errors.OpExecError("Node '%s' returned invalid value"
6962                                      " for '%s': %s" %
6963                                      (nname, attr, remote_info[attr]))
6964         # compute memory used by primary instances
6965         i_p_mem = i_p_up_mem = 0
6966         for iinfo, beinfo in i_list:
6967           if iinfo.primary_node == nname:
6968             i_p_mem += beinfo[constants.BE_MEMORY]
6969             if iinfo.name not in node_iinfo[nname].payload:
6970               i_used_mem = 0
6971             else:
6972               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6973             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6974             remote_info['memory_free'] -= max(0, i_mem_diff)
6975
6976             if iinfo.admin_up:
6977               i_p_up_mem += beinfo[constants.BE_MEMORY]
6978
6979         # compute memory used by instances
6980         pnr_dyn = {
6981           "total_memory": remote_info['memory_total'],
6982           "reserved_memory": remote_info['memory_dom0'],
6983           "free_memory": remote_info['memory_free'],
6984           "total_disk": remote_info['vg_size'],
6985           "free_disk": remote_info['vg_free'],
6986           "total_cpus": remote_info['cpu_total'],
6987           "i_pri_memory": i_p_mem,
6988           "i_pri_up_memory": i_p_up_mem,
6989           }
6990         pnr.update(pnr_dyn)
6991
6992       node_results[nname] = pnr
6993     data["nodes"] = node_results
6994
6995     # instance data
6996     instance_data = {}
6997     for iinfo, beinfo in i_list:
6998       nic_data = []
6999       for nic in iinfo.nics:
7000         filled_params = objects.FillDict(
7001             cluster_info.nicparams[constants.PP_DEFAULT],
7002             nic.nicparams)
7003         nic_dict = {"mac": nic.mac,
7004                     "ip": nic.ip,
7005                     "mode": filled_params[constants.NIC_MODE],
7006                     "link": filled_params[constants.NIC_LINK],
7007                    }
7008         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7009           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7010         nic_data.append(nic_dict)
7011       pir = {
7012         "tags": list(iinfo.GetTags()),
7013         "admin_up": iinfo.admin_up,
7014         "vcpus": beinfo[constants.BE_VCPUS],
7015         "memory": beinfo[constants.BE_MEMORY],
7016         "os": iinfo.os,
7017         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7018         "nics": nic_data,
7019         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7020         "disk_template": iinfo.disk_template,
7021         "hypervisor": iinfo.hypervisor,
7022         }
7023       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7024                                                  pir["disks"])
7025       instance_data[iinfo.name] = pir
7026
7027     data["instances"] = instance_data
7028
7029     self.in_data = data
7030
7031   def _AddNewInstance(self):
7032     """Add new instance data to allocator structure.
7033
7034     This in combination with _AllocatorGetClusterData will create the
7035     correct structure needed as input for the allocator.
7036
7037     The checks for the completeness of the opcode must have already been
7038     done.
7039
7040     """
7041     data = self.in_data
7042
7043     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7044
7045     if self.disk_template in constants.DTS_NET_MIRROR:
7046       self.required_nodes = 2
7047     else:
7048       self.required_nodes = 1
7049     request = {
7050       "type": "allocate",
7051       "name": self.name,
7052       "disk_template": self.disk_template,
7053       "tags": self.tags,
7054       "os": self.os,
7055       "vcpus": self.vcpus,
7056       "memory": self.mem_size,
7057       "disks": self.disks,
7058       "disk_space_total": disk_space,
7059       "nics": self.nics,
7060       "required_nodes": self.required_nodes,
7061       }
7062     data["request"] = request
7063
7064   def _AddRelocateInstance(self):
7065     """Add relocate instance data to allocator structure.
7066
7067     This in combination with _IAllocatorGetClusterData will create the
7068     correct structure needed as input for the allocator.
7069
7070     The checks for the completeness of the opcode must have already been
7071     done.
7072
7073     """
7074     instance = self.lu.cfg.GetInstanceInfo(self.name)
7075     if instance is None:
7076       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7077                                    " IAllocator" % self.name)
7078
7079     if instance.disk_template not in constants.DTS_NET_MIRROR:
7080       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7081
7082     if len(instance.secondary_nodes) != 1:
7083       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7084
7085     self.required_nodes = 1
7086     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7087     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7088
7089     request = {
7090       "type": "relocate",
7091       "name": self.name,
7092       "disk_space_total": disk_space,
7093       "required_nodes": self.required_nodes,
7094       "relocate_from": self.relocate_from,
7095       }
7096     self.in_data["request"] = request
7097
7098   def _BuildInputData(self):
7099     """Build input data structures.
7100
7101     """
7102     self._ComputeClusterData()
7103
7104     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7105       self._AddNewInstance()
7106     else:
7107       self._AddRelocateInstance()
7108
7109     self.in_text = serializer.Dump(self.in_data)
7110
7111   def Run(self, name, validate=True, call_fn=None):
7112     """Run an instance allocator and return the results.
7113
7114     """
7115     if call_fn is None:
7116       call_fn = self.lu.rpc.call_iallocator_runner
7117     data = self.in_text
7118
7119     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7120     result.Raise()
7121
7122     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7123       raise errors.OpExecError("Invalid result from master iallocator runner")
7124
7125     rcode, stdout, stderr, fail = result.data
7126
7127     if rcode == constants.IARUN_NOTFOUND:
7128       raise errors.OpExecError("Can't find allocator '%s'" % name)
7129     elif rcode == constants.IARUN_FAILURE:
7130       raise errors.OpExecError("Instance allocator call failed: %s,"
7131                                " output: %s" % (fail, stdout+stderr))
7132     self.out_text = stdout
7133     if validate:
7134       self._ValidateResult()
7135
7136   def _ValidateResult(self):
7137     """Process the allocator results.
7138
7139     This will process and if successful save the result in
7140     self.out_data and the other parameters.
7141
7142     """
7143     try:
7144       rdict = serializer.Load(self.out_text)
7145     except Exception, err:
7146       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7147
7148     if not isinstance(rdict, dict):
7149       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7150
7151     for key in "success", "info", "nodes":
7152       if key not in rdict:
7153         raise errors.OpExecError("Can't parse iallocator results:"
7154                                  " missing key '%s'" % key)
7155       setattr(self, key, rdict[key])
7156
7157     if not isinstance(rdict["nodes"], list):
7158       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7159                                " is not a list")
7160     self.out_data = rdict
7161
7162
7163 class LUTestAllocator(NoHooksLU):
7164   """Run allocator tests.
7165
7166   This LU runs the allocator tests
7167
7168   """
7169   _OP_REQP = ["direction", "mode", "name"]
7170
7171   def CheckPrereq(self):
7172     """Check prerequisites.
7173
7174     This checks the opcode parameters depending on the director and mode test.
7175
7176     """
7177     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7178       for attr in ["name", "mem_size", "disks", "disk_template",
7179                    "os", "tags", "nics", "vcpus"]:
7180         if not hasattr(self.op, attr):
7181           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7182                                      attr)
7183       iname = self.cfg.ExpandInstanceName(self.op.name)
7184       if iname is not None:
7185         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7186                                    iname)
7187       if not isinstance(self.op.nics, list):
7188         raise errors.OpPrereqError("Invalid parameter 'nics'")
7189       for row in self.op.nics:
7190         if (not isinstance(row, dict) or
7191             "mac" not in row or
7192             "ip" not in row or
7193             "bridge" not in row):
7194           raise errors.OpPrereqError("Invalid contents of the"
7195                                      " 'nics' parameter")
7196       if not isinstance(self.op.disks, list):
7197         raise errors.OpPrereqError("Invalid parameter 'disks'")
7198       for row in self.op.disks:
7199         if (not isinstance(row, dict) or
7200             "size" not in row or
7201             not isinstance(row["size"], int) or
7202             "mode" not in row or
7203             row["mode"] not in ['r', 'w']):
7204           raise errors.OpPrereqError("Invalid contents of the"
7205                                      " 'disks' parameter")
7206       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7207         self.op.hypervisor = self.cfg.GetHypervisorType()
7208     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7209       if not hasattr(self.op, "name"):
7210         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7211       fname = self.cfg.ExpandInstanceName(self.op.name)
7212       if fname is None:
7213         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7214                                    self.op.name)
7215       self.op.name = fname
7216       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7217     else:
7218       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7219                                  self.op.mode)
7220
7221     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7222       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7223         raise errors.OpPrereqError("Missing allocator name")
7224     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7225       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7226                                  self.op.direction)
7227
7228   def Exec(self, feedback_fn):
7229     """Run the allocator test.
7230
7231     """
7232     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7233       ial = IAllocator(self,
7234                        mode=self.op.mode,
7235                        name=self.op.name,
7236                        mem_size=self.op.mem_size,
7237                        disks=self.op.disks,
7238                        disk_template=self.op.disk_template,
7239                        os=self.op.os,
7240                        tags=self.op.tags,
7241                        nics=self.op.nics,
7242                        vcpus=self.op.vcpus,
7243                        hypervisor=self.op.hypervisor,
7244                        )
7245     else:
7246       ial = IAllocator(self,
7247                        mode=self.op.mode,
7248                        name=self.op.name,
7249                        relocate_from=list(self.relocate_from),
7250                        )
7251
7252     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7253       result = ial.in_text
7254     else:
7255       ial.Run(self.op.allocator, validate=False)
7256       result = ial.out_text
7257     return result