QueryClusterInfo: don't show default_bridge
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckNicsBridgesExist(lu, target_nics, target_node,
578                                profile=constants.PP_DEFAULT):
579   """Check that the brigdes needed by a list of nics exist.
580
581   """
582   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
583   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
584                 for nic in target_nics]
585   brlist = [params[constants.NIC_LINK] for params in paramslist
586             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
587   if brlist:
588     result = lu.rpc.call_bridges_exist(target_node, brlist)
589     result.Raise()
590     if not result.data:
591       raise errors.OpPrereqError("One or more target bridges %s does not"
592                                  " exist on destination node '%s'" %
593                                  (brlist, target_node))
594
595
596 def _CheckInstanceBridgesExist(lu, instance, node=None):
597   """Check that the brigdes needed by an instance exist.
598
599   """
600   if node is None:
601     node=instance.primary_node
602   _CheckNicsBridgesExist(lu, instance.nics, node)
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signalled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
752                         " '%s'" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just rone in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analize the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     to_act = set()
1294     for node in nodes:
1295       # node_volume
1296       lvs = node_lvs[node]
1297       if lvs.failed:
1298         if not lvs.offline:
1299           self.LogWarning("Connection to node %s failed: %s" %
1300                           (node, lvs.data))
1301         continue
1302       lvs = lvs.data
1303       if isinstance(lvs, basestring):
1304         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1305         res_nlvm[node] = lvs
1306         continue
1307       elif not isinstance(lvs, dict):
1308         logging.warning("Connection to node %s failed or invalid data"
1309                         " returned", node)
1310         res_nodes.append(node)
1311         continue
1312
1313       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1314         inst = nv_dict.pop((node, lv_name), None)
1315         if (not lv_online and inst is not None
1316             and inst.name not in res_instances):
1317           res_instances.append(inst.name)
1318
1319     # any leftover items in nv_dict are missing LVs, let's arrange the
1320     # data better
1321     for key, inst in nv_dict.iteritems():
1322       if inst.name not in res_missing:
1323         res_missing[inst.name] = []
1324       res_missing[inst.name].append(key)
1325
1326     return result
1327
1328
1329 class LURenameCluster(LogicalUnit):
1330   """Rename the cluster.
1331
1332   """
1333   HPATH = "cluster-rename"
1334   HTYPE = constants.HTYPE_CLUSTER
1335   _OP_REQP = ["name"]
1336
1337   def BuildHooksEnv(self):
1338     """Build hooks env.
1339
1340     """
1341     env = {
1342       "OP_TARGET": self.cfg.GetClusterName(),
1343       "NEW_NAME": self.op.name,
1344       }
1345     mn = self.cfg.GetMasterNode()
1346     return env, [mn], [mn]
1347
1348   def CheckPrereq(self):
1349     """Verify that the passed name is a valid one.
1350
1351     """
1352     hostname = utils.HostInfo(self.op.name)
1353
1354     new_name = hostname.name
1355     self.ip = new_ip = hostname.ip
1356     old_name = self.cfg.GetClusterName()
1357     old_ip = self.cfg.GetMasterIP()
1358     if new_name == old_name and new_ip == old_ip:
1359       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1360                                  " cluster has changed")
1361     if new_ip != old_ip:
1362       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1363         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1364                                    " reachable on the network. Aborting." %
1365                                    new_ip)
1366
1367     self.op.name = new_name
1368
1369   def Exec(self, feedback_fn):
1370     """Rename the cluster.
1371
1372     """
1373     clustername = self.op.name
1374     ip = self.ip
1375
1376     # shutdown the master IP
1377     master = self.cfg.GetMasterNode()
1378     result = self.rpc.call_node_stop_master(master, False)
1379     if result.failed or not result.data:
1380       raise errors.OpExecError("Could not disable the master role")
1381
1382     try:
1383       cluster = self.cfg.GetClusterInfo()
1384       cluster.cluster_name = clustername
1385       cluster.master_ip = ip
1386       self.cfg.Update(cluster)
1387
1388       # update the known hosts file
1389       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1390       node_list = self.cfg.GetNodeList()
1391       try:
1392         node_list.remove(master)
1393       except ValueError:
1394         pass
1395       result = self.rpc.call_upload_file(node_list,
1396                                          constants.SSH_KNOWN_HOSTS_FILE)
1397       for to_node, to_result in result.iteritems():
1398          msg = to_result.RemoteFailMsg()
1399          if msg:
1400            msg = ("Copy of file %s to node %s failed: %s" %
1401                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1402            self.proc.LogWarning(msg)
1403
1404     finally:
1405       result = self.rpc.call_node_start_master(master, False)
1406       if result.failed or not result.data:
1407         self.LogWarning("Could not re-enable the master role on"
1408                         " the master, please restart manually.")
1409
1410
1411 def _RecursiveCheckIfLVMBased(disk):
1412   """Check if the given disk or its children are lvm-based.
1413
1414   @type disk: L{objects.Disk}
1415   @param disk: the disk to check
1416   @rtype: booleean
1417   @return: boolean indicating whether a LD_LV dev_type was found or not
1418
1419   """
1420   if disk.children:
1421     for chdisk in disk.children:
1422       if _RecursiveCheckIfLVMBased(chdisk):
1423         return True
1424   return disk.dev_type == constants.LD_LV
1425
1426
1427 class LUSetClusterParams(LogicalUnit):
1428   """Change the parameters of the cluster.
1429
1430   """
1431   HPATH = "cluster-modify"
1432   HTYPE = constants.HTYPE_CLUSTER
1433   _OP_REQP = []
1434   REQ_BGL = False
1435
1436   def CheckArguments(self):
1437     """Check parameters
1438
1439     """
1440     if not hasattr(self.op, "candidate_pool_size"):
1441       self.op.candidate_pool_size = None
1442     if self.op.candidate_pool_size is not None:
1443       try:
1444         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1445       except (ValueError, TypeError), err:
1446         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1447                                    str(err))
1448       if self.op.candidate_pool_size < 1:
1449         raise errors.OpPrereqError("At least one master candidate needed")
1450
1451   def ExpandNames(self):
1452     # FIXME: in the future maybe other cluster params won't require checking on
1453     # all nodes to be modified.
1454     self.needed_locks = {
1455       locking.LEVEL_NODE: locking.ALL_SET,
1456     }
1457     self.share_locks[locking.LEVEL_NODE] = 1
1458
1459   def BuildHooksEnv(self):
1460     """Build hooks env.
1461
1462     """
1463     env = {
1464       "OP_TARGET": self.cfg.GetClusterName(),
1465       "NEW_VG_NAME": self.op.vg_name,
1466       }
1467     mn = self.cfg.GetMasterNode()
1468     return env, [mn], [mn]
1469
1470   def CheckPrereq(self):
1471     """Check prerequisites.
1472
1473     This checks whether the given params don't conflict and
1474     if the given volume group is valid.
1475
1476     """
1477     if self.op.vg_name is not None and not self.op.vg_name:
1478       instances = self.cfg.GetAllInstancesInfo().values()
1479       for inst in instances:
1480         for disk in inst.disks:
1481           if _RecursiveCheckIfLVMBased(disk):
1482             raise errors.OpPrereqError("Cannot disable lvm storage while"
1483                                        " lvm-based instances exist")
1484
1485     node_list = self.acquired_locks[locking.LEVEL_NODE]
1486
1487     # if vg_name not None, checks given volume group on all nodes
1488     if self.op.vg_name:
1489       vglist = self.rpc.call_vg_list(node_list)
1490       for node in node_list:
1491         if vglist[node].failed:
1492           # ignoring down node
1493           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1494           continue
1495         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1496                                               self.op.vg_name,
1497                                               constants.MIN_VG_SIZE)
1498         if vgstatus:
1499           raise errors.OpPrereqError("Error on node '%s': %s" %
1500                                      (node, vgstatus))
1501
1502     self.cluster = cluster = self.cfg.GetClusterInfo()
1503     # validate params changes
1504     if self.op.beparams:
1505       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1506       self.new_beparams = objects.FillDict(
1507         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1508
1509     if self.op.nicparams:
1510       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1511       self.new_nicparams = objects.FillDict(
1512         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1513       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1514
1515     # hypervisor list/parameters
1516     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1517     if self.op.hvparams:
1518       if not isinstance(self.op.hvparams, dict):
1519         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1520       for hv_name, hv_dict in self.op.hvparams.items():
1521         if hv_name not in self.new_hvparams:
1522           self.new_hvparams[hv_name] = hv_dict
1523         else:
1524           self.new_hvparams[hv_name].update(hv_dict)
1525
1526     if self.op.enabled_hypervisors is not None:
1527       self.hv_list = self.op.enabled_hypervisors
1528     else:
1529       self.hv_list = cluster.enabled_hypervisors
1530
1531     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1532       # either the enabled list has changed, or the parameters have, validate
1533       for hv_name, hv_params in self.new_hvparams.items():
1534         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1535             (self.op.enabled_hypervisors and
1536              hv_name in self.op.enabled_hypervisors)):
1537           # either this is a new hypervisor, or its parameters have changed
1538           hv_class = hypervisor.GetHypervisor(hv_name)
1539           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1540           hv_class.CheckParameterSyntax(hv_params)
1541           _CheckHVParams(self, node_list, hv_name, hv_params)
1542
1543   def Exec(self, feedback_fn):
1544     """Change the parameters of the cluster.
1545
1546     """
1547     if self.op.vg_name is not None:
1548       new_volume = self.op.vg_name
1549       if not new_volume:
1550         new_volume = None
1551       if new_volume != self.cfg.GetVGName():
1552         self.cfg.SetVGName(new_volume)
1553       else:
1554         feedback_fn("Cluster LVM configuration already in desired"
1555                     " state, not changing")
1556     if self.op.hvparams:
1557       self.cluster.hvparams = self.new_hvparams
1558     if self.op.enabled_hypervisors is not None:
1559       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1560     if self.op.beparams:
1561       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1562     if self.op.nicparams:
1563       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1564
1565     if self.op.candidate_pool_size is not None:
1566       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1567
1568     self.cfg.Update(self.cluster)
1569
1570     # we want to update nodes after the cluster so that if any errors
1571     # happen, we have recorded and saved the cluster info
1572     if self.op.candidate_pool_size is not None:
1573       _AdjustCandidatePool(self)
1574
1575
1576 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1577   """Distribute additional files which are part of the cluster configuration.
1578
1579   ConfigWriter takes care of distributing the config and ssconf files, but
1580   there are more files which should be distributed to all nodes. This function
1581   makes sure those are copied.
1582
1583   @param lu: calling logical unit
1584   @param additional_nodes: list of nodes not in the config to distribute to
1585
1586   """
1587   # 1. Gather target nodes
1588   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1589   dist_nodes = lu.cfg.GetNodeList()
1590   if additional_nodes is not None:
1591     dist_nodes.extend(additional_nodes)
1592   if myself.name in dist_nodes:
1593     dist_nodes.remove(myself.name)
1594   # 2. Gather files to distribute
1595   dist_files = set([constants.ETC_HOSTS,
1596                     constants.SSH_KNOWN_HOSTS_FILE,
1597                     constants.RAPI_CERT_FILE,
1598                     constants.RAPI_USERS_FILE,
1599                    ])
1600
1601   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1602   for hv_name in enabled_hypervisors:
1603     hv_class = hypervisor.GetHypervisor(hv_name)
1604     dist_files.update(hv_class.GetAncillaryFiles())
1605
1606   # 3. Perform the files upload
1607   for fname in dist_files:
1608     if os.path.exists(fname):
1609       result = lu.rpc.call_upload_file(dist_nodes, fname)
1610       for to_node, to_result in result.items():
1611          msg = to_result.RemoteFailMsg()
1612          if msg:
1613            msg = ("Copy of file %s to node %s failed: %s" %
1614                    (fname, to_node, msg))
1615            lu.proc.LogWarning(msg)
1616
1617
1618 class LURedistributeConfig(NoHooksLU):
1619   """Force the redistribution of cluster configuration.
1620
1621   This is a very simple LU.
1622
1623   """
1624   _OP_REQP = []
1625   REQ_BGL = False
1626
1627   def ExpandNames(self):
1628     self.needed_locks = {
1629       locking.LEVEL_NODE: locking.ALL_SET,
1630     }
1631     self.share_locks[locking.LEVEL_NODE] = 1
1632
1633   def CheckPrereq(self):
1634     """Check prerequisites.
1635
1636     """
1637
1638   def Exec(self, feedback_fn):
1639     """Redistribute the configuration.
1640
1641     """
1642     self.cfg.Update(self.cfg.GetClusterInfo())
1643     _RedistributeAncillaryFiles(self)
1644
1645
1646 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1647   """Sleep and poll for an instance's disk to sync.
1648
1649   """
1650   if not instance.disks:
1651     return True
1652
1653   if not oneshot:
1654     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1655
1656   node = instance.primary_node
1657
1658   for dev in instance.disks:
1659     lu.cfg.SetDiskID(dev, node)
1660
1661   retries = 0
1662   while True:
1663     max_time = 0
1664     done = True
1665     cumul_degraded = False
1666     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1667     if rstats.failed or not rstats.data:
1668       lu.LogWarning("Can't get any data from node %s", node)
1669       retries += 1
1670       if retries >= 10:
1671         raise errors.RemoteError("Can't contact node %s for mirror data,"
1672                                  " aborting." % node)
1673       time.sleep(6)
1674       continue
1675     rstats = rstats.data
1676     retries = 0
1677     for i, mstat in enumerate(rstats):
1678       if mstat is None:
1679         lu.LogWarning("Can't compute data for node %s/%s",
1680                            node, instance.disks[i].iv_name)
1681         continue
1682       # we ignore the ldisk parameter
1683       perc_done, est_time, is_degraded, _ = mstat
1684       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1685       if perc_done is not None:
1686         done = False
1687         if est_time is not None:
1688           rem_time = "%d estimated seconds remaining" % est_time
1689           max_time = est_time
1690         else:
1691           rem_time = "no time estimate"
1692         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1693                         (instance.disks[i].iv_name, perc_done, rem_time))
1694     if done or oneshot:
1695       break
1696
1697     time.sleep(min(60, max_time))
1698
1699   if done:
1700     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1701   return not cumul_degraded
1702
1703
1704 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1705   """Check that mirrors are not degraded.
1706
1707   The ldisk parameter, if True, will change the test from the
1708   is_degraded attribute (which represents overall non-ok status for
1709   the device(s)) to the ldisk (representing the local storage status).
1710
1711   """
1712   lu.cfg.SetDiskID(dev, node)
1713   if ldisk:
1714     idx = 6
1715   else:
1716     idx = 5
1717
1718   result = True
1719   if on_primary or dev.AssembleOnSecondary():
1720     rstats = lu.rpc.call_blockdev_find(node, dev)
1721     msg = rstats.RemoteFailMsg()
1722     if msg:
1723       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1724       result = False
1725     elif not rstats.payload:
1726       lu.LogWarning("Can't find disk on node %s", node)
1727       result = False
1728     else:
1729       result = result and (not rstats.payload[idx])
1730   if dev.children:
1731     for child in dev.children:
1732       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1733
1734   return result
1735
1736
1737 class LUDiagnoseOS(NoHooksLU):
1738   """Logical unit for OS diagnose/query.
1739
1740   """
1741   _OP_REQP = ["output_fields", "names"]
1742   REQ_BGL = False
1743   _FIELDS_STATIC = utils.FieldSet()
1744   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1745
1746   def ExpandNames(self):
1747     if self.op.names:
1748       raise errors.OpPrereqError("Selective OS query not supported")
1749
1750     _CheckOutputFields(static=self._FIELDS_STATIC,
1751                        dynamic=self._FIELDS_DYNAMIC,
1752                        selected=self.op.output_fields)
1753
1754     # Lock all nodes, in shared mode
1755     # Temporary removal of locks, should be reverted later
1756     # TODO: reintroduce locks when they are lighter-weight
1757     self.needed_locks = {}
1758     #self.share_locks[locking.LEVEL_NODE] = 1
1759     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1760
1761   def CheckPrereq(self):
1762     """Check prerequisites.
1763
1764     """
1765
1766   @staticmethod
1767   def _DiagnoseByOS(node_list, rlist):
1768     """Remaps a per-node return list into an a per-os per-node dictionary
1769
1770     @param node_list: a list with the names of all nodes
1771     @param rlist: a map with node names as keys and OS objects as values
1772
1773     @rtype: dict
1774     @return: a dictionary with osnames as keys and as value another map, with
1775         nodes as keys and list of OS objects as values, eg::
1776
1777           {"debian-etch": {"node1": [<object>,...],
1778                            "node2": [<object>,]}
1779           }
1780
1781     """
1782     all_os = {}
1783     # we build here the list of nodes that didn't fail the RPC (at RPC
1784     # level), so that nodes with a non-responding node daemon don't
1785     # make all OSes invalid
1786     good_nodes = [node_name for node_name in rlist
1787                   if not rlist[node_name].failed]
1788     for node_name, nr in rlist.iteritems():
1789       if nr.failed or not nr.data:
1790         continue
1791       for os_obj in nr.data:
1792         if os_obj.name not in all_os:
1793           # build a list of nodes for this os containing empty lists
1794           # for each node in node_list
1795           all_os[os_obj.name] = {}
1796           for nname in good_nodes:
1797             all_os[os_obj.name][nname] = []
1798         all_os[os_obj.name][node_name].append(os_obj)
1799     return all_os
1800
1801   def Exec(self, feedback_fn):
1802     """Compute the list of OSes.
1803
1804     """
1805     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1806     node_data = self.rpc.call_os_diagnose(valid_nodes)
1807     if node_data == False:
1808       raise errors.OpExecError("Can't gather the list of OSes")
1809     pol = self._DiagnoseByOS(valid_nodes, node_data)
1810     output = []
1811     for os_name, os_data in pol.iteritems():
1812       row = []
1813       for field in self.op.output_fields:
1814         if field == "name":
1815           val = os_name
1816         elif field == "valid":
1817           val = utils.all([osl and osl[0] for osl in os_data.values()])
1818         elif field == "node_status":
1819           val = {}
1820           for node_name, nos_list in os_data.iteritems():
1821             val[node_name] = [(v.status, v.path) for v in nos_list]
1822         else:
1823           raise errors.ParameterError(field)
1824         row.append(val)
1825       output.append(row)
1826
1827     return output
1828
1829
1830 class LURemoveNode(LogicalUnit):
1831   """Logical unit for removing a node.
1832
1833   """
1834   HPATH = "node-remove"
1835   HTYPE = constants.HTYPE_NODE
1836   _OP_REQP = ["node_name"]
1837
1838   def BuildHooksEnv(self):
1839     """Build hooks env.
1840
1841     This doesn't run on the target node in the pre phase as a failed
1842     node would then be impossible to remove.
1843
1844     """
1845     env = {
1846       "OP_TARGET": self.op.node_name,
1847       "NODE_NAME": self.op.node_name,
1848       }
1849     all_nodes = self.cfg.GetNodeList()
1850     all_nodes.remove(self.op.node_name)
1851     return env, all_nodes, all_nodes
1852
1853   def CheckPrereq(self):
1854     """Check prerequisites.
1855
1856     This checks:
1857      - the node exists in the configuration
1858      - it does not have primary or secondary instances
1859      - it's not the master
1860
1861     Any errors are signalled by raising errors.OpPrereqError.
1862
1863     """
1864     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1865     if node is None:
1866       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1867
1868     instance_list = self.cfg.GetInstanceList()
1869
1870     masternode = self.cfg.GetMasterNode()
1871     if node.name == masternode:
1872       raise errors.OpPrereqError("Node is the master node,"
1873                                  " you need to failover first.")
1874
1875     for instance_name in instance_list:
1876       instance = self.cfg.GetInstanceInfo(instance_name)
1877       if node.name in instance.all_nodes:
1878         raise errors.OpPrereqError("Instance %s is still running on the node,"
1879                                    " please remove first." % instance_name)
1880     self.op.node_name = node.name
1881     self.node = node
1882
1883   def Exec(self, feedback_fn):
1884     """Removes the node from the cluster.
1885
1886     """
1887     node = self.node
1888     logging.info("Stopping the node daemon and removing configs from node %s",
1889                  node.name)
1890
1891     self.context.RemoveNode(node.name)
1892
1893     self.rpc.call_node_leave_cluster(node.name)
1894
1895     # Promote nodes to master candidate as needed
1896     _AdjustCandidatePool(self)
1897
1898
1899 class LUQueryNodes(NoHooksLU):
1900   """Logical unit for querying nodes.
1901
1902   """
1903   _OP_REQP = ["output_fields", "names", "use_locking"]
1904   REQ_BGL = False
1905   _FIELDS_DYNAMIC = utils.FieldSet(
1906     "dtotal", "dfree",
1907     "mtotal", "mnode", "mfree",
1908     "bootid",
1909     "ctotal", "cnodes", "csockets",
1910     )
1911
1912   _FIELDS_STATIC = utils.FieldSet(
1913     "name", "pinst_cnt", "sinst_cnt",
1914     "pinst_list", "sinst_list",
1915     "pip", "sip", "tags",
1916     "serial_no",
1917     "master_candidate",
1918     "master",
1919     "offline",
1920     "drained",
1921     )
1922
1923   def ExpandNames(self):
1924     _CheckOutputFields(static=self._FIELDS_STATIC,
1925                        dynamic=self._FIELDS_DYNAMIC,
1926                        selected=self.op.output_fields)
1927
1928     self.needed_locks = {}
1929     self.share_locks[locking.LEVEL_NODE] = 1
1930
1931     if self.op.names:
1932       self.wanted = _GetWantedNodes(self, self.op.names)
1933     else:
1934       self.wanted = locking.ALL_SET
1935
1936     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1937     self.do_locking = self.do_node_query and self.op.use_locking
1938     if self.do_locking:
1939       # if we don't request only static fields, we need to lock the nodes
1940       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1941
1942
1943   def CheckPrereq(self):
1944     """Check prerequisites.
1945
1946     """
1947     # The validation of the node list is done in the _GetWantedNodes,
1948     # if non empty, and if empty, there's no validation to do
1949     pass
1950
1951   def Exec(self, feedback_fn):
1952     """Computes the list of nodes and their attributes.
1953
1954     """
1955     all_info = self.cfg.GetAllNodesInfo()
1956     if self.do_locking:
1957       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1958     elif self.wanted != locking.ALL_SET:
1959       nodenames = self.wanted
1960       missing = set(nodenames).difference(all_info.keys())
1961       if missing:
1962         raise errors.OpExecError(
1963           "Some nodes were removed before retrieving their data: %s" % missing)
1964     else:
1965       nodenames = all_info.keys()
1966
1967     nodenames = utils.NiceSort(nodenames)
1968     nodelist = [all_info[name] for name in nodenames]
1969
1970     # begin data gathering
1971
1972     if self.do_node_query:
1973       live_data = {}
1974       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1975                                           self.cfg.GetHypervisorType())
1976       for name in nodenames:
1977         nodeinfo = node_data[name]
1978         if not nodeinfo.failed and nodeinfo.data:
1979           nodeinfo = nodeinfo.data
1980           fn = utils.TryConvert
1981           live_data[name] = {
1982             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1983             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1984             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1985             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1986             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1987             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1988             "bootid": nodeinfo.get('bootid', None),
1989             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1990             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1991             }
1992         else:
1993           live_data[name] = {}
1994     else:
1995       live_data = dict.fromkeys(nodenames, {})
1996
1997     node_to_primary = dict([(name, set()) for name in nodenames])
1998     node_to_secondary = dict([(name, set()) for name in nodenames])
1999
2000     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2001                              "sinst_cnt", "sinst_list"))
2002     if inst_fields & frozenset(self.op.output_fields):
2003       instancelist = self.cfg.GetInstanceList()
2004
2005       for instance_name in instancelist:
2006         inst = self.cfg.GetInstanceInfo(instance_name)
2007         if inst.primary_node in node_to_primary:
2008           node_to_primary[inst.primary_node].add(inst.name)
2009         for secnode in inst.secondary_nodes:
2010           if secnode in node_to_secondary:
2011             node_to_secondary[secnode].add(inst.name)
2012
2013     master_node = self.cfg.GetMasterNode()
2014
2015     # end data gathering
2016
2017     output = []
2018     for node in nodelist:
2019       node_output = []
2020       for field in self.op.output_fields:
2021         if field == "name":
2022           val = node.name
2023         elif field == "pinst_list":
2024           val = list(node_to_primary[node.name])
2025         elif field == "sinst_list":
2026           val = list(node_to_secondary[node.name])
2027         elif field == "pinst_cnt":
2028           val = len(node_to_primary[node.name])
2029         elif field == "sinst_cnt":
2030           val = len(node_to_secondary[node.name])
2031         elif field == "pip":
2032           val = node.primary_ip
2033         elif field == "sip":
2034           val = node.secondary_ip
2035         elif field == "tags":
2036           val = list(node.GetTags())
2037         elif field == "serial_no":
2038           val = node.serial_no
2039         elif field == "master_candidate":
2040           val = node.master_candidate
2041         elif field == "master":
2042           val = node.name == master_node
2043         elif field == "offline":
2044           val = node.offline
2045         elif field == "drained":
2046           val = node.drained
2047         elif self._FIELDS_DYNAMIC.Matches(field):
2048           val = live_data[node.name].get(field, None)
2049         else:
2050           raise errors.ParameterError(field)
2051         node_output.append(val)
2052       output.append(node_output)
2053
2054     return output
2055
2056
2057 class LUQueryNodeVolumes(NoHooksLU):
2058   """Logical unit for getting volumes on node(s).
2059
2060   """
2061   _OP_REQP = ["nodes", "output_fields"]
2062   REQ_BGL = False
2063   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2064   _FIELDS_STATIC = utils.FieldSet("node")
2065
2066   def ExpandNames(self):
2067     _CheckOutputFields(static=self._FIELDS_STATIC,
2068                        dynamic=self._FIELDS_DYNAMIC,
2069                        selected=self.op.output_fields)
2070
2071     self.needed_locks = {}
2072     self.share_locks[locking.LEVEL_NODE] = 1
2073     if not self.op.nodes:
2074       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2075     else:
2076       self.needed_locks[locking.LEVEL_NODE] = \
2077         _GetWantedNodes(self, self.op.nodes)
2078
2079   def CheckPrereq(self):
2080     """Check prerequisites.
2081
2082     This checks that the fields required are valid output fields.
2083
2084     """
2085     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2086
2087   def Exec(self, feedback_fn):
2088     """Computes the list of nodes and their attributes.
2089
2090     """
2091     nodenames = self.nodes
2092     volumes = self.rpc.call_node_volumes(nodenames)
2093
2094     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2095              in self.cfg.GetInstanceList()]
2096
2097     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2098
2099     output = []
2100     for node in nodenames:
2101       if node not in volumes or volumes[node].failed or not volumes[node].data:
2102         continue
2103
2104       node_vols = volumes[node].data[:]
2105       node_vols.sort(key=lambda vol: vol['dev'])
2106
2107       for vol in node_vols:
2108         node_output = []
2109         for field in self.op.output_fields:
2110           if field == "node":
2111             val = node
2112           elif field == "phys":
2113             val = vol['dev']
2114           elif field == "vg":
2115             val = vol['vg']
2116           elif field == "name":
2117             val = vol['name']
2118           elif field == "size":
2119             val = int(float(vol['size']))
2120           elif field == "instance":
2121             for inst in ilist:
2122               if node not in lv_by_node[inst]:
2123                 continue
2124               if vol['name'] in lv_by_node[inst][node]:
2125                 val = inst.name
2126                 break
2127             else:
2128               val = '-'
2129           else:
2130             raise errors.ParameterError(field)
2131           node_output.append(str(val))
2132
2133         output.append(node_output)
2134
2135     return output
2136
2137
2138 class LUAddNode(LogicalUnit):
2139   """Logical unit for adding node to the cluster.
2140
2141   """
2142   HPATH = "node-add"
2143   HTYPE = constants.HTYPE_NODE
2144   _OP_REQP = ["node_name"]
2145
2146   def BuildHooksEnv(self):
2147     """Build hooks env.
2148
2149     This will run on all nodes before, and on all nodes + the new node after.
2150
2151     """
2152     env = {
2153       "OP_TARGET": self.op.node_name,
2154       "NODE_NAME": self.op.node_name,
2155       "NODE_PIP": self.op.primary_ip,
2156       "NODE_SIP": self.op.secondary_ip,
2157       }
2158     nodes_0 = self.cfg.GetNodeList()
2159     nodes_1 = nodes_0 + [self.op.node_name, ]
2160     return env, nodes_0, nodes_1
2161
2162   def CheckPrereq(self):
2163     """Check prerequisites.
2164
2165     This checks:
2166      - the new node is not already in the config
2167      - it is resolvable
2168      - its parameters (single/dual homed) matches the cluster
2169
2170     Any errors are signalled by raising errors.OpPrereqError.
2171
2172     """
2173     node_name = self.op.node_name
2174     cfg = self.cfg
2175
2176     dns_data = utils.HostInfo(node_name)
2177
2178     node = dns_data.name
2179     primary_ip = self.op.primary_ip = dns_data.ip
2180     secondary_ip = getattr(self.op, "secondary_ip", None)
2181     if secondary_ip is None:
2182       secondary_ip = primary_ip
2183     if not utils.IsValidIP(secondary_ip):
2184       raise errors.OpPrereqError("Invalid secondary IP given")
2185     self.op.secondary_ip = secondary_ip
2186
2187     node_list = cfg.GetNodeList()
2188     if not self.op.readd and node in node_list:
2189       raise errors.OpPrereqError("Node %s is already in the configuration" %
2190                                  node)
2191     elif self.op.readd and node not in node_list:
2192       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2193
2194     for existing_node_name in node_list:
2195       existing_node = cfg.GetNodeInfo(existing_node_name)
2196
2197       if self.op.readd and node == existing_node_name:
2198         if (existing_node.primary_ip != primary_ip or
2199             existing_node.secondary_ip != secondary_ip):
2200           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2201                                      " address configuration as before")
2202         continue
2203
2204       if (existing_node.primary_ip == primary_ip or
2205           existing_node.secondary_ip == primary_ip or
2206           existing_node.primary_ip == secondary_ip or
2207           existing_node.secondary_ip == secondary_ip):
2208         raise errors.OpPrereqError("New node ip address(es) conflict with"
2209                                    " existing node %s" % existing_node.name)
2210
2211     # check that the type of the node (single versus dual homed) is the
2212     # same as for the master
2213     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2214     master_singlehomed = myself.secondary_ip == myself.primary_ip
2215     newbie_singlehomed = secondary_ip == primary_ip
2216     if master_singlehomed != newbie_singlehomed:
2217       if master_singlehomed:
2218         raise errors.OpPrereqError("The master has no private ip but the"
2219                                    " new node has one")
2220       else:
2221         raise errors.OpPrereqError("The master has a private ip but the"
2222                                    " new node doesn't have one")
2223
2224     # checks reachablity
2225     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2226       raise errors.OpPrereqError("Node not reachable by ping")
2227
2228     if not newbie_singlehomed:
2229       # check reachability from my secondary ip to newbie's secondary ip
2230       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2231                            source=myself.secondary_ip):
2232         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2233                                    " based ping to noded port")
2234
2235     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2236     mc_now, _ = self.cfg.GetMasterCandidateStats()
2237     master_candidate = mc_now < cp_size
2238
2239     self.new_node = objects.Node(name=node,
2240                                  primary_ip=primary_ip,
2241                                  secondary_ip=secondary_ip,
2242                                  master_candidate=master_candidate,
2243                                  offline=False, drained=False)
2244
2245   def Exec(self, feedback_fn):
2246     """Adds the new node to the cluster.
2247
2248     """
2249     new_node = self.new_node
2250     node = new_node.name
2251
2252     # check connectivity
2253     result = self.rpc.call_version([node])[node]
2254     result.Raise()
2255     if result.data:
2256       if constants.PROTOCOL_VERSION == result.data:
2257         logging.info("Communication to node %s fine, sw version %s match",
2258                      node, result.data)
2259       else:
2260         raise errors.OpExecError("Version mismatch master version %s,"
2261                                  " node version %s" %
2262                                  (constants.PROTOCOL_VERSION, result.data))
2263     else:
2264       raise errors.OpExecError("Cannot get version from the new node")
2265
2266     # setup ssh on node
2267     logging.info("Copy ssh key to node %s", node)
2268     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2269     keyarray = []
2270     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2271                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2272                 priv_key, pub_key]
2273
2274     for i in keyfiles:
2275       f = open(i, 'r')
2276       try:
2277         keyarray.append(f.read())
2278       finally:
2279         f.close()
2280
2281     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2282                                     keyarray[2],
2283                                     keyarray[3], keyarray[4], keyarray[5])
2284
2285     msg = result.RemoteFailMsg()
2286     if msg:
2287       raise errors.OpExecError("Cannot transfer ssh keys to the"
2288                                " new node: %s" % msg)
2289
2290     # Add node to our /etc/hosts, and add key to known_hosts
2291     if self.cfg.GetClusterInfo().modify_etc_hosts:
2292       utils.AddHostToEtcHosts(new_node.name)
2293
2294     if new_node.secondary_ip != new_node.primary_ip:
2295       result = self.rpc.call_node_has_ip_address(new_node.name,
2296                                                  new_node.secondary_ip)
2297       if result.failed or not result.data:
2298         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2299                                  " you gave (%s). Please fix and re-run this"
2300                                  " command." % new_node.secondary_ip)
2301
2302     node_verify_list = [self.cfg.GetMasterNode()]
2303     node_verify_param = {
2304       'nodelist': [node],
2305       # TODO: do a node-net-test as well?
2306     }
2307
2308     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2309                                        self.cfg.GetClusterName())
2310     for verifier in node_verify_list:
2311       if result[verifier].failed or not result[verifier].data:
2312         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2313                                  " for remote verification" % verifier)
2314       if result[verifier].data['nodelist']:
2315         for failed in result[verifier].data['nodelist']:
2316           feedback_fn("ssh/hostname verification failed %s -> %s" %
2317                       (verifier, result[verifier].data['nodelist'][failed]))
2318         raise errors.OpExecError("ssh/hostname verification failed.")
2319
2320     if self.op.readd:
2321       _RedistributeAncillaryFiles(self)
2322       self.context.ReaddNode(new_node)
2323     else:
2324       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2325       self.context.AddNode(new_node)
2326
2327
2328 class LUSetNodeParams(LogicalUnit):
2329   """Modifies the parameters of a node.
2330
2331   """
2332   HPATH = "node-modify"
2333   HTYPE = constants.HTYPE_NODE
2334   _OP_REQP = ["node_name"]
2335   REQ_BGL = False
2336
2337   def CheckArguments(self):
2338     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2339     if node_name is None:
2340       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2341     self.op.node_name = node_name
2342     _CheckBooleanOpField(self.op, 'master_candidate')
2343     _CheckBooleanOpField(self.op, 'offline')
2344     _CheckBooleanOpField(self.op, 'drained')
2345     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2346     if all_mods.count(None) == 3:
2347       raise errors.OpPrereqError("Please pass at least one modification")
2348     if all_mods.count(True) > 1:
2349       raise errors.OpPrereqError("Can't set the node into more than one"
2350                                  " state at the same time")
2351
2352   def ExpandNames(self):
2353     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2354
2355   def BuildHooksEnv(self):
2356     """Build hooks env.
2357
2358     This runs on the master node.
2359
2360     """
2361     env = {
2362       "OP_TARGET": self.op.node_name,
2363       "MASTER_CANDIDATE": str(self.op.master_candidate),
2364       "OFFLINE": str(self.op.offline),
2365       "DRAINED": str(self.op.drained),
2366       }
2367     nl = [self.cfg.GetMasterNode(),
2368           self.op.node_name]
2369     return env, nl, nl
2370
2371   def CheckPrereq(self):
2372     """Check prerequisites.
2373
2374     This only checks the instance list against the existing names.
2375
2376     """
2377     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2378
2379     if ((self.op.master_candidate == False or self.op.offline == True or
2380          self.op.drained == True) and node.master_candidate):
2381       # we will demote the node from master_candidate
2382       if self.op.node_name == self.cfg.GetMasterNode():
2383         raise errors.OpPrereqError("The master node has to be a"
2384                                    " master candidate, online and not drained")
2385       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2386       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2387       if num_candidates <= cp_size:
2388         msg = ("Not enough master candidates (desired"
2389                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2390         if self.op.force:
2391           self.LogWarning(msg)
2392         else:
2393           raise errors.OpPrereqError(msg)
2394
2395     if (self.op.master_candidate == True and
2396         ((node.offline and not self.op.offline == False) or
2397          (node.drained and not self.op.drained == False))):
2398       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2399                                  " to master_candidate" % node.name)
2400
2401     return
2402
2403   def Exec(self, feedback_fn):
2404     """Modifies a node.
2405
2406     """
2407     node = self.node
2408
2409     result = []
2410     changed_mc = False
2411
2412     if self.op.offline is not None:
2413       node.offline = self.op.offline
2414       result.append(("offline", str(self.op.offline)))
2415       if self.op.offline == True:
2416         if node.master_candidate:
2417           node.master_candidate = False
2418           changed_mc = True
2419           result.append(("master_candidate", "auto-demotion due to offline"))
2420         if node.drained:
2421           node.drained = False
2422           result.append(("drained", "clear drained status due to offline"))
2423
2424     if self.op.master_candidate is not None:
2425       node.master_candidate = self.op.master_candidate
2426       changed_mc = True
2427       result.append(("master_candidate", str(self.op.master_candidate)))
2428       if self.op.master_candidate == False:
2429         rrc = self.rpc.call_node_demote_from_mc(node.name)
2430         msg = rrc.RemoteFailMsg()
2431         if msg:
2432           self.LogWarning("Node failed to demote itself: %s" % msg)
2433
2434     if self.op.drained is not None:
2435       node.drained = self.op.drained
2436       result.append(("drained", str(self.op.drained)))
2437       if self.op.drained == True:
2438         if node.master_candidate:
2439           node.master_candidate = False
2440           changed_mc = True
2441           result.append(("master_candidate", "auto-demotion due to drain"))
2442         if node.offline:
2443           node.offline = False
2444           result.append(("offline", "clear offline status due to drain"))
2445
2446     # this will trigger configuration file update, if needed
2447     self.cfg.Update(node)
2448     # this will trigger job queue propagation or cleanup
2449     if changed_mc:
2450       self.context.ReaddNode(node)
2451
2452     return result
2453
2454
2455 class LUPowercycleNode(NoHooksLU):
2456   """Powercycles a node.
2457
2458   """
2459   _OP_REQP = ["node_name", "force"]
2460   REQ_BGL = False
2461
2462   def CheckArguments(self):
2463     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2464     if node_name is None:
2465       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2466     self.op.node_name = node_name
2467     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2468       raise errors.OpPrereqError("The node is the master and the force"
2469                                  " parameter was not set")
2470
2471   def ExpandNames(self):
2472     """Locking for PowercycleNode.
2473
2474     This is a last-resource option and shouldn't block on other
2475     jobs. Therefore, we grab no locks.
2476
2477     """
2478     self.needed_locks = {}
2479
2480   def CheckPrereq(self):
2481     """Check prerequisites.
2482
2483     This LU has no prereqs.
2484
2485     """
2486     pass
2487
2488   def Exec(self, feedback_fn):
2489     """Reboots a node.
2490
2491     """
2492     result = self.rpc.call_node_powercycle(self.op.node_name,
2493                                            self.cfg.GetHypervisorType())
2494     msg = result.RemoteFailMsg()
2495     if msg:
2496       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2497     return result.payload
2498
2499
2500 class LUQueryClusterInfo(NoHooksLU):
2501   """Query cluster configuration.
2502
2503   """
2504   _OP_REQP = []
2505   REQ_BGL = False
2506
2507   def ExpandNames(self):
2508     self.needed_locks = {}
2509
2510   def CheckPrereq(self):
2511     """No prerequsites needed for this LU.
2512
2513     """
2514     pass
2515
2516   def Exec(self, feedback_fn):
2517     """Return cluster config.
2518
2519     """
2520     cluster = self.cfg.GetClusterInfo()
2521     result = {
2522       "software_version": constants.RELEASE_VERSION,
2523       "protocol_version": constants.PROTOCOL_VERSION,
2524       "config_version": constants.CONFIG_VERSION,
2525       "os_api_version": constants.OS_API_VERSION,
2526       "export_version": constants.EXPORT_VERSION,
2527       "architecture": (platform.architecture()[0], platform.machine()),
2528       "name": cluster.cluster_name,
2529       "master": cluster.master_node,
2530       "default_hypervisor": cluster.default_hypervisor,
2531       "enabled_hypervisors": cluster.enabled_hypervisors,
2532       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2533                         for hypervisor in cluster.enabled_hypervisors]),
2534       "beparams": cluster.beparams,
2535       "nicparams": cluster.nicparams,
2536       "candidate_pool_size": cluster.candidate_pool_size,
2537       "master_netdev": cluster.master_netdev,
2538       "volume_group_name": cluster.volume_group_name,
2539       "file_storage_dir": cluster.file_storage_dir,
2540       }
2541
2542     return result
2543
2544
2545 class LUQueryConfigValues(NoHooksLU):
2546   """Return configuration values.
2547
2548   """
2549   _OP_REQP = []
2550   REQ_BGL = False
2551   _FIELDS_DYNAMIC = utils.FieldSet()
2552   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2553
2554   def ExpandNames(self):
2555     self.needed_locks = {}
2556
2557     _CheckOutputFields(static=self._FIELDS_STATIC,
2558                        dynamic=self._FIELDS_DYNAMIC,
2559                        selected=self.op.output_fields)
2560
2561   def CheckPrereq(self):
2562     """No prerequisites.
2563
2564     """
2565     pass
2566
2567   def Exec(self, feedback_fn):
2568     """Dump a representation of the cluster config to the standard output.
2569
2570     """
2571     values = []
2572     for field in self.op.output_fields:
2573       if field == "cluster_name":
2574         entry = self.cfg.GetClusterName()
2575       elif field == "master_node":
2576         entry = self.cfg.GetMasterNode()
2577       elif field == "drain_flag":
2578         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2579       else:
2580         raise errors.ParameterError(field)
2581       values.append(entry)
2582     return values
2583
2584
2585 class LUActivateInstanceDisks(NoHooksLU):
2586   """Bring up an instance's disks.
2587
2588   """
2589   _OP_REQP = ["instance_name"]
2590   REQ_BGL = False
2591
2592   def ExpandNames(self):
2593     self._ExpandAndLockInstance()
2594     self.needed_locks[locking.LEVEL_NODE] = []
2595     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2596
2597   def DeclareLocks(self, level):
2598     if level == locking.LEVEL_NODE:
2599       self._LockInstancesNodes()
2600
2601   def CheckPrereq(self):
2602     """Check prerequisites.
2603
2604     This checks that the instance is in the cluster.
2605
2606     """
2607     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2608     assert self.instance is not None, \
2609       "Cannot retrieve locked instance %s" % self.op.instance_name
2610     _CheckNodeOnline(self, self.instance.primary_node)
2611
2612   def Exec(self, feedback_fn):
2613     """Activate the disks.
2614
2615     """
2616     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2617     if not disks_ok:
2618       raise errors.OpExecError("Cannot activate block devices")
2619
2620     return disks_info
2621
2622
2623 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2624   """Prepare the block devices for an instance.
2625
2626   This sets up the block devices on all nodes.
2627
2628   @type lu: L{LogicalUnit}
2629   @param lu: the logical unit on whose behalf we execute
2630   @type instance: L{objects.Instance}
2631   @param instance: the instance for whose disks we assemble
2632   @type ignore_secondaries: boolean
2633   @param ignore_secondaries: if true, errors on secondary nodes
2634       won't result in an error return from the function
2635   @return: False if the operation failed, otherwise a list of
2636       (host, instance_visible_name, node_visible_name)
2637       with the mapping from node devices to instance devices
2638
2639   """
2640   device_info = []
2641   disks_ok = True
2642   iname = instance.name
2643   # With the two passes mechanism we try to reduce the window of
2644   # opportunity for the race condition of switching DRBD to primary
2645   # before handshaking occured, but we do not eliminate it
2646
2647   # The proper fix would be to wait (with some limits) until the
2648   # connection has been made and drbd transitions from WFConnection
2649   # into any other network-connected state (Connected, SyncTarget,
2650   # SyncSource, etc.)
2651
2652   # 1st pass, assemble on all nodes in secondary mode
2653   for inst_disk in instance.disks:
2654     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2655       lu.cfg.SetDiskID(node_disk, node)
2656       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2657       msg = result.RemoteFailMsg()
2658       if msg:
2659         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2660                            " (is_primary=False, pass=1): %s",
2661                            inst_disk.iv_name, node, msg)
2662         if not ignore_secondaries:
2663           disks_ok = False
2664
2665   # FIXME: race condition on drbd migration to primary
2666
2667   # 2nd pass, do only the primary node
2668   for inst_disk in instance.disks:
2669     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2670       if node != instance.primary_node:
2671         continue
2672       lu.cfg.SetDiskID(node_disk, node)
2673       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2674       msg = result.RemoteFailMsg()
2675       if msg:
2676         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2677                            " (is_primary=True, pass=2): %s",
2678                            inst_disk.iv_name, node, msg)
2679         disks_ok = False
2680     device_info.append((instance.primary_node, inst_disk.iv_name,
2681                         result.payload))
2682
2683   # leave the disks configured for the primary node
2684   # this is a workaround that would be fixed better by
2685   # improving the logical/physical id handling
2686   for disk in instance.disks:
2687     lu.cfg.SetDiskID(disk, instance.primary_node)
2688
2689   return disks_ok, device_info
2690
2691
2692 def _StartInstanceDisks(lu, instance, force):
2693   """Start the disks of an instance.
2694
2695   """
2696   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2697                                            ignore_secondaries=force)
2698   if not disks_ok:
2699     _ShutdownInstanceDisks(lu, instance)
2700     if force is not None and not force:
2701       lu.proc.LogWarning("", hint="If the message above refers to a"
2702                          " secondary node,"
2703                          " you can retry the operation using '--force'.")
2704     raise errors.OpExecError("Disk consistency error")
2705
2706
2707 class LUDeactivateInstanceDisks(NoHooksLU):
2708   """Shutdown an instance's disks.
2709
2710   """
2711   _OP_REQP = ["instance_name"]
2712   REQ_BGL = False
2713
2714   def ExpandNames(self):
2715     self._ExpandAndLockInstance()
2716     self.needed_locks[locking.LEVEL_NODE] = []
2717     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2718
2719   def DeclareLocks(self, level):
2720     if level == locking.LEVEL_NODE:
2721       self._LockInstancesNodes()
2722
2723   def CheckPrereq(self):
2724     """Check prerequisites.
2725
2726     This checks that the instance is in the cluster.
2727
2728     """
2729     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2730     assert self.instance is not None, \
2731       "Cannot retrieve locked instance %s" % self.op.instance_name
2732
2733   def Exec(self, feedback_fn):
2734     """Deactivate the disks
2735
2736     """
2737     instance = self.instance
2738     _SafeShutdownInstanceDisks(self, instance)
2739
2740
2741 def _SafeShutdownInstanceDisks(lu, instance):
2742   """Shutdown block devices of an instance.
2743
2744   This function checks if an instance is running, before calling
2745   _ShutdownInstanceDisks.
2746
2747   """
2748   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2749                                       [instance.hypervisor])
2750   ins_l = ins_l[instance.primary_node]
2751   if ins_l.failed or not isinstance(ins_l.data, list):
2752     raise errors.OpExecError("Can't contact node '%s'" %
2753                              instance.primary_node)
2754
2755   if instance.name in ins_l.data:
2756     raise errors.OpExecError("Instance is running, can't shutdown"
2757                              " block devices.")
2758
2759   _ShutdownInstanceDisks(lu, instance)
2760
2761
2762 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2763   """Shutdown block devices of an instance.
2764
2765   This does the shutdown on all nodes of the instance.
2766
2767   If the ignore_primary is false, errors on the primary node are
2768   ignored.
2769
2770   """
2771   all_result = True
2772   for disk in instance.disks:
2773     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2774       lu.cfg.SetDiskID(top_disk, node)
2775       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2776       msg = result.RemoteFailMsg()
2777       if msg:
2778         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2779                       disk.iv_name, node, msg)
2780         if not ignore_primary or node != instance.primary_node:
2781           all_result = False
2782   return all_result
2783
2784
2785 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2786   """Checks if a node has enough free memory.
2787
2788   This function check if a given node has the needed amount of free
2789   memory. In case the node has less memory or we cannot get the
2790   information from the node, this function raise an OpPrereqError
2791   exception.
2792
2793   @type lu: C{LogicalUnit}
2794   @param lu: a logical unit from which we get configuration data
2795   @type node: C{str}
2796   @param node: the node to check
2797   @type reason: C{str}
2798   @param reason: string to use in the error message
2799   @type requested: C{int}
2800   @param requested: the amount of memory in MiB to check for
2801   @type hypervisor_name: C{str}
2802   @param hypervisor_name: the hypervisor to ask for memory stats
2803   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2804       we cannot check the node
2805
2806   """
2807   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2808   nodeinfo[node].Raise()
2809   free_mem = nodeinfo[node].data.get('memory_free')
2810   if not isinstance(free_mem, int):
2811     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2812                              " was '%s'" % (node, free_mem))
2813   if requested > free_mem:
2814     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2815                              " needed %s MiB, available %s MiB" %
2816                              (node, reason, requested, free_mem))
2817
2818
2819 class LUStartupInstance(LogicalUnit):
2820   """Starts an instance.
2821
2822   """
2823   HPATH = "instance-start"
2824   HTYPE = constants.HTYPE_INSTANCE
2825   _OP_REQP = ["instance_name", "force"]
2826   REQ_BGL = False
2827
2828   def ExpandNames(self):
2829     self._ExpandAndLockInstance()
2830
2831   def BuildHooksEnv(self):
2832     """Build hooks env.
2833
2834     This runs on master, primary and secondary nodes of the instance.
2835
2836     """
2837     env = {
2838       "FORCE": self.op.force,
2839       }
2840     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2841     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2842     return env, nl, nl
2843
2844   def CheckPrereq(self):
2845     """Check prerequisites.
2846
2847     This checks that the instance is in the cluster.
2848
2849     """
2850     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2851     assert self.instance is not None, \
2852       "Cannot retrieve locked instance %s" % self.op.instance_name
2853
2854     # extra beparams
2855     self.beparams = getattr(self.op, "beparams", {})
2856     if self.beparams:
2857       if not isinstance(self.beparams, dict):
2858         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2859                                    " dict" % (type(self.beparams), ))
2860       # fill the beparams dict
2861       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2862       self.op.beparams = self.beparams
2863
2864     # extra hvparams
2865     self.hvparams = getattr(self.op, "hvparams", {})
2866     if self.hvparams:
2867       if not isinstance(self.hvparams, dict):
2868         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2869                                    " dict" % (type(self.hvparams), ))
2870
2871       # check hypervisor parameter syntax (locally)
2872       cluster = self.cfg.GetClusterInfo()
2873       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2874       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2875                                     instance.hvparams)
2876       filled_hvp.update(self.hvparams)
2877       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2878       hv_type.CheckParameterSyntax(filled_hvp)
2879       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2880       self.op.hvparams = self.hvparams
2881
2882     _CheckNodeOnline(self, instance.primary_node)
2883
2884     bep = self.cfg.GetClusterInfo().FillBE(instance)
2885     # check bridges existance
2886     _CheckInstanceBridgesExist(self, instance)
2887
2888     remote_info = self.rpc.call_instance_info(instance.primary_node,
2889                                               instance.name,
2890                                               instance.hypervisor)
2891     remote_info.Raise()
2892     if not remote_info.data:
2893       _CheckNodeFreeMemory(self, instance.primary_node,
2894                            "starting instance %s" % instance.name,
2895                            bep[constants.BE_MEMORY], instance.hypervisor)
2896
2897   def Exec(self, feedback_fn):
2898     """Start the instance.
2899
2900     """
2901     instance = self.instance
2902     force = self.op.force
2903
2904     self.cfg.MarkInstanceUp(instance.name)
2905
2906     node_current = instance.primary_node
2907
2908     _StartInstanceDisks(self, instance, force)
2909
2910     result = self.rpc.call_instance_start(node_current, instance,
2911                                           self.hvparams, self.beparams)
2912     msg = result.RemoteFailMsg()
2913     if msg:
2914       _ShutdownInstanceDisks(self, instance)
2915       raise errors.OpExecError("Could not start instance: %s" % msg)
2916
2917
2918 class LURebootInstance(LogicalUnit):
2919   """Reboot an instance.
2920
2921   """
2922   HPATH = "instance-reboot"
2923   HTYPE = constants.HTYPE_INSTANCE
2924   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2925   REQ_BGL = False
2926
2927   def ExpandNames(self):
2928     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2929                                    constants.INSTANCE_REBOOT_HARD,
2930                                    constants.INSTANCE_REBOOT_FULL]:
2931       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2932                                   (constants.INSTANCE_REBOOT_SOFT,
2933                                    constants.INSTANCE_REBOOT_HARD,
2934                                    constants.INSTANCE_REBOOT_FULL))
2935     self._ExpandAndLockInstance()
2936
2937   def BuildHooksEnv(self):
2938     """Build hooks env.
2939
2940     This runs on master, primary and secondary nodes of the instance.
2941
2942     """
2943     env = {
2944       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2945       "REBOOT_TYPE": self.op.reboot_type,
2946       }
2947     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2948     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2949     return env, nl, nl
2950
2951   def CheckPrereq(self):
2952     """Check prerequisites.
2953
2954     This checks that the instance is in the cluster.
2955
2956     """
2957     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2958     assert self.instance is not None, \
2959       "Cannot retrieve locked instance %s" % self.op.instance_name
2960
2961     _CheckNodeOnline(self, instance.primary_node)
2962
2963     # check bridges existance
2964     _CheckInstanceBridgesExist(self, instance)
2965
2966   def Exec(self, feedback_fn):
2967     """Reboot the instance.
2968
2969     """
2970     instance = self.instance
2971     ignore_secondaries = self.op.ignore_secondaries
2972     reboot_type = self.op.reboot_type
2973
2974     node_current = instance.primary_node
2975
2976     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2977                        constants.INSTANCE_REBOOT_HARD]:
2978       for disk in instance.disks:
2979         self.cfg.SetDiskID(disk, node_current)
2980       result = self.rpc.call_instance_reboot(node_current, instance,
2981                                              reboot_type)
2982       msg = result.RemoteFailMsg()
2983       if msg:
2984         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2985     else:
2986       result = self.rpc.call_instance_shutdown(node_current, instance)
2987       msg = result.RemoteFailMsg()
2988       if msg:
2989         raise errors.OpExecError("Could not shutdown instance for"
2990                                  " full reboot: %s" % msg)
2991       _ShutdownInstanceDisks(self, instance)
2992       _StartInstanceDisks(self, instance, ignore_secondaries)
2993       result = self.rpc.call_instance_start(node_current, instance, None, None)
2994       msg = result.RemoteFailMsg()
2995       if msg:
2996         _ShutdownInstanceDisks(self, instance)
2997         raise errors.OpExecError("Could not start instance for"
2998                                  " full reboot: %s" % msg)
2999
3000     self.cfg.MarkInstanceUp(instance.name)
3001
3002
3003 class LUShutdownInstance(LogicalUnit):
3004   """Shutdown an instance.
3005
3006   """
3007   HPATH = "instance-stop"
3008   HTYPE = constants.HTYPE_INSTANCE
3009   _OP_REQP = ["instance_name"]
3010   REQ_BGL = False
3011
3012   def ExpandNames(self):
3013     self._ExpandAndLockInstance()
3014
3015   def BuildHooksEnv(self):
3016     """Build hooks env.
3017
3018     This runs on master, primary and secondary nodes of the instance.
3019
3020     """
3021     env = _BuildInstanceHookEnvByObject(self, self.instance)
3022     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3023     return env, nl, nl
3024
3025   def CheckPrereq(self):
3026     """Check prerequisites.
3027
3028     This checks that the instance is in the cluster.
3029
3030     """
3031     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3032     assert self.instance is not None, \
3033       "Cannot retrieve locked instance %s" % self.op.instance_name
3034     _CheckNodeOnline(self, self.instance.primary_node)
3035
3036   def Exec(self, feedback_fn):
3037     """Shutdown the instance.
3038
3039     """
3040     instance = self.instance
3041     node_current = instance.primary_node
3042     self.cfg.MarkInstanceDown(instance.name)
3043     result = self.rpc.call_instance_shutdown(node_current, instance)
3044     msg = result.RemoteFailMsg()
3045     if msg:
3046       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3047
3048     _ShutdownInstanceDisks(self, instance)
3049
3050
3051 class LUReinstallInstance(LogicalUnit):
3052   """Reinstall an instance.
3053
3054   """
3055   HPATH = "instance-reinstall"
3056   HTYPE = constants.HTYPE_INSTANCE
3057   _OP_REQP = ["instance_name"]
3058   REQ_BGL = False
3059
3060   def ExpandNames(self):
3061     self._ExpandAndLockInstance()
3062
3063   def BuildHooksEnv(self):
3064     """Build hooks env.
3065
3066     This runs on master, primary and secondary nodes of the instance.
3067
3068     """
3069     env = _BuildInstanceHookEnvByObject(self, self.instance)
3070     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071     return env, nl, nl
3072
3073   def CheckPrereq(self):
3074     """Check prerequisites.
3075
3076     This checks that the instance is in the cluster and is not running.
3077
3078     """
3079     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080     assert instance is not None, \
3081       "Cannot retrieve locked instance %s" % self.op.instance_name
3082     _CheckNodeOnline(self, instance.primary_node)
3083
3084     if instance.disk_template == constants.DT_DISKLESS:
3085       raise errors.OpPrereqError("Instance '%s' has no disks" %
3086                                  self.op.instance_name)
3087     if instance.admin_up:
3088       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3089                                  self.op.instance_name)
3090     remote_info = self.rpc.call_instance_info(instance.primary_node,
3091                                               instance.name,
3092                                               instance.hypervisor)
3093     remote_info.Raise()
3094     if remote_info.data:
3095       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3096                                  (self.op.instance_name,
3097                                   instance.primary_node))
3098
3099     self.op.os_type = getattr(self.op, "os_type", None)
3100     if self.op.os_type is not None:
3101       # OS verification
3102       pnode = self.cfg.GetNodeInfo(
3103         self.cfg.ExpandNodeName(instance.primary_node))
3104       if pnode is None:
3105         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3106                                    self.op.pnode)
3107       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3108       result.Raise()
3109       if not isinstance(result.data, objects.OS):
3110         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3111                                    " primary node"  % self.op.os_type)
3112
3113     self.instance = instance
3114
3115   def Exec(self, feedback_fn):
3116     """Reinstall the instance.
3117
3118     """
3119     inst = self.instance
3120
3121     if self.op.os_type is not None:
3122       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3123       inst.os = self.op.os_type
3124       self.cfg.Update(inst)
3125
3126     _StartInstanceDisks(self, inst, None)
3127     try:
3128       feedback_fn("Running the instance OS create scripts...")
3129       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3130       msg = result.RemoteFailMsg()
3131       if msg:
3132         raise errors.OpExecError("Could not install OS for instance %s"
3133                                  " on node %s: %s" %
3134                                  (inst.name, inst.primary_node, msg))
3135     finally:
3136       _ShutdownInstanceDisks(self, inst)
3137
3138
3139 class LURenameInstance(LogicalUnit):
3140   """Rename an instance.
3141
3142   """
3143   HPATH = "instance-rename"
3144   HTYPE = constants.HTYPE_INSTANCE
3145   _OP_REQP = ["instance_name", "new_name"]
3146
3147   def BuildHooksEnv(self):
3148     """Build hooks env.
3149
3150     This runs on master, primary and secondary nodes of the instance.
3151
3152     """
3153     env = _BuildInstanceHookEnvByObject(self, self.instance)
3154     env["INSTANCE_NEW_NAME"] = self.op.new_name
3155     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3156     return env, nl, nl
3157
3158   def CheckPrereq(self):
3159     """Check prerequisites.
3160
3161     This checks that the instance is in the cluster and is not running.
3162
3163     """
3164     instance = self.cfg.GetInstanceInfo(
3165       self.cfg.ExpandInstanceName(self.op.instance_name))
3166     if instance is None:
3167       raise errors.OpPrereqError("Instance '%s' not known" %
3168                                  self.op.instance_name)
3169     _CheckNodeOnline(self, instance.primary_node)
3170
3171     if instance.admin_up:
3172       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3173                                  self.op.instance_name)
3174     remote_info = self.rpc.call_instance_info(instance.primary_node,
3175                                               instance.name,
3176                                               instance.hypervisor)
3177     remote_info.Raise()
3178     if remote_info.data:
3179       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3180                                  (self.op.instance_name,
3181                                   instance.primary_node))
3182     self.instance = instance
3183
3184     # new name verification
3185     name_info = utils.HostInfo(self.op.new_name)
3186
3187     self.op.new_name = new_name = name_info.name
3188     instance_list = self.cfg.GetInstanceList()
3189     if new_name in instance_list:
3190       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3191                                  new_name)
3192
3193     if not getattr(self.op, "ignore_ip", False):
3194       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3195         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3196                                    (name_info.ip, new_name))
3197
3198
3199   def Exec(self, feedback_fn):
3200     """Reinstall the instance.
3201
3202     """
3203     inst = self.instance
3204     old_name = inst.name
3205
3206     if inst.disk_template == constants.DT_FILE:
3207       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3208
3209     self.cfg.RenameInstance(inst.name, self.op.new_name)
3210     # Change the instance lock. This is definitely safe while we hold the BGL
3211     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3212     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3213
3214     # re-read the instance from the configuration after rename
3215     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3216
3217     if inst.disk_template == constants.DT_FILE:
3218       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3219       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3220                                                      old_file_storage_dir,
3221                                                      new_file_storage_dir)
3222       result.Raise()
3223       if not result.data:
3224         raise errors.OpExecError("Could not connect to node '%s' to rename"
3225                                  " directory '%s' to '%s' (but the instance"
3226                                  " has been renamed in Ganeti)" % (
3227                                  inst.primary_node, old_file_storage_dir,
3228                                  new_file_storage_dir))
3229
3230       if not result.data[0]:
3231         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3232                                  " (but the instance has been renamed in"
3233                                  " Ganeti)" % (old_file_storage_dir,
3234                                                new_file_storage_dir))
3235
3236     _StartInstanceDisks(self, inst, None)
3237     try:
3238       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3239                                                  old_name)
3240       msg = result.RemoteFailMsg()
3241       if msg:
3242         msg = ("Could not run OS rename script for instance %s on node %s"
3243                " (but the instance has been renamed in Ganeti): %s" %
3244                (inst.name, inst.primary_node, msg))
3245         self.proc.LogWarning(msg)
3246     finally:
3247       _ShutdownInstanceDisks(self, inst)
3248
3249
3250 class LURemoveInstance(LogicalUnit):
3251   """Remove an instance.
3252
3253   """
3254   HPATH = "instance-remove"
3255   HTYPE = constants.HTYPE_INSTANCE
3256   _OP_REQP = ["instance_name", "ignore_failures"]
3257   REQ_BGL = False
3258
3259   def ExpandNames(self):
3260     self._ExpandAndLockInstance()
3261     self.needed_locks[locking.LEVEL_NODE] = []
3262     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3263
3264   def DeclareLocks(self, level):
3265     if level == locking.LEVEL_NODE:
3266       self._LockInstancesNodes()
3267
3268   def BuildHooksEnv(self):
3269     """Build hooks env.
3270
3271     This runs on master, primary and secondary nodes of the instance.
3272
3273     """
3274     env = _BuildInstanceHookEnvByObject(self, self.instance)
3275     nl = [self.cfg.GetMasterNode()]
3276     return env, nl, nl
3277
3278   def CheckPrereq(self):
3279     """Check prerequisites.
3280
3281     This checks that the instance is in the cluster.
3282
3283     """
3284     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3285     assert self.instance is not None, \
3286       "Cannot retrieve locked instance %s" % self.op.instance_name
3287
3288   def Exec(self, feedback_fn):
3289     """Remove the instance.
3290
3291     """
3292     instance = self.instance
3293     logging.info("Shutting down instance %s on node %s",
3294                  instance.name, instance.primary_node)
3295
3296     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3297     msg = result.RemoteFailMsg()
3298     if msg:
3299       if self.op.ignore_failures:
3300         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3301       else:
3302         raise errors.OpExecError("Could not shutdown instance %s on"
3303                                  " node %s: %s" %
3304                                  (instance.name, instance.primary_node, msg))
3305
3306     logging.info("Removing block devices for instance %s", instance.name)
3307
3308     if not _RemoveDisks(self, instance):
3309       if self.op.ignore_failures:
3310         feedback_fn("Warning: can't remove instance's disks")
3311       else:
3312         raise errors.OpExecError("Can't remove instance's disks")
3313
3314     logging.info("Removing instance %s out of cluster config", instance.name)
3315
3316     self.cfg.RemoveInstance(instance.name)
3317     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3318
3319
3320 class LUQueryInstances(NoHooksLU):
3321   """Logical unit for querying instances.
3322
3323   """
3324   _OP_REQP = ["output_fields", "names", "use_locking"]
3325   REQ_BGL = False
3326   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3327                                     "admin_state",
3328                                     "disk_template", "ip", "mac", "bridge",
3329                                     "sda_size", "sdb_size", "vcpus", "tags",
3330                                     "network_port", "beparams",
3331                                     r"(disk)\.(size)/([0-9]+)",
3332                                     r"(disk)\.(sizes)", "disk_usage",
3333                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3334                                     r"(nic)\.(macs|ips|bridges)",
3335                                     r"(disk|nic)\.(count)",
3336                                     "serial_no", "hypervisor", "hvparams",] +
3337                                   ["hv/%s" % name
3338                                    for name in constants.HVS_PARAMETERS] +
3339                                   ["be/%s" % name
3340                                    for name in constants.BES_PARAMETERS])
3341   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3342
3343
3344   def ExpandNames(self):
3345     _CheckOutputFields(static=self._FIELDS_STATIC,
3346                        dynamic=self._FIELDS_DYNAMIC,
3347                        selected=self.op.output_fields)
3348
3349     self.needed_locks = {}
3350     self.share_locks[locking.LEVEL_INSTANCE] = 1
3351     self.share_locks[locking.LEVEL_NODE] = 1
3352
3353     if self.op.names:
3354       self.wanted = _GetWantedInstances(self, self.op.names)
3355     else:
3356       self.wanted = locking.ALL_SET
3357
3358     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3359     self.do_locking = self.do_node_query and self.op.use_locking
3360     if self.do_locking:
3361       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3362       self.needed_locks[locking.LEVEL_NODE] = []
3363       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3364
3365   def DeclareLocks(self, level):
3366     if level == locking.LEVEL_NODE and self.do_locking:
3367       self._LockInstancesNodes()
3368
3369   def CheckPrereq(self):
3370     """Check prerequisites.
3371
3372     """
3373     pass
3374
3375   def Exec(self, feedback_fn):
3376     """Computes the list of nodes and their attributes.
3377
3378     """
3379     all_info = self.cfg.GetAllInstancesInfo()
3380     if self.wanted == locking.ALL_SET:
3381       # caller didn't specify instance names, so ordering is not important
3382       if self.do_locking:
3383         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3384       else:
3385         instance_names = all_info.keys()
3386       instance_names = utils.NiceSort(instance_names)
3387     else:
3388       # caller did specify names, so we must keep the ordering
3389       if self.do_locking:
3390         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3391       else:
3392         tgt_set = all_info.keys()
3393       missing = set(self.wanted).difference(tgt_set)
3394       if missing:
3395         raise errors.OpExecError("Some instances were removed before"
3396                                  " retrieving their data: %s" % missing)
3397       instance_names = self.wanted
3398
3399     instance_list = [all_info[iname] for iname in instance_names]
3400
3401     # begin data gathering
3402
3403     nodes = frozenset([inst.primary_node for inst in instance_list])
3404     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3405
3406     bad_nodes = []
3407     off_nodes = []
3408     if self.do_node_query:
3409       live_data = {}
3410       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3411       for name in nodes:
3412         result = node_data[name]
3413         if result.offline:
3414           # offline nodes will be in both lists
3415           off_nodes.append(name)
3416         if result.failed:
3417           bad_nodes.append(name)
3418         else:
3419           if result.data:
3420             live_data.update(result.data)
3421             # else no instance is alive
3422     else:
3423       live_data = dict([(name, {}) for name in instance_names])
3424
3425     # end data gathering
3426
3427     HVPREFIX = "hv/"
3428     BEPREFIX = "be/"
3429     output = []
3430     for instance in instance_list:
3431       iout = []
3432       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3433       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3434       for field in self.op.output_fields:
3435         st_match = self._FIELDS_STATIC.Matches(field)
3436         if field == "name":
3437           val = instance.name
3438         elif field == "os":
3439           val = instance.os
3440         elif field == "pnode":
3441           val = instance.primary_node
3442         elif field == "snodes":
3443           val = list(instance.secondary_nodes)
3444         elif field == "admin_state":
3445           val = instance.admin_up
3446         elif field == "oper_state":
3447           if instance.primary_node in bad_nodes:
3448             val = None
3449           else:
3450             val = bool(live_data.get(instance.name))
3451         elif field == "status":
3452           if instance.primary_node in off_nodes:
3453             val = "ERROR_nodeoffline"
3454           elif instance.primary_node in bad_nodes:
3455             val = "ERROR_nodedown"
3456           else:
3457             running = bool(live_data.get(instance.name))
3458             if running:
3459               if instance.admin_up:
3460                 val = "running"
3461               else:
3462                 val = "ERROR_up"
3463             else:
3464               if instance.admin_up:
3465                 val = "ERROR_down"
3466               else:
3467                 val = "ADMIN_down"
3468         elif field == "oper_ram":
3469           if instance.primary_node in bad_nodes:
3470             val = None
3471           elif instance.name in live_data:
3472             val = live_data[instance.name].get("memory", "?")
3473           else:
3474             val = "-"
3475         elif field == "disk_template":
3476           val = instance.disk_template
3477         elif field == "ip":
3478           val = instance.nics[0].ip
3479         elif field == "bridge":
3480           val = instance.nics[0].bridge
3481         elif field == "mac":
3482           val = instance.nics[0].mac
3483         elif field == "sda_size" or field == "sdb_size":
3484           idx = ord(field[2]) - ord('a')
3485           try:
3486             val = instance.FindDisk(idx).size
3487           except errors.OpPrereqError:
3488             val = None
3489         elif field == "disk_usage": # total disk usage per node
3490           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3491           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3492         elif field == "tags":
3493           val = list(instance.GetTags())
3494         elif field == "serial_no":
3495           val = instance.serial_no
3496         elif field == "network_port":
3497           val = instance.network_port
3498         elif field == "hypervisor":
3499           val = instance.hypervisor
3500         elif field == "hvparams":
3501           val = i_hv
3502         elif (field.startswith(HVPREFIX) and
3503               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3504           val = i_hv.get(field[len(HVPREFIX):], None)
3505         elif field == "beparams":
3506           val = i_be
3507         elif (field.startswith(BEPREFIX) and
3508               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3509           val = i_be.get(field[len(BEPREFIX):], None)
3510         elif st_match and st_match.groups():
3511           # matches a variable list
3512           st_groups = st_match.groups()
3513           if st_groups and st_groups[0] == "disk":
3514             if st_groups[1] == "count":
3515               val = len(instance.disks)
3516             elif st_groups[1] == "sizes":
3517               val = [disk.size for disk in instance.disks]
3518             elif st_groups[1] == "size":
3519               try:
3520                 val = instance.FindDisk(st_groups[2]).size
3521               except errors.OpPrereqError:
3522                 val = None
3523             else:
3524               assert False, "Unhandled disk parameter"
3525           elif st_groups[0] == "nic":
3526             if st_groups[1] == "count":
3527               val = len(instance.nics)
3528             elif st_groups[1] == "macs":
3529               val = [nic.mac for nic in instance.nics]
3530             elif st_groups[1] == "ips":
3531               val = [nic.ip for nic in instance.nics]
3532             elif st_groups[1] == "bridges":
3533               val = [nic.bridge for nic in instance.nics]
3534             else:
3535               # index-based item
3536               nic_idx = int(st_groups[2])
3537               if nic_idx >= len(instance.nics):
3538                 val = None
3539               else:
3540                 if st_groups[1] == "mac":
3541                   val = instance.nics[nic_idx].mac
3542                 elif st_groups[1] == "ip":
3543                   val = instance.nics[nic_idx].ip
3544                 elif st_groups[1] == "bridge":
3545                   val = instance.nics[nic_idx].bridge
3546                 else:
3547                   assert False, "Unhandled NIC parameter"
3548           else:
3549             assert False, "Unhandled variable parameter"
3550         else:
3551           raise errors.ParameterError(field)
3552         iout.append(val)
3553       output.append(iout)
3554
3555     return output
3556
3557
3558 class LUFailoverInstance(LogicalUnit):
3559   """Failover an instance.
3560
3561   """
3562   HPATH = "instance-failover"
3563   HTYPE = constants.HTYPE_INSTANCE
3564   _OP_REQP = ["instance_name", "ignore_consistency"]
3565   REQ_BGL = False
3566
3567   def ExpandNames(self):
3568     self._ExpandAndLockInstance()
3569     self.needed_locks[locking.LEVEL_NODE] = []
3570     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3571
3572   def DeclareLocks(self, level):
3573     if level == locking.LEVEL_NODE:
3574       self._LockInstancesNodes()
3575
3576   def BuildHooksEnv(self):
3577     """Build hooks env.
3578
3579     This runs on master, primary and secondary nodes of the instance.
3580
3581     """
3582     env = {
3583       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3584       }
3585     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3586     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3587     return env, nl, nl
3588
3589   def CheckPrereq(self):
3590     """Check prerequisites.
3591
3592     This checks that the instance is in the cluster.
3593
3594     """
3595     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3596     assert self.instance is not None, \
3597       "Cannot retrieve locked instance %s" % self.op.instance_name
3598
3599     bep = self.cfg.GetClusterInfo().FillBE(instance)
3600     if instance.disk_template not in constants.DTS_NET_MIRROR:
3601       raise errors.OpPrereqError("Instance's disk layout is not"
3602                                  " network mirrored, cannot failover.")
3603
3604     secondary_nodes = instance.secondary_nodes
3605     if not secondary_nodes:
3606       raise errors.ProgrammerError("no secondary node but using "
3607                                    "a mirrored disk template")
3608
3609     target_node = secondary_nodes[0]
3610     _CheckNodeOnline(self, target_node)
3611     _CheckNodeNotDrained(self, target_node)
3612     # check memory requirements on the secondary node
3613     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3614                          instance.name, bep[constants.BE_MEMORY],
3615                          instance.hypervisor)
3616     # check bridge existance
3617     _CheckInstanceBridgesExist(self, instance, node=target_node)
3618
3619   def Exec(self, feedback_fn):
3620     """Failover an instance.
3621
3622     The failover is done by shutting it down on its present node and
3623     starting it on the secondary.
3624
3625     """
3626     instance = self.instance
3627
3628     source_node = instance.primary_node
3629     target_node = instance.secondary_nodes[0]
3630
3631     feedback_fn("* checking disk consistency between source and target")
3632     for dev in instance.disks:
3633       # for drbd, these are drbd over lvm
3634       if not _CheckDiskConsistency(self, dev, target_node, False):
3635         if instance.admin_up and not self.op.ignore_consistency:
3636           raise errors.OpExecError("Disk %s is degraded on target node,"
3637                                    " aborting failover." % dev.iv_name)
3638
3639     feedback_fn("* shutting down instance on source node")
3640     logging.info("Shutting down instance %s on node %s",
3641                  instance.name, source_node)
3642
3643     result = self.rpc.call_instance_shutdown(source_node, instance)
3644     msg = result.RemoteFailMsg()
3645     if msg:
3646       if self.op.ignore_consistency:
3647         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3648                              " Proceeding anyway. Please make sure node"
3649                              " %s is down. Error details: %s",
3650                              instance.name, source_node, source_node, msg)
3651       else:
3652         raise errors.OpExecError("Could not shutdown instance %s on"
3653                                  " node %s: %s" %
3654                                  (instance.name, source_node, msg))
3655
3656     feedback_fn("* deactivating the instance's disks on source node")
3657     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3658       raise errors.OpExecError("Can't shut down the instance's disks.")
3659
3660     instance.primary_node = target_node
3661     # distribute new instance config to the other nodes
3662     self.cfg.Update(instance)
3663
3664     # Only start the instance if it's marked as up
3665     if instance.admin_up:
3666       feedback_fn("* activating the instance's disks on target node")
3667       logging.info("Starting instance %s on node %s",
3668                    instance.name, target_node)
3669
3670       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3671                                                ignore_secondaries=True)
3672       if not disks_ok:
3673         _ShutdownInstanceDisks(self, instance)
3674         raise errors.OpExecError("Can't activate the instance's disks")
3675
3676       feedback_fn("* starting the instance on the target node")
3677       result = self.rpc.call_instance_start(target_node, instance, None, None)
3678       msg = result.RemoteFailMsg()
3679       if msg:
3680         _ShutdownInstanceDisks(self, instance)
3681         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3682                                  (instance.name, target_node, msg))
3683
3684
3685 class LUMigrateInstance(LogicalUnit):
3686   """Migrate an instance.
3687
3688   This is migration without shutting down, compared to the failover,
3689   which is done with shutdown.
3690
3691   """
3692   HPATH = "instance-migrate"
3693   HTYPE = constants.HTYPE_INSTANCE
3694   _OP_REQP = ["instance_name", "live", "cleanup"]
3695
3696   REQ_BGL = False
3697
3698   def ExpandNames(self):
3699     self._ExpandAndLockInstance()
3700     self.needed_locks[locking.LEVEL_NODE] = []
3701     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3702
3703   def DeclareLocks(self, level):
3704     if level == locking.LEVEL_NODE:
3705       self._LockInstancesNodes()
3706
3707   def BuildHooksEnv(self):
3708     """Build hooks env.
3709
3710     This runs on master, primary and secondary nodes of the instance.
3711
3712     """
3713     env = _BuildInstanceHookEnvByObject(self, self.instance)
3714     env["MIGRATE_LIVE"] = self.op.live
3715     env["MIGRATE_CLEANUP"] = self.op.cleanup
3716     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3717     return env, nl, nl
3718
3719   def CheckPrereq(self):
3720     """Check prerequisites.
3721
3722     This checks that the instance is in the cluster.
3723
3724     """
3725     instance = self.cfg.GetInstanceInfo(
3726       self.cfg.ExpandInstanceName(self.op.instance_name))
3727     if instance is None:
3728       raise errors.OpPrereqError("Instance '%s' not known" %
3729                                  self.op.instance_name)
3730
3731     if instance.disk_template != constants.DT_DRBD8:
3732       raise errors.OpPrereqError("Instance's disk layout is not"
3733                                  " drbd8, cannot migrate.")
3734
3735     secondary_nodes = instance.secondary_nodes
3736     if not secondary_nodes:
3737       raise errors.ConfigurationError("No secondary node but using"
3738                                       " drbd8 disk template")
3739
3740     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3741
3742     target_node = secondary_nodes[0]
3743     # check memory requirements on the secondary node
3744     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3745                          instance.name, i_be[constants.BE_MEMORY],
3746                          instance.hypervisor)
3747
3748     # check bridge existance
3749     _CheckInstanceBridgesExist(self, instance, node=target_node)
3750
3751     if not self.op.cleanup:
3752       _CheckNodeNotDrained(self, target_node)
3753       result = self.rpc.call_instance_migratable(instance.primary_node,
3754                                                  instance)
3755       msg = result.RemoteFailMsg()
3756       if msg:
3757         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3758                                    msg)
3759
3760     self.instance = instance
3761
3762   def _WaitUntilSync(self):
3763     """Poll with custom rpc for disk sync.
3764
3765     This uses our own step-based rpc call.
3766
3767     """
3768     self.feedback_fn("* wait until resync is done")
3769     all_done = False
3770     while not all_done:
3771       all_done = True
3772       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3773                                             self.nodes_ip,
3774                                             self.instance.disks)
3775       min_percent = 100
3776       for node, nres in result.items():
3777         msg = nres.RemoteFailMsg()
3778         if msg:
3779           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3780                                    (node, msg))
3781         node_done, node_percent = nres.payload
3782         all_done = all_done and node_done
3783         if node_percent is not None:
3784           min_percent = min(min_percent, node_percent)
3785       if not all_done:
3786         if min_percent < 100:
3787           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3788         time.sleep(2)
3789
3790   def _EnsureSecondary(self, node):
3791     """Demote a node to secondary.
3792
3793     """
3794     self.feedback_fn("* switching node %s to secondary mode" % node)
3795
3796     for dev in self.instance.disks:
3797       self.cfg.SetDiskID(dev, node)
3798
3799     result = self.rpc.call_blockdev_close(node, self.instance.name,
3800                                           self.instance.disks)
3801     msg = result.RemoteFailMsg()
3802     if msg:
3803       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3804                                " error %s" % (node, msg))
3805
3806   def _GoStandalone(self):
3807     """Disconnect from the network.
3808
3809     """
3810     self.feedback_fn("* changing into standalone mode")
3811     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3812                                                self.instance.disks)
3813     for node, nres in result.items():
3814       msg = nres.RemoteFailMsg()
3815       if msg:
3816         raise errors.OpExecError("Cannot disconnect disks node %s,"
3817                                  " error %s" % (node, msg))
3818
3819   def _GoReconnect(self, multimaster):
3820     """Reconnect to the network.
3821
3822     """
3823     if multimaster:
3824       msg = "dual-master"
3825     else:
3826       msg = "single-master"
3827     self.feedback_fn("* changing disks into %s mode" % msg)
3828     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3829                                            self.instance.disks,
3830                                            self.instance.name, multimaster)
3831     for node, nres in result.items():
3832       msg = nres.RemoteFailMsg()
3833       if msg:
3834         raise errors.OpExecError("Cannot change disks config on node %s,"
3835                                  " error: %s" % (node, msg))
3836
3837   def _ExecCleanup(self):
3838     """Try to cleanup after a failed migration.
3839
3840     The cleanup is done by:
3841       - check that the instance is running only on one node
3842         (and update the config if needed)
3843       - change disks on its secondary node to secondary
3844       - wait until disks are fully synchronized
3845       - disconnect from the network
3846       - change disks into single-master mode
3847       - wait again until disks are fully synchronized
3848
3849     """
3850     instance = self.instance
3851     target_node = self.target_node
3852     source_node = self.source_node
3853
3854     # check running on only one node
3855     self.feedback_fn("* checking where the instance actually runs"
3856                      " (if this hangs, the hypervisor might be in"
3857                      " a bad state)")
3858     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3859     for node, result in ins_l.items():
3860       result.Raise()
3861       if not isinstance(result.data, list):
3862         raise errors.OpExecError("Can't contact node '%s'" % node)
3863
3864     runningon_source = instance.name in ins_l[source_node].data
3865     runningon_target = instance.name in ins_l[target_node].data
3866
3867     if runningon_source and runningon_target:
3868       raise errors.OpExecError("Instance seems to be running on two nodes,"
3869                                " or the hypervisor is confused. You will have"
3870                                " to ensure manually that it runs only on one"
3871                                " and restart this operation.")
3872
3873     if not (runningon_source or runningon_target):
3874       raise errors.OpExecError("Instance does not seem to be running at all."
3875                                " In this case, it's safer to repair by"
3876                                " running 'gnt-instance stop' to ensure disk"
3877                                " shutdown, and then restarting it.")
3878
3879     if runningon_target:
3880       # the migration has actually succeeded, we need to update the config
3881       self.feedback_fn("* instance running on secondary node (%s),"
3882                        " updating config" % target_node)
3883       instance.primary_node = target_node
3884       self.cfg.Update(instance)
3885       demoted_node = source_node
3886     else:
3887       self.feedback_fn("* instance confirmed to be running on its"
3888                        " primary node (%s)" % source_node)
3889       demoted_node = target_node
3890
3891     self._EnsureSecondary(demoted_node)
3892     try:
3893       self._WaitUntilSync()
3894     except errors.OpExecError:
3895       # we ignore here errors, since if the device is standalone, it
3896       # won't be able to sync
3897       pass
3898     self._GoStandalone()
3899     self._GoReconnect(False)
3900     self._WaitUntilSync()
3901
3902     self.feedback_fn("* done")
3903
3904   def _RevertDiskStatus(self):
3905     """Try to revert the disk status after a failed migration.
3906
3907     """
3908     target_node = self.target_node
3909     try:
3910       self._EnsureSecondary(target_node)
3911       self._GoStandalone()
3912       self._GoReconnect(False)
3913       self._WaitUntilSync()
3914     except errors.OpExecError, err:
3915       self.LogWarning("Migration failed and I can't reconnect the"
3916                       " drives: error '%s'\n"
3917                       "Please look and recover the instance status" %
3918                       str(err))
3919
3920   def _AbortMigration(self):
3921     """Call the hypervisor code to abort a started migration.
3922
3923     """
3924     instance = self.instance
3925     target_node = self.target_node
3926     migration_info = self.migration_info
3927
3928     abort_result = self.rpc.call_finalize_migration(target_node,
3929                                                     instance,
3930                                                     migration_info,
3931                                                     False)
3932     abort_msg = abort_result.RemoteFailMsg()
3933     if abort_msg:
3934       logging.error("Aborting migration failed on target node %s: %s" %
3935                     (target_node, abort_msg))
3936       # Don't raise an exception here, as we stil have to try to revert the
3937       # disk status, even if this step failed.
3938
3939   def _ExecMigration(self):
3940     """Migrate an instance.
3941
3942     The migrate is done by:
3943       - change the disks into dual-master mode
3944       - wait until disks are fully synchronized again
3945       - migrate the instance
3946       - change disks on the new secondary node (the old primary) to secondary
3947       - wait until disks are fully synchronized
3948       - change disks into single-master mode
3949
3950     """
3951     instance = self.instance
3952     target_node = self.target_node
3953     source_node = self.source_node
3954
3955     self.feedback_fn("* checking disk consistency between source and target")
3956     for dev in instance.disks:
3957       if not _CheckDiskConsistency(self, dev, target_node, False):
3958         raise errors.OpExecError("Disk %s is degraded or not fully"
3959                                  " synchronized on target node,"
3960                                  " aborting migrate." % dev.iv_name)
3961
3962     # First get the migration information from the remote node
3963     result = self.rpc.call_migration_info(source_node, instance)
3964     msg = result.RemoteFailMsg()
3965     if msg:
3966       log_err = ("Failed fetching source migration information from %s: %s" %
3967                  (source_node, msg))
3968       logging.error(log_err)
3969       raise errors.OpExecError(log_err)
3970
3971     self.migration_info = migration_info = result.payload
3972
3973     # Then switch the disks to master/master mode
3974     self._EnsureSecondary(target_node)
3975     self._GoStandalone()
3976     self._GoReconnect(True)
3977     self._WaitUntilSync()
3978
3979     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3980     result = self.rpc.call_accept_instance(target_node,
3981                                            instance,
3982                                            migration_info,
3983                                            self.nodes_ip[target_node])
3984
3985     msg = result.RemoteFailMsg()
3986     if msg:
3987       logging.error("Instance pre-migration failed, trying to revert"
3988                     " disk status: %s", msg)
3989       self._AbortMigration()
3990       self._RevertDiskStatus()
3991       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3992                                (instance.name, msg))
3993
3994     self.feedback_fn("* migrating instance to %s" % target_node)
3995     time.sleep(10)
3996     result = self.rpc.call_instance_migrate(source_node, instance,
3997                                             self.nodes_ip[target_node],
3998                                             self.op.live)
3999     msg = result.RemoteFailMsg()
4000     if msg:
4001       logging.error("Instance migration failed, trying to revert"
4002                     " disk status: %s", msg)
4003       self._AbortMigration()
4004       self._RevertDiskStatus()
4005       raise errors.OpExecError("Could not migrate instance %s: %s" %
4006                                (instance.name, msg))
4007     time.sleep(10)
4008
4009     instance.primary_node = target_node
4010     # distribute new instance config to the other nodes
4011     self.cfg.Update(instance)
4012
4013     result = self.rpc.call_finalize_migration(target_node,
4014                                               instance,
4015                                               migration_info,
4016                                               True)
4017     msg = result.RemoteFailMsg()
4018     if msg:
4019       logging.error("Instance migration succeeded, but finalization failed:"
4020                     " %s" % msg)
4021       raise errors.OpExecError("Could not finalize instance migration: %s" %
4022                                msg)
4023
4024     self._EnsureSecondary(source_node)
4025     self._WaitUntilSync()
4026     self._GoStandalone()
4027     self._GoReconnect(False)
4028     self._WaitUntilSync()
4029
4030     self.feedback_fn("* done")
4031
4032   def Exec(self, feedback_fn):
4033     """Perform the migration.
4034
4035     """
4036     self.feedback_fn = feedback_fn
4037
4038     self.source_node = self.instance.primary_node
4039     self.target_node = self.instance.secondary_nodes[0]
4040     self.all_nodes = [self.source_node, self.target_node]
4041     self.nodes_ip = {
4042       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4043       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4044       }
4045     if self.op.cleanup:
4046       return self._ExecCleanup()
4047     else:
4048       return self._ExecMigration()
4049
4050
4051 def _CreateBlockDev(lu, node, instance, device, force_create,
4052                     info, force_open):
4053   """Create a tree of block devices on a given node.
4054
4055   If this device type has to be created on secondaries, create it and
4056   all its children.
4057
4058   If not, just recurse to children keeping the same 'force' value.
4059
4060   @param lu: the lu on whose behalf we execute
4061   @param node: the node on which to create the device
4062   @type instance: L{objects.Instance}
4063   @param instance: the instance which owns the device
4064   @type device: L{objects.Disk}
4065   @param device: the device to create
4066   @type force_create: boolean
4067   @param force_create: whether to force creation of this device; this
4068       will be change to True whenever we find a device which has
4069       CreateOnSecondary() attribute
4070   @param info: the extra 'metadata' we should attach to the device
4071       (this will be represented as a LVM tag)
4072   @type force_open: boolean
4073   @param force_open: this parameter will be passes to the
4074       L{backend.BlockdevCreate} function where it specifies
4075       whether we run on primary or not, and it affects both
4076       the child assembly and the device own Open() execution
4077
4078   """
4079   if device.CreateOnSecondary():
4080     force_create = True
4081
4082   if device.children:
4083     for child in device.children:
4084       _CreateBlockDev(lu, node, instance, child, force_create,
4085                       info, force_open)
4086
4087   if not force_create:
4088     return
4089
4090   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4091
4092
4093 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4094   """Create a single block device on a given node.
4095
4096   This will not recurse over children of the device, so they must be
4097   created in advance.
4098
4099   @param lu: the lu on whose behalf we execute
4100   @param node: the node on which to create the device
4101   @type instance: L{objects.Instance}
4102   @param instance: the instance which owns the device
4103   @type device: L{objects.Disk}
4104   @param device: the device to create
4105   @param info: the extra 'metadata' we should attach to the device
4106       (this will be represented as a LVM tag)
4107   @type force_open: boolean
4108   @param force_open: this parameter will be passes to the
4109       L{backend.BlockdevCreate} function where it specifies
4110       whether we run on primary or not, and it affects both
4111       the child assembly and the device own Open() execution
4112
4113   """
4114   lu.cfg.SetDiskID(device, node)
4115   result = lu.rpc.call_blockdev_create(node, device, device.size,
4116                                        instance.name, force_open, info)
4117   msg = result.RemoteFailMsg()
4118   if msg:
4119     raise errors.OpExecError("Can't create block device %s on"
4120                              " node %s for instance %s: %s" %
4121                              (device, node, instance.name, msg))
4122   if device.physical_id is None:
4123     device.physical_id = result.payload
4124
4125
4126 def _GenerateUniqueNames(lu, exts):
4127   """Generate a suitable LV name.
4128
4129   This will generate a logical volume name for the given instance.
4130
4131   """
4132   results = []
4133   for val in exts:
4134     new_id = lu.cfg.GenerateUniqueID()
4135     results.append("%s%s" % (new_id, val))
4136   return results
4137
4138
4139 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4140                          p_minor, s_minor):
4141   """Generate a drbd8 device complete with its children.
4142
4143   """
4144   port = lu.cfg.AllocatePort()
4145   vgname = lu.cfg.GetVGName()
4146   shared_secret = lu.cfg.GenerateDRBDSecret()
4147   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4148                           logical_id=(vgname, names[0]))
4149   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4150                           logical_id=(vgname, names[1]))
4151   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4152                           logical_id=(primary, secondary, port,
4153                                       p_minor, s_minor,
4154                                       shared_secret),
4155                           children=[dev_data, dev_meta],
4156                           iv_name=iv_name)
4157   return drbd_dev
4158
4159
4160 def _GenerateDiskTemplate(lu, template_name,
4161                           instance_name, primary_node,
4162                           secondary_nodes, disk_info,
4163                           file_storage_dir, file_driver,
4164                           base_index):
4165   """Generate the entire disk layout for a given template type.
4166
4167   """
4168   #TODO: compute space requirements
4169
4170   vgname = lu.cfg.GetVGName()
4171   disk_count = len(disk_info)
4172   disks = []
4173   if template_name == constants.DT_DISKLESS:
4174     pass
4175   elif template_name == constants.DT_PLAIN:
4176     if len(secondary_nodes) != 0:
4177       raise errors.ProgrammerError("Wrong template configuration")
4178
4179     names = _GenerateUniqueNames(lu, [".disk%d" % i
4180                                       for i in range(disk_count)])
4181     for idx, disk in enumerate(disk_info):
4182       disk_index = idx + base_index
4183       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4184                               logical_id=(vgname, names[idx]),
4185                               iv_name="disk/%d" % disk_index,
4186                               mode=disk["mode"])
4187       disks.append(disk_dev)
4188   elif template_name == constants.DT_DRBD8:
4189     if len(secondary_nodes) != 1:
4190       raise errors.ProgrammerError("Wrong template configuration")
4191     remote_node = secondary_nodes[0]
4192     minors = lu.cfg.AllocateDRBDMinor(
4193       [primary_node, remote_node] * len(disk_info), instance_name)
4194
4195     names = []
4196     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4197                                                for i in range(disk_count)]):
4198       names.append(lv_prefix + "_data")
4199       names.append(lv_prefix + "_meta")
4200     for idx, disk in enumerate(disk_info):
4201       disk_index = idx + base_index
4202       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4203                                       disk["size"], names[idx*2:idx*2+2],
4204                                       "disk/%d" % disk_index,
4205                                       minors[idx*2], minors[idx*2+1])
4206       disk_dev.mode = disk["mode"]
4207       disks.append(disk_dev)
4208   elif template_name == constants.DT_FILE:
4209     if len(secondary_nodes) != 0:
4210       raise errors.ProgrammerError("Wrong template configuration")
4211
4212     for idx, disk in enumerate(disk_info):
4213       disk_index = idx + base_index
4214       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4215                               iv_name="disk/%d" % disk_index,
4216                               logical_id=(file_driver,
4217                                           "%s/disk%d" % (file_storage_dir,
4218                                                          disk_index)),
4219                               mode=disk["mode"])
4220       disks.append(disk_dev)
4221   else:
4222     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4223   return disks
4224
4225
4226 def _GetInstanceInfoText(instance):
4227   """Compute that text that should be added to the disk's metadata.
4228
4229   """
4230   return "originstname+%s" % instance.name
4231
4232
4233 def _CreateDisks(lu, instance):
4234   """Create all disks for an instance.
4235
4236   This abstracts away some work from AddInstance.
4237
4238   @type lu: L{LogicalUnit}
4239   @param lu: the logical unit on whose behalf we execute
4240   @type instance: L{objects.Instance}
4241   @param instance: the instance whose disks we should create
4242   @rtype: boolean
4243   @return: the success of the creation
4244
4245   """
4246   info = _GetInstanceInfoText(instance)
4247   pnode = instance.primary_node
4248
4249   if instance.disk_template == constants.DT_FILE:
4250     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4251     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4252
4253     if result.failed or not result.data:
4254       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4255
4256     if not result.data[0]:
4257       raise errors.OpExecError("Failed to create directory '%s'" %
4258                                file_storage_dir)
4259
4260   # Note: this needs to be kept in sync with adding of disks in
4261   # LUSetInstanceParams
4262   for device in instance.disks:
4263     logging.info("Creating volume %s for instance %s",
4264                  device.iv_name, instance.name)
4265     #HARDCODE
4266     for node in instance.all_nodes:
4267       f_create = node == pnode
4268       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4269
4270
4271 def _RemoveDisks(lu, instance):
4272   """Remove all disks for an instance.
4273
4274   This abstracts away some work from `AddInstance()` and
4275   `RemoveInstance()`. Note that in case some of the devices couldn't
4276   be removed, the removal will continue with the other ones (compare
4277   with `_CreateDisks()`).
4278
4279   @type lu: L{LogicalUnit}
4280   @param lu: the logical unit on whose behalf we execute
4281   @type instance: L{objects.Instance}
4282   @param instance: the instance whose disks we should remove
4283   @rtype: boolean
4284   @return: the success of the removal
4285
4286   """
4287   logging.info("Removing block devices for instance %s", instance.name)
4288
4289   all_result = True
4290   for device in instance.disks:
4291     for node, disk in device.ComputeNodeTree(instance.primary_node):
4292       lu.cfg.SetDiskID(disk, node)
4293       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4294       if msg:
4295         lu.LogWarning("Could not remove block device %s on node %s,"
4296                       " continuing anyway: %s", device.iv_name, node, msg)
4297         all_result = False
4298
4299   if instance.disk_template == constants.DT_FILE:
4300     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4301     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4302                                                  file_storage_dir)
4303     if result.failed or not result.data:
4304       logging.error("Could not remove directory '%s'", file_storage_dir)
4305       all_result = False
4306
4307   return all_result
4308
4309
4310 def _ComputeDiskSize(disk_template, disks):
4311   """Compute disk size requirements in the volume group
4312
4313   """
4314   # Required free disk space as a function of disk and swap space
4315   req_size_dict = {
4316     constants.DT_DISKLESS: None,
4317     constants.DT_PLAIN: sum(d["size"] for d in disks),
4318     # 128 MB are added for drbd metadata for each disk
4319     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4320     constants.DT_FILE: None,
4321   }
4322
4323   if disk_template not in req_size_dict:
4324     raise errors.ProgrammerError("Disk template '%s' size requirement"
4325                                  " is unknown" %  disk_template)
4326
4327   return req_size_dict[disk_template]
4328
4329
4330 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4331   """Hypervisor parameter validation.
4332
4333   This function abstract the hypervisor parameter validation to be
4334   used in both instance create and instance modify.
4335
4336   @type lu: L{LogicalUnit}
4337   @param lu: the logical unit for which we check
4338   @type nodenames: list
4339   @param nodenames: the list of nodes on which we should check
4340   @type hvname: string
4341   @param hvname: the name of the hypervisor we should use
4342   @type hvparams: dict
4343   @param hvparams: the parameters which we need to check
4344   @raise errors.OpPrereqError: if the parameters are not valid
4345
4346   """
4347   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4348                                                   hvname,
4349                                                   hvparams)
4350   for node in nodenames:
4351     info = hvinfo[node]
4352     if info.offline:
4353       continue
4354     msg = info.RemoteFailMsg()
4355     if msg:
4356       raise errors.OpPrereqError("Hypervisor parameter validation"
4357                                  " failed on node %s: %s" % (node, msg))
4358
4359
4360 class LUCreateInstance(LogicalUnit):
4361   """Create an instance.
4362
4363   """
4364   HPATH = "instance-add"
4365   HTYPE = constants.HTYPE_INSTANCE
4366   _OP_REQP = ["instance_name", "disks", "disk_template",
4367               "mode", "start",
4368               "wait_for_sync", "ip_check", "nics",
4369               "hvparams", "beparams"]
4370   REQ_BGL = False
4371
4372   def _ExpandNode(self, node):
4373     """Expands and checks one node name.
4374
4375     """
4376     node_full = self.cfg.ExpandNodeName(node)
4377     if node_full is None:
4378       raise errors.OpPrereqError("Unknown node %s" % node)
4379     return node_full
4380
4381   def ExpandNames(self):
4382     """ExpandNames for CreateInstance.
4383
4384     Figure out the right locks for instance creation.
4385
4386     """
4387     self.needed_locks = {}
4388
4389     # set optional parameters to none if they don't exist
4390     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4391       if not hasattr(self.op, attr):
4392         setattr(self.op, attr, None)
4393
4394     # cheap checks, mostly valid constants given
4395
4396     # verify creation mode
4397     if self.op.mode not in (constants.INSTANCE_CREATE,
4398                             constants.INSTANCE_IMPORT):
4399       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4400                                  self.op.mode)
4401
4402     # disk template and mirror node verification
4403     if self.op.disk_template not in constants.DISK_TEMPLATES:
4404       raise errors.OpPrereqError("Invalid disk template name")
4405
4406     if self.op.hypervisor is None:
4407       self.op.hypervisor = self.cfg.GetHypervisorType()
4408
4409     cluster = self.cfg.GetClusterInfo()
4410     enabled_hvs = cluster.enabled_hypervisors
4411     if self.op.hypervisor not in enabled_hvs:
4412       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4413                                  " cluster (%s)" % (self.op.hypervisor,
4414                                   ",".join(enabled_hvs)))
4415
4416     # check hypervisor parameter syntax (locally)
4417     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4418     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4419                                   self.op.hvparams)
4420     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4421     hv_type.CheckParameterSyntax(filled_hvp)
4422
4423     # fill and remember the beparams dict
4424     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4425     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4426                                     self.op.beparams)
4427
4428     #### instance parameters check
4429
4430     # instance name verification
4431     hostname1 = utils.HostInfo(self.op.instance_name)
4432     self.op.instance_name = instance_name = hostname1.name
4433
4434     # this is just a preventive check, but someone might still add this
4435     # instance in the meantime, and creation will fail at lock-add time
4436     if instance_name in self.cfg.GetInstanceList():
4437       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4438                                  instance_name)
4439
4440     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4441
4442     # NIC buildup
4443     self.nics = []
4444     for idx, nic in enumerate(self.op.nics):
4445       nic_mode_req = nic.get("mode", None)
4446       nic_mode = nic_mode_req
4447       if nic_mode is None:
4448         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4449
4450       # in routed mode, for the first nic, the default ip is 'auto'
4451       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4452         default_ip_mode = constants.VALUE_AUTO
4453       else:
4454         default_ip_mode = constants.VALUE_NONE
4455
4456       # ip validity checks
4457       ip = nic.get("ip", default_ip_mode)
4458       if ip is None or ip.lower() == constants.VALUE_NONE:
4459         nic_ip = None
4460       elif ip.lower() == constants.VALUE_AUTO:
4461         nic_ip = hostname1.ip
4462       else:
4463         if not utils.IsValidIP(ip):
4464           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4465                                      " like a valid IP" % ip)
4466         nic_ip = ip
4467
4468       # TODO: check the ip for uniqueness !!
4469       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4470         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4471
4472       # MAC address verification
4473       mac = nic.get("mac", constants.VALUE_AUTO)
4474       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4475         if not utils.IsValidMac(mac.lower()):
4476           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4477                                      mac)
4478       # bridge verification
4479       bridge = nic.get("bridge", None)
4480       link = nic.get("link", None)
4481       if bridge and link:
4482         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4483       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4484         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4485       elif bridge:
4486         link = bridge
4487
4488       nicparams = {}
4489       if nic_mode_req:
4490         nicparams[constants.NIC_MODE] = nic_mode_req
4491       if link:
4492         nicparams[constants.NIC_LINK] = link
4493
4494       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4495                                       nicparams)
4496       objects.NIC.CheckParameterSyntax(check_params)
4497       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4498
4499     # disk checks/pre-build
4500     self.disks = []
4501     for disk in self.op.disks:
4502       mode = disk.get("mode", constants.DISK_RDWR)
4503       if mode not in constants.DISK_ACCESS_SET:
4504         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4505                                    mode)
4506       size = disk.get("size", None)
4507       if size is None:
4508         raise errors.OpPrereqError("Missing disk size")
4509       try:
4510         size = int(size)
4511       except ValueError:
4512         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4513       self.disks.append({"size": size, "mode": mode})
4514
4515     # used in CheckPrereq for ip ping check
4516     self.check_ip = hostname1.ip
4517
4518     # file storage checks
4519     if (self.op.file_driver and
4520         not self.op.file_driver in constants.FILE_DRIVER):
4521       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4522                                  self.op.file_driver)
4523
4524     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4525       raise errors.OpPrereqError("File storage directory path not absolute")
4526
4527     ### Node/iallocator related checks
4528     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4529       raise errors.OpPrereqError("One and only one of iallocator and primary"
4530                                  " node must be given")
4531
4532     if self.op.iallocator:
4533       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4534     else:
4535       self.op.pnode = self._ExpandNode(self.op.pnode)
4536       nodelist = [self.op.pnode]
4537       if self.op.snode is not None:
4538         self.op.snode = self._ExpandNode(self.op.snode)
4539         nodelist.append(self.op.snode)
4540       self.needed_locks[locking.LEVEL_NODE] = nodelist
4541
4542     # in case of import lock the source node too
4543     if self.op.mode == constants.INSTANCE_IMPORT:
4544       src_node = getattr(self.op, "src_node", None)
4545       src_path = getattr(self.op, "src_path", None)
4546
4547       if src_path is None:
4548         self.op.src_path = src_path = self.op.instance_name
4549
4550       if src_node is None:
4551         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4552         self.op.src_node = None
4553         if os.path.isabs(src_path):
4554           raise errors.OpPrereqError("Importing an instance from an absolute"
4555                                      " path requires a source node option.")
4556       else:
4557         self.op.src_node = src_node = self._ExpandNode(src_node)
4558         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4559           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4560         if not os.path.isabs(src_path):
4561           self.op.src_path = src_path = \
4562             os.path.join(constants.EXPORT_DIR, src_path)
4563
4564     else: # INSTANCE_CREATE
4565       if getattr(self.op, "os_type", None) is None:
4566         raise errors.OpPrereqError("No guest OS specified")
4567
4568   def _RunAllocator(self):
4569     """Run the allocator based on input opcode.
4570
4571     """
4572     nics = [n.ToDict() for n in self.nics]
4573     ial = IAllocator(self,
4574                      mode=constants.IALLOCATOR_MODE_ALLOC,
4575                      name=self.op.instance_name,
4576                      disk_template=self.op.disk_template,
4577                      tags=[],
4578                      os=self.op.os_type,
4579                      vcpus=self.be_full[constants.BE_VCPUS],
4580                      mem_size=self.be_full[constants.BE_MEMORY],
4581                      disks=self.disks,
4582                      nics=nics,
4583                      hypervisor=self.op.hypervisor,
4584                      )
4585
4586     ial.Run(self.op.iallocator)
4587
4588     if not ial.success:
4589       raise errors.OpPrereqError("Can't compute nodes using"
4590                                  " iallocator '%s': %s" % (self.op.iallocator,
4591                                                            ial.info))
4592     if len(ial.nodes) != ial.required_nodes:
4593       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4594                                  " of nodes (%s), required %s" %
4595                                  (self.op.iallocator, len(ial.nodes),
4596                                   ial.required_nodes))
4597     self.op.pnode = ial.nodes[0]
4598     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4599                  self.op.instance_name, self.op.iallocator,
4600                  ", ".join(ial.nodes))
4601     if ial.required_nodes == 2:
4602       self.op.snode = ial.nodes[1]
4603
4604   def BuildHooksEnv(self):
4605     """Build hooks env.
4606
4607     This runs on master, primary and secondary nodes of the instance.
4608
4609     """
4610     env = {
4611       "ADD_MODE": self.op.mode,
4612       }
4613     if self.op.mode == constants.INSTANCE_IMPORT:
4614       env["SRC_NODE"] = self.op.src_node
4615       env["SRC_PATH"] = self.op.src_path
4616       env["SRC_IMAGES"] = self.src_images
4617
4618     env.update(_BuildInstanceHookEnv(
4619       name=self.op.instance_name,
4620       primary_node=self.op.pnode,
4621       secondary_nodes=self.secondaries,
4622       status=self.op.start,
4623       os_type=self.op.os_type,
4624       memory=self.be_full[constants.BE_MEMORY],
4625       vcpus=self.be_full[constants.BE_VCPUS],
4626       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4627       disk_template=self.op.disk_template,
4628       disks=[(d["size"], d["mode"]) for d in self.disks],
4629     ))
4630
4631     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4632           self.secondaries)
4633     return env, nl, nl
4634
4635
4636   def CheckPrereq(self):
4637     """Check prerequisites.
4638
4639     """
4640     if (not self.cfg.GetVGName() and
4641         self.op.disk_template not in constants.DTS_NOT_LVM):
4642       raise errors.OpPrereqError("Cluster does not support lvm-based"
4643                                  " instances")
4644
4645     if self.op.mode == constants.INSTANCE_IMPORT:
4646       src_node = self.op.src_node
4647       src_path = self.op.src_path
4648
4649       if src_node is None:
4650         exp_list = self.rpc.call_export_list(
4651           self.acquired_locks[locking.LEVEL_NODE])
4652         found = False
4653         for node in exp_list:
4654           if not exp_list[node].failed and src_path in exp_list[node].data:
4655             found = True
4656             self.op.src_node = src_node = node
4657             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4658                                                        src_path)
4659             break
4660         if not found:
4661           raise errors.OpPrereqError("No export found for relative path %s" %
4662                                       src_path)
4663
4664       _CheckNodeOnline(self, src_node)
4665       result = self.rpc.call_export_info(src_node, src_path)
4666       result.Raise()
4667       if not result.data:
4668         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4669
4670       export_info = result.data
4671       if not export_info.has_section(constants.INISECT_EXP):
4672         raise errors.ProgrammerError("Corrupted export config")
4673
4674       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4675       if (int(ei_version) != constants.EXPORT_VERSION):
4676         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4677                                    (ei_version, constants.EXPORT_VERSION))
4678
4679       # Check that the new instance doesn't have less disks than the export
4680       instance_disks = len(self.disks)
4681       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4682       if instance_disks < export_disks:
4683         raise errors.OpPrereqError("Not enough disks to import."
4684                                    " (instance: %d, export: %d)" %
4685                                    (instance_disks, export_disks))
4686
4687       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4688       disk_images = []
4689       for idx in range(export_disks):
4690         option = 'disk%d_dump' % idx
4691         if export_info.has_option(constants.INISECT_INS, option):
4692           # FIXME: are the old os-es, disk sizes, etc. useful?
4693           export_name = export_info.get(constants.INISECT_INS, option)
4694           image = os.path.join(src_path, export_name)
4695           disk_images.append(image)
4696         else:
4697           disk_images.append(False)
4698
4699       self.src_images = disk_images
4700
4701       old_name = export_info.get(constants.INISECT_INS, 'name')
4702       # FIXME: int() here could throw a ValueError on broken exports
4703       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4704       if self.op.instance_name == old_name:
4705         for idx, nic in enumerate(self.nics):
4706           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4707             nic_mac_ini = 'nic%d_mac' % idx
4708             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4709
4710     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4711     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4712     if self.op.start and not self.op.ip_check:
4713       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4714                                  " adding an instance in start mode")
4715
4716     if self.op.ip_check:
4717       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4718         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4719                                    (self.check_ip, self.op.instance_name))
4720
4721     #### mac address generation
4722     # By generating here the mac address both the allocator and the hooks get
4723     # the real final mac address rather than the 'auto' or 'generate' value.
4724     # There is a race condition between the generation and the instance object
4725     # creation, which means that we know the mac is valid now, but we're not
4726     # sure it will be when we actually add the instance. If things go bad
4727     # adding the instance will abort because of a duplicate mac, and the
4728     # creation job will fail.
4729     for nic in self.nics:
4730       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4731         nic.mac = self.cfg.GenerateMAC()
4732
4733     #### allocator run
4734
4735     if self.op.iallocator is not None:
4736       self._RunAllocator()
4737
4738     #### node related checks
4739
4740     # check primary node
4741     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4742     assert self.pnode is not None, \
4743       "Cannot retrieve locked node %s" % self.op.pnode
4744     if pnode.offline:
4745       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4746                                  pnode.name)
4747     if pnode.drained:
4748       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4749                                  pnode.name)
4750
4751     self.secondaries = []
4752
4753     # mirror node verification
4754     if self.op.disk_template in constants.DTS_NET_MIRROR:
4755       if self.op.snode is None:
4756         raise errors.OpPrereqError("The networked disk templates need"
4757                                    " a mirror node")
4758       if self.op.snode == pnode.name:
4759         raise errors.OpPrereqError("The secondary node cannot be"
4760                                    " the primary node.")
4761       _CheckNodeOnline(self, self.op.snode)
4762       _CheckNodeNotDrained(self, self.op.snode)
4763       self.secondaries.append(self.op.snode)
4764
4765     nodenames = [pnode.name] + self.secondaries
4766
4767     req_size = _ComputeDiskSize(self.op.disk_template,
4768                                 self.disks)
4769
4770     # Check lv size requirements
4771     if req_size is not None:
4772       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773                                          self.op.hypervisor)
4774       for node in nodenames:
4775         info = nodeinfo[node]
4776         info.Raise()
4777         info = info.data
4778         if not info:
4779           raise errors.OpPrereqError("Cannot get current information"
4780                                      " from node '%s'" % node)
4781         vg_free = info.get('vg_free', None)
4782         if not isinstance(vg_free, int):
4783           raise errors.OpPrereqError("Can't compute free disk space on"
4784                                      " node %s" % node)
4785         if req_size > info['vg_free']:
4786           raise errors.OpPrereqError("Not enough disk space on target node %s."
4787                                      " %d MB available, %d MB required" %
4788                                      (node, info['vg_free'], req_size))
4789
4790     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4791
4792     # os verification
4793     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4794     result.Raise()
4795     if not isinstance(result.data, objects.OS):
4796       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4797                                  " primary node"  % self.op.os_type)
4798
4799     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4800
4801     # memory check on primary node
4802     if self.op.start:
4803       _CheckNodeFreeMemory(self, self.pnode.name,
4804                            "creating instance %s" % self.op.instance_name,
4805                            self.be_full[constants.BE_MEMORY],
4806                            self.op.hypervisor)
4807
4808   def Exec(self, feedback_fn):
4809     """Create and add the instance to the cluster.
4810
4811     """
4812     instance = self.op.instance_name
4813     pnode_name = self.pnode.name
4814
4815     ht_kind = self.op.hypervisor
4816     if ht_kind in constants.HTS_REQ_PORT:
4817       network_port = self.cfg.AllocatePort()
4818     else:
4819       network_port = None
4820
4821     ##if self.op.vnc_bind_address is None:
4822     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4823
4824     # this is needed because os.path.join does not accept None arguments
4825     if self.op.file_storage_dir is None:
4826       string_file_storage_dir = ""
4827     else:
4828       string_file_storage_dir = self.op.file_storage_dir
4829
4830     # build the full file storage dir path
4831     file_storage_dir = os.path.normpath(os.path.join(
4832                                         self.cfg.GetFileStorageDir(),
4833                                         string_file_storage_dir, instance))
4834
4835
4836     disks = _GenerateDiskTemplate(self,
4837                                   self.op.disk_template,
4838                                   instance, pnode_name,
4839                                   self.secondaries,
4840                                   self.disks,
4841                                   file_storage_dir,
4842                                   self.op.file_driver,
4843                                   0)
4844
4845     iobj = objects.Instance(name=instance, os=self.op.os_type,
4846                             primary_node=pnode_name,
4847                             nics=self.nics, disks=disks,
4848                             disk_template=self.op.disk_template,
4849                             admin_up=False,
4850                             network_port=network_port,
4851                             beparams=self.op.beparams,
4852                             hvparams=self.op.hvparams,
4853                             hypervisor=self.op.hypervisor,
4854                             )
4855
4856     feedback_fn("* creating instance disks...")
4857     try:
4858       _CreateDisks(self, iobj)
4859     except errors.OpExecError:
4860       self.LogWarning("Device creation failed, reverting...")
4861       try:
4862         _RemoveDisks(self, iobj)
4863       finally:
4864         self.cfg.ReleaseDRBDMinors(instance)
4865         raise
4866
4867     feedback_fn("adding instance %s to cluster config" % instance)
4868
4869     self.cfg.AddInstance(iobj)
4870     # Declare that we don't want to remove the instance lock anymore, as we've
4871     # added the instance to the config
4872     del self.remove_locks[locking.LEVEL_INSTANCE]
4873     # Unlock all the nodes
4874     if self.op.mode == constants.INSTANCE_IMPORT:
4875       nodes_keep = [self.op.src_node]
4876       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4877                        if node != self.op.src_node]
4878       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4879       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4880     else:
4881       self.context.glm.release(locking.LEVEL_NODE)
4882       del self.acquired_locks[locking.LEVEL_NODE]
4883
4884     if self.op.wait_for_sync:
4885       disk_abort = not _WaitForSync(self, iobj)
4886     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4887       # make sure the disks are not degraded (still sync-ing is ok)
4888       time.sleep(15)
4889       feedback_fn("* checking mirrors status")
4890       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4891     else:
4892       disk_abort = False
4893
4894     if disk_abort:
4895       _RemoveDisks(self, iobj)
4896       self.cfg.RemoveInstance(iobj.name)
4897       # Make sure the instance lock gets removed
4898       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4899       raise errors.OpExecError("There are some degraded disks for"
4900                                " this instance")
4901
4902     feedback_fn("creating os for instance %s on node %s" %
4903                 (instance, pnode_name))
4904
4905     if iobj.disk_template != constants.DT_DISKLESS:
4906       if self.op.mode == constants.INSTANCE_CREATE:
4907         feedback_fn("* running the instance OS create scripts...")
4908         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4909         msg = result.RemoteFailMsg()
4910         if msg:
4911           raise errors.OpExecError("Could not add os for instance %s"
4912                                    " on node %s: %s" %
4913                                    (instance, pnode_name, msg))
4914
4915       elif self.op.mode == constants.INSTANCE_IMPORT:
4916         feedback_fn("* running the instance OS import scripts...")
4917         src_node = self.op.src_node
4918         src_images = self.src_images
4919         cluster_name = self.cfg.GetClusterName()
4920         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4921                                                          src_node, src_images,
4922                                                          cluster_name)
4923         import_result.Raise()
4924         for idx, result in enumerate(import_result.data):
4925           if not result:
4926             self.LogWarning("Could not import the image %s for instance"
4927                             " %s, disk %d, on node %s" %
4928                             (src_images[idx], instance, idx, pnode_name))
4929       else:
4930         # also checked in the prereq part
4931         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4932                                      % self.op.mode)
4933
4934     if self.op.start:
4935       iobj.admin_up = True
4936       self.cfg.Update(iobj)
4937       logging.info("Starting instance %s on node %s", instance, pnode_name)
4938       feedback_fn("* starting instance...")
4939       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4940       msg = result.RemoteFailMsg()
4941       if msg:
4942         raise errors.OpExecError("Could not start instance: %s" % msg)
4943
4944
4945 class LUConnectConsole(NoHooksLU):
4946   """Connect to an instance's console.
4947
4948   This is somewhat special in that it returns the command line that
4949   you need to run on the master node in order to connect to the
4950   console.
4951
4952   """
4953   _OP_REQP = ["instance_name"]
4954   REQ_BGL = False
4955
4956   def ExpandNames(self):
4957     self._ExpandAndLockInstance()
4958
4959   def CheckPrereq(self):
4960     """Check prerequisites.
4961
4962     This checks that the instance is in the cluster.
4963
4964     """
4965     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4966     assert self.instance is not None, \
4967       "Cannot retrieve locked instance %s" % self.op.instance_name
4968     _CheckNodeOnline(self, self.instance.primary_node)
4969
4970   def Exec(self, feedback_fn):
4971     """Connect to the console of an instance
4972
4973     """
4974     instance = self.instance
4975     node = instance.primary_node
4976
4977     node_insts = self.rpc.call_instance_list([node],
4978                                              [instance.hypervisor])[node]
4979     node_insts.Raise()
4980
4981     if instance.name not in node_insts.data:
4982       raise errors.OpExecError("Instance %s is not running." % instance.name)
4983
4984     logging.debug("Connecting to console of %s on %s", instance.name, node)
4985
4986     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4987     cluster = self.cfg.GetClusterInfo()
4988     # beparams and hvparams are passed separately, to avoid editing the
4989     # instance and then saving the defaults in the instance itself.
4990     hvparams = cluster.FillHV(instance)
4991     beparams = cluster.FillBE(instance)
4992     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4993
4994     # build ssh cmdline
4995     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4996
4997
4998 class LUReplaceDisks(LogicalUnit):
4999   """Replace the disks of an instance.
5000
5001   """
5002   HPATH = "mirrors-replace"
5003   HTYPE = constants.HTYPE_INSTANCE
5004   _OP_REQP = ["instance_name", "mode", "disks"]
5005   REQ_BGL = False
5006
5007   def CheckArguments(self):
5008     if not hasattr(self.op, "remote_node"):
5009       self.op.remote_node = None
5010     if not hasattr(self.op, "iallocator"):
5011       self.op.iallocator = None
5012
5013     # check for valid parameter combination
5014     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5015     if self.op.mode == constants.REPLACE_DISK_CHG:
5016       if cnt == 2:
5017         raise errors.OpPrereqError("When changing the secondary either an"
5018                                    " iallocator script must be used or the"
5019                                    " new node given")
5020       elif cnt == 0:
5021         raise errors.OpPrereqError("Give either the iallocator or the new"
5022                                    " secondary, not both")
5023     else: # not replacing the secondary
5024       if cnt != 2:
5025         raise errors.OpPrereqError("The iallocator and new node options can"
5026                                    " be used only when changing the"
5027                                    " secondary node")
5028
5029   def ExpandNames(self):
5030     self._ExpandAndLockInstance()
5031
5032     if self.op.iallocator is not None:
5033       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5034     elif self.op.remote_node is not None:
5035       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5036       if remote_node is None:
5037         raise errors.OpPrereqError("Node '%s' not known" %
5038                                    self.op.remote_node)
5039       self.op.remote_node = remote_node
5040       # Warning: do not remove the locking of the new secondary here
5041       # unless DRBD8.AddChildren is changed to work in parallel;
5042       # currently it doesn't since parallel invocations of
5043       # FindUnusedMinor will conflict
5044       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5045       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5046     else:
5047       self.needed_locks[locking.LEVEL_NODE] = []
5048       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5049
5050   def DeclareLocks(self, level):
5051     # If we're not already locking all nodes in the set we have to declare the
5052     # instance's primary/secondary nodes.
5053     if (level == locking.LEVEL_NODE and
5054         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5055       self._LockInstancesNodes()
5056
5057   def _RunAllocator(self):
5058     """Compute a new secondary node using an IAllocator.
5059
5060     """
5061     ial = IAllocator(self,
5062                      mode=constants.IALLOCATOR_MODE_RELOC,
5063                      name=self.op.instance_name,
5064                      relocate_from=[self.sec_node])
5065
5066     ial.Run(self.op.iallocator)
5067
5068     if not ial.success:
5069       raise errors.OpPrereqError("Can't compute nodes using"
5070                                  " iallocator '%s': %s" % (self.op.iallocator,
5071                                                            ial.info))
5072     if len(ial.nodes) != ial.required_nodes:
5073       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5074                                  " of nodes (%s), required %s" %
5075                                  (len(ial.nodes), ial.required_nodes))
5076     self.op.remote_node = ial.nodes[0]
5077     self.LogInfo("Selected new secondary for the instance: %s",
5078                  self.op.remote_node)
5079
5080   def BuildHooksEnv(self):
5081     """Build hooks env.
5082
5083     This runs on the master, the primary and all the secondaries.
5084
5085     """
5086     env = {
5087       "MODE": self.op.mode,
5088       "NEW_SECONDARY": self.op.remote_node,
5089       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5090       }
5091     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5092     nl = [
5093       self.cfg.GetMasterNode(),
5094       self.instance.primary_node,
5095       ]
5096     if self.op.remote_node is not None:
5097       nl.append(self.op.remote_node)
5098     return env, nl, nl
5099
5100   def CheckPrereq(self):
5101     """Check prerequisites.
5102
5103     This checks that the instance is in the cluster.
5104
5105     """
5106     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5107     assert instance is not None, \
5108       "Cannot retrieve locked instance %s" % self.op.instance_name
5109     self.instance = instance
5110
5111     if instance.disk_template != constants.DT_DRBD8:
5112       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5113                                  " instances")
5114
5115     if len(instance.secondary_nodes) != 1:
5116       raise errors.OpPrereqError("The instance has a strange layout,"
5117                                  " expected one secondary but found %d" %
5118                                  len(instance.secondary_nodes))
5119
5120     self.sec_node = instance.secondary_nodes[0]
5121
5122     if self.op.iallocator is not None:
5123       self._RunAllocator()
5124
5125     remote_node = self.op.remote_node
5126     if remote_node is not None:
5127       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5128       assert self.remote_node_info is not None, \
5129         "Cannot retrieve locked node %s" % remote_node
5130     else:
5131       self.remote_node_info = None
5132     if remote_node == instance.primary_node:
5133       raise errors.OpPrereqError("The specified node is the primary node of"
5134                                  " the instance.")
5135     elif remote_node == self.sec_node:
5136       raise errors.OpPrereqError("The specified node is already the"
5137                                  " secondary node of the instance.")
5138
5139     if self.op.mode == constants.REPLACE_DISK_PRI:
5140       n1 = self.tgt_node = instance.primary_node
5141       n2 = self.oth_node = self.sec_node
5142     elif self.op.mode == constants.REPLACE_DISK_SEC:
5143       n1 = self.tgt_node = self.sec_node
5144       n2 = self.oth_node = instance.primary_node
5145     elif self.op.mode == constants.REPLACE_DISK_CHG:
5146       n1 = self.new_node = remote_node
5147       n2 = self.oth_node = instance.primary_node
5148       self.tgt_node = self.sec_node
5149       _CheckNodeNotDrained(self, remote_node)
5150     else:
5151       raise errors.ProgrammerError("Unhandled disk replace mode")
5152
5153     _CheckNodeOnline(self, n1)
5154     _CheckNodeOnline(self, n2)
5155
5156     if not self.op.disks:
5157       self.op.disks = range(len(instance.disks))
5158
5159     for disk_idx in self.op.disks:
5160       instance.FindDisk(disk_idx)
5161
5162   def _ExecD8DiskOnly(self, feedback_fn):
5163     """Replace a disk on the primary or secondary for dbrd8.
5164
5165     The algorithm for replace is quite complicated:
5166
5167       1. for each disk to be replaced:
5168
5169         1. create new LVs on the target node with unique names
5170         1. detach old LVs from the drbd device
5171         1. rename old LVs to name_replaced.<time_t>
5172         1. rename new LVs to old LVs
5173         1. attach the new LVs (with the old names now) to the drbd device
5174
5175       1. wait for sync across all devices
5176
5177       1. for each modified disk:
5178
5179         1. remove old LVs (which have the name name_replaces.<time_t>)
5180
5181     Failures are not very well handled.
5182
5183     """
5184     steps_total = 6
5185     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5186     instance = self.instance
5187     iv_names = {}
5188     vgname = self.cfg.GetVGName()
5189     # start of work
5190     cfg = self.cfg
5191     tgt_node = self.tgt_node
5192     oth_node = self.oth_node
5193
5194     # Step: check device activation
5195     self.proc.LogStep(1, steps_total, "check device existence")
5196     info("checking volume groups")
5197     my_vg = cfg.GetVGName()
5198     results = self.rpc.call_vg_list([oth_node, tgt_node])
5199     if not results:
5200       raise errors.OpExecError("Can't list volume groups on the nodes")
5201     for node in oth_node, tgt_node:
5202       res = results[node]
5203       if res.failed or not res.data or my_vg not in res.data:
5204         raise errors.OpExecError("Volume group '%s' not found on %s" %
5205                                  (my_vg, node))
5206     for idx, dev in enumerate(instance.disks):
5207       if idx not in self.op.disks:
5208         continue
5209       for node in tgt_node, oth_node:
5210         info("checking disk/%d on %s" % (idx, node))
5211         cfg.SetDiskID(dev, node)
5212         result = self.rpc.call_blockdev_find(node, dev)
5213         msg = result.RemoteFailMsg()
5214         if not msg and not result.payload:
5215           msg = "disk not found"
5216         if msg:
5217           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5218                                    (idx, node, msg))
5219
5220     # Step: check other node consistency
5221     self.proc.LogStep(2, steps_total, "check peer consistency")
5222     for idx, dev in enumerate(instance.disks):
5223       if idx not in self.op.disks:
5224         continue
5225       info("checking disk/%d consistency on %s" % (idx, oth_node))
5226       if not _CheckDiskConsistency(self, dev, oth_node,
5227                                    oth_node==instance.primary_node):
5228         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5229                                  " to replace disks on this node (%s)" %
5230                                  (oth_node, tgt_node))
5231
5232     # Step: create new storage
5233     self.proc.LogStep(3, steps_total, "allocate new storage")
5234     for idx, dev in enumerate(instance.disks):
5235       if idx not in self.op.disks:
5236         continue
5237       size = dev.size
5238       cfg.SetDiskID(dev, tgt_node)
5239       lv_names = [".disk%d_%s" % (idx, suf)
5240                   for suf in ["data", "meta"]]
5241       names = _GenerateUniqueNames(self, lv_names)
5242       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5243                              logical_id=(vgname, names[0]))
5244       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5245                              logical_id=(vgname, names[1]))
5246       new_lvs = [lv_data, lv_meta]
5247       old_lvs = dev.children
5248       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5249       info("creating new local storage on %s for %s" %
5250            (tgt_node, dev.iv_name))
5251       # we pass force_create=True to force the LVM creation
5252       for new_lv in new_lvs:
5253         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5254                         _GetInstanceInfoText(instance), False)
5255
5256     # Step: for each lv, detach+rename*2+attach
5257     self.proc.LogStep(4, steps_total, "change drbd configuration")
5258     for dev, old_lvs, new_lvs in iv_names.itervalues():
5259       info("detaching %s drbd from local storage" % dev.iv_name)
5260       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5261       msg = result.RemoteFailMsg()
5262       if msg:
5263         raise errors.OpExecError("Can't detach drbd from local storage on node"
5264                                  " %s for device %s: %s" %
5265                                  (tgt_node, dev.iv_name, msg))
5266       #dev.children = []
5267       #cfg.Update(instance)
5268
5269       # ok, we created the new LVs, so now we know we have the needed
5270       # storage; as such, we proceed on the target node to rename
5271       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5272       # using the assumption that logical_id == physical_id (which in
5273       # turn is the unique_id on that node)
5274
5275       # FIXME(iustin): use a better name for the replaced LVs
5276       temp_suffix = int(time.time())
5277       ren_fn = lambda d, suff: (d.physical_id[0],
5278                                 d.physical_id[1] + "_replaced-%s" % suff)
5279       # build the rename list based on what LVs exist on the node
5280       rlist = []
5281       for to_ren in old_lvs:
5282         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5283         if not result.RemoteFailMsg() and result.payload:
5284           # device exists
5285           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5286
5287       info("renaming the old LVs on the target node")
5288       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5289       msg = result.RemoteFailMsg()
5290       if msg:
5291         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5292                                  (tgt_node, msg))
5293       # now we rename the new LVs to the old LVs
5294       info("renaming the new LVs on the target node")
5295       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5296       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5297       msg = result.RemoteFailMsg()
5298       if msg:
5299         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5300                                  (tgt_node, msg))
5301
5302       for old, new in zip(old_lvs, new_lvs):
5303         new.logical_id = old.logical_id
5304         cfg.SetDiskID(new, tgt_node)
5305
5306       for disk in old_lvs:
5307         disk.logical_id = ren_fn(disk, temp_suffix)
5308         cfg.SetDiskID(disk, tgt_node)
5309
5310       # now that the new lvs have the old name, we can add them to the device
5311       info("adding new mirror component on %s" % tgt_node)
5312       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5313       msg = result.RemoteFailMsg()
5314       if msg:
5315         for new_lv in new_lvs:
5316           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5317           if msg:
5318             warning("Can't rollback device %s: %s", dev, msg,
5319                     hint="cleanup manually the unused logical volumes")
5320         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5321
5322       dev.children = new_lvs
5323       cfg.Update(instance)
5324
5325     # Step: wait for sync
5326
5327     # this can fail as the old devices are degraded and _WaitForSync
5328     # does a combined result over all disks, so we don't check its
5329     # return value
5330     self.proc.LogStep(5, steps_total, "sync devices")
5331     _WaitForSync(self, instance, unlock=True)
5332
5333     # so check manually all the devices
5334     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5335       cfg.SetDiskID(dev, instance.primary_node)
5336       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5337       msg = result.RemoteFailMsg()
5338       if not msg and not result.payload:
5339         msg = "disk not found"
5340       if msg:
5341         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5342                                  (name, msg))
5343       if result.payload[5]:
5344         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5345
5346     # Step: remove old storage
5347     self.proc.LogStep(6, steps_total, "removing old storage")
5348     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5349       info("remove logical volumes for %s" % name)
5350       for lv in old_lvs:
5351         cfg.SetDiskID(lv, tgt_node)
5352         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5353         if msg:
5354           warning("Can't remove old LV: %s" % msg,
5355                   hint="manually remove unused LVs")
5356           continue
5357
5358   def _ExecD8Secondary(self, feedback_fn):
5359     """Replace the secondary node for drbd8.
5360
5361     The algorithm for replace is quite complicated:
5362       - for all disks of the instance:
5363         - create new LVs on the new node with same names
5364         - shutdown the drbd device on the old secondary
5365         - disconnect the drbd network on the primary
5366         - create the drbd device on the new secondary
5367         - network attach the drbd on the primary, using an artifice:
5368           the drbd code for Attach() will connect to the network if it
5369           finds a device which is connected to the good local disks but
5370           not network enabled
5371       - wait for sync across all devices
5372       - remove all disks from the old secondary
5373
5374     Failures are not very well handled.
5375
5376     """
5377     steps_total = 6
5378     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5379     instance = self.instance
5380     iv_names = {}
5381     # start of work
5382     cfg = self.cfg
5383     old_node = self.tgt_node
5384     new_node = self.new_node
5385     pri_node = instance.primary_node
5386     nodes_ip = {
5387       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5388       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5389       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5390       }
5391
5392     # Step: check device activation
5393     self.proc.LogStep(1, steps_total, "check device existence")
5394     info("checking volume groups")
5395     my_vg = cfg.GetVGName()
5396     results = self.rpc.call_vg_list([pri_node, new_node])
5397     for node in pri_node, new_node:
5398       res = results[node]
5399       if res.failed or not res.data or my_vg not in res.data:
5400         raise errors.OpExecError("Volume group '%s' not found on %s" %
5401                                  (my_vg, node))
5402     for idx, dev in enumerate(instance.disks):
5403       if idx not in self.op.disks:
5404         continue
5405       info("checking disk/%d on %s" % (idx, pri_node))
5406       cfg.SetDiskID(dev, pri_node)
5407       result = self.rpc.call_blockdev_find(pri_node, dev)
5408       msg = result.RemoteFailMsg()
5409       if not msg and not result.payload:
5410         msg = "disk not found"
5411       if msg:
5412         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5413                                  (idx, pri_node, msg))
5414
5415     # Step: check other node consistency
5416     self.proc.LogStep(2, steps_total, "check peer consistency")
5417     for idx, dev in enumerate(instance.disks):
5418       if idx not in self.op.disks:
5419         continue
5420       info("checking disk/%d consistency on %s" % (idx, pri_node))
5421       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5422         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5423                                  " unsafe to replace the secondary" %
5424                                  pri_node)
5425
5426     # Step: create new storage
5427     self.proc.LogStep(3, steps_total, "allocate new storage")
5428     for idx, dev in enumerate(instance.disks):
5429       info("adding new local storage on %s for disk/%d" %
5430            (new_node, idx))
5431       # we pass force_create=True to force LVM creation
5432       for new_lv in dev.children:
5433         _CreateBlockDev(self, new_node, instance, new_lv, True,
5434                         _GetInstanceInfoText(instance), False)
5435
5436     # Step 4: dbrd minors and drbd setups changes
5437     # after this, we must manually remove the drbd minors on both the
5438     # error and the success paths
5439     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5440                                    instance.name)
5441     logging.debug("Allocated minors %s" % (minors,))
5442     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5443     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5444       size = dev.size
5445       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5446       # create new devices on new_node; note that we create two IDs:
5447       # one without port, so the drbd will be activated without
5448       # networking information on the new node at this stage, and one
5449       # with network, for the latter activation in step 4
5450       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5451       if pri_node == o_node1:
5452         p_minor = o_minor1
5453       else:
5454         p_minor = o_minor2
5455
5456       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5457       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5458
5459       iv_names[idx] = (dev, dev.children, new_net_id)
5460       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5461                     new_net_id)
5462       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5463                               logical_id=new_alone_id,
5464                               children=dev.children)
5465       try:
5466         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5467                               _GetInstanceInfoText(instance), False)
5468       except errors.GenericError:
5469         self.cfg.ReleaseDRBDMinors(instance.name)
5470         raise
5471
5472     for idx, dev in enumerate(instance.disks):
5473       # we have new devices, shutdown the drbd on the old secondary
5474       info("shutting down drbd for disk/%d on old node" % idx)
5475       cfg.SetDiskID(dev, old_node)
5476       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5477       if msg:
5478         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5479                 (idx, msg),
5480                 hint="Please cleanup this device manually as soon as possible")
5481
5482     info("detaching primary drbds from the network (=> standalone)")
5483     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5484                                                instance.disks)[pri_node]
5485
5486     msg = result.RemoteFailMsg()
5487     if msg:
5488       # detaches didn't succeed (unlikely)
5489       self.cfg.ReleaseDRBDMinors(instance.name)
5490       raise errors.OpExecError("Can't detach the disks from the network on"
5491                                " old node: %s" % (msg,))
5492
5493     # if we managed to detach at least one, we update all the disks of
5494     # the instance to point to the new secondary
5495     info("updating instance configuration")
5496     for dev, _, new_logical_id in iv_names.itervalues():
5497       dev.logical_id = new_logical_id
5498       cfg.SetDiskID(dev, pri_node)
5499     cfg.Update(instance)
5500
5501     # and now perform the drbd attach
5502     info("attaching primary drbds to new secondary (standalone => connected)")
5503     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5504                                            instance.disks, instance.name,
5505                                            False)
5506     for to_node, to_result in result.items():
5507       msg = to_result.RemoteFailMsg()
5508       if msg:
5509         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5510                 hint="please do a gnt-instance info to see the"
5511                 " status of disks")
5512
5513     # this can fail as the old devices are degraded and _WaitForSync
5514     # does a combined result over all disks, so we don't check its
5515     # return value
5516     self.proc.LogStep(5, steps_total, "sync devices")
5517     _WaitForSync(self, instance, unlock=True)
5518
5519     # so check manually all the devices
5520     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5521       cfg.SetDiskID(dev, pri_node)
5522       result = self.rpc.call_blockdev_find(pri_node, dev)
5523       msg = result.RemoteFailMsg()
5524       if not msg and not result.payload:
5525         msg = "disk not found"
5526       if msg:
5527         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5528                                  (idx, msg))
5529       if result.payload[5]:
5530         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5531
5532     self.proc.LogStep(6, steps_total, "removing old storage")
5533     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5534       info("remove logical volumes for disk/%d" % idx)
5535       for lv in old_lvs:
5536         cfg.SetDiskID(lv, old_node)
5537         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5538         if msg:
5539           warning("Can't remove LV on old secondary: %s", msg,
5540                   hint="Cleanup stale volumes by hand")
5541
5542   def Exec(self, feedback_fn):
5543     """Execute disk replacement.
5544
5545     This dispatches the disk replacement to the appropriate handler.
5546
5547     """
5548     instance = self.instance
5549
5550     # Activate the instance disks if we're replacing them on a down instance
5551     if not instance.admin_up:
5552       _StartInstanceDisks(self, instance, True)
5553
5554     if self.op.mode == constants.REPLACE_DISK_CHG:
5555       fn = self._ExecD8Secondary
5556     else:
5557       fn = self._ExecD8DiskOnly
5558
5559     ret = fn(feedback_fn)
5560
5561     # Deactivate the instance disks if we're replacing them on a down instance
5562     if not instance.admin_up:
5563       _SafeShutdownInstanceDisks(self, instance)
5564
5565     return ret
5566
5567
5568 class LUGrowDisk(LogicalUnit):
5569   """Grow a disk of an instance.
5570
5571   """
5572   HPATH = "disk-grow"
5573   HTYPE = constants.HTYPE_INSTANCE
5574   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5575   REQ_BGL = False
5576
5577   def ExpandNames(self):
5578     self._ExpandAndLockInstance()
5579     self.needed_locks[locking.LEVEL_NODE] = []
5580     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5581
5582   def DeclareLocks(self, level):
5583     if level == locking.LEVEL_NODE:
5584       self._LockInstancesNodes()
5585
5586   def BuildHooksEnv(self):
5587     """Build hooks env.
5588
5589     This runs on the master, the primary and all the secondaries.
5590
5591     """
5592     env = {
5593       "DISK": self.op.disk,
5594       "AMOUNT": self.op.amount,
5595       }
5596     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5597     nl = [
5598       self.cfg.GetMasterNode(),
5599       self.instance.primary_node,
5600       ]
5601     return env, nl, nl
5602
5603   def CheckPrereq(self):
5604     """Check prerequisites.
5605
5606     This checks that the instance is in the cluster.
5607
5608     """
5609     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5610     assert instance is not None, \
5611       "Cannot retrieve locked instance %s" % self.op.instance_name
5612     nodenames = list(instance.all_nodes)
5613     for node in nodenames:
5614       _CheckNodeOnline(self, node)
5615
5616
5617     self.instance = instance
5618
5619     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5620       raise errors.OpPrereqError("Instance's disk layout does not support"
5621                                  " growing.")
5622
5623     self.disk = instance.FindDisk(self.op.disk)
5624
5625     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5626                                        instance.hypervisor)
5627     for node in nodenames:
5628       info = nodeinfo[node]
5629       if info.failed or not info.data:
5630         raise errors.OpPrereqError("Cannot get current information"
5631                                    " from node '%s'" % node)
5632       vg_free = info.data.get('vg_free', None)
5633       if not isinstance(vg_free, int):
5634         raise errors.OpPrereqError("Can't compute free disk space on"
5635                                    " node %s" % node)
5636       if self.op.amount > vg_free:
5637         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5638                                    " %d MiB available, %d MiB required" %
5639                                    (node, vg_free, self.op.amount))
5640
5641   def Exec(self, feedback_fn):
5642     """Execute disk grow.
5643
5644     """
5645     instance = self.instance
5646     disk = self.disk
5647     for node in instance.all_nodes:
5648       self.cfg.SetDiskID(disk, node)
5649       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5650       msg = result.RemoteFailMsg()
5651       if msg:
5652         raise errors.OpExecError("Grow request failed to node %s: %s" %
5653                                  (node, msg))
5654     disk.RecordGrow(self.op.amount)
5655     self.cfg.Update(instance)
5656     if self.op.wait_for_sync:
5657       disk_abort = not _WaitForSync(self, instance)
5658       if disk_abort:
5659         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5660                              " status.\nPlease check the instance.")
5661
5662
5663 class LUQueryInstanceData(NoHooksLU):
5664   """Query runtime instance data.
5665
5666   """
5667   _OP_REQP = ["instances", "static"]
5668   REQ_BGL = False
5669
5670   def ExpandNames(self):
5671     self.needed_locks = {}
5672     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5673
5674     if not isinstance(self.op.instances, list):
5675       raise errors.OpPrereqError("Invalid argument type 'instances'")
5676
5677     if self.op.instances:
5678       self.wanted_names = []
5679       for name in self.op.instances:
5680         full_name = self.cfg.ExpandInstanceName(name)
5681         if full_name is None:
5682           raise errors.OpPrereqError("Instance '%s' not known" % name)
5683         self.wanted_names.append(full_name)
5684       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5685     else:
5686       self.wanted_names = None
5687       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5688
5689     self.needed_locks[locking.LEVEL_NODE] = []
5690     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5691
5692   def DeclareLocks(self, level):
5693     if level == locking.LEVEL_NODE:
5694       self._LockInstancesNodes()
5695
5696   def CheckPrereq(self):
5697     """Check prerequisites.
5698
5699     This only checks the optional instance list against the existing names.
5700
5701     """
5702     if self.wanted_names is None:
5703       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5704
5705     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5706                              in self.wanted_names]
5707     return
5708
5709   def _ComputeDiskStatus(self, instance, snode, dev):
5710     """Compute block device status.
5711
5712     """
5713     static = self.op.static
5714     if not static:
5715       self.cfg.SetDiskID(dev, instance.primary_node)
5716       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5717       if dev_pstatus.offline:
5718         dev_pstatus = None
5719       else:
5720         msg = dev_pstatus.RemoteFailMsg()
5721         if msg:
5722           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5723                                    (instance.name, msg))
5724         dev_pstatus = dev_pstatus.payload
5725     else:
5726       dev_pstatus = None
5727
5728     if dev.dev_type in constants.LDS_DRBD:
5729       # we change the snode then (otherwise we use the one passed in)
5730       if dev.logical_id[0] == instance.primary_node:
5731         snode = dev.logical_id[1]
5732       else:
5733         snode = dev.logical_id[0]
5734
5735     if snode and not static:
5736       self.cfg.SetDiskID(dev, snode)
5737       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5738       if dev_sstatus.offline:
5739         dev_sstatus = None
5740       else:
5741         msg = dev_sstatus.RemoteFailMsg()
5742         if msg:
5743           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5744                                    (instance.name, msg))
5745         dev_sstatus = dev_sstatus.payload
5746     else:
5747       dev_sstatus = None
5748
5749     if dev.children:
5750       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5751                       for child in dev.children]
5752     else:
5753       dev_children = []
5754
5755     data = {
5756       "iv_name": dev.iv_name,
5757       "dev_type": dev.dev_type,
5758       "logical_id": dev.logical_id,
5759       "physical_id": dev.physical_id,
5760       "pstatus": dev_pstatus,
5761       "sstatus": dev_sstatus,
5762       "children": dev_children,
5763       "mode": dev.mode,
5764       }
5765
5766     return data
5767
5768   def Exec(self, feedback_fn):
5769     """Gather and return data"""
5770     result = {}
5771
5772     cluster = self.cfg.GetClusterInfo()
5773
5774     for instance in self.wanted_instances:
5775       if not self.op.static:
5776         remote_info = self.rpc.call_instance_info(instance.primary_node,
5777                                                   instance.name,
5778                                                   instance.hypervisor)
5779         remote_info.Raise()
5780         remote_info = remote_info.data
5781         if remote_info and "state" in remote_info:
5782           remote_state = "up"
5783         else:
5784           remote_state = "down"
5785       else:
5786         remote_state = None
5787       if instance.admin_up:
5788         config_state = "up"
5789       else:
5790         config_state = "down"
5791
5792       disks = [self._ComputeDiskStatus(instance, None, device)
5793                for device in instance.disks]
5794
5795       idict = {
5796         "name": instance.name,
5797         "config_state": config_state,
5798         "run_state": remote_state,
5799         "pnode": instance.primary_node,
5800         "snodes": instance.secondary_nodes,
5801         "os": instance.os,
5802         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5803         "disks": disks,
5804         "hypervisor": instance.hypervisor,
5805         "network_port": instance.network_port,
5806         "hv_instance": instance.hvparams,
5807         "hv_actual": cluster.FillHV(instance),
5808         "be_instance": instance.beparams,
5809         "be_actual": cluster.FillBE(instance),
5810         }
5811
5812       result[instance.name] = idict
5813
5814     return result
5815
5816
5817 class LUSetInstanceParams(LogicalUnit):
5818   """Modifies an instances's parameters.
5819
5820   """
5821   HPATH = "instance-modify"
5822   HTYPE = constants.HTYPE_INSTANCE
5823   _OP_REQP = ["instance_name"]
5824   REQ_BGL = False
5825
5826   def CheckArguments(self):
5827     if not hasattr(self.op, 'nics'):
5828       self.op.nics = []
5829     if not hasattr(self.op, 'disks'):
5830       self.op.disks = []
5831     if not hasattr(self.op, 'beparams'):
5832       self.op.beparams = {}
5833     if not hasattr(self.op, 'hvparams'):
5834       self.op.hvparams = {}
5835     self.op.force = getattr(self.op, "force", False)
5836     if not (self.op.nics or self.op.disks or
5837             self.op.hvparams or self.op.beparams):
5838       raise errors.OpPrereqError("No changes submitted")
5839
5840     # Disk validation
5841     disk_addremove = 0
5842     for disk_op, disk_dict in self.op.disks:
5843       if disk_op == constants.DDM_REMOVE:
5844         disk_addremove += 1
5845         continue
5846       elif disk_op == constants.DDM_ADD:
5847         disk_addremove += 1
5848       else:
5849         if not isinstance(disk_op, int):
5850           raise errors.OpPrereqError("Invalid disk index")
5851       if disk_op == constants.DDM_ADD:
5852         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5853         if mode not in constants.DISK_ACCESS_SET:
5854           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5855         size = disk_dict.get('size', None)
5856         if size is None:
5857           raise errors.OpPrereqError("Required disk parameter size missing")
5858         try:
5859           size = int(size)
5860         except ValueError, err:
5861           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5862                                      str(err))
5863         disk_dict['size'] = size
5864       else:
5865         # modification of disk
5866         if 'size' in disk_dict:
5867           raise errors.OpPrereqError("Disk size change not possible, use"
5868                                      " grow-disk")
5869
5870     if disk_addremove > 1:
5871       raise errors.OpPrereqError("Only one disk add or remove operation"
5872                                  " supported at a time")
5873
5874     # NIC validation
5875     nic_addremove = 0
5876     for nic_op, nic_dict in self.op.nics:
5877       if nic_op == constants.DDM_REMOVE:
5878         nic_addremove += 1
5879         continue
5880       elif nic_op == constants.DDM_ADD:
5881         nic_addremove += 1
5882       else:
5883         if not isinstance(nic_op, int):
5884           raise errors.OpPrereqError("Invalid nic index")
5885
5886       # nic_dict should be a dict
5887       nic_ip = nic_dict.get('ip', None)
5888       if nic_ip is not None:
5889         if nic_ip.lower() == constants.VALUE_NONE:
5890           nic_dict['ip'] = None
5891         else:
5892           if not utils.IsValidIP(nic_ip):
5893             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5894
5895       nic_bridge = nic_dict.get('bridge', None)
5896       nic_link = nic_dict.get('link', None)
5897       if nic_bridge and nic_link:
5898         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5899       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5900         nic_dict['bridge'] = None
5901       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5902         nic_dict['link'] = None
5903
5904       if nic_op == constants.DDM_ADD:
5905         nic_mac = nic_dict.get('mac', None)
5906         if nic_mac is None:
5907           nic_dict['mac'] = constants.VALUE_AUTO
5908
5909       if 'mac' in nic_dict:
5910         nic_mac = nic_dict['mac']
5911         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5912           if not utils.IsValidMac(nic_mac):
5913             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5914         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5915           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5916                                      " modifying an existing nic")
5917
5918     if nic_addremove > 1:
5919       raise errors.OpPrereqError("Only one NIC add or remove operation"
5920                                  " supported at a time")
5921
5922   def ExpandNames(self):
5923     self._ExpandAndLockInstance()
5924     self.needed_locks[locking.LEVEL_NODE] = []
5925     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5926
5927   def DeclareLocks(self, level):
5928     if level == locking.LEVEL_NODE:
5929       self._LockInstancesNodes()
5930
5931   def BuildHooksEnv(self):
5932     """Build hooks env.
5933
5934     This runs on the master, primary and secondaries.
5935
5936     """
5937     args = dict()
5938     if constants.BE_MEMORY in self.be_new:
5939       args['memory'] = self.be_new[constants.BE_MEMORY]
5940     if constants.BE_VCPUS in self.be_new:
5941       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5942     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5943     # information at all.
5944     if self.op.nics:
5945       args['nics'] = []
5946       nic_override = dict(self.op.nics)
5947       for idx, nic in enumerate(self.instance.nics):
5948         if idx in nic_override:
5949           this_nic_override = nic_override[idx]
5950         else:
5951           this_nic_override = {}
5952         if 'ip' in this_nic_override:
5953           ip = this_nic_override['ip']
5954         else:
5955           ip = nic.ip
5956         if 'bridge' in this_nic_override:
5957           bridge = this_nic_override['bridge']
5958         else:
5959           bridge = nic.bridge
5960         if 'mac' in this_nic_override:
5961           mac = this_nic_override['mac']
5962         else:
5963           mac = nic.mac
5964         args['nics'].append((ip, bridge, mac))
5965       if constants.DDM_ADD in nic_override:
5966         ip = nic_override[constants.DDM_ADD].get('ip', None)
5967         bridge = nic_override[constants.DDM_ADD]['bridge']
5968         mac = nic_override[constants.DDM_ADD]['mac']
5969         args['nics'].append((ip, bridge, mac))
5970       elif constants.DDM_REMOVE in nic_override:
5971         del args['nics'][-1]
5972
5973     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5974     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5975     return env, nl, nl
5976
5977   def _GetUpdatedParams(self, old_params, update_dict,
5978                         default_values, parameter_types):
5979     """Return the new params dict for the given params.
5980
5981     @type old_params: dict
5982     @type old_params: old parameters
5983     @type update_dict: dict
5984     @type update_dict: dict containing new parameter values,
5985                        or constants.VALUE_DEFAULT to reset the
5986                        parameter to its default value
5987     @type default_values: dict
5988     @param default_values: default values for the filled parameters
5989     @type parameter_types: dict
5990     @param parameter_types: dict mapping target dict keys to types
5991                             in constants.ENFORCEABLE_TYPES
5992     @rtype: (dict, dict)
5993     @return: (new_parameters, filled_parameters)
5994
5995     """
5996     params_copy = copy.deepcopy(old_params)
5997     for key, val in update_dict.iteritems():
5998       if val == constants.VALUE_DEFAULT:
5999         try:
6000           del params_copy[key]
6001         except KeyError:
6002           pass
6003       else:
6004         params_copy[key] = val
6005     utils.ForceDictType(params_copy, parameter_types)
6006     params_filled = objects.FillDict(default_values, params_copy)
6007     return (params_copy, params_filled)
6008
6009   def CheckPrereq(self):
6010     """Check prerequisites.
6011
6012     This only checks the instance list against the existing names.
6013
6014     """
6015     force = self.force = self.op.force
6016
6017     # checking the new params on the primary/secondary nodes
6018
6019     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6020     cluster = self.cluster = self.cfg.GetClusterInfo()
6021     assert self.instance is not None, \
6022       "Cannot retrieve locked instance %s" % self.op.instance_name
6023     pnode = instance.primary_node
6024     nodelist = list(instance.all_nodes)
6025
6026     # hvparams processing
6027     if self.op.hvparams:
6028       i_hvdict, hv_new = self._GetUpdatedParams(
6029                              instance.hvparams, self.op.hvparams,
6030                              cluster.hvparams[instance.hypervisor],
6031                              constants.HVS_PARAMETER_TYPES)
6032       # local check
6033       hypervisor.GetHypervisor(
6034         instance.hypervisor).CheckParameterSyntax(hv_new)
6035       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6036       self.hv_new = hv_new # the new actual values
6037       self.hv_inst = i_hvdict # the new dict (without defaults)
6038     else:
6039       self.hv_new = self.hv_inst = {}
6040
6041     # beparams processing
6042     if self.op.beparams:
6043       i_bedict, be_new = self._GetUpdatedParams(
6044                              instance.beparams, self.op.beparams,
6045                              cluster.beparams[constants.PP_DEFAULT],
6046                              constants.BES_PARAMETER_TYPES)
6047       self.be_new = be_new # the new actual values
6048       self.be_inst = i_bedict # the new dict (without defaults)
6049     else:
6050       self.be_new = self.be_inst = {}
6051
6052     self.warn = []
6053
6054     if constants.BE_MEMORY in self.op.beparams and not self.force:
6055       mem_check_list = [pnode]
6056       if be_new[constants.BE_AUTO_BALANCE]:
6057         # either we changed auto_balance to yes or it was from before
6058         mem_check_list.extend(instance.secondary_nodes)
6059       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6060                                                   instance.hypervisor)
6061       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6062                                          instance.hypervisor)
6063       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6064         # Assume the primary node is unreachable and go ahead
6065         self.warn.append("Can't get info from primary node %s" % pnode)
6066       else:
6067         if not instance_info.failed and instance_info.data:
6068           current_mem = int(instance_info.data['memory'])
6069         else:
6070           # Assume instance not running
6071           # (there is a slight race condition here, but it's not very probable,
6072           # and we have no other way to check)
6073           current_mem = 0
6074         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6075                     nodeinfo[pnode].data['memory_free'])
6076         if miss_mem > 0:
6077           raise errors.OpPrereqError("This change will prevent the instance"
6078                                      " from starting, due to %d MB of memory"
6079                                      " missing on its primary node" % miss_mem)
6080
6081       if be_new[constants.BE_AUTO_BALANCE]:
6082         for node, nres in nodeinfo.iteritems():
6083           if node not in instance.secondary_nodes:
6084             continue
6085           if nres.failed or not isinstance(nres.data, dict):
6086             self.warn.append("Can't get info from secondary node %s" % node)
6087           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6088             self.warn.append("Not enough memory to failover instance to"
6089                              " secondary node %s" % node)
6090
6091     # NIC processing
6092     self.nic_pnew = {}
6093     self.nic_pinst = {}
6094     for nic_op, nic_dict in self.op.nics:
6095       if nic_op == constants.DDM_REMOVE:
6096         if not instance.nics:
6097           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6098         continue
6099       if nic_op != constants.DDM_ADD:
6100         # an existing nic
6101         if nic_op < 0 or nic_op >= len(instance.nics):
6102           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6103                                      " are 0 to %d" %
6104                                      (nic_op, len(instance.nics)))
6105         old_nic_params = instance.nics[nic_op].nicparams
6106         old_nic_ip = instance.nics[nic_op].ip
6107       else:
6108         old_nic_params = {}
6109         old_nic_ip = None
6110
6111       update_params_dict = dict([(key, nic_dict[key])
6112                                  for key in constants.NICS_PARAMETERS
6113                                  if key in nic_dict])
6114
6115       if 'bridge' in nic_dict:
6116         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6117
6118       new_nic_params, new_filled_nic_params = \
6119           self._GetUpdatedParams(old_nic_params, update_params_dict,
6120                                  cluster.nicparams[constants.PP_DEFAULT],
6121                                  constants.NICS_PARAMETER_TYPES)
6122       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6123       self.nic_pinst[nic_op] = new_nic_params
6124       self.nic_pnew[nic_op] = new_filled_nic_params
6125       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6126
6127       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6128         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6129         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6130         result.Raise()
6131         if not result.data:
6132           msg = ("Bridge '%s' doesn't exist on one of"
6133                  " the instance nodes" % nic_bridge)
6134           if self.force:
6135             self.warn.append(msg)
6136           else:
6137             raise errors.OpPrereqError(msg)
6138       if new_nic_mode == constants.NIC_MODE_ROUTED:
6139         if 'ip' in nic_dict:
6140           nic_ip = nic_dict['ip']
6141         else:
6142           nic_ip = old_nic_ip
6143         if nic_ip is None:
6144           raise errors.OpPrereqError('Cannot set the nic ip to None'
6145                                      ' on a routed nic')
6146       if 'mac' in nic_dict:
6147         nic_mac = nic_dict['mac']
6148         if nic_mac is None:
6149           raise errors.OpPrereqError('Cannot set the nic mac to None')
6150         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6151           # otherwise generate the mac
6152           nic_dict['mac'] = self.cfg.GenerateMAC()
6153         else:
6154           # or validate/reserve the current one
6155           if self.cfg.IsMacInUse(nic_mac):
6156             raise errors.OpPrereqError("MAC address %s already in use"
6157                                        " in cluster" % nic_mac)
6158
6159     # DISK processing
6160     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6161       raise errors.OpPrereqError("Disk operations not supported for"
6162                                  " diskless instances")
6163     for disk_op, disk_dict in self.op.disks:
6164       if disk_op == constants.DDM_REMOVE:
6165         if len(instance.disks) == 1:
6166           raise errors.OpPrereqError("Cannot remove the last disk of"
6167                                      " an instance")
6168         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6169         ins_l = ins_l[pnode]
6170         if ins_l.failed or not isinstance(ins_l.data, list):
6171           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6172         if instance.name in ins_l.data:
6173           raise errors.OpPrereqError("Instance is running, can't remove"
6174                                      " disks.")
6175
6176       if (disk_op == constants.DDM_ADD and
6177           len(instance.nics) >= constants.MAX_DISKS):
6178         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6179                                    " add more" % constants.MAX_DISKS)
6180       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6181         # an existing disk
6182         if disk_op < 0 or disk_op >= len(instance.disks):
6183           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6184                                      " are 0 to %d" %
6185                                      (disk_op, len(instance.disks)))
6186
6187     return
6188
6189   def Exec(self, feedback_fn):
6190     """Modifies an instance.
6191
6192     All parameters take effect only at the next restart of the instance.
6193
6194     """
6195     # Process here the warnings from CheckPrereq, as we don't have a
6196     # feedback_fn there.
6197     for warn in self.warn:
6198       feedback_fn("WARNING: %s" % warn)
6199
6200     result = []
6201     instance = self.instance
6202     cluster = self.cluster
6203     # disk changes
6204     for disk_op, disk_dict in self.op.disks:
6205       if disk_op == constants.DDM_REMOVE:
6206         # remove the last disk
6207         device = instance.disks.pop()
6208         device_idx = len(instance.disks)
6209         for node, disk in device.ComputeNodeTree(instance.primary_node):
6210           self.cfg.SetDiskID(disk, node)
6211           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6212           if msg:
6213             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6214                             " continuing anyway", device_idx, node, msg)
6215         result.append(("disk/%d" % device_idx, "remove"))
6216       elif disk_op == constants.DDM_ADD:
6217         # add a new disk
6218         if instance.disk_template == constants.DT_FILE:
6219           file_driver, file_path = instance.disks[0].logical_id
6220           file_path = os.path.dirname(file_path)
6221         else:
6222           file_driver = file_path = None
6223         disk_idx_base = len(instance.disks)
6224         new_disk = _GenerateDiskTemplate(self,
6225                                          instance.disk_template,
6226                                          instance.name, instance.primary_node,
6227                                          instance.secondary_nodes,
6228                                          [disk_dict],
6229                                          file_path,
6230                                          file_driver,
6231                                          disk_idx_base)[0]
6232         instance.disks.append(new_disk)
6233         info = _GetInstanceInfoText(instance)
6234
6235         logging.info("Creating volume %s for instance %s",
6236                      new_disk.iv_name, instance.name)
6237         # Note: this needs to be kept in sync with _CreateDisks
6238         #HARDCODE
6239         for node in instance.all_nodes:
6240           f_create = node == instance.primary_node
6241           try:
6242             _CreateBlockDev(self, node, instance, new_disk,
6243                             f_create, info, f_create)
6244           except errors.OpExecError, err:
6245             self.LogWarning("Failed to create volume %s (%s) on"
6246                             " node %s: %s",
6247                             new_disk.iv_name, new_disk, node, err)
6248         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6249                        (new_disk.size, new_disk.mode)))
6250       else:
6251         # change a given disk
6252         instance.disks[disk_op].mode = disk_dict['mode']
6253         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6254     # NIC changes
6255     for nic_op, nic_dict in self.op.nics:
6256       if nic_op == constants.DDM_REMOVE:
6257         # remove the last nic
6258         del instance.nics[-1]
6259         result.append(("nic.%d" % len(instance.nics), "remove"))
6260       elif nic_op == constants.DDM_ADD:
6261         # mac and bridge should be set, by now
6262         mac = nic_dict['mac']
6263         ip = nic_dict.get('ip', None)
6264         nicparams = self.nic_pinst[constants.DDM_ADD]
6265         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6266         instance.nics.append(new_nic)
6267         result.append(("nic.%d" % (len(instance.nics) - 1),
6268                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6269                        (new_nic.mac, new_nic.ip,
6270                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6271                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6272                        )))
6273       else:
6274         for key in 'mac', 'ip':
6275           if key in nic_dict:
6276             setattr(instance.nics[nic_op], key, nic_dict[key])
6277         if nic_op in self.nic_pnew:
6278           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6279         for key, val in nic_dict.iteritems():
6280           result.append(("nic.%s/%d" % (key, nic_op), val))
6281
6282     # hvparams changes
6283     if self.op.hvparams:
6284       instance.hvparams = self.hv_inst
6285       for key, val in self.op.hvparams.iteritems():
6286         result.append(("hv/%s" % key, val))
6287
6288     # beparams changes
6289     if self.op.beparams:
6290       instance.beparams = self.be_inst
6291       for key, val in self.op.beparams.iteritems():
6292         result.append(("be/%s" % key, val))
6293
6294     self.cfg.Update(instance)
6295
6296     return result
6297
6298
6299 class LUQueryExports(NoHooksLU):
6300   """Query the exports list
6301
6302   """
6303   _OP_REQP = ['nodes']
6304   REQ_BGL = False
6305
6306   def ExpandNames(self):
6307     self.needed_locks = {}
6308     self.share_locks[locking.LEVEL_NODE] = 1
6309     if not self.op.nodes:
6310       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6311     else:
6312       self.needed_locks[locking.LEVEL_NODE] = \
6313         _GetWantedNodes(self, self.op.nodes)
6314
6315   def CheckPrereq(self):
6316     """Check prerequisites.
6317
6318     """
6319     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6320
6321   def Exec(self, feedback_fn):
6322     """Compute the list of all the exported system images.
6323
6324     @rtype: dict
6325     @return: a dictionary with the structure node->(export-list)
6326         where export-list is a list of the instances exported on
6327         that node.
6328
6329     """
6330     rpcresult = self.rpc.call_export_list(self.nodes)
6331     result = {}
6332     for node in rpcresult:
6333       if rpcresult[node].failed:
6334         result[node] = False
6335       else:
6336         result[node] = rpcresult[node].data
6337
6338     return result
6339
6340
6341 class LUExportInstance(LogicalUnit):
6342   """Export an instance to an image in the cluster.
6343
6344   """
6345   HPATH = "instance-export"
6346   HTYPE = constants.HTYPE_INSTANCE
6347   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6348   REQ_BGL = False
6349
6350   def ExpandNames(self):
6351     self._ExpandAndLockInstance()
6352     # FIXME: lock only instance primary and destination node
6353     #
6354     # Sad but true, for now we have do lock all nodes, as we don't know where
6355     # the previous export might be, and and in this LU we search for it and
6356     # remove it from its current node. In the future we could fix this by:
6357     #  - making a tasklet to search (share-lock all), then create the new one,
6358     #    then one to remove, after
6359     #  - removing the removal operation altoghether
6360     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6361
6362   def DeclareLocks(self, level):
6363     """Last minute lock declaration."""
6364     # All nodes are locked anyway, so nothing to do here.
6365
6366   def BuildHooksEnv(self):
6367     """Build hooks env.
6368
6369     This will run on the master, primary node and target node.
6370
6371     """
6372     env = {
6373       "EXPORT_NODE": self.op.target_node,
6374       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6375       }
6376     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6377     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6378           self.op.target_node]
6379     return env, nl, nl
6380
6381   def CheckPrereq(self):
6382     """Check prerequisites.
6383
6384     This checks that the instance and node names are valid.
6385
6386     """
6387     instance_name = self.op.instance_name
6388     self.instance = self.cfg.GetInstanceInfo(instance_name)
6389     assert self.instance is not None, \
6390           "Cannot retrieve locked instance %s" % self.op.instance_name
6391     _CheckNodeOnline(self, self.instance.primary_node)
6392
6393     self.dst_node = self.cfg.GetNodeInfo(
6394       self.cfg.ExpandNodeName(self.op.target_node))
6395
6396     if self.dst_node is None:
6397       # This is wrong node name, not a non-locked node
6398       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6399     _CheckNodeOnline(self, self.dst_node.name)
6400     _CheckNodeNotDrained(self, self.dst_node.name)
6401
6402     # instance disk type verification
6403     for disk in self.instance.disks:
6404       if disk.dev_type == constants.LD_FILE:
6405         raise errors.OpPrereqError("Export not supported for instances with"
6406                                    " file-based disks")
6407
6408   def Exec(self, feedback_fn):
6409     """Export an instance to an image in the cluster.
6410
6411     """
6412     instance = self.instance
6413     dst_node = self.dst_node
6414     src_node = instance.primary_node
6415     if self.op.shutdown:
6416       # shutdown the instance, but not the disks
6417       result = self.rpc.call_instance_shutdown(src_node, instance)
6418       msg = result.RemoteFailMsg()
6419       if msg:
6420         raise errors.OpExecError("Could not shutdown instance %s on"
6421                                  " node %s: %s" %
6422                                  (instance.name, src_node, msg))
6423
6424     vgname = self.cfg.GetVGName()
6425
6426     snap_disks = []
6427
6428     # set the disks ID correctly since call_instance_start needs the
6429     # correct drbd minor to create the symlinks
6430     for disk in instance.disks:
6431       self.cfg.SetDiskID(disk, src_node)
6432
6433     try:
6434       for disk in instance.disks:
6435         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6436         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6437         if new_dev_name.failed or not new_dev_name.data:
6438           self.LogWarning("Could not snapshot block device %s on node %s",
6439                           disk.logical_id[1], src_node)
6440           snap_disks.append(False)
6441         else:
6442           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6443                                  logical_id=(vgname, new_dev_name.data),
6444                                  physical_id=(vgname, new_dev_name.data),
6445                                  iv_name=disk.iv_name)
6446           snap_disks.append(new_dev)
6447
6448     finally:
6449       if self.op.shutdown and instance.admin_up:
6450         result = self.rpc.call_instance_start(src_node, instance, None, None)
6451         msg = result.RemoteFailMsg()
6452         if msg:
6453           _ShutdownInstanceDisks(self, instance)
6454           raise errors.OpExecError("Could not start instance: %s" % msg)
6455
6456     # TODO: check for size
6457
6458     cluster_name = self.cfg.GetClusterName()
6459     for idx, dev in enumerate(snap_disks):
6460       if dev:
6461         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6462                                                instance, cluster_name, idx)
6463         if result.failed or not result.data:
6464           self.LogWarning("Could not export block device %s from node %s to"
6465                           " node %s", dev.logical_id[1], src_node,
6466                           dst_node.name)
6467         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6468         if msg:
6469           self.LogWarning("Could not remove snapshot block device %s from node"
6470                           " %s: %s", dev.logical_id[1], src_node, msg)
6471
6472     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6473     if result.failed or not result.data:
6474       self.LogWarning("Could not finalize export for instance %s on node %s",
6475                       instance.name, dst_node.name)
6476
6477     nodelist = self.cfg.GetNodeList()
6478     nodelist.remove(dst_node.name)
6479
6480     # on one-node clusters nodelist will be empty after the removal
6481     # if we proceed the backup would be removed because OpQueryExports
6482     # substitutes an empty list with the full cluster node list.
6483     if nodelist:
6484       exportlist = self.rpc.call_export_list(nodelist)
6485       for node in exportlist:
6486         if exportlist[node].failed:
6487           continue
6488         if instance.name in exportlist[node].data:
6489           if not self.rpc.call_export_remove(node, instance.name):
6490             self.LogWarning("Could not remove older export for instance %s"
6491                             " on node %s", instance.name, node)
6492
6493
6494 class LURemoveExport(NoHooksLU):
6495   """Remove exports related to the named instance.
6496
6497   """
6498   _OP_REQP = ["instance_name"]
6499   REQ_BGL = False
6500
6501   def ExpandNames(self):
6502     self.needed_locks = {}
6503     # We need all nodes to be locked in order for RemoveExport to work, but we
6504     # don't need to lock the instance itself, as nothing will happen to it (and
6505     # we can remove exports also for a removed instance)
6506     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6507
6508   def CheckPrereq(self):
6509     """Check prerequisites.
6510     """
6511     pass
6512
6513   def Exec(self, feedback_fn):
6514     """Remove any export.
6515
6516     """
6517     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6518     # If the instance was not found we'll try with the name that was passed in.
6519     # This will only work if it was an FQDN, though.
6520     fqdn_warn = False
6521     if not instance_name:
6522       fqdn_warn = True
6523       instance_name = self.op.instance_name
6524
6525     exportlist = self.rpc.call_export_list(self.acquired_locks[
6526       locking.LEVEL_NODE])
6527     found = False
6528     for node in exportlist:
6529       if exportlist[node].failed:
6530         self.LogWarning("Failed to query node %s, continuing" % node)
6531         continue
6532       if instance_name in exportlist[node].data:
6533         found = True
6534         result = self.rpc.call_export_remove(node, instance_name)
6535         if result.failed or not result.data:
6536           logging.error("Could not remove export for instance %s"
6537                         " on node %s", instance_name, node)
6538
6539     if fqdn_warn and not found:
6540       feedback_fn("Export not found. If trying to remove an export belonging"
6541                   " to a deleted instance please use its Fully Qualified"
6542                   " Domain Name.")
6543
6544
6545 class TagsLU(NoHooksLU):
6546   """Generic tags LU.
6547
6548   This is an abstract class which is the parent of all the other tags LUs.
6549
6550   """
6551
6552   def ExpandNames(self):
6553     self.needed_locks = {}
6554     if self.op.kind == constants.TAG_NODE:
6555       name = self.cfg.ExpandNodeName(self.op.name)
6556       if name is None:
6557         raise errors.OpPrereqError("Invalid node name (%s)" %
6558                                    (self.op.name,))
6559       self.op.name = name
6560       self.needed_locks[locking.LEVEL_NODE] = name
6561     elif self.op.kind == constants.TAG_INSTANCE:
6562       name = self.cfg.ExpandInstanceName(self.op.name)
6563       if name is None:
6564         raise errors.OpPrereqError("Invalid instance name (%s)" %
6565                                    (self.op.name,))
6566       self.op.name = name
6567       self.needed_locks[locking.LEVEL_INSTANCE] = name
6568
6569   def CheckPrereq(self):
6570     """Check prerequisites.
6571
6572     """
6573     if self.op.kind == constants.TAG_CLUSTER:
6574       self.target = self.cfg.GetClusterInfo()
6575     elif self.op.kind == constants.TAG_NODE:
6576       self.target = self.cfg.GetNodeInfo(self.op.name)
6577     elif self.op.kind == constants.TAG_INSTANCE:
6578       self.target = self.cfg.GetInstanceInfo(self.op.name)
6579     else:
6580       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6581                                  str(self.op.kind))
6582
6583
6584 class LUGetTags(TagsLU):
6585   """Returns the tags of a given object.
6586
6587   """
6588   _OP_REQP = ["kind", "name"]
6589   REQ_BGL = False
6590
6591   def Exec(self, feedback_fn):
6592     """Returns the tag list.
6593
6594     """
6595     return list(self.target.GetTags())
6596
6597
6598 class LUSearchTags(NoHooksLU):
6599   """Searches the tags for a given pattern.
6600
6601   """
6602   _OP_REQP = ["pattern"]
6603   REQ_BGL = False
6604
6605   def ExpandNames(self):
6606     self.needed_locks = {}
6607
6608   def CheckPrereq(self):
6609     """Check prerequisites.
6610
6611     This checks the pattern passed for validity by compiling it.
6612
6613     """
6614     try:
6615       self.re = re.compile(self.op.pattern)
6616     except re.error, err:
6617       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6618                                  (self.op.pattern, err))
6619
6620   def Exec(self, feedback_fn):
6621     """Returns the tag list.
6622
6623     """
6624     cfg = self.cfg
6625     tgts = [("/cluster", cfg.GetClusterInfo())]
6626     ilist = cfg.GetAllInstancesInfo().values()
6627     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6628     nlist = cfg.GetAllNodesInfo().values()
6629     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6630     results = []
6631     for path, target in tgts:
6632       for tag in target.GetTags():
6633         if self.re.search(tag):
6634           results.append((path, tag))
6635     return results
6636
6637
6638 class LUAddTags(TagsLU):
6639   """Sets a tag on a given object.
6640
6641   """
6642   _OP_REQP = ["kind", "name", "tags"]
6643   REQ_BGL = False
6644
6645   def CheckPrereq(self):
6646     """Check prerequisites.
6647
6648     This checks the type and length of the tag name and value.
6649
6650     """
6651     TagsLU.CheckPrereq(self)
6652     for tag in self.op.tags:
6653       objects.TaggableObject.ValidateTag(tag)
6654
6655   def Exec(self, feedback_fn):
6656     """Sets the tag.
6657
6658     """
6659     try:
6660       for tag in self.op.tags:
6661         self.target.AddTag(tag)
6662     except errors.TagError, err:
6663       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6664     try:
6665       self.cfg.Update(self.target)
6666     except errors.ConfigurationError:
6667       raise errors.OpRetryError("There has been a modification to the"
6668                                 " config file and the operation has been"
6669                                 " aborted. Please retry.")
6670
6671
6672 class LUDelTags(TagsLU):
6673   """Delete a list of tags from a given object.
6674
6675   """
6676   _OP_REQP = ["kind", "name", "tags"]
6677   REQ_BGL = False
6678
6679   def CheckPrereq(self):
6680     """Check prerequisites.
6681
6682     This checks that we have the given tag.
6683
6684     """
6685     TagsLU.CheckPrereq(self)
6686     for tag in self.op.tags:
6687       objects.TaggableObject.ValidateTag(tag)
6688     del_tags = frozenset(self.op.tags)
6689     cur_tags = self.target.GetTags()
6690     if not del_tags <= cur_tags:
6691       diff_tags = del_tags - cur_tags
6692       diff_names = ["'%s'" % tag for tag in diff_tags]
6693       diff_names.sort()
6694       raise errors.OpPrereqError("Tag(s) %s not found" %
6695                                  (",".join(diff_names)))
6696
6697   def Exec(self, feedback_fn):
6698     """Remove the tag from the object.
6699
6700     """
6701     for tag in self.op.tags:
6702       self.target.RemoveTag(tag)
6703     try:
6704       self.cfg.Update(self.target)
6705     except errors.ConfigurationError:
6706       raise errors.OpRetryError("There has been a modification to the"
6707                                 " config file and the operation has been"
6708                                 " aborted. Please retry.")
6709
6710
6711 class LUTestDelay(NoHooksLU):
6712   """Sleep for a specified amount of time.
6713
6714   This LU sleeps on the master and/or nodes for a specified amount of
6715   time.
6716
6717   """
6718   _OP_REQP = ["duration", "on_master", "on_nodes"]
6719   REQ_BGL = False
6720
6721   def ExpandNames(self):
6722     """Expand names and set required locks.
6723
6724     This expands the node list, if any.
6725
6726     """
6727     self.needed_locks = {}
6728     if self.op.on_nodes:
6729       # _GetWantedNodes can be used here, but is not always appropriate to use
6730       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6731       # more information.
6732       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6733       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6734
6735   def CheckPrereq(self):
6736     """Check prerequisites.
6737
6738     """
6739
6740   def Exec(self, feedback_fn):
6741     """Do the actual sleep.
6742
6743     """
6744     if self.op.on_master:
6745       if not utils.TestDelay(self.op.duration):
6746         raise errors.OpExecError("Error during master delay test")
6747     if self.op.on_nodes:
6748       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6749       if not result:
6750         raise errors.OpExecError("Complete failure from rpc call")
6751       for node, node_result in result.items():
6752         node_result.Raise()
6753         if not node_result.data:
6754           raise errors.OpExecError("Failure during rpc call to node %s,"
6755                                    " result: %s" % (node, node_result.data))
6756
6757
6758 class IAllocator(object):
6759   """IAllocator framework.
6760
6761   An IAllocator instance has three sets of attributes:
6762     - cfg that is needed to query the cluster
6763     - input data (all members of the _KEYS class attribute are required)
6764     - four buffer attributes (in|out_data|text), that represent the
6765       input (to the external script) in text and data structure format,
6766       and the output from it, again in two formats
6767     - the result variables from the script (success, info, nodes) for
6768       easy usage
6769
6770   """
6771   _ALLO_KEYS = [
6772     "mem_size", "disks", "disk_template",
6773     "os", "tags", "nics", "vcpus", "hypervisor",
6774     ]
6775   _RELO_KEYS = [
6776     "relocate_from",
6777     ]
6778
6779   def __init__(self, lu, mode, name, **kwargs):
6780     self.lu = lu
6781     # init buffer variables
6782     self.in_text = self.out_text = self.in_data = self.out_data = None
6783     # init all input fields so that pylint is happy
6784     self.mode = mode
6785     self.name = name
6786     self.mem_size = self.disks = self.disk_template = None
6787     self.os = self.tags = self.nics = self.vcpus = None
6788     self.hypervisor = None
6789     self.relocate_from = None
6790     # computed fields
6791     self.required_nodes = None
6792     # init result fields
6793     self.success = self.info = self.nodes = None
6794     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6795       keyset = self._ALLO_KEYS
6796     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6797       keyset = self._RELO_KEYS
6798     else:
6799       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6800                                    " IAllocator" % self.mode)
6801     for key in kwargs:
6802       if key not in keyset:
6803         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6804                                      " IAllocator" % key)
6805       setattr(self, key, kwargs[key])
6806     for key in keyset:
6807       if key not in kwargs:
6808         raise errors.ProgrammerError("Missing input parameter '%s' to"
6809                                      " IAllocator" % key)
6810     self._BuildInputData()
6811
6812   def _ComputeClusterData(self):
6813     """Compute the generic allocator input data.
6814
6815     This is the data that is independent of the actual operation.
6816
6817     """
6818     cfg = self.lu.cfg
6819     cluster_info = cfg.GetClusterInfo()
6820     # cluster data
6821     data = {
6822       "version": constants.IALLOCATOR_VERSION,
6823       "cluster_name": cfg.GetClusterName(),
6824       "cluster_tags": list(cluster_info.GetTags()),
6825       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6826       # we don't have job IDs
6827       }
6828     iinfo = cfg.GetAllInstancesInfo().values()
6829     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6830
6831     # node data
6832     node_results = {}
6833     node_list = cfg.GetNodeList()
6834
6835     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6836       hypervisor_name = self.hypervisor
6837     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6838       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6839
6840     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6841                                            hypervisor_name)
6842     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6843                        cluster_info.enabled_hypervisors)
6844     for nname, nresult in node_data.items():
6845       # first fill in static (config-based) values
6846       ninfo = cfg.GetNodeInfo(nname)
6847       pnr = {
6848         "tags": list(ninfo.GetTags()),
6849         "primary_ip": ninfo.primary_ip,
6850         "secondary_ip": ninfo.secondary_ip,
6851         "offline": ninfo.offline,
6852         "drained": ninfo.drained,
6853         "master_candidate": ninfo.master_candidate,
6854         }
6855
6856       if not ninfo.offline:
6857         nresult.Raise()
6858         if not isinstance(nresult.data, dict):
6859           raise errors.OpExecError("Can't get data for node %s" % nname)
6860         remote_info = nresult.data
6861         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6862                      'vg_size', 'vg_free', 'cpu_total']:
6863           if attr not in remote_info:
6864             raise errors.OpExecError("Node '%s' didn't return attribute"
6865                                      " '%s'" % (nname, attr))
6866           try:
6867             remote_info[attr] = int(remote_info[attr])
6868           except ValueError, err:
6869             raise errors.OpExecError("Node '%s' returned invalid value"
6870                                      " for '%s': %s" % (nname, attr, err))
6871         # compute memory used by primary instances
6872         i_p_mem = i_p_up_mem = 0
6873         for iinfo, beinfo in i_list:
6874           if iinfo.primary_node == nname:
6875             i_p_mem += beinfo[constants.BE_MEMORY]
6876             if iinfo.name not in node_iinfo[nname].data:
6877               i_used_mem = 0
6878             else:
6879               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6880             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6881             remote_info['memory_free'] -= max(0, i_mem_diff)
6882
6883             if iinfo.admin_up:
6884               i_p_up_mem += beinfo[constants.BE_MEMORY]
6885
6886         # compute memory used by instances
6887         pnr_dyn = {
6888           "total_memory": remote_info['memory_total'],
6889           "reserved_memory": remote_info['memory_dom0'],
6890           "free_memory": remote_info['memory_free'],
6891           "total_disk": remote_info['vg_size'],
6892           "free_disk": remote_info['vg_free'],
6893           "total_cpus": remote_info['cpu_total'],
6894           "i_pri_memory": i_p_mem,
6895           "i_pri_up_memory": i_p_up_mem,
6896           }
6897         pnr.update(pnr_dyn)
6898
6899       node_results[nname] = pnr
6900     data["nodes"] = node_results
6901
6902     # instance data
6903     instance_data = {}
6904     for iinfo, beinfo in i_list:
6905       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6906                   for n in iinfo.nics]
6907       pir = {
6908         "tags": list(iinfo.GetTags()),
6909         "admin_up": iinfo.admin_up,
6910         "vcpus": beinfo[constants.BE_VCPUS],
6911         "memory": beinfo[constants.BE_MEMORY],
6912         "os": iinfo.os,
6913         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6914         "nics": nic_data,
6915         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6916         "disk_template": iinfo.disk_template,
6917         "hypervisor": iinfo.hypervisor,
6918         }
6919       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6920                                                  pir["disks"])
6921       instance_data[iinfo.name] = pir
6922
6923     data["instances"] = instance_data
6924
6925     self.in_data = data
6926
6927   def _AddNewInstance(self):
6928     """Add new instance data to allocator structure.
6929
6930     This in combination with _AllocatorGetClusterData will create the
6931     correct structure needed as input for the allocator.
6932
6933     The checks for the completeness of the opcode must have already been
6934     done.
6935
6936     """
6937     data = self.in_data
6938
6939     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6940
6941     if self.disk_template in constants.DTS_NET_MIRROR:
6942       self.required_nodes = 2
6943     else:
6944       self.required_nodes = 1
6945     request = {
6946       "type": "allocate",
6947       "name": self.name,
6948       "disk_template": self.disk_template,
6949       "tags": self.tags,
6950       "os": self.os,
6951       "vcpus": self.vcpus,
6952       "memory": self.mem_size,
6953       "disks": self.disks,
6954       "disk_space_total": disk_space,
6955       "nics": self.nics,
6956       "required_nodes": self.required_nodes,
6957       }
6958     data["request"] = request
6959
6960   def _AddRelocateInstance(self):
6961     """Add relocate instance data to allocator structure.
6962
6963     This in combination with _IAllocatorGetClusterData will create the
6964     correct structure needed as input for the allocator.
6965
6966     The checks for the completeness of the opcode must have already been
6967     done.
6968
6969     """
6970     instance = self.lu.cfg.GetInstanceInfo(self.name)
6971     if instance is None:
6972       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6973                                    " IAllocator" % self.name)
6974
6975     if instance.disk_template not in constants.DTS_NET_MIRROR:
6976       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6977
6978     if len(instance.secondary_nodes) != 1:
6979       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6980
6981     self.required_nodes = 1
6982     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6983     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6984
6985     request = {
6986       "type": "relocate",
6987       "name": self.name,
6988       "disk_space_total": disk_space,
6989       "required_nodes": self.required_nodes,
6990       "relocate_from": self.relocate_from,
6991       }
6992     self.in_data["request"] = request
6993
6994   def _BuildInputData(self):
6995     """Build input data structures.
6996
6997     """
6998     self._ComputeClusterData()
6999
7000     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7001       self._AddNewInstance()
7002     else:
7003       self._AddRelocateInstance()
7004
7005     self.in_text = serializer.Dump(self.in_data)
7006
7007   def Run(self, name, validate=True, call_fn=None):
7008     """Run an instance allocator and return the results.
7009
7010     """
7011     if call_fn is None:
7012       call_fn = self.lu.rpc.call_iallocator_runner
7013     data = self.in_text
7014
7015     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7016     result.Raise()
7017
7018     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7019       raise errors.OpExecError("Invalid result from master iallocator runner")
7020
7021     rcode, stdout, stderr, fail = result.data
7022
7023     if rcode == constants.IARUN_NOTFOUND:
7024       raise errors.OpExecError("Can't find allocator '%s'" % name)
7025     elif rcode == constants.IARUN_FAILURE:
7026       raise errors.OpExecError("Instance allocator call failed: %s,"
7027                                " output: %s" % (fail, stdout+stderr))
7028     self.out_text = stdout
7029     if validate:
7030       self._ValidateResult()
7031
7032   def _ValidateResult(self):
7033     """Process the allocator results.
7034
7035     This will process and if successful save the result in
7036     self.out_data and the other parameters.
7037
7038     """
7039     try:
7040       rdict = serializer.Load(self.out_text)
7041     except Exception, err:
7042       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7043
7044     if not isinstance(rdict, dict):
7045       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7046
7047     for key in "success", "info", "nodes":
7048       if key not in rdict:
7049         raise errors.OpExecError("Can't parse iallocator results:"
7050                                  " missing key '%s'" % key)
7051       setattr(self, key, rdict[key])
7052
7053     if not isinstance(rdict["nodes"], list):
7054       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7055                                " is not a list")
7056     self.out_data = rdict
7057
7058
7059 class LUTestAllocator(NoHooksLU):
7060   """Run allocator tests.
7061
7062   This LU runs the allocator tests
7063
7064   """
7065   _OP_REQP = ["direction", "mode", "name"]
7066
7067   def CheckPrereq(self):
7068     """Check prerequisites.
7069
7070     This checks the opcode parameters depending on the director and mode test.
7071
7072     """
7073     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7074       for attr in ["name", "mem_size", "disks", "disk_template",
7075                    "os", "tags", "nics", "vcpus"]:
7076         if not hasattr(self.op, attr):
7077           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7078                                      attr)
7079       iname = self.cfg.ExpandInstanceName(self.op.name)
7080       if iname is not None:
7081         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7082                                    iname)
7083       if not isinstance(self.op.nics, list):
7084         raise errors.OpPrereqError("Invalid parameter 'nics'")
7085       for row in self.op.nics:
7086         if (not isinstance(row, dict) or
7087             "mac" not in row or
7088             "ip" not in row or
7089             "bridge" not in row):
7090           raise errors.OpPrereqError("Invalid contents of the"
7091                                      " 'nics' parameter")
7092       if not isinstance(self.op.disks, list):
7093         raise errors.OpPrereqError("Invalid parameter 'disks'")
7094       for row in self.op.disks:
7095         if (not isinstance(row, dict) or
7096             "size" not in row or
7097             not isinstance(row["size"], int) or
7098             "mode" not in row or
7099             row["mode"] not in ['r', 'w']):
7100           raise errors.OpPrereqError("Invalid contents of the"
7101                                      " 'disks' parameter")
7102       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7103         self.op.hypervisor = self.cfg.GetHypervisorType()
7104     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7105       if not hasattr(self.op, "name"):
7106         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7107       fname = self.cfg.ExpandInstanceName(self.op.name)
7108       if fname is None:
7109         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7110                                    self.op.name)
7111       self.op.name = fname
7112       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7113     else:
7114       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7115                                  self.op.mode)
7116
7117     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7118       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7119         raise errors.OpPrereqError("Missing allocator name")
7120     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7121       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7122                                  self.op.direction)
7123
7124   def Exec(self, feedback_fn):
7125     """Run the allocator test.
7126
7127     """
7128     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7129       ial = IAllocator(self,
7130                        mode=self.op.mode,
7131                        name=self.op.name,
7132                        mem_size=self.op.mem_size,
7133                        disks=self.op.disks,
7134                        disk_template=self.op.disk_template,
7135                        os=self.op.os,
7136                        tags=self.op.tags,
7137                        nics=self.op.nics,
7138                        vcpus=self.op.vcpus,
7139                        hypervisor=self.op.hypervisor,
7140                        )
7141     else:
7142       ial = IAllocator(self,
7143                        mode=self.op.mode,
7144                        name=self.op.name,
7145                        relocate_from=list(self.relocate_from),
7146                        )
7147
7148     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7149       result = ial.in_text
7150     else:
7151       ial.Run(self.op.allocator, validate=False)
7152       result = ial.out_text
7153     return result