Fix node readd issues
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: file '%s' should not exist on non master"
755                         " candidates (and the file is outdated)" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURenameCluster(LogicalUnit):
1333   """Rename the cluster.
1334
1335   """
1336   HPATH = "cluster-rename"
1337   HTYPE = constants.HTYPE_CLUSTER
1338   _OP_REQP = ["name"]
1339
1340   def BuildHooksEnv(self):
1341     """Build hooks env.
1342
1343     """
1344     env = {
1345       "OP_TARGET": self.cfg.GetClusterName(),
1346       "NEW_NAME": self.op.name,
1347       }
1348     mn = self.cfg.GetMasterNode()
1349     return env, [mn], [mn]
1350
1351   def CheckPrereq(self):
1352     """Verify that the passed name is a valid one.
1353
1354     """
1355     hostname = utils.HostInfo(self.op.name)
1356
1357     new_name = hostname.name
1358     self.ip = new_ip = hostname.ip
1359     old_name = self.cfg.GetClusterName()
1360     old_ip = self.cfg.GetMasterIP()
1361     if new_name == old_name and new_ip == old_ip:
1362       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363                                  " cluster has changed")
1364     if new_ip != old_ip:
1365       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367                                    " reachable on the network. Aborting." %
1368                                    new_ip)
1369
1370     self.op.name = new_name
1371
1372   def Exec(self, feedback_fn):
1373     """Rename the cluster.
1374
1375     """
1376     clustername = self.op.name
1377     ip = self.ip
1378
1379     # shutdown the master IP
1380     master = self.cfg.GetMasterNode()
1381     result = self.rpc.call_node_stop_master(master, False)
1382     if result.failed or not result.data:
1383       raise errors.OpExecError("Could not disable the master role")
1384
1385     try:
1386       cluster = self.cfg.GetClusterInfo()
1387       cluster.cluster_name = clustername
1388       cluster.master_ip = ip
1389       self.cfg.Update(cluster)
1390
1391       # update the known hosts file
1392       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393       node_list = self.cfg.GetNodeList()
1394       try:
1395         node_list.remove(master)
1396       except ValueError:
1397         pass
1398       result = self.rpc.call_upload_file(node_list,
1399                                          constants.SSH_KNOWN_HOSTS_FILE)
1400       for to_node, to_result in result.iteritems():
1401         if to_result.failed or not to_result.data:
1402           logging.error("Copy of file %s to node %s failed",
1403                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1404
1405     finally:
1406       result = self.rpc.call_node_start_master(master, False)
1407       if result.failed or not result.data:
1408         self.LogWarning("Could not re-enable the master role on"
1409                         " the master, please restart manually.")
1410
1411
1412 def _RecursiveCheckIfLVMBased(disk):
1413   """Check if the given disk or its children are lvm-based.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to check
1417   @rtype: booleean
1418   @return: boolean indicating whether a LD_LV dev_type was found or not
1419
1420   """
1421   if disk.children:
1422     for chdisk in disk.children:
1423       if _RecursiveCheckIfLVMBased(chdisk):
1424         return True
1425   return disk.dev_type == constants.LD_LV
1426
1427
1428 class LUSetClusterParams(LogicalUnit):
1429   """Change the parameters of the cluster.
1430
1431   """
1432   HPATH = "cluster-modify"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = []
1435   REQ_BGL = False
1436
1437   def CheckArguments(self):
1438     """Check parameters
1439
1440     """
1441     if not hasattr(self.op, "candidate_pool_size"):
1442       self.op.candidate_pool_size = None
1443     if self.op.candidate_pool_size is not None:
1444       try:
1445         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446       except (ValueError, TypeError), err:
1447         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1448                                    str(err))
1449       if self.op.candidate_pool_size < 1:
1450         raise errors.OpPrereqError("At least one master candidate needed")
1451
1452   def ExpandNames(self):
1453     # FIXME: in the future maybe other cluster params won't require checking on
1454     # all nodes to be modified.
1455     self.needed_locks = {
1456       locking.LEVEL_NODE: locking.ALL_SET,
1457     }
1458     self.share_locks[locking.LEVEL_NODE] = 1
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     """
1464     env = {
1465       "OP_TARGET": self.cfg.GetClusterName(),
1466       "NEW_VG_NAME": self.op.vg_name,
1467       }
1468     mn = self.cfg.GetMasterNode()
1469     return env, [mn], [mn]
1470
1471   def CheckPrereq(self):
1472     """Check prerequisites.
1473
1474     This checks whether the given params don't conflict and
1475     if the given volume group is valid.
1476
1477     """
1478     if self.op.vg_name is not None and not self.op.vg_name:
1479       instances = self.cfg.GetAllInstancesInfo().values()
1480       for inst in instances:
1481         for disk in inst.disks:
1482           if _RecursiveCheckIfLVMBased(disk):
1483             raise errors.OpPrereqError("Cannot disable lvm storage while"
1484                                        " lvm-based instances exist")
1485
1486     node_list = self.acquired_locks[locking.LEVEL_NODE]
1487
1488     # if vg_name not None, checks given volume group on all nodes
1489     if self.op.vg_name:
1490       vglist = self.rpc.call_vg_list(node_list)
1491       for node in node_list:
1492         if vglist[node].failed:
1493           # ignoring down node
1494           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1495           continue
1496         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1497                                               self.op.vg_name,
1498                                               constants.MIN_VG_SIZE)
1499         if vgstatus:
1500           raise errors.OpPrereqError("Error on node '%s': %s" %
1501                                      (node, vgstatus))
1502
1503     self.cluster = cluster = self.cfg.GetClusterInfo()
1504     # validate beparams changes
1505     if self.op.beparams:
1506       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507       self.new_beparams = cluster.FillDict(
1508         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1509
1510     # hypervisor list/parameters
1511     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512     if self.op.hvparams:
1513       if not isinstance(self.op.hvparams, dict):
1514         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515       for hv_name, hv_dict in self.op.hvparams.items():
1516         if hv_name not in self.new_hvparams:
1517           self.new_hvparams[hv_name] = hv_dict
1518         else:
1519           self.new_hvparams[hv_name].update(hv_dict)
1520
1521     if self.op.enabled_hypervisors is not None:
1522       self.hv_list = self.op.enabled_hypervisors
1523     else:
1524       self.hv_list = cluster.enabled_hypervisors
1525
1526     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527       # either the enabled list has changed, or the parameters have, validate
1528       for hv_name, hv_params in self.new_hvparams.items():
1529         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530             (self.op.enabled_hypervisors and
1531              hv_name in self.op.enabled_hypervisors)):
1532           # either this is a new hypervisor, or its parameters have changed
1533           hv_class = hypervisor.GetHypervisor(hv_name)
1534           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535           hv_class.CheckParameterSyntax(hv_params)
1536           _CheckHVParams(self, node_list, hv_name, hv_params)
1537
1538   def Exec(self, feedback_fn):
1539     """Change the parameters of the cluster.
1540
1541     """
1542     if self.op.vg_name is not None:
1543       new_volume = self.op.vg_name
1544       if not new_volume:
1545         new_volume = None
1546       if new_volume != self.cfg.GetVGName():
1547         self.cfg.SetVGName(new_volume)
1548       else:
1549         feedback_fn("Cluster LVM configuration already in desired"
1550                     " state, not changing")
1551     if self.op.hvparams:
1552       self.cluster.hvparams = self.new_hvparams
1553     if self.op.enabled_hypervisors is not None:
1554       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555     if self.op.beparams:
1556       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557     if self.op.candidate_pool_size is not None:
1558       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559       # we need to update the pool size here, otherwise the save will fail
1560       _AdjustCandidatePool(self)
1561
1562     self.cfg.Update(self.cluster)
1563
1564
1565 class LURedistributeConfig(NoHooksLU):
1566   """Force the redistribution of cluster configuration.
1567
1568   This is a very simple LU.
1569
1570   """
1571   _OP_REQP = []
1572   REQ_BGL = False
1573
1574   def ExpandNames(self):
1575     self.needed_locks = {
1576       locking.LEVEL_NODE: locking.ALL_SET,
1577     }
1578     self.share_locks[locking.LEVEL_NODE] = 1
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     """
1584
1585   def Exec(self, feedback_fn):
1586     """Redistribute the configuration.
1587
1588     """
1589     self.cfg.Update(self.cfg.GetClusterInfo())
1590
1591
1592 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1593   """Sleep and poll for an instance's disk to sync.
1594
1595   """
1596   if not instance.disks:
1597     return True
1598
1599   if not oneshot:
1600     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1601
1602   node = instance.primary_node
1603
1604   for dev in instance.disks:
1605     lu.cfg.SetDiskID(dev, node)
1606
1607   retries = 0
1608   degr_retries = 10 # in seconds, as we sleep 1 second each time
1609   while True:
1610     max_time = 0
1611     done = True
1612     cumul_degraded = False
1613     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1614     if rstats.failed or not rstats.data:
1615       lu.LogWarning("Can't get any data from node %s", node)
1616       retries += 1
1617       if retries >= 10:
1618         raise errors.RemoteError("Can't contact node %s for mirror data,"
1619                                  " aborting." % node)
1620       time.sleep(6)
1621       continue
1622     rstats = rstats.data
1623     retries = 0
1624     for i, mstat in enumerate(rstats):
1625       if mstat is None:
1626         lu.LogWarning("Can't compute data for node %s/%s",
1627                            node, instance.disks[i].iv_name)
1628         continue
1629       # we ignore the ldisk parameter
1630       perc_done, est_time, is_degraded, _ = mstat
1631       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1632       if perc_done is not None:
1633         done = False
1634         if est_time is not None:
1635           rem_time = "%d estimated seconds remaining" % est_time
1636           max_time = est_time
1637         else:
1638           rem_time = "no time estimate"
1639         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1640                         (instance.disks[i].iv_name, perc_done, rem_time))
1641
1642     # if we're done but degraded, let's do a few small retries, to
1643     # make sure we see a stable and not transient situation; therefore
1644     # we force restart of the loop
1645     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1646       logging.info("Degraded disks found, %d retries left", degr_retries)
1647       degr_retries -= 1
1648       time.sleep(1)
1649       continue
1650
1651     if done or oneshot:
1652       break
1653
1654     time.sleep(min(60, max_time))
1655
1656   if done:
1657     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1658   return not cumul_degraded
1659
1660
1661 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1662   """Check that mirrors are not degraded.
1663
1664   The ldisk parameter, if True, will change the test from the
1665   is_degraded attribute (which represents overall non-ok status for
1666   the device(s)) to the ldisk (representing the local storage status).
1667
1668   """
1669   lu.cfg.SetDiskID(dev, node)
1670   if ldisk:
1671     idx = 6
1672   else:
1673     idx = 5
1674
1675   result = True
1676   if on_primary or dev.AssembleOnSecondary():
1677     rstats = lu.rpc.call_blockdev_find(node, dev)
1678     msg = rstats.RemoteFailMsg()
1679     if msg:
1680       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1681       result = False
1682     elif not rstats.payload:
1683       lu.LogWarning("Can't find disk on node %s", node)
1684       result = False
1685     else:
1686       result = result and (not rstats.payload[idx])
1687   if dev.children:
1688     for child in dev.children:
1689       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1690
1691   return result
1692
1693
1694 class LUDiagnoseOS(NoHooksLU):
1695   """Logical unit for OS diagnose/query.
1696
1697   """
1698   _OP_REQP = ["output_fields", "names"]
1699   REQ_BGL = False
1700   _FIELDS_STATIC = utils.FieldSet()
1701   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1702
1703   def ExpandNames(self):
1704     if self.op.names:
1705       raise errors.OpPrereqError("Selective OS query not supported")
1706
1707     _CheckOutputFields(static=self._FIELDS_STATIC,
1708                        dynamic=self._FIELDS_DYNAMIC,
1709                        selected=self.op.output_fields)
1710
1711     # Lock all nodes, in shared mode
1712     # Temporary removal of locks, should be reverted later
1713     # TODO: reintroduce locks when they are lighter-weight
1714     self.needed_locks = {}
1715     #self.share_locks[locking.LEVEL_NODE] = 1
1716     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1717
1718   def CheckPrereq(self):
1719     """Check prerequisites.
1720
1721     """
1722
1723   @staticmethod
1724   def _DiagnoseByOS(node_list, rlist):
1725     """Remaps a per-node return list into an a per-os per-node dictionary
1726
1727     @param node_list: a list with the names of all nodes
1728     @param rlist: a map with node names as keys and OS objects as values
1729
1730     @rtype: dict
1731     @return: a dictionary with osnames as keys and as value another map, with
1732         nodes as keys and list of OS objects as values, eg::
1733
1734           {"debian-etch": {"node1": [<object>,...],
1735                            "node2": [<object>,]}
1736           }
1737
1738     """
1739     all_os = {}
1740     # we build here the list of nodes that didn't fail the RPC (at RPC
1741     # level), so that nodes with a non-responding node daemon don't
1742     # make all OSes invalid
1743     good_nodes = [node_name for node_name in rlist
1744                   if not rlist[node_name].failed]
1745     for node_name, nr in rlist.iteritems():
1746       if nr.failed or not nr.data:
1747         continue
1748       for os_obj in nr.data:
1749         if os_obj.name not in all_os:
1750           # build a list of nodes for this os containing empty lists
1751           # for each node in node_list
1752           all_os[os_obj.name] = {}
1753           for nname in good_nodes:
1754             all_os[os_obj.name][nname] = []
1755         all_os[os_obj.name][node_name].append(os_obj)
1756     return all_os
1757
1758   def Exec(self, feedback_fn):
1759     """Compute the list of OSes.
1760
1761     """
1762     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1763     node_data = self.rpc.call_os_diagnose(valid_nodes)
1764     if node_data == False:
1765       raise errors.OpExecError("Can't gather the list of OSes")
1766     pol = self._DiagnoseByOS(valid_nodes, node_data)
1767     output = []
1768     for os_name, os_data in pol.iteritems():
1769       row = []
1770       for field in self.op.output_fields:
1771         if field == "name":
1772           val = os_name
1773         elif field == "valid":
1774           val = utils.all([osl and osl[0] for osl in os_data.values()])
1775         elif field == "node_status":
1776           val = {}
1777           for node_name, nos_list in os_data.iteritems():
1778             val[node_name] = [(v.status, v.path) for v in nos_list]
1779         else:
1780           raise errors.ParameterError(field)
1781         row.append(val)
1782       output.append(row)
1783
1784     return output
1785
1786
1787 class LURemoveNode(LogicalUnit):
1788   """Logical unit for removing a node.
1789
1790   """
1791   HPATH = "node-remove"
1792   HTYPE = constants.HTYPE_NODE
1793   _OP_REQP = ["node_name"]
1794
1795   def BuildHooksEnv(self):
1796     """Build hooks env.
1797
1798     This doesn't run on the target node in the pre phase as a failed
1799     node would then be impossible to remove.
1800
1801     """
1802     env = {
1803       "OP_TARGET": self.op.node_name,
1804       "NODE_NAME": self.op.node_name,
1805       }
1806     all_nodes = self.cfg.GetNodeList()
1807     all_nodes.remove(self.op.node_name)
1808     return env, all_nodes, all_nodes
1809
1810   def CheckPrereq(self):
1811     """Check prerequisites.
1812
1813     This checks:
1814      - the node exists in the configuration
1815      - it does not have primary or secondary instances
1816      - it's not the master
1817
1818     Any errors are signalled by raising errors.OpPrereqError.
1819
1820     """
1821     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1822     if node is None:
1823       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1824
1825     instance_list = self.cfg.GetInstanceList()
1826
1827     masternode = self.cfg.GetMasterNode()
1828     if node.name == masternode:
1829       raise errors.OpPrereqError("Node is the master node,"
1830                                  " you need to failover first.")
1831
1832     for instance_name in instance_list:
1833       instance = self.cfg.GetInstanceInfo(instance_name)
1834       if node.name in instance.all_nodes:
1835         raise errors.OpPrereqError("Instance %s is still running on the node,"
1836                                    " please remove first." % instance_name)
1837     self.op.node_name = node.name
1838     self.node = node
1839
1840   def Exec(self, feedback_fn):
1841     """Removes the node from the cluster.
1842
1843     """
1844     node = self.node
1845     logging.info("Stopping the node daemon and removing configs from node %s",
1846                  node.name)
1847
1848     self.context.RemoveNode(node.name)
1849
1850     self.rpc.call_node_leave_cluster(node.name)
1851
1852     # Promote nodes to master candidate as needed
1853     _AdjustCandidatePool(self)
1854
1855
1856 class LUQueryNodes(NoHooksLU):
1857   """Logical unit for querying nodes.
1858
1859   """
1860   _OP_REQP = ["output_fields", "names", "use_locking"]
1861   REQ_BGL = False
1862   _FIELDS_DYNAMIC = utils.FieldSet(
1863     "dtotal", "dfree",
1864     "mtotal", "mnode", "mfree",
1865     "bootid",
1866     "ctotal", "cnodes", "csockets",
1867     )
1868
1869   _FIELDS_STATIC = utils.FieldSet(
1870     "name", "pinst_cnt", "sinst_cnt",
1871     "pinst_list", "sinst_list",
1872     "pip", "sip", "tags",
1873     "serial_no",
1874     "master_candidate",
1875     "master",
1876     "offline",
1877     "drained",
1878     "role",
1879     )
1880
1881   def ExpandNames(self):
1882     _CheckOutputFields(static=self._FIELDS_STATIC,
1883                        dynamic=self._FIELDS_DYNAMIC,
1884                        selected=self.op.output_fields)
1885
1886     self.needed_locks = {}
1887     self.share_locks[locking.LEVEL_NODE] = 1
1888
1889     if self.op.names:
1890       self.wanted = _GetWantedNodes(self, self.op.names)
1891     else:
1892       self.wanted = locking.ALL_SET
1893
1894     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1895     self.do_locking = self.do_node_query and self.op.use_locking
1896     if self.do_locking:
1897       # if we don't request only static fields, we need to lock the nodes
1898       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1899
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     """
1905     # The validation of the node list is done in the _GetWantedNodes,
1906     # if non empty, and if empty, there's no validation to do
1907     pass
1908
1909   def Exec(self, feedback_fn):
1910     """Computes the list of nodes and their attributes.
1911
1912     """
1913     all_info = self.cfg.GetAllNodesInfo()
1914     if self.do_locking:
1915       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1916     elif self.wanted != locking.ALL_SET:
1917       nodenames = self.wanted
1918       missing = set(nodenames).difference(all_info.keys())
1919       if missing:
1920         raise errors.OpExecError(
1921           "Some nodes were removed before retrieving their data: %s" % missing)
1922     else:
1923       nodenames = all_info.keys()
1924
1925     nodenames = utils.NiceSort(nodenames)
1926     nodelist = [all_info[name] for name in nodenames]
1927
1928     # begin data gathering
1929
1930     if self.do_node_query:
1931       live_data = {}
1932       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1933                                           self.cfg.GetHypervisorType())
1934       for name in nodenames:
1935         nodeinfo = node_data[name]
1936         if not nodeinfo.failed and nodeinfo.data:
1937           nodeinfo = nodeinfo.data
1938           fn = utils.TryConvert
1939           live_data[name] = {
1940             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1941             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1942             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1943             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1944             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1945             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1946             "bootid": nodeinfo.get('bootid', None),
1947             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1948             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1949             }
1950         else:
1951           live_data[name] = {}
1952     else:
1953       live_data = dict.fromkeys(nodenames, {})
1954
1955     node_to_primary = dict([(name, set()) for name in nodenames])
1956     node_to_secondary = dict([(name, set()) for name in nodenames])
1957
1958     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1959                              "sinst_cnt", "sinst_list"))
1960     if inst_fields & frozenset(self.op.output_fields):
1961       instancelist = self.cfg.GetInstanceList()
1962
1963       for instance_name in instancelist:
1964         inst = self.cfg.GetInstanceInfo(instance_name)
1965         if inst.primary_node in node_to_primary:
1966           node_to_primary[inst.primary_node].add(inst.name)
1967         for secnode in inst.secondary_nodes:
1968           if secnode in node_to_secondary:
1969             node_to_secondary[secnode].add(inst.name)
1970
1971     master_node = self.cfg.GetMasterNode()
1972
1973     # end data gathering
1974
1975     output = []
1976     for node in nodelist:
1977       node_output = []
1978       for field in self.op.output_fields:
1979         if field == "name":
1980           val = node.name
1981         elif field == "pinst_list":
1982           val = list(node_to_primary[node.name])
1983         elif field == "sinst_list":
1984           val = list(node_to_secondary[node.name])
1985         elif field == "pinst_cnt":
1986           val = len(node_to_primary[node.name])
1987         elif field == "sinst_cnt":
1988           val = len(node_to_secondary[node.name])
1989         elif field == "pip":
1990           val = node.primary_ip
1991         elif field == "sip":
1992           val = node.secondary_ip
1993         elif field == "tags":
1994           val = list(node.GetTags())
1995         elif field == "serial_no":
1996           val = node.serial_no
1997         elif field == "master_candidate":
1998           val = node.master_candidate
1999         elif field == "master":
2000           val = node.name == master_node
2001         elif field == "offline":
2002           val = node.offline
2003         elif field == "drained":
2004           val = node.drained
2005         elif self._FIELDS_DYNAMIC.Matches(field):
2006           val = live_data[node.name].get(field, None)
2007         elif field == "role":
2008           if node.name == master_node:
2009             val = "M"
2010           elif node.master_candidate:
2011             val = "C"
2012           elif node.drained:
2013             val = "D"
2014           elif node.offline:
2015             val = "O"
2016           else:
2017             val = "R"
2018         else:
2019           raise errors.ParameterError(field)
2020         node_output.append(val)
2021       output.append(node_output)
2022
2023     return output
2024
2025
2026 class LUQueryNodeVolumes(NoHooksLU):
2027   """Logical unit for getting volumes on node(s).
2028
2029   """
2030   _OP_REQP = ["nodes", "output_fields"]
2031   REQ_BGL = False
2032   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2033   _FIELDS_STATIC = utils.FieldSet("node")
2034
2035   def ExpandNames(self):
2036     _CheckOutputFields(static=self._FIELDS_STATIC,
2037                        dynamic=self._FIELDS_DYNAMIC,
2038                        selected=self.op.output_fields)
2039
2040     self.needed_locks = {}
2041     self.share_locks[locking.LEVEL_NODE] = 1
2042     if not self.op.nodes:
2043       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2044     else:
2045       self.needed_locks[locking.LEVEL_NODE] = \
2046         _GetWantedNodes(self, self.op.nodes)
2047
2048   def CheckPrereq(self):
2049     """Check prerequisites.
2050
2051     This checks that the fields required are valid output fields.
2052
2053     """
2054     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2055
2056   def Exec(self, feedback_fn):
2057     """Computes the list of nodes and their attributes.
2058
2059     """
2060     nodenames = self.nodes
2061     volumes = self.rpc.call_node_volumes(nodenames)
2062
2063     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2064              in self.cfg.GetInstanceList()]
2065
2066     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2067
2068     output = []
2069     for node in nodenames:
2070       if node not in volumes or volumes[node].failed or not volumes[node].data:
2071         continue
2072
2073       node_vols = volumes[node].data[:]
2074       node_vols.sort(key=lambda vol: vol['dev'])
2075
2076       for vol in node_vols:
2077         node_output = []
2078         for field in self.op.output_fields:
2079           if field == "node":
2080             val = node
2081           elif field == "phys":
2082             val = vol['dev']
2083           elif field == "vg":
2084             val = vol['vg']
2085           elif field == "name":
2086             val = vol['name']
2087           elif field == "size":
2088             val = int(float(vol['size']))
2089           elif field == "instance":
2090             for inst in ilist:
2091               if node not in lv_by_node[inst]:
2092                 continue
2093               if vol['name'] in lv_by_node[inst][node]:
2094                 val = inst.name
2095                 break
2096             else:
2097               val = '-'
2098           else:
2099             raise errors.ParameterError(field)
2100           node_output.append(str(val))
2101
2102         output.append(node_output)
2103
2104     return output
2105
2106
2107 class LUAddNode(LogicalUnit):
2108   """Logical unit for adding node to the cluster.
2109
2110   """
2111   HPATH = "node-add"
2112   HTYPE = constants.HTYPE_NODE
2113   _OP_REQP = ["node_name"]
2114
2115   def BuildHooksEnv(self):
2116     """Build hooks env.
2117
2118     This will run on all nodes before, and on all nodes + the new node after.
2119
2120     """
2121     env = {
2122       "OP_TARGET": self.op.node_name,
2123       "NODE_NAME": self.op.node_name,
2124       "NODE_PIP": self.op.primary_ip,
2125       "NODE_SIP": self.op.secondary_ip,
2126       }
2127     nodes_0 = self.cfg.GetNodeList()
2128     nodes_1 = nodes_0 + [self.op.node_name, ]
2129     return env, nodes_0, nodes_1
2130
2131   def CheckPrereq(self):
2132     """Check prerequisites.
2133
2134     This checks:
2135      - the new node is not already in the config
2136      - it is resolvable
2137      - its parameters (single/dual homed) matches the cluster
2138
2139     Any errors are signalled by raising errors.OpPrereqError.
2140
2141     """
2142     node_name = self.op.node_name
2143     cfg = self.cfg
2144
2145     dns_data = utils.HostInfo(node_name)
2146
2147     node = dns_data.name
2148     primary_ip = self.op.primary_ip = dns_data.ip
2149     secondary_ip = getattr(self.op, "secondary_ip", None)
2150     if secondary_ip is None:
2151       secondary_ip = primary_ip
2152     if not utils.IsValidIP(secondary_ip):
2153       raise errors.OpPrereqError("Invalid secondary IP given")
2154     self.op.secondary_ip = secondary_ip
2155
2156     node_list = cfg.GetNodeList()
2157     if not self.op.readd and node in node_list:
2158       raise errors.OpPrereqError("Node %s is already in the configuration" %
2159                                  node)
2160     elif self.op.readd and node not in node_list:
2161       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2162
2163     for existing_node_name in node_list:
2164       existing_node = cfg.GetNodeInfo(existing_node_name)
2165
2166       if self.op.readd and node == existing_node_name:
2167         if (existing_node.primary_ip != primary_ip or
2168             existing_node.secondary_ip != secondary_ip):
2169           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2170                                      " address configuration as before")
2171         continue
2172
2173       if (existing_node.primary_ip == primary_ip or
2174           existing_node.secondary_ip == primary_ip or
2175           existing_node.primary_ip == secondary_ip or
2176           existing_node.secondary_ip == secondary_ip):
2177         raise errors.OpPrereqError("New node ip address(es) conflict with"
2178                                    " existing node %s" % existing_node.name)
2179
2180     # check that the type of the node (single versus dual homed) is the
2181     # same as for the master
2182     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2183     master_singlehomed = myself.secondary_ip == myself.primary_ip
2184     newbie_singlehomed = secondary_ip == primary_ip
2185     if master_singlehomed != newbie_singlehomed:
2186       if master_singlehomed:
2187         raise errors.OpPrereqError("The master has no private ip but the"
2188                                    " new node has one")
2189       else:
2190         raise errors.OpPrereqError("The master has a private ip but the"
2191                                    " new node doesn't have one")
2192
2193     # checks reachablity
2194     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2195       raise errors.OpPrereqError("Node not reachable by ping")
2196
2197     if not newbie_singlehomed:
2198       # check reachability from my secondary ip to newbie's secondary ip
2199       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2200                            source=myself.secondary_ip):
2201         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2202                                    " based ping to noded port")
2203
2204     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2205     if self.op.readd:
2206       exceptions = [node]
2207     else:
2208       exceptions = []
2209     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2210     # the new node will increase mc_max with one, so:
2211     mc_max = min(mc_max + 1, cp_size)
2212     self.master_candidate = mc_now < mc_max
2213
2214     if self.op.readd:
2215       self.new_node = self.cfg.GetNodeInfo(node)
2216       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2217     else:
2218       self.new_node = objects.Node(name=node,
2219                                    primary_ip=primary_ip,
2220                                    secondary_ip=secondary_ip,
2221                                    master_candidate=self.master_candidate,
2222                                    offline=False, drained=False)
2223
2224   def Exec(self, feedback_fn):
2225     """Adds the new node to the cluster.
2226
2227     """
2228     new_node = self.new_node
2229     node = new_node.name
2230
2231     # for re-adds, reset the offline/drained/master-candidate flags;
2232     # we need to reset here, otherwise offline would prevent RPC calls
2233     # later in the procedure; this also means that if the re-add
2234     # fails, we are left with a non-offlined, broken node
2235     if self.op.readd:
2236       new_node.drained = new_node.offline = False
2237       self.LogInfo("Readding a node, the offline/drained flags were reset")
2238       # if we demote the node, we do cleanup later in the procedure
2239       new_node.master_candidate = self.master_candidate
2240
2241     # notify the user about any possible mc promotion
2242     if new_node.master_candidate:
2243       self.LogInfo("Node will be a master candidate")
2244
2245     # check connectivity
2246     result = self.rpc.call_version([node])[node]
2247     result.Raise()
2248     if result.data:
2249       if constants.PROTOCOL_VERSION == result.data:
2250         logging.info("Communication to node %s fine, sw version %s match",
2251                      node, result.data)
2252       else:
2253         raise errors.OpExecError("Version mismatch master version %s,"
2254                                  " node version %s" %
2255                                  (constants.PROTOCOL_VERSION, result.data))
2256     else:
2257       raise errors.OpExecError("Cannot get version from the new node")
2258
2259     # setup ssh on node
2260     logging.info("Copy ssh key to node %s", node)
2261     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2262     keyarray = []
2263     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2264                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2265                 priv_key, pub_key]
2266
2267     for i in keyfiles:
2268       f = open(i, 'r')
2269       try:
2270         keyarray.append(f.read())
2271       finally:
2272         f.close()
2273
2274     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2275                                     keyarray[2],
2276                                     keyarray[3], keyarray[4], keyarray[5])
2277
2278     msg = result.RemoteFailMsg()
2279     if msg:
2280       raise errors.OpExecError("Cannot transfer ssh keys to the"
2281                                " new node: %s" % msg)
2282
2283     # Add node to our /etc/hosts, and add key to known_hosts
2284     utils.AddHostToEtcHosts(new_node.name)
2285
2286     if new_node.secondary_ip != new_node.primary_ip:
2287       result = self.rpc.call_node_has_ip_address(new_node.name,
2288                                                  new_node.secondary_ip)
2289       if result.failed or not result.data:
2290         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2291                                  " you gave (%s). Please fix and re-run this"
2292                                  " command." % new_node.secondary_ip)
2293
2294     node_verify_list = [self.cfg.GetMasterNode()]
2295     node_verify_param = {
2296       'nodelist': [node],
2297       # TODO: do a node-net-test as well?
2298     }
2299
2300     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2301                                        self.cfg.GetClusterName())
2302     for verifier in node_verify_list:
2303       if result[verifier].failed or not result[verifier].data:
2304         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2305                                  " for remote verification" % verifier)
2306       if result[verifier].data['nodelist']:
2307         for failed in result[verifier].data['nodelist']:
2308           feedback_fn("ssh/hostname verification failed %s -> %s" %
2309                       (verifier, result[verifier].data['nodelist'][failed]))
2310         raise errors.OpExecError("ssh/hostname verification failed.")
2311
2312     # Distribute updated /etc/hosts and known_hosts to all nodes,
2313     # including the node just added
2314     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2315     dist_nodes = self.cfg.GetNodeList()
2316     if not self.op.readd:
2317       dist_nodes.append(node)
2318     if myself.name in dist_nodes:
2319       dist_nodes.remove(myself.name)
2320
2321     logging.debug("Copying hosts and known_hosts to all nodes")
2322     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2323       result = self.rpc.call_upload_file(dist_nodes, fname)
2324       for to_node, to_result in result.iteritems():
2325         if to_result.failed or not to_result.data:
2326           logging.error("Copy of file %s to node %s failed", fname, to_node)
2327
2328     to_copy = []
2329     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2330     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2331       to_copy.append(constants.VNC_PASSWORD_FILE)
2332
2333     for fname in to_copy:
2334       result = self.rpc.call_upload_file([node], fname)
2335       if result[node].failed or not result[node]:
2336         logging.error("Could not copy file %s to node %s", fname, node)
2337
2338     if self.op.readd:
2339       self.context.ReaddNode(new_node)
2340       # make sure we redistribute the config
2341       self.cfg.Update(new_node)
2342       # and make sure the new node will not have old files around
2343       if not new_node.master_candidate:
2344         result = self.rpc.call_node_demote_from_mc(new_node.name)
2345         msg = result.RemoteFailMsg()
2346         if msg:
2347           self.LogWarning("Node failed to demote itself from master"
2348                           " candidate status: %s" % msg)
2349     else:
2350       self.context.AddNode(new_node)
2351
2352
2353 class LUSetNodeParams(LogicalUnit):
2354   """Modifies the parameters of a node.
2355
2356   """
2357   HPATH = "node-modify"
2358   HTYPE = constants.HTYPE_NODE
2359   _OP_REQP = ["node_name"]
2360   REQ_BGL = False
2361
2362   def CheckArguments(self):
2363     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364     if node_name is None:
2365       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366     self.op.node_name = node_name
2367     _CheckBooleanOpField(self.op, 'master_candidate')
2368     _CheckBooleanOpField(self.op, 'offline')
2369     _CheckBooleanOpField(self.op, 'drained')
2370     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371     if all_mods.count(None) == 3:
2372       raise errors.OpPrereqError("Please pass at least one modification")
2373     if all_mods.count(True) > 1:
2374       raise errors.OpPrereqError("Can't set the node into more than one"
2375                                  " state at the same time")
2376
2377   def ExpandNames(self):
2378     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2379
2380   def BuildHooksEnv(self):
2381     """Build hooks env.
2382
2383     This runs on the master node.
2384
2385     """
2386     env = {
2387       "OP_TARGET": self.op.node_name,
2388       "MASTER_CANDIDATE": str(self.op.master_candidate),
2389       "OFFLINE": str(self.op.offline),
2390       "DRAINED": str(self.op.drained),
2391       }
2392     nl = [self.cfg.GetMasterNode(),
2393           self.op.node_name]
2394     return env, nl, nl
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This only checks the instance list against the existing names.
2400
2401     """
2402     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2403
2404     if ((self.op.master_candidate == False or self.op.offline == True or
2405          self.op.drained == True) and node.master_candidate):
2406       # we will demote the node from master_candidate
2407       if self.op.node_name == self.cfg.GetMasterNode():
2408         raise errors.OpPrereqError("The master node has to be a"
2409                                    " master candidate, online and not drained")
2410       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412       if num_candidates <= cp_size:
2413         msg = ("Not enough master candidates (desired"
2414                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2415         if self.op.force:
2416           self.LogWarning(msg)
2417         else:
2418           raise errors.OpPrereqError(msg)
2419
2420     if (self.op.master_candidate == True and
2421         ((node.offline and not self.op.offline == False) or
2422          (node.drained and not self.op.drained == False))):
2423       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424                                  " to master_candidate" % node.name)
2425
2426     return
2427
2428   def Exec(self, feedback_fn):
2429     """Modifies a node.
2430
2431     """
2432     node = self.node
2433
2434     result = []
2435     changed_mc = False
2436
2437     if self.op.offline is not None:
2438       node.offline = self.op.offline
2439       result.append(("offline", str(self.op.offline)))
2440       if self.op.offline == True:
2441         if node.master_candidate:
2442           node.master_candidate = False
2443           changed_mc = True
2444           result.append(("master_candidate", "auto-demotion due to offline"))
2445         if node.drained:
2446           node.drained = False
2447           result.append(("drained", "clear drained status due to offline"))
2448
2449     if self.op.master_candidate is not None:
2450       node.master_candidate = self.op.master_candidate
2451       changed_mc = True
2452       result.append(("master_candidate", str(self.op.master_candidate)))
2453       if self.op.master_candidate == False:
2454         rrc = self.rpc.call_node_demote_from_mc(node.name)
2455         msg = rrc.RemoteFailMsg()
2456         if msg:
2457           self.LogWarning("Node failed to demote itself: %s" % msg)
2458
2459     if self.op.drained is not None:
2460       node.drained = self.op.drained
2461       result.append(("drained", str(self.op.drained)))
2462       if self.op.drained == True:
2463         if node.master_candidate:
2464           node.master_candidate = False
2465           changed_mc = True
2466           result.append(("master_candidate", "auto-demotion due to drain"))
2467         if node.offline:
2468           node.offline = False
2469           result.append(("offline", "clear offline status due to drain"))
2470
2471     # this will trigger configuration file update, if needed
2472     self.cfg.Update(node)
2473     # this will trigger job queue propagation or cleanup
2474     if changed_mc:
2475       self.context.ReaddNode(node)
2476
2477     return result
2478
2479
2480 class LUQueryClusterInfo(NoHooksLU):
2481   """Query cluster configuration.
2482
2483   """
2484   _OP_REQP = []
2485   REQ_BGL = False
2486
2487   def ExpandNames(self):
2488     self.needed_locks = {}
2489
2490   def CheckPrereq(self):
2491     """No prerequsites needed for this LU.
2492
2493     """
2494     pass
2495
2496   def Exec(self, feedback_fn):
2497     """Return cluster config.
2498
2499     """
2500     cluster = self.cfg.GetClusterInfo()
2501     result = {
2502       "software_version": constants.RELEASE_VERSION,
2503       "protocol_version": constants.PROTOCOL_VERSION,
2504       "config_version": constants.CONFIG_VERSION,
2505       "os_api_version": constants.OS_API_VERSION,
2506       "export_version": constants.EXPORT_VERSION,
2507       "architecture": (platform.architecture()[0], platform.machine()),
2508       "name": cluster.cluster_name,
2509       "master": cluster.master_node,
2510       "default_hypervisor": cluster.default_hypervisor,
2511       "enabled_hypervisors": cluster.enabled_hypervisors,
2512       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2513                         for hypervisor in cluster.enabled_hypervisors]),
2514       "beparams": cluster.beparams,
2515       "candidate_pool_size": cluster.candidate_pool_size,
2516       "default_bridge": cluster.default_bridge,
2517       "master_netdev": cluster.master_netdev,
2518       "volume_group_name": cluster.volume_group_name,
2519       "file_storage_dir": cluster.file_storage_dir,
2520       }
2521
2522     return result
2523
2524
2525 class LUQueryConfigValues(NoHooksLU):
2526   """Return configuration values.
2527
2528   """
2529   _OP_REQP = []
2530   REQ_BGL = False
2531   _FIELDS_DYNAMIC = utils.FieldSet()
2532   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2533
2534   def ExpandNames(self):
2535     self.needed_locks = {}
2536
2537     _CheckOutputFields(static=self._FIELDS_STATIC,
2538                        dynamic=self._FIELDS_DYNAMIC,
2539                        selected=self.op.output_fields)
2540
2541   def CheckPrereq(self):
2542     """No prerequisites.
2543
2544     """
2545     pass
2546
2547   def Exec(self, feedback_fn):
2548     """Dump a representation of the cluster config to the standard output.
2549
2550     """
2551     values = []
2552     for field in self.op.output_fields:
2553       if field == "cluster_name":
2554         entry = self.cfg.GetClusterName()
2555       elif field == "master_node":
2556         entry = self.cfg.GetMasterNode()
2557       elif field == "drain_flag":
2558         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2559       else:
2560         raise errors.ParameterError(field)
2561       values.append(entry)
2562     return values
2563
2564
2565 class LUActivateInstanceDisks(NoHooksLU):
2566   """Bring up an instance's disks.
2567
2568   """
2569   _OP_REQP = ["instance_name"]
2570   REQ_BGL = False
2571
2572   def ExpandNames(self):
2573     self._ExpandAndLockInstance()
2574     self.needed_locks[locking.LEVEL_NODE] = []
2575     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2576
2577   def DeclareLocks(self, level):
2578     if level == locking.LEVEL_NODE:
2579       self._LockInstancesNodes()
2580
2581   def CheckPrereq(self):
2582     """Check prerequisites.
2583
2584     This checks that the instance is in the cluster.
2585
2586     """
2587     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2588     assert self.instance is not None, \
2589       "Cannot retrieve locked instance %s" % self.op.instance_name
2590     _CheckNodeOnline(self, self.instance.primary_node)
2591
2592   def Exec(self, feedback_fn):
2593     """Activate the disks.
2594
2595     """
2596     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2597     if not disks_ok:
2598       raise errors.OpExecError("Cannot activate block devices")
2599
2600     return disks_info
2601
2602
2603 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2604   """Prepare the block devices for an instance.
2605
2606   This sets up the block devices on all nodes.
2607
2608   @type lu: L{LogicalUnit}
2609   @param lu: the logical unit on whose behalf we execute
2610   @type instance: L{objects.Instance}
2611   @param instance: the instance for whose disks we assemble
2612   @type ignore_secondaries: boolean
2613   @param ignore_secondaries: if true, errors on secondary nodes
2614       won't result in an error return from the function
2615   @return: False if the operation failed, otherwise a list of
2616       (host, instance_visible_name, node_visible_name)
2617       with the mapping from node devices to instance devices
2618
2619   """
2620   device_info = []
2621   disks_ok = True
2622   iname = instance.name
2623   # With the two passes mechanism we try to reduce the window of
2624   # opportunity for the race condition of switching DRBD to primary
2625   # before handshaking occured, but we do not eliminate it
2626
2627   # The proper fix would be to wait (with some limits) until the
2628   # connection has been made and drbd transitions from WFConnection
2629   # into any other network-connected state (Connected, SyncTarget,
2630   # SyncSource, etc.)
2631
2632   # 1st pass, assemble on all nodes in secondary mode
2633   for inst_disk in instance.disks:
2634     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2635       lu.cfg.SetDiskID(node_disk, node)
2636       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2637       msg = result.RemoteFailMsg()
2638       if msg:
2639         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2640                            " (is_primary=False, pass=1): %s",
2641                            inst_disk.iv_name, node, msg)
2642         if not ignore_secondaries:
2643           disks_ok = False
2644
2645   # FIXME: race condition on drbd migration to primary
2646
2647   # 2nd pass, do only the primary node
2648   for inst_disk in instance.disks:
2649     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2650       if node != instance.primary_node:
2651         continue
2652       lu.cfg.SetDiskID(node_disk, node)
2653       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2654       msg = result.RemoteFailMsg()
2655       if msg:
2656         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2657                            " (is_primary=True, pass=2): %s",
2658                            inst_disk.iv_name, node, msg)
2659         disks_ok = False
2660     device_info.append((instance.primary_node, inst_disk.iv_name,
2661                         result.payload))
2662
2663   # leave the disks configured for the primary node
2664   # this is a workaround that would be fixed better by
2665   # improving the logical/physical id handling
2666   for disk in instance.disks:
2667     lu.cfg.SetDiskID(disk, instance.primary_node)
2668
2669   return disks_ok, device_info
2670
2671
2672 def _StartInstanceDisks(lu, instance, force):
2673   """Start the disks of an instance.
2674
2675   """
2676   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2677                                            ignore_secondaries=force)
2678   if not disks_ok:
2679     _ShutdownInstanceDisks(lu, instance)
2680     if force is not None and not force:
2681       lu.proc.LogWarning("", hint="If the message above refers to a"
2682                          " secondary node,"
2683                          " you can retry the operation using '--force'.")
2684     raise errors.OpExecError("Disk consistency error")
2685
2686
2687 class LUDeactivateInstanceDisks(NoHooksLU):
2688   """Shutdown an instance's disks.
2689
2690   """
2691   _OP_REQP = ["instance_name"]
2692   REQ_BGL = False
2693
2694   def ExpandNames(self):
2695     self._ExpandAndLockInstance()
2696     self.needed_locks[locking.LEVEL_NODE] = []
2697     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2698
2699   def DeclareLocks(self, level):
2700     if level == locking.LEVEL_NODE:
2701       self._LockInstancesNodes()
2702
2703   def CheckPrereq(self):
2704     """Check prerequisites.
2705
2706     This checks that the instance is in the cluster.
2707
2708     """
2709     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2710     assert self.instance is not None, \
2711       "Cannot retrieve locked instance %s" % self.op.instance_name
2712
2713   def Exec(self, feedback_fn):
2714     """Deactivate the disks
2715
2716     """
2717     instance = self.instance
2718     _SafeShutdownInstanceDisks(self, instance)
2719
2720
2721 def _SafeShutdownInstanceDisks(lu, instance):
2722   """Shutdown block devices of an instance.
2723
2724   This function checks if an instance is running, before calling
2725   _ShutdownInstanceDisks.
2726
2727   """
2728   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2729                                       [instance.hypervisor])
2730   ins_l = ins_l[instance.primary_node]
2731   if ins_l.failed or not isinstance(ins_l.data, list):
2732     raise errors.OpExecError("Can't contact node '%s'" %
2733                              instance.primary_node)
2734
2735   if instance.name in ins_l.data:
2736     raise errors.OpExecError("Instance is running, can't shutdown"
2737                              " block devices.")
2738
2739   _ShutdownInstanceDisks(lu, instance)
2740
2741
2742 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2743   """Shutdown block devices of an instance.
2744
2745   This does the shutdown on all nodes of the instance.
2746
2747   If the ignore_primary is false, errors on the primary node are
2748   ignored.
2749
2750   """
2751   all_result = True
2752   for disk in instance.disks:
2753     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2754       lu.cfg.SetDiskID(top_disk, node)
2755       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2756       msg = result.RemoteFailMsg()
2757       if msg:
2758         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2759                       disk.iv_name, node, msg)
2760         if not ignore_primary or node != instance.primary_node:
2761           all_result = False
2762   return all_result
2763
2764
2765 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2766   """Checks if a node has enough free memory.
2767
2768   This function check if a given node has the needed amount of free
2769   memory. In case the node has less memory or we cannot get the
2770   information from the node, this function raise an OpPrereqError
2771   exception.
2772
2773   @type lu: C{LogicalUnit}
2774   @param lu: a logical unit from which we get configuration data
2775   @type node: C{str}
2776   @param node: the node to check
2777   @type reason: C{str}
2778   @param reason: string to use in the error message
2779   @type requested: C{int}
2780   @param requested: the amount of memory in MiB to check for
2781   @type hypervisor_name: C{str}
2782   @param hypervisor_name: the hypervisor to ask for memory stats
2783   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2784       we cannot check the node
2785
2786   """
2787   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2788   nodeinfo[node].Raise()
2789   free_mem = nodeinfo[node].data.get('memory_free')
2790   if not isinstance(free_mem, int):
2791     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2792                              " was '%s'" % (node, free_mem))
2793   if requested > free_mem:
2794     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2795                              " needed %s MiB, available %s MiB" %
2796                              (node, reason, requested, free_mem))
2797
2798
2799 class LUStartupInstance(LogicalUnit):
2800   """Starts an instance.
2801
2802   """
2803   HPATH = "instance-start"
2804   HTYPE = constants.HTYPE_INSTANCE
2805   _OP_REQP = ["instance_name", "force"]
2806   REQ_BGL = False
2807
2808   def ExpandNames(self):
2809     self._ExpandAndLockInstance()
2810
2811   def BuildHooksEnv(self):
2812     """Build hooks env.
2813
2814     This runs on master, primary and secondary nodes of the instance.
2815
2816     """
2817     env = {
2818       "FORCE": self.op.force,
2819       }
2820     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2821     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2822     return env, nl, nl
2823
2824   def CheckPrereq(self):
2825     """Check prerequisites.
2826
2827     This checks that the instance is in the cluster.
2828
2829     """
2830     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2831     assert self.instance is not None, \
2832       "Cannot retrieve locked instance %s" % self.op.instance_name
2833
2834     # extra beparams
2835     self.beparams = getattr(self.op, "beparams", {})
2836     if self.beparams:
2837       if not isinstance(self.beparams, dict):
2838         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2839                                    " dict" % (type(self.beparams), ))
2840       # fill the beparams dict
2841       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2842       self.op.beparams = self.beparams
2843
2844     # extra hvparams
2845     self.hvparams = getattr(self.op, "hvparams", {})
2846     if self.hvparams:
2847       if not isinstance(self.hvparams, dict):
2848         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2849                                    " dict" % (type(self.hvparams), ))
2850
2851       # check hypervisor parameter syntax (locally)
2852       cluster = self.cfg.GetClusterInfo()
2853       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2854       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2855                                     instance.hvparams)
2856       filled_hvp.update(self.hvparams)
2857       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2858       hv_type.CheckParameterSyntax(filled_hvp)
2859       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2860       self.op.hvparams = self.hvparams
2861
2862     _CheckNodeOnline(self, instance.primary_node)
2863
2864     bep = self.cfg.GetClusterInfo().FillBE(instance)
2865     # check bridges existance
2866     _CheckInstanceBridgesExist(self, instance)
2867
2868     remote_info = self.rpc.call_instance_info(instance.primary_node,
2869                                               instance.name,
2870                                               instance.hypervisor)
2871     remote_info.Raise()
2872     if not remote_info.data:
2873       _CheckNodeFreeMemory(self, instance.primary_node,
2874                            "starting instance %s" % instance.name,
2875                            bep[constants.BE_MEMORY], instance.hypervisor)
2876
2877   def Exec(self, feedback_fn):
2878     """Start the instance.
2879
2880     """
2881     instance = self.instance
2882     force = self.op.force
2883
2884     self.cfg.MarkInstanceUp(instance.name)
2885
2886     node_current = instance.primary_node
2887
2888     _StartInstanceDisks(self, instance, force)
2889
2890     result = self.rpc.call_instance_start(node_current, instance,
2891                                           self.hvparams, self.beparams)
2892     msg = result.RemoteFailMsg()
2893     if msg:
2894       _ShutdownInstanceDisks(self, instance)
2895       raise errors.OpExecError("Could not start instance: %s" % msg)
2896
2897
2898 class LURebootInstance(LogicalUnit):
2899   """Reboot an instance.
2900
2901   """
2902   HPATH = "instance-reboot"
2903   HTYPE = constants.HTYPE_INSTANCE
2904   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2905   REQ_BGL = False
2906
2907   def ExpandNames(self):
2908     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2909                                    constants.INSTANCE_REBOOT_HARD,
2910                                    constants.INSTANCE_REBOOT_FULL]:
2911       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2912                                   (constants.INSTANCE_REBOOT_SOFT,
2913                                    constants.INSTANCE_REBOOT_HARD,
2914                                    constants.INSTANCE_REBOOT_FULL))
2915     self._ExpandAndLockInstance()
2916
2917   def BuildHooksEnv(self):
2918     """Build hooks env.
2919
2920     This runs on master, primary and secondary nodes of the instance.
2921
2922     """
2923     env = {
2924       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2925       "REBOOT_TYPE": self.op.reboot_type,
2926       }
2927     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2928     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2929     return env, nl, nl
2930
2931   def CheckPrereq(self):
2932     """Check prerequisites.
2933
2934     This checks that the instance is in the cluster.
2935
2936     """
2937     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2938     assert self.instance is not None, \
2939       "Cannot retrieve locked instance %s" % self.op.instance_name
2940
2941     _CheckNodeOnline(self, instance.primary_node)
2942
2943     # check bridges existance
2944     _CheckInstanceBridgesExist(self, instance)
2945
2946   def Exec(self, feedback_fn):
2947     """Reboot the instance.
2948
2949     """
2950     instance = self.instance
2951     ignore_secondaries = self.op.ignore_secondaries
2952     reboot_type = self.op.reboot_type
2953
2954     node_current = instance.primary_node
2955
2956     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2957                        constants.INSTANCE_REBOOT_HARD]:
2958       for disk in instance.disks:
2959         self.cfg.SetDiskID(disk, node_current)
2960       result = self.rpc.call_instance_reboot(node_current, instance,
2961                                              reboot_type)
2962       msg = result.RemoteFailMsg()
2963       if msg:
2964         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2965     else:
2966       result = self.rpc.call_instance_shutdown(node_current, instance)
2967       msg = result.RemoteFailMsg()
2968       if msg:
2969         raise errors.OpExecError("Could not shutdown instance for"
2970                                  " full reboot: %s" % msg)
2971       _ShutdownInstanceDisks(self, instance)
2972       _StartInstanceDisks(self, instance, ignore_secondaries)
2973       result = self.rpc.call_instance_start(node_current, instance, None, None)
2974       msg = result.RemoteFailMsg()
2975       if msg:
2976         _ShutdownInstanceDisks(self, instance)
2977         raise errors.OpExecError("Could not start instance for"
2978                                  " full reboot: %s" % msg)
2979
2980     self.cfg.MarkInstanceUp(instance.name)
2981
2982
2983 class LUShutdownInstance(LogicalUnit):
2984   """Shutdown an instance.
2985
2986   """
2987   HPATH = "instance-stop"
2988   HTYPE = constants.HTYPE_INSTANCE
2989   _OP_REQP = ["instance_name"]
2990   REQ_BGL = False
2991
2992   def ExpandNames(self):
2993     self._ExpandAndLockInstance()
2994
2995   def BuildHooksEnv(self):
2996     """Build hooks env.
2997
2998     This runs on master, primary and secondary nodes of the instance.
2999
3000     """
3001     env = _BuildInstanceHookEnvByObject(self, self.instance)
3002     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3003     return env, nl, nl
3004
3005   def CheckPrereq(self):
3006     """Check prerequisites.
3007
3008     This checks that the instance is in the cluster.
3009
3010     """
3011     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3012     assert self.instance is not None, \
3013       "Cannot retrieve locked instance %s" % self.op.instance_name
3014     _CheckNodeOnline(self, self.instance.primary_node)
3015
3016   def Exec(self, feedback_fn):
3017     """Shutdown the instance.
3018
3019     """
3020     instance = self.instance
3021     node_current = instance.primary_node
3022     self.cfg.MarkInstanceDown(instance.name)
3023     result = self.rpc.call_instance_shutdown(node_current, instance)
3024     msg = result.RemoteFailMsg()
3025     if msg:
3026       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3027
3028     _ShutdownInstanceDisks(self, instance)
3029
3030
3031 class LUReinstallInstance(LogicalUnit):
3032   """Reinstall an instance.
3033
3034   """
3035   HPATH = "instance-reinstall"
3036   HTYPE = constants.HTYPE_INSTANCE
3037   _OP_REQP = ["instance_name"]
3038   REQ_BGL = False
3039
3040   def ExpandNames(self):
3041     self._ExpandAndLockInstance()
3042
3043   def BuildHooksEnv(self):
3044     """Build hooks env.
3045
3046     This runs on master, primary and secondary nodes of the instance.
3047
3048     """
3049     env = _BuildInstanceHookEnvByObject(self, self.instance)
3050     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3051     return env, nl, nl
3052
3053   def CheckPrereq(self):
3054     """Check prerequisites.
3055
3056     This checks that the instance is in the cluster and is not running.
3057
3058     """
3059     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3060     assert instance is not None, \
3061       "Cannot retrieve locked instance %s" % self.op.instance_name
3062     _CheckNodeOnline(self, instance.primary_node)
3063
3064     if instance.disk_template == constants.DT_DISKLESS:
3065       raise errors.OpPrereqError("Instance '%s' has no disks" %
3066                                  self.op.instance_name)
3067     if instance.admin_up:
3068       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3069                                  self.op.instance_name)
3070     remote_info = self.rpc.call_instance_info(instance.primary_node,
3071                                               instance.name,
3072                                               instance.hypervisor)
3073     remote_info.Raise()
3074     if remote_info.data:
3075       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3076                                  (self.op.instance_name,
3077                                   instance.primary_node))
3078
3079     self.op.os_type = getattr(self.op, "os_type", None)
3080     if self.op.os_type is not None:
3081       # OS verification
3082       pnode = self.cfg.GetNodeInfo(
3083         self.cfg.ExpandNodeName(instance.primary_node))
3084       if pnode is None:
3085         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3086                                    self.op.pnode)
3087       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3088       result.Raise()
3089       if not isinstance(result.data, objects.OS):
3090         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3091                                    " primary node"  % self.op.os_type)
3092
3093     self.instance = instance
3094
3095   def Exec(self, feedback_fn):
3096     """Reinstall the instance.
3097
3098     """
3099     inst = self.instance
3100
3101     if self.op.os_type is not None:
3102       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3103       inst.os = self.op.os_type
3104       self.cfg.Update(inst)
3105
3106     _StartInstanceDisks(self, inst, None)
3107     try:
3108       feedback_fn("Running the instance OS create scripts...")
3109       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3110       msg = result.RemoteFailMsg()
3111       if msg:
3112         raise errors.OpExecError("Could not install OS for instance %s"
3113                                  " on node %s: %s" %
3114                                  (inst.name, inst.primary_node, msg))
3115     finally:
3116       _ShutdownInstanceDisks(self, inst)
3117
3118
3119 class LURenameInstance(LogicalUnit):
3120   """Rename an instance.
3121
3122   """
3123   HPATH = "instance-rename"
3124   HTYPE = constants.HTYPE_INSTANCE
3125   _OP_REQP = ["instance_name", "new_name"]
3126
3127   def BuildHooksEnv(self):
3128     """Build hooks env.
3129
3130     This runs on master, primary and secondary nodes of the instance.
3131
3132     """
3133     env = _BuildInstanceHookEnvByObject(self, self.instance)
3134     env["INSTANCE_NEW_NAME"] = self.op.new_name
3135     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3136     return env, nl, nl
3137
3138   def CheckPrereq(self):
3139     """Check prerequisites.
3140
3141     This checks that the instance is in the cluster and is not running.
3142
3143     """
3144     instance = self.cfg.GetInstanceInfo(
3145       self.cfg.ExpandInstanceName(self.op.instance_name))
3146     if instance is None:
3147       raise errors.OpPrereqError("Instance '%s' not known" %
3148                                  self.op.instance_name)
3149     _CheckNodeOnline(self, instance.primary_node)
3150
3151     if instance.admin_up:
3152       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3153                                  self.op.instance_name)
3154     remote_info = self.rpc.call_instance_info(instance.primary_node,
3155                                               instance.name,
3156                                               instance.hypervisor)
3157     remote_info.Raise()
3158     if remote_info.data:
3159       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3160                                  (self.op.instance_name,
3161                                   instance.primary_node))
3162     self.instance = instance
3163
3164     # new name verification
3165     name_info = utils.HostInfo(self.op.new_name)
3166
3167     self.op.new_name = new_name = name_info.name
3168     instance_list = self.cfg.GetInstanceList()
3169     if new_name in instance_list:
3170       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3171                                  new_name)
3172
3173     if not getattr(self.op, "ignore_ip", False):
3174       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3175         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3176                                    (name_info.ip, new_name))
3177
3178
3179   def Exec(self, feedback_fn):
3180     """Reinstall the instance.
3181
3182     """
3183     inst = self.instance
3184     old_name = inst.name
3185
3186     if inst.disk_template == constants.DT_FILE:
3187       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3188
3189     self.cfg.RenameInstance(inst.name, self.op.new_name)
3190     # Change the instance lock. This is definitely safe while we hold the BGL
3191     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3192     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3193
3194     # re-read the instance from the configuration after rename
3195     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3196
3197     if inst.disk_template == constants.DT_FILE:
3198       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3199       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3200                                                      old_file_storage_dir,
3201                                                      new_file_storage_dir)
3202       result.Raise()
3203       if not result.data:
3204         raise errors.OpExecError("Could not connect to node '%s' to rename"
3205                                  " directory '%s' to '%s' (but the instance"
3206                                  " has been renamed in Ganeti)" % (
3207                                  inst.primary_node, old_file_storage_dir,
3208                                  new_file_storage_dir))
3209
3210       if not result.data[0]:
3211         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3212                                  " (but the instance has been renamed in"
3213                                  " Ganeti)" % (old_file_storage_dir,
3214                                                new_file_storage_dir))
3215
3216     _StartInstanceDisks(self, inst, None)
3217     try:
3218       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3219                                                  old_name)
3220       msg = result.RemoteFailMsg()
3221       if msg:
3222         msg = ("Could not run OS rename script for instance %s on node %s"
3223                " (but the instance has been renamed in Ganeti): %s" %
3224                (inst.name, inst.primary_node, msg))
3225         self.proc.LogWarning(msg)
3226     finally:
3227       _ShutdownInstanceDisks(self, inst)
3228
3229
3230 class LURemoveInstance(LogicalUnit):
3231   """Remove an instance.
3232
3233   """
3234   HPATH = "instance-remove"
3235   HTYPE = constants.HTYPE_INSTANCE
3236   _OP_REQP = ["instance_name", "ignore_failures"]
3237   REQ_BGL = False
3238
3239   def ExpandNames(self):
3240     self._ExpandAndLockInstance()
3241     self.needed_locks[locking.LEVEL_NODE] = []
3242     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3243
3244   def DeclareLocks(self, level):
3245     if level == locking.LEVEL_NODE:
3246       self._LockInstancesNodes()
3247
3248   def BuildHooksEnv(self):
3249     """Build hooks env.
3250
3251     This runs on master, primary and secondary nodes of the instance.
3252
3253     """
3254     env = _BuildInstanceHookEnvByObject(self, self.instance)
3255     nl = [self.cfg.GetMasterNode()]
3256     return env, nl, nl
3257
3258   def CheckPrereq(self):
3259     """Check prerequisites.
3260
3261     This checks that the instance is in the cluster.
3262
3263     """
3264     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3265     assert self.instance is not None, \
3266       "Cannot retrieve locked instance %s" % self.op.instance_name
3267
3268   def Exec(self, feedback_fn):
3269     """Remove the instance.
3270
3271     """
3272     instance = self.instance
3273     logging.info("Shutting down instance %s on node %s",
3274                  instance.name, instance.primary_node)
3275
3276     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3277     msg = result.RemoteFailMsg()
3278     if msg:
3279       if self.op.ignore_failures:
3280         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3281       else:
3282         raise errors.OpExecError("Could not shutdown instance %s on"
3283                                  " node %s: %s" %
3284                                  (instance.name, instance.primary_node, msg))
3285
3286     logging.info("Removing block devices for instance %s", instance.name)
3287
3288     if not _RemoveDisks(self, instance):
3289       if self.op.ignore_failures:
3290         feedback_fn("Warning: can't remove instance's disks")
3291       else:
3292         raise errors.OpExecError("Can't remove instance's disks")
3293
3294     logging.info("Removing instance %s out of cluster config", instance.name)
3295
3296     self.cfg.RemoveInstance(instance.name)
3297     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3298
3299
3300 class LUQueryInstances(NoHooksLU):
3301   """Logical unit for querying instances.
3302
3303   """
3304   _OP_REQP = ["output_fields", "names", "use_locking"]
3305   REQ_BGL = False
3306   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3307                                     "admin_state",
3308                                     "disk_template", "ip", "mac", "bridge",
3309                                     "sda_size", "sdb_size", "vcpus", "tags",
3310                                     "network_port", "beparams",
3311                                     r"(disk)\.(size)/([0-9]+)",
3312                                     r"(disk)\.(sizes)", "disk_usage",
3313                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3314                                     r"(nic)\.(macs|ips|bridges)",
3315                                     r"(disk|nic)\.(count)",
3316                                     "serial_no", "hypervisor", "hvparams",] +
3317                                   ["hv/%s" % name
3318                                    for name in constants.HVS_PARAMETERS] +
3319                                   ["be/%s" % name
3320                                    for name in constants.BES_PARAMETERS])
3321   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3322
3323
3324   def ExpandNames(self):
3325     _CheckOutputFields(static=self._FIELDS_STATIC,
3326                        dynamic=self._FIELDS_DYNAMIC,
3327                        selected=self.op.output_fields)
3328
3329     self.needed_locks = {}
3330     self.share_locks[locking.LEVEL_INSTANCE] = 1
3331     self.share_locks[locking.LEVEL_NODE] = 1
3332
3333     if self.op.names:
3334       self.wanted = _GetWantedInstances(self, self.op.names)
3335     else:
3336       self.wanted = locking.ALL_SET
3337
3338     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3339     self.do_locking = self.do_node_query and self.op.use_locking
3340     if self.do_locking:
3341       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3342       self.needed_locks[locking.LEVEL_NODE] = []
3343       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3344
3345   def DeclareLocks(self, level):
3346     if level == locking.LEVEL_NODE and self.do_locking:
3347       self._LockInstancesNodes()
3348
3349   def CheckPrereq(self):
3350     """Check prerequisites.
3351
3352     """
3353     pass
3354
3355   def Exec(self, feedback_fn):
3356     """Computes the list of nodes and their attributes.
3357
3358     """
3359     all_info = self.cfg.GetAllInstancesInfo()
3360     if self.wanted == locking.ALL_SET:
3361       # caller didn't specify instance names, so ordering is not important
3362       if self.do_locking:
3363         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3364       else:
3365         instance_names = all_info.keys()
3366       instance_names = utils.NiceSort(instance_names)
3367     else:
3368       # caller did specify names, so we must keep the ordering
3369       if self.do_locking:
3370         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3371       else:
3372         tgt_set = all_info.keys()
3373       missing = set(self.wanted).difference(tgt_set)
3374       if missing:
3375         raise errors.OpExecError("Some instances were removed before"
3376                                  " retrieving their data: %s" % missing)
3377       instance_names = self.wanted
3378
3379     instance_list = [all_info[iname] for iname in instance_names]
3380
3381     # begin data gathering
3382
3383     nodes = frozenset([inst.primary_node for inst in instance_list])
3384     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3385
3386     bad_nodes = []
3387     off_nodes = []
3388     if self.do_node_query:
3389       live_data = {}
3390       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3391       for name in nodes:
3392         result = node_data[name]
3393         if result.offline:
3394           # offline nodes will be in both lists
3395           off_nodes.append(name)
3396         if result.failed:
3397           bad_nodes.append(name)
3398         else:
3399           if result.data:
3400             live_data.update(result.data)
3401             # else no instance is alive
3402     else:
3403       live_data = dict([(name, {}) for name in instance_names])
3404
3405     # end data gathering
3406
3407     HVPREFIX = "hv/"
3408     BEPREFIX = "be/"
3409     output = []
3410     for instance in instance_list:
3411       iout = []
3412       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3413       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3414       for field in self.op.output_fields:
3415         st_match = self._FIELDS_STATIC.Matches(field)
3416         if field == "name":
3417           val = instance.name
3418         elif field == "os":
3419           val = instance.os
3420         elif field == "pnode":
3421           val = instance.primary_node
3422         elif field == "snodes":
3423           val = list(instance.secondary_nodes)
3424         elif field == "admin_state":
3425           val = instance.admin_up
3426         elif field == "oper_state":
3427           if instance.primary_node in bad_nodes:
3428             val = None
3429           else:
3430             val = bool(live_data.get(instance.name))
3431         elif field == "status":
3432           if instance.primary_node in off_nodes:
3433             val = "ERROR_nodeoffline"
3434           elif instance.primary_node in bad_nodes:
3435             val = "ERROR_nodedown"
3436           else:
3437             running = bool(live_data.get(instance.name))
3438             if running:
3439               if instance.admin_up:
3440                 val = "running"
3441               else:
3442                 val = "ERROR_up"
3443             else:
3444               if instance.admin_up:
3445                 val = "ERROR_down"
3446               else:
3447                 val = "ADMIN_down"
3448         elif field == "oper_ram":
3449           if instance.primary_node in bad_nodes:
3450             val = None
3451           elif instance.name in live_data:
3452             val = live_data[instance.name].get("memory", "?")
3453           else:
3454             val = "-"
3455         elif field == "vcpus":
3456           val = i_be[constants.BE_VCPUS]
3457         elif field == "disk_template":
3458           val = instance.disk_template
3459         elif field == "ip":
3460           if instance.nics:
3461             val = instance.nics[0].ip
3462           else:
3463             val = None
3464         elif field == "bridge":
3465           if instance.nics:
3466             val = instance.nics[0].bridge
3467           else:
3468             val = None
3469         elif field == "mac":
3470           if instance.nics:
3471             val = instance.nics[0].mac
3472           else:
3473             val = None
3474         elif field == "sda_size" or field == "sdb_size":
3475           idx = ord(field[2]) - ord('a')
3476           try:
3477             val = instance.FindDisk(idx).size
3478           except errors.OpPrereqError:
3479             val = None
3480         elif field == "disk_usage": # total disk usage per node
3481           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3482           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3483         elif field == "tags":
3484           val = list(instance.GetTags())
3485         elif field == "serial_no":
3486           val = instance.serial_no
3487         elif field == "network_port":
3488           val = instance.network_port
3489         elif field == "hypervisor":
3490           val = instance.hypervisor
3491         elif field == "hvparams":
3492           val = i_hv
3493         elif (field.startswith(HVPREFIX) and
3494               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3495           val = i_hv.get(field[len(HVPREFIX):], None)
3496         elif field == "beparams":
3497           val = i_be
3498         elif (field.startswith(BEPREFIX) and
3499               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3500           val = i_be.get(field[len(BEPREFIX):], None)
3501         elif st_match and st_match.groups():
3502           # matches a variable list
3503           st_groups = st_match.groups()
3504           if st_groups and st_groups[0] == "disk":
3505             if st_groups[1] == "count":
3506               val = len(instance.disks)
3507             elif st_groups[1] == "sizes":
3508               val = [disk.size for disk in instance.disks]
3509             elif st_groups[1] == "size":
3510               try:
3511                 val = instance.FindDisk(st_groups[2]).size
3512               except errors.OpPrereqError:
3513                 val = None
3514             else:
3515               assert False, "Unhandled disk parameter"
3516           elif st_groups[0] == "nic":
3517             if st_groups[1] == "count":
3518               val = len(instance.nics)
3519             elif st_groups[1] == "macs":
3520               val = [nic.mac for nic in instance.nics]
3521             elif st_groups[1] == "ips":
3522               val = [nic.ip for nic in instance.nics]
3523             elif st_groups[1] == "bridges":
3524               val = [nic.bridge for nic in instance.nics]
3525             else:
3526               # index-based item
3527               nic_idx = int(st_groups[2])
3528               if nic_idx >= len(instance.nics):
3529                 val = None
3530               else:
3531                 if st_groups[1] == "mac":
3532                   val = instance.nics[nic_idx].mac
3533                 elif st_groups[1] == "ip":
3534                   val = instance.nics[nic_idx].ip
3535                 elif st_groups[1] == "bridge":
3536                   val = instance.nics[nic_idx].bridge
3537                 else:
3538                   assert False, "Unhandled NIC parameter"
3539           else:
3540             assert False, ("Declared but unhandled variable parameter '%s'" %
3541                            field)
3542         else:
3543           assert False, "Declared but unhandled parameter '%s'" % field
3544         iout.append(val)
3545       output.append(iout)
3546
3547     return output
3548
3549
3550 class LUFailoverInstance(LogicalUnit):
3551   """Failover an instance.
3552
3553   """
3554   HPATH = "instance-failover"
3555   HTYPE = constants.HTYPE_INSTANCE
3556   _OP_REQP = ["instance_name", "ignore_consistency"]
3557   REQ_BGL = False
3558
3559   def ExpandNames(self):
3560     self._ExpandAndLockInstance()
3561     self.needed_locks[locking.LEVEL_NODE] = []
3562     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3563
3564   def DeclareLocks(self, level):
3565     if level == locking.LEVEL_NODE:
3566       self._LockInstancesNodes()
3567
3568   def BuildHooksEnv(self):
3569     """Build hooks env.
3570
3571     This runs on master, primary and secondary nodes of the instance.
3572
3573     """
3574     env = {
3575       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3576       }
3577     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3578     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3579     return env, nl, nl
3580
3581   def CheckPrereq(self):
3582     """Check prerequisites.
3583
3584     This checks that the instance is in the cluster.
3585
3586     """
3587     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3588     assert self.instance is not None, \
3589       "Cannot retrieve locked instance %s" % self.op.instance_name
3590
3591     bep = self.cfg.GetClusterInfo().FillBE(instance)
3592     if instance.disk_template not in constants.DTS_NET_MIRROR:
3593       raise errors.OpPrereqError("Instance's disk layout is not"
3594                                  " network mirrored, cannot failover.")
3595
3596     secondary_nodes = instance.secondary_nodes
3597     if not secondary_nodes:
3598       raise errors.ProgrammerError("no secondary node but using "
3599                                    "a mirrored disk template")
3600
3601     target_node = secondary_nodes[0]
3602     _CheckNodeOnline(self, target_node)
3603     _CheckNodeNotDrained(self, target_node)
3604
3605     if instance.admin_up:
3606       # check memory requirements on the secondary node
3607       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3608                            instance.name, bep[constants.BE_MEMORY],
3609                            instance.hypervisor)
3610     else:
3611       self.LogInfo("Not checking memory on the secondary node as"
3612                    " instance will not be started")
3613
3614     # check bridge existance
3615     brlist = [nic.bridge for nic in instance.nics]
3616     result = self.rpc.call_bridges_exist(target_node, brlist)
3617     result.Raise()
3618     if not result.data:
3619       raise errors.OpPrereqError("One or more target bridges %s does not"
3620                                  " exist on destination node '%s'" %
3621                                  (brlist, target_node))
3622
3623   def Exec(self, feedback_fn):
3624     """Failover an instance.
3625
3626     The failover is done by shutting it down on its present node and
3627     starting it on the secondary.
3628
3629     """
3630     instance = self.instance
3631
3632     source_node = instance.primary_node
3633     target_node = instance.secondary_nodes[0]
3634
3635     feedback_fn("* checking disk consistency between source and target")
3636     for dev in instance.disks:
3637       # for drbd, these are drbd over lvm
3638       if not _CheckDiskConsistency(self, dev, target_node, False):
3639         if instance.admin_up and not self.op.ignore_consistency:
3640           raise errors.OpExecError("Disk %s is degraded on target node,"
3641                                    " aborting failover." % dev.iv_name)
3642
3643     feedback_fn("* shutting down instance on source node")
3644     logging.info("Shutting down instance %s on node %s",
3645                  instance.name, source_node)
3646
3647     result = self.rpc.call_instance_shutdown(source_node, instance)
3648     msg = result.RemoteFailMsg()
3649     if msg:
3650       if self.op.ignore_consistency:
3651         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3652                              " Proceeding anyway. Please make sure node"
3653                              " %s is down. Error details: %s",
3654                              instance.name, source_node, source_node, msg)
3655       else:
3656         raise errors.OpExecError("Could not shutdown instance %s on"
3657                                  " node %s: %s" %
3658                                  (instance.name, source_node, msg))
3659
3660     feedback_fn("* deactivating the instance's disks on source node")
3661     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3662       raise errors.OpExecError("Can't shut down the instance's disks.")
3663
3664     instance.primary_node = target_node
3665     # distribute new instance config to the other nodes
3666     self.cfg.Update(instance)
3667
3668     # Only start the instance if it's marked as up
3669     if instance.admin_up:
3670       feedback_fn("* activating the instance's disks on target node")
3671       logging.info("Starting instance %s on node %s",
3672                    instance.name, target_node)
3673
3674       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3675                                                ignore_secondaries=True)
3676       if not disks_ok:
3677         _ShutdownInstanceDisks(self, instance)
3678         raise errors.OpExecError("Can't activate the instance's disks")
3679
3680       feedback_fn("* starting the instance on the target node")
3681       result = self.rpc.call_instance_start(target_node, instance, None, None)
3682       msg = result.RemoteFailMsg()
3683       if msg:
3684         _ShutdownInstanceDisks(self, instance)
3685         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3686                                  (instance.name, target_node, msg))
3687
3688
3689 class LUMigrateInstance(LogicalUnit):
3690   """Migrate an instance.
3691
3692   This is migration without shutting down, compared to the failover,
3693   which is done with shutdown.
3694
3695   """
3696   HPATH = "instance-migrate"
3697   HTYPE = constants.HTYPE_INSTANCE
3698   _OP_REQP = ["instance_name", "live", "cleanup"]
3699
3700   REQ_BGL = False
3701
3702   def ExpandNames(self):
3703     self._ExpandAndLockInstance()
3704     self.needed_locks[locking.LEVEL_NODE] = []
3705     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3706
3707   def DeclareLocks(self, level):
3708     if level == locking.LEVEL_NODE:
3709       self._LockInstancesNodes()
3710
3711   def BuildHooksEnv(self):
3712     """Build hooks env.
3713
3714     This runs on master, primary and secondary nodes of the instance.
3715
3716     """
3717     env = _BuildInstanceHookEnvByObject(self, self.instance)
3718     env["MIGRATE_LIVE"] = self.op.live
3719     env["MIGRATE_CLEANUP"] = self.op.cleanup
3720     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3721     return env, nl, nl
3722
3723   def CheckPrereq(self):
3724     """Check prerequisites.
3725
3726     This checks that the instance is in the cluster.
3727
3728     """
3729     instance = self.cfg.GetInstanceInfo(
3730       self.cfg.ExpandInstanceName(self.op.instance_name))
3731     if instance is None:
3732       raise errors.OpPrereqError("Instance '%s' not known" %
3733                                  self.op.instance_name)
3734
3735     if instance.disk_template != constants.DT_DRBD8:
3736       raise errors.OpPrereqError("Instance's disk layout is not"
3737                                  " drbd8, cannot migrate.")
3738
3739     secondary_nodes = instance.secondary_nodes
3740     if not secondary_nodes:
3741       raise errors.ConfigurationError("No secondary node but using"
3742                                       " drbd8 disk template")
3743
3744     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3745
3746     target_node = secondary_nodes[0]
3747     # check memory requirements on the secondary node
3748     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3749                          instance.name, i_be[constants.BE_MEMORY],
3750                          instance.hypervisor)
3751
3752     # check bridge existance
3753     brlist = [nic.bridge for nic in instance.nics]
3754     result = self.rpc.call_bridges_exist(target_node, brlist)
3755     if result.failed or not result.data:
3756       raise errors.OpPrereqError("One or more target bridges %s does not"
3757                                  " exist on destination node '%s'" %
3758                                  (brlist, target_node))
3759
3760     if not self.op.cleanup:
3761       _CheckNodeNotDrained(self, target_node)
3762       result = self.rpc.call_instance_migratable(instance.primary_node,
3763                                                  instance)
3764       msg = result.RemoteFailMsg()
3765       if msg:
3766         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3767                                    msg)
3768
3769     self.instance = instance
3770
3771   def _WaitUntilSync(self):
3772     """Poll with custom rpc for disk sync.
3773
3774     This uses our own step-based rpc call.
3775
3776     """
3777     self.feedback_fn("* wait until resync is done")
3778     all_done = False
3779     while not all_done:
3780       all_done = True
3781       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3782                                             self.nodes_ip,
3783                                             self.instance.disks)
3784       min_percent = 100
3785       for node, nres in result.items():
3786         msg = nres.RemoteFailMsg()
3787         if msg:
3788           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3789                                    (node, msg))
3790         node_done, node_percent = nres.payload
3791         all_done = all_done and node_done
3792         if node_percent is not None:
3793           min_percent = min(min_percent, node_percent)
3794       if not all_done:
3795         if min_percent < 100:
3796           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3797         time.sleep(2)
3798
3799   def _EnsureSecondary(self, node):
3800     """Demote a node to secondary.
3801
3802     """
3803     self.feedback_fn("* switching node %s to secondary mode" % node)
3804
3805     for dev in self.instance.disks:
3806       self.cfg.SetDiskID(dev, node)
3807
3808     result = self.rpc.call_blockdev_close(node, self.instance.name,
3809                                           self.instance.disks)
3810     msg = result.RemoteFailMsg()
3811     if msg:
3812       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3813                                " error %s" % (node, msg))
3814
3815   def _GoStandalone(self):
3816     """Disconnect from the network.
3817
3818     """
3819     self.feedback_fn("* changing into standalone mode")
3820     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3821                                                self.instance.disks)
3822     for node, nres in result.items():
3823       msg = nres.RemoteFailMsg()
3824       if msg:
3825         raise errors.OpExecError("Cannot disconnect disks node %s,"
3826                                  " error %s" % (node, msg))
3827
3828   def _GoReconnect(self, multimaster):
3829     """Reconnect to the network.
3830
3831     """
3832     if multimaster:
3833       msg = "dual-master"
3834     else:
3835       msg = "single-master"
3836     self.feedback_fn("* changing disks into %s mode" % msg)
3837     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3838                                            self.instance.disks,
3839                                            self.instance.name, multimaster)
3840     for node, nres in result.items():
3841       msg = nres.RemoteFailMsg()
3842       if msg:
3843         raise errors.OpExecError("Cannot change disks config on node %s,"
3844                                  " error: %s" % (node, msg))
3845
3846   def _ExecCleanup(self):
3847     """Try to cleanup after a failed migration.
3848
3849     The cleanup is done by:
3850       - check that the instance is running only on one node
3851         (and update the config if needed)
3852       - change disks on its secondary node to secondary
3853       - wait until disks are fully synchronized
3854       - disconnect from the network
3855       - change disks into single-master mode
3856       - wait again until disks are fully synchronized
3857
3858     """
3859     instance = self.instance
3860     target_node = self.target_node
3861     source_node = self.source_node
3862
3863     # check running on only one node
3864     self.feedback_fn("* checking where the instance actually runs"
3865                      " (if this hangs, the hypervisor might be in"
3866                      " a bad state)")
3867     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3868     for node, result in ins_l.items():
3869       result.Raise()
3870       if not isinstance(result.data, list):
3871         raise errors.OpExecError("Can't contact node '%s'" % node)
3872
3873     runningon_source = instance.name in ins_l[source_node].data
3874     runningon_target = instance.name in ins_l[target_node].data
3875
3876     if runningon_source and runningon_target:
3877       raise errors.OpExecError("Instance seems to be running on two nodes,"
3878                                " or the hypervisor is confused. You will have"
3879                                " to ensure manually that it runs only on one"
3880                                " and restart this operation.")
3881
3882     if not (runningon_source or runningon_target):
3883       raise errors.OpExecError("Instance does not seem to be running at all."
3884                                " In this case, it's safer to repair by"
3885                                " running 'gnt-instance stop' to ensure disk"
3886                                " shutdown, and then restarting it.")
3887
3888     if runningon_target:
3889       # the migration has actually succeeded, we need to update the config
3890       self.feedback_fn("* instance running on secondary node (%s),"
3891                        " updating config" % target_node)
3892       instance.primary_node = target_node
3893       self.cfg.Update(instance)
3894       demoted_node = source_node
3895     else:
3896       self.feedback_fn("* instance confirmed to be running on its"
3897                        " primary node (%s)" % source_node)
3898       demoted_node = target_node
3899
3900     self._EnsureSecondary(demoted_node)
3901     try:
3902       self._WaitUntilSync()
3903     except errors.OpExecError:
3904       # we ignore here errors, since if the device is standalone, it
3905       # won't be able to sync
3906       pass
3907     self._GoStandalone()
3908     self._GoReconnect(False)
3909     self._WaitUntilSync()
3910
3911     self.feedback_fn("* done")
3912
3913   def _RevertDiskStatus(self):
3914     """Try to revert the disk status after a failed migration.
3915
3916     """
3917     target_node = self.target_node
3918     try:
3919       self._EnsureSecondary(target_node)
3920       self._GoStandalone()
3921       self._GoReconnect(False)
3922       self._WaitUntilSync()
3923     except errors.OpExecError, err:
3924       self.LogWarning("Migration failed and I can't reconnect the"
3925                       " drives: error '%s'\n"
3926                       "Please look and recover the instance status" %
3927                       str(err))
3928
3929   def _AbortMigration(self):
3930     """Call the hypervisor code to abort a started migration.
3931
3932     """
3933     instance = self.instance
3934     target_node = self.target_node
3935     migration_info = self.migration_info
3936
3937     abort_result = self.rpc.call_finalize_migration(target_node,
3938                                                     instance,
3939                                                     migration_info,
3940                                                     False)
3941     abort_msg = abort_result.RemoteFailMsg()
3942     if abort_msg:
3943       logging.error("Aborting migration failed on target node %s: %s" %
3944                     (target_node, abort_msg))
3945       # Don't raise an exception here, as we stil have to try to revert the
3946       # disk status, even if this step failed.
3947
3948   def _ExecMigration(self):
3949     """Migrate an instance.
3950
3951     The migrate is done by:
3952       - change the disks into dual-master mode
3953       - wait until disks are fully synchronized again
3954       - migrate the instance
3955       - change disks on the new secondary node (the old primary) to secondary
3956       - wait until disks are fully synchronized
3957       - change disks into single-master mode
3958
3959     """
3960     instance = self.instance
3961     target_node = self.target_node
3962     source_node = self.source_node
3963
3964     self.feedback_fn("* checking disk consistency between source and target")
3965     for dev in instance.disks:
3966       if not _CheckDiskConsistency(self, dev, target_node, False):
3967         raise errors.OpExecError("Disk %s is degraded or not fully"
3968                                  " synchronized on target node,"
3969                                  " aborting migrate." % dev.iv_name)
3970
3971     # First get the migration information from the remote node
3972     result = self.rpc.call_migration_info(source_node, instance)
3973     msg = result.RemoteFailMsg()
3974     if msg:
3975       log_err = ("Failed fetching source migration information from %s: %s" %
3976                  (source_node, msg))
3977       logging.error(log_err)
3978       raise errors.OpExecError(log_err)
3979
3980     self.migration_info = migration_info = result.payload
3981
3982     # Then switch the disks to master/master mode
3983     self._EnsureSecondary(target_node)
3984     self._GoStandalone()
3985     self._GoReconnect(True)
3986     self._WaitUntilSync()
3987
3988     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3989     result = self.rpc.call_accept_instance(target_node,
3990                                            instance,
3991                                            migration_info,
3992                                            self.nodes_ip[target_node])
3993
3994     msg = result.RemoteFailMsg()
3995     if msg:
3996       logging.error("Instance pre-migration failed, trying to revert"
3997                     " disk status: %s", msg)
3998       self._AbortMigration()
3999       self._RevertDiskStatus()
4000       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4001                                (instance.name, msg))
4002
4003     self.feedback_fn("* migrating instance to %s" % target_node)
4004     time.sleep(10)
4005     result = self.rpc.call_instance_migrate(source_node, instance,
4006                                             self.nodes_ip[target_node],
4007                                             self.op.live)
4008     msg = result.RemoteFailMsg()
4009     if msg:
4010       logging.error("Instance migration failed, trying to revert"
4011                     " disk status: %s", msg)
4012       self._AbortMigration()
4013       self._RevertDiskStatus()
4014       raise errors.OpExecError("Could not migrate instance %s: %s" %
4015                                (instance.name, msg))
4016     time.sleep(10)
4017
4018     instance.primary_node = target_node
4019     # distribute new instance config to the other nodes
4020     self.cfg.Update(instance)
4021
4022     result = self.rpc.call_finalize_migration(target_node,
4023                                               instance,
4024                                               migration_info,
4025                                               True)
4026     msg = result.RemoteFailMsg()
4027     if msg:
4028       logging.error("Instance migration succeeded, but finalization failed:"
4029                     " %s" % msg)
4030       raise errors.OpExecError("Could not finalize instance migration: %s" %
4031                                msg)
4032
4033     self._EnsureSecondary(source_node)
4034     self._WaitUntilSync()
4035     self._GoStandalone()
4036     self._GoReconnect(False)
4037     self._WaitUntilSync()
4038
4039     self.feedback_fn("* done")
4040
4041   def Exec(self, feedback_fn):
4042     """Perform the migration.
4043
4044     """
4045     self.feedback_fn = feedback_fn
4046
4047     self.source_node = self.instance.primary_node
4048     self.target_node = self.instance.secondary_nodes[0]
4049     self.all_nodes = [self.source_node, self.target_node]
4050     self.nodes_ip = {
4051       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4052       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4053       }
4054     if self.op.cleanup:
4055       return self._ExecCleanup()
4056     else:
4057       return self._ExecMigration()
4058
4059
4060 def _CreateBlockDev(lu, node, instance, device, force_create,
4061                     info, force_open):
4062   """Create a tree of block devices on a given node.
4063
4064   If this device type has to be created on secondaries, create it and
4065   all its children.
4066
4067   If not, just recurse to children keeping the same 'force' value.
4068
4069   @param lu: the lu on whose behalf we execute
4070   @param node: the node on which to create the device
4071   @type instance: L{objects.Instance}
4072   @param instance: the instance which owns the device
4073   @type device: L{objects.Disk}
4074   @param device: the device to create
4075   @type force_create: boolean
4076   @param force_create: whether to force creation of this device; this
4077       will be change to True whenever we find a device which has
4078       CreateOnSecondary() attribute
4079   @param info: the extra 'metadata' we should attach to the device
4080       (this will be represented as a LVM tag)
4081   @type force_open: boolean
4082   @param force_open: this parameter will be passes to the
4083       L{backend.BlockdevCreate} function where it specifies
4084       whether we run on primary or not, and it affects both
4085       the child assembly and the device own Open() execution
4086
4087   """
4088   if device.CreateOnSecondary():
4089     force_create = True
4090
4091   if device.children:
4092     for child in device.children:
4093       _CreateBlockDev(lu, node, instance, child, force_create,
4094                       info, force_open)
4095
4096   if not force_create:
4097     return
4098
4099   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4100
4101
4102 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4103   """Create a single block device on a given node.
4104
4105   This will not recurse over children of the device, so they must be
4106   created in advance.
4107
4108   @param lu: the lu on whose behalf we execute
4109   @param node: the node on which to create the device
4110   @type instance: L{objects.Instance}
4111   @param instance: the instance which owns the device
4112   @type device: L{objects.Disk}
4113   @param device: the device to create
4114   @param info: the extra 'metadata' we should attach to the device
4115       (this will be represented as a LVM tag)
4116   @type force_open: boolean
4117   @param force_open: this parameter will be passes to the
4118       L{backend.BlockdevCreate} function where it specifies
4119       whether we run on primary or not, and it affects both
4120       the child assembly and the device own Open() execution
4121
4122   """
4123   lu.cfg.SetDiskID(device, node)
4124   result = lu.rpc.call_blockdev_create(node, device, device.size,
4125                                        instance.name, force_open, info)
4126   msg = result.RemoteFailMsg()
4127   if msg:
4128     raise errors.OpExecError("Can't create block device %s on"
4129                              " node %s for instance %s: %s" %
4130                              (device, node, instance.name, msg))
4131   if device.physical_id is None:
4132     device.physical_id = result.payload
4133
4134
4135 def _GenerateUniqueNames(lu, exts):
4136   """Generate a suitable LV name.
4137
4138   This will generate a logical volume name for the given instance.
4139
4140   """
4141   results = []
4142   for val in exts:
4143     new_id = lu.cfg.GenerateUniqueID()
4144     results.append("%s%s" % (new_id, val))
4145   return results
4146
4147
4148 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4149                          p_minor, s_minor):
4150   """Generate a drbd8 device complete with its children.
4151
4152   """
4153   port = lu.cfg.AllocatePort()
4154   vgname = lu.cfg.GetVGName()
4155   shared_secret = lu.cfg.GenerateDRBDSecret()
4156   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4157                           logical_id=(vgname, names[0]))
4158   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4159                           logical_id=(vgname, names[1]))
4160   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4161                           logical_id=(primary, secondary, port,
4162                                       p_minor, s_minor,
4163                                       shared_secret),
4164                           children=[dev_data, dev_meta],
4165                           iv_name=iv_name)
4166   return drbd_dev
4167
4168
4169 def _GenerateDiskTemplate(lu, template_name,
4170                           instance_name, primary_node,
4171                           secondary_nodes, disk_info,
4172                           file_storage_dir, file_driver,
4173                           base_index):
4174   """Generate the entire disk layout for a given template type.
4175
4176   """
4177   #TODO: compute space requirements
4178
4179   vgname = lu.cfg.GetVGName()
4180   disk_count = len(disk_info)
4181   disks = []
4182   if template_name == constants.DT_DISKLESS:
4183     pass
4184   elif template_name == constants.DT_PLAIN:
4185     if len(secondary_nodes) != 0:
4186       raise errors.ProgrammerError("Wrong template configuration")
4187
4188     names = _GenerateUniqueNames(lu, [".disk%d" % i
4189                                       for i in range(disk_count)])
4190     for idx, disk in enumerate(disk_info):
4191       disk_index = idx + base_index
4192       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4193                               logical_id=(vgname, names[idx]),
4194                               iv_name="disk/%d" % disk_index,
4195                               mode=disk["mode"])
4196       disks.append(disk_dev)
4197   elif template_name == constants.DT_DRBD8:
4198     if len(secondary_nodes) != 1:
4199       raise errors.ProgrammerError("Wrong template configuration")
4200     remote_node = secondary_nodes[0]
4201     minors = lu.cfg.AllocateDRBDMinor(
4202       [primary_node, remote_node] * len(disk_info), instance_name)
4203
4204     names = []
4205     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4206                                                for i in range(disk_count)]):
4207       names.append(lv_prefix + "_data")
4208       names.append(lv_prefix + "_meta")
4209     for idx, disk in enumerate(disk_info):
4210       disk_index = idx + base_index
4211       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4212                                       disk["size"], names[idx*2:idx*2+2],
4213                                       "disk/%d" % disk_index,
4214                                       minors[idx*2], minors[idx*2+1])
4215       disk_dev.mode = disk["mode"]
4216       disks.append(disk_dev)
4217   elif template_name == constants.DT_FILE:
4218     if len(secondary_nodes) != 0:
4219       raise errors.ProgrammerError("Wrong template configuration")
4220
4221     for idx, disk in enumerate(disk_info):
4222       disk_index = idx + base_index
4223       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4224                               iv_name="disk/%d" % disk_index,
4225                               logical_id=(file_driver,
4226                                           "%s/disk%d" % (file_storage_dir,
4227                                                          disk_index)),
4228                               mode=disk["mode"])
4229       disks.append(disk_dev)
4230   else:
4231     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4232   return disks
4233
4234
4235 def _GetInstanceInfoText(instance):
4236   """Compute that text that should be added to the disk's metadata.
4237
4238   """
4239   return "originstname+%s" % instance.name
4240
4241
4242 def _CreateDisks(lu, instance):
4243   """Create all disks for an instance.
4244
4245   This abstracts away some work from AddInstance.
4246
4247   @type lu: L{LogicalUnit}
4248   @param lu: the logical unit on whose behalf we execute
4249   @type instance: L{objects.Instance}
4250   @param instance: the instance whose disks we should create
4251   @rtype: boolean
4252   @return: the success of the creation
4253
4254   """
4255   info = _GetInstanceInfoText(instance)
4256   pnode = instance.primary_node
4257
4258   if instance.disk_template == constants.DT_FILE:
4259     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4260     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4261
4262     if result.failed or not result.data:
4263       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4264
4265     if not result.data[0]:
4266       raise errors.OpExecError("Failed to create directory '%s'" %
4267                                file_storage_dir)
4268
4269   # Note: this needs to be kept in sync with adding of disks in
4270   # LUSetInstanceParams
4271   for device in instance.disks:
4272     logging.info("Creating volume %s for instance %s",
4273                  device.iv_name, instance.name)
4274     #HARDCODE
4275     for node in instance.all_nodes:
4276       f_create = node == pnode
4277       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4278
4279
4280 def _RemoveDisks(lu, instance):
4281   """Remove all disks for an instance.
4282
4283   This abstracts away some work from `AddInstance()` and
4284   `RemoveInstance()`. Note that in case some of the devices couldn't
4285   be removed, the removal will continue with the other ones (compare
4286   with `_CreateDisks()`).
4287
4288   @type lu: L{LogicalUnit}
4289   @param lu: the logical unit on whose behalf we execute
4290   @type instance: L{objects.Instance}
4291   @param instance: the instance whose disks we should remove
4292   @rtype: boolean
4293   @return: the success of the removal
4294
4295   """
4296   logging.info("Removing block devices for instance %s", instance.name)
4297
4298   all_result = True
4299   for device in instance.disks:
4300     for node, disk in device.ComputeNodeTree(instance.primary_node):
4301       lu.cfg.SetDiskID(disk, node)
4302       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4303       if msg:
4304         lu.LogWarning("Could not remove block device %s on node %s,"
4305                       " continuing anyway: %s", device.iv_name, node, msg)
4306         all_result = False
4307
4308   if instance.disk_template == constants.DT_FILE:
4309     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4310     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4311                                                  file_storage_dir)
4312     if result.failed or not result.data:
4313       logging.error("Could not remove directory '%s'", file_storage_dir)
4314       all_result = False
4315
4316   return all_result
4317
4318
4319 def _ComputeDiskSize(disk_template, disks):
4320   """Compute disk size requirements in the volume group
4321
4322   """
4323   # Required free disk space as a function of disk and swap space
4324   req_size_dict = {
4325     constants.DT_DISKLESS: None,
4326     constants.DT_PLAIN: sum(d["size"] for d in disks),
4327     # 128 MB are added for drbd metadata for each disk
4328     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4329     constants.DT_FILE: None,
4330   }
4331
4332   if disk_template not in req_size_dict:
4333     raise errors.ProgrammerError("Disk template '%s' size requirement"
4334                                  " is unknown" %  disk_template)
4335
4336   return req_size_dict[disk_template]
4337
4338
4339 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4340   """Hypervisor parameter validation.
4341
4342   This function abstract the hypervisor parameter validation to be
4343   used in both instance create and instance modify.
4344
4345   @type lu: L{LogicalUnit}
4346   @param lu: the logical unit for which we check
4347   @type nodenames: list
4348   @param nodenames: the list of nodes on which we should check
4349   @type hvname: string
4350   @param hvname: the name of the hypervisor we should use
4351   @type hvparams: dict
4352   @param hvparams: the parameters which we need to check
4353   @raise errors.OpPrereqError: if the parameters are not valid
4354
4355   """
4356   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4357                                                   hvname,
4358                                                   hvparams)
4359   for node in nodenames:
4360     info = hvinfo[node]
4361     if info.offline:
4362       continue
4363     msg = info.RemoteFailMsg()
4364     if msg:
4365       raise errors.OpPrereqError("Hypervisor parameter validation"
4366                                  " failed on node %s: %s" % (node, msg))
4367
4368
4369 class LUCreateInstance(LogicalUnit):
4370   """Create an instance.
4371
4372   """
4373   HPATH = "instance-add"
4374   HTYPE = constants.HTYPE_INSTANCE
4375   _OP_REQP = ["instance_name", "disks", "disk_template",
4376               "mode", "start",
4377               "wait_for_sync", "ip_check", "nics",
4378               "hvparams", "beparams"]
4379   REQ_BGL = False
4380
4381   def _ExpandNode(self, node):
4382     """Expands and checks one node name.
4383
4384     """
4385     node_full = self.cfg.ExpandNodeName(node)
4386     if node_full is None:
4387       raise errors.OpPrereqError("Unknown node %s" % node)
4388     return node_full
4389
4390   def ExpandNames(self):
4391     """ExpandNames for CreateInstance.
4392
4393     Figure out the right locks for instance creation.
4394
4395     """
4396     self.needed_locks = {}
4397
4398     # set optional parameters to none if they don't exist
4399     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4400       if not hasattr(self.op, attr):
4401         setattr(self.op, attr, None)
4402
4403     # cheap checks, mostly valid constants given
4404
4405     # verify creation mode
4406     if self.op.mode not in (constants.INSTANCE_CREATE,
4407                             constants.INSTANCE_IMPORT):
4408       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4409                                  self.op.mode)
4410
4411     # disk template and mirror node verification
4412     if self.op.disk_template not in constants.DISK_TEMPLATES:
4413       raise errors.OpPrereqError("Invalid disk template name")
4414
4415     if self.op.hypervisor is None:
4416       self.op.hypervisor = self.cfg.GetHypervisorType()
4417
4418     cluster = self.cfg.GetClusterInfo()
4419     enabled_hvs = cluster.enabled_hypervisors
4420     if self.op.hypervisor not in enabled_hvs:
4421       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4422                                  " cluster (%s)" % (self.op.hypervisor,
4423                                   ",".join(enabled_hvs)))
4424
4425     # check hypervisor parameter syntax (locally)
4426     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4427     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4428                                   self.op.hvparams)
4429     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4430     hv_type.CheckParameterSyntax(filled_hvp)
4431     self.hv_full = filled_hvp
4432
4433     # fill and remember the beparams dict
4434     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4435     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4436                                     self.op.beparams)
4437
4438     #### instance parameters check
4439
4440     # instance name verification
4441     hostname1 = utils.HostInfo(self.op.instance_name)
4442     self.op.instance_name = instance_name = hostname1.name
4443
4444     # this is just a preventive check, but someone might still add this
4445     # instance in the meantime, and creation will fail at lock-add time
4446     if instance_name in self.cfg.GetInstanceList():
4447       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4448                                  instance_name)
4449
4450     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4451
4452     # NIC buildup
4453     self.nics = []
4454     for nic in self.op.nics:
4455       # ip validity checks
4456       ip = nic.get("ip", None)
4457       if ip is None or ip.lower() == "none":
4458         nic_ip = None
4459       elif ip.lower() == constants.VALUE_AUTO:
4460         nic_ip = hostname1.ip
4461       else:
4462         if not utils.IsValidIP(ip):
4463           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4464                                      " like a valid IP" % ip)
4465         nic_ip = ip
4466
4467       # MAC address verification
4468       mac = nic.get("mac", constants.VALUE_AUTO)
4469       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4470         if not utils.IsValidMac(mac.lower()):
4471           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4472                                      mac)
4473       # bridge verification
4474       bridge = nic.get("bridge", None)
4475       if bridge is None:
4476         bridge = self.cfg.GetDefBridge()
4477       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4478
4479     # disk checks/pre-build
4480     self.disks = []
4481     for disk in self.op.disks:
4482       mode = disk.get("mode", constants.DISK_RDWR)
4483       if mode not in constants.DISK_ACCESS_SET:
4484         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4485                                    mode)
4486       size = disk.get("size", None)
4487       if size is None:
4488         raise errors.OpPrereqError("Missing disk size")
4489       try:
4490         size = int(size)
4491       except ValueError:
4492         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493       self.disks.append({"size": size, "mode": mode})
4494
4495     # used in CheckPrereq for ip ping check
4496     self.check_ip = hostname1.ip
4497
4498     # file storage checks
4499     if (self.op.file_driver and
4500         not self.op.file_driver in constants.FILE_DRIVER):
4501       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502                                  self.op.file_driver)
4503
4504     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505       raise errors.OpPrereqError("File storage directory path not absolute")
4506
4507     ### Node/iallocator related checks
4508     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509       raise errors.OpPrereqError("One and only one of iallocator and primary"
4510                                  " node must be given")
4511
4512     if self.op.iallocator:
4513       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4514     else:
4515       self.op.pnode = self._ExpandNode(self.op.pnode)
4516       nodelist = [self.op.pnode]
4517       if self.op.snode is not None:
4518         self.op.snode = self._ExpandNode(self.op.snode)
4519         nodelist.append(self.op.snode)
4520       self.needed_locks[locking.LEVEL_NODE] = nodelist
4521
4522     # in case of import lock the source node too
4523     if self.op.mode == constants.INSTANCE_IMPORT:
4524       src_node = getattr(self.op, "src_node", None)
4525       src_path = getattr(self.op, "src_path", None)
4526
4527       if src_path is None:
4528         self.op.src_path = src_path = self.op.instance_name
4529
4530       if src_node is None:
4531         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532         self.op.src_node = None
4533         if os.path.isabs(src_path):
4534           raise errors.OpPrereqError("Importing an instance from an absolute"
4535                                      " path requires a source node option.")
4536       else:
4537         self.op.src_node = src_node = self._ExpandNode(src_node)
4538         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540         if not os.path.isabs(src_path):
4541           self.op.src_path = src_path = \
4542             os.path.join(constants.EXPORT_DIR, src_path)
4543
4544     else: # INSTANCE_CREATE
4545       if getattr(self.op, "os_type", None) is None:
4546         raise errors.OpPrereqError("No guest OS specified")
4547
4548   def _RunAllocator(self):
4549     """Run the allocator based on input opcode.
4550
4551     """
4552     nics = [n.ToDict() for n in self.nics]
4553     ial = IAllocator(self,
4554                      mode=constants.IALLOCATOR_MODE_ALLOC,
4555                      name=self.op.instance_name,
4556                      disk_template=self.op.disk_template,
4557                      tags=[],
4558                      os=self.op.os_type,
4559                      vcpus=self.be_full[constants.BE_VCPUS],
4560                      mem_size=self.be_full[constants.BE_MEMORY],
4561                      disks=self.disks,
4562                      nics=nics,
4563                      hypervisor=self.op.hypervisor,
4564                      )
4565
4566     ial.Run(self.op.iallocator)
4567
4568     if not ial.success:
4569       raise errors.OpPrereqError("Can't compute nodes using"
4570                                  " iallocator '%s': %s" % (self.op.iallocator,
4571                                                            ial.info))
4572     if len(ial.nodes) != ial.required_nodes:
4573       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574                                  " of nodes (%s), required %s" %
4575                                  (self.op.iallocator, len(ial.nodes),
4576                                   ial.required_nodes))
4577     self.op.pnode = ial.nodes[0]
4578     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579                  self.op.instance_name, self.op.iallocator,
4580                  ", ".join(ial.nodes))
4581     if ial.required_nodes == 2:
4582       self.op.snode = ial.nodes[1]
4583
4584   def BuildHooksEnv(self):
4585     """Build hooks env.
4586
4587     This runs on master, primary and secondary nodes of the instance.
4588
4589     """
4590     env = {
4591       "ADD_MODE": self.op.mode,
4592       }
4593     if self.op.mode == constants.INSTANCE_IMPORT:
4594       env["SRC_NODE"] = self.op.src_node
4595       env["SRC_PATH"] = self.op.src_path
4596       env["SRC_IMAGES"] = self.src_images
4597
4598     env.update(_BuildInstanceHookEnv(
4599       name=self.op.instance_name,
4600       primary_node=self.op.pnode,
4601       secondary_nodes=self.secondaries,
4602       status=self.op.start,
4603       os_type=self.op.os_type,
4604       memory=self.be_full[constants.BE_MEMORY],
4605       vcpus=self.be_full[constants.BE_VCPUS],
4606       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4607       disk_template=self.op.disk_template,
4608       disks=[(d["size"], d["mode"]) for d in self.disks],
4609       bep=self.be_full,
4610       hvp=self.hv_full,
4611       hypervisor=self.op.hypervisor,
4612     ))
4613
4614     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4615           self.secondaries)
4616     return env, nl, nl
4617
4618
4619   def CheckPrereq(self):
4620     """Check prerequisites.
4621
4622     """
4623     if (not self.cfg.GetVGName() and
4624         self.op.disk_template not in constants.DTS_NOT_LVM):
4625       raise errors.OpPrereqError("Cluster does not support lvm-based"
4626                                  " instances")
4627
4628     if self.op.mode == constants.INSTANCE_IMPORT:
4629       src_node = self.op.src_node
4630       src_path = self.op.src_path
4631
4632       if src_node is None:
4633         exp_list = self.rpc.call_export_list(
4634           self.acquired_locks[locking.LEVEL_NODE])
4635         found = False
4636         for node in exp_list:
4637           if not exp_list[node].failed and src_path in exp_list[node].data:
4638             found = True
4639             self.op.src_node = src_node = node
4640             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4641                                                        src_path)
4642             break
4643         if not found:
4644           raise errors.OpPrereqError("No export found for relative path %s" %
4645                                       src_path)
4646
4647       _CheckNodeOnline(self, src_node)
4648       result = self.rpc.call_export_info(src_node, src_path)
4649       result.Raise()
4650       if not result.data:
4651         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4652
4653       export_info = result.data
4654       if not export_info.has_section(constants.INISECT_EXP):
4655         raise errors.ProgrammerError("Corrupted export config")
4656
4657       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4658       if (int(ei_version) != constants.EXPORT_VERSION):
4659         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4660                                    (ei_version, constants.EXPORT_VERSION))
4661
4662       # Check that the new instance doesn't have less disks than the export
4663       instance_disks = len(self.disks)
4664       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4665       if instance_disks < export_disks:
4666         raise errors.OpPrereqError("Not enough disks to import."
4667                                    " (instance: %d, export: %d)" %
4668                                    (instance_disks, export_disks))
4669
4670       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4671       disk_images = []
4672       for idx in range(export_disks):
4673         option = 'disk%d_dump' % idx
4674         if export_info.has_option(constants.INISECT_INS, option):
4675           # FIXME: are the old os-es, disk sizes, etc. useful?
4676           export_name = export_info.get(constants.INISECT_INS, option)
4677           image = os.path.join(src_path, export_name)
4678           disk_images.append(image)
4679         else:
4680           disk_images.append(False)
4681
4682       self.src_images = disk_images
4683
4684       old_name = export_info.get(constants.INISECT_INS, 'name')
4685       # FIXME: int() here could throw a ValueError on broken exports
4686       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4687       if self.op.instance_name == old_name:
4688         for idx, nic in enumerate(self.nics):
4689           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4690             nic_mac_ini = 'nic%d_mac' % idx
4691             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4692
4693     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4694     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4695     if self.op.start and not self.op.ip_check:
4696       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4697                                  " adding an instance in start mode")
4698
4699     if self.op.ip_check:
4700       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4701         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4702                                    (self.check_ip, self.op.instance_name))
4703
4704     #### mac address generation
4705     # By generating here the mac address both the allocator and the hooks get
4706     # the real final mac address rather than the 'auto' or 'generate' value.
4707     # There is a race condition between the generation and the instance object
4708     # creation, which means that we know the mac is valid now, but we're not
4709     # sure it will be when we actually add the instance. If things go bad
4710     # adding the instance will abort because of a duplicate mac, and the
4711     # creation job will fail.
4712     for nic in self.nics:
4713       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4714         nic.mac = self.cfg.GenerateMAC()
4715
4716     #### allocator run
4717
4718     if self.op.iallocator is not None:
4719       self._RunAllocator()
4720
4721     #### node related checks
4722
4723     # check primary node
4724     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4725     assert self.pnode is not None, \
4726       "Cannot retrieve locked node %s" % self.op.pnode
4727     if pnode.offline:
4728       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4729                                  pnode.name)
4730     if pnode.drained:
4731       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4732                                  pnode.name)
4733
4734     self.secondaries = []
4735
4736     # mirror node verification
4737     if self.op.disk_template in constants.DTS_NET_MIRROR:
4738       if self.op.snode is None:
4739         raise errors.OpPrereqError("The networked disk templates need"
4740                                    " a mirror node")
4741       if self.op.snode == pnode.name:
4742         raise errors.OpPrereqError("The secondary node cannot be"
4743                                    " the primary node.")
4744       _CheckNodeOnline(self, self.op.snode)
4745       _CheckNodeNotDrained(self, self.op.snode)
4746       self.secondaries.append(self.op.snode)
4747
4748     nodenames = [pnode.name] + self.secondaries
4749
4750     req_size = _ComputeDiskSize(self.op.disk_template,
4751                                 self.disks)
4752
4753     # Check lv size requirements
4754     if req_size is not None:
4755       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4756                                          self.op.hypervisor)
4757       for node in nodenames:
4758         info = nodeinfo[node]
4759         info.Raise()
4760         info = info.data
4761         if not info:
4762           raise errors.OpPrereqError("Cannot get current information"
4763                                      " from node '%s'" % node)
4764         vg_free = info.get('vg_free', None)
4765         if not isinstance(vg_free, int):
4766           raise errors.OpPrereqError("Can't compute free disk space on"
4767                                      " node %s" % node)
4768         if req_size > info['vg_free']:
4769           raise errors.OpPrereqError("Not enough disk space on target node %s."
4770                                      " %d MB available, %d MB required" %
4771                                      (node, info['vg_free'], req_size))
4772
4773     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4774
4775     # os verification
4776     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4777     result.Raise()
4778     if not isinstance(result.data, objects.OS) or not result.data:
4779       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4780                                  " primary node"  % self.op.os_type)
4781
4782     # bridge check on primary node
4783     bridges = [n.bridge for n in self.nics]
4784     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4785     result.Raise()
4786     if not result.data:
4787       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4788                                  " exist on destination node '%s'" %
4789                                  (",".join(bridges), pnode.name))
4790
4791     # memory check on primary node
4792     if self.op.start:
4793       _CheckNodeFreeMemory(self, self.pnode.name,
4794                            "creating instance %s" % self.op.instance_name,
4795                            self.be_full[constants.BE_MEMORY],
4796                            self.op.hypervisor)
4797
4798   def Exec(self, feedback_fn):
4799     """Create and add the instance to the cluster.
4800
4801     """
4802     instance = self.op.instance_name
4803     pnode_name = self.pnode.name
4804
4805     ht_kind = self.op.hypervisor
4806     if ht_kind in constants.HTS_REQ_PORT:
4807       network_port = self.cfg.AllocatePort()
4808     else:
4809       network_port = None
4810
4811     ##if self.op.vnc_bind_address is None:
4812     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4813
4814     # this is needed because os.path.join does not accept None arguments
4815     if self.op.file_storage_dir is None:
4816       string_file_storage_dir = ""
4817     else:
4818       string_file_storage_dir = self.op.file_storage_dir
4819
4820     # build the full file storage dir path
4821     file_storage_dir = os.path.normpath(os.path.join(
4822                                         self.cfg.GetFileStorageDir(),
4823                                         string_file_storage_dir, instance))
4824
4825
4826     disks = _GenerateDiskTemplate(self,
4827                                   self.op.disk_template,
4828                                   instance, pnode_name,
4829                                   self.secondaries,
4830                                   self.disks,
4831                                   file_storage_dir,
4832                                   self.op.file_driver,
4833                                   0)
4834
4835     iobj = objects.Instance(name=instance, os=self.op.os_type,
4836                             primary_node=pnode_name,
4837                             nics=self.nics, disks=disks,
4838                             disk_template=self.op.disk_template,
4839                             admin_up=False,
4840                             network_port=network_port,
4841                             beparams=self.op.beparams,
4842                             hvparams=self.op.hvparams,
4843                             hypervisor=self.op.hypervisor,
4844                             )
4845
4846     feedback_fn("* creating instance disks...")
4847     try:
4848       _CreateDisks(self, iobj)
4849     except errors.OpExecError:
4850       self.LogWarning("Device creation failed, reverting...")
4851       try:
4852         _RemoveDisks(self, iobj)
4853       finally:
4854         self.cfg.ReleaseDRBDMinors(instance)
4855         raise
4856
4857     feedback_fn("adding instance %s to cluster config" % instance)
4858
4859     self.cfg.AddInstance(iobj)
4860     # Declare that we don't want to remove the instance lock anymore, as we've
4861     # added the instance to the config
4862     del self.remove_locks[locking.LEVEL_INSTANCE]
4863     # Unlock all the nodes
4864     if self.op.mode == constants.INSTANCE_IMPORT:
4865       nodes_keep = [self.op.src_node]
4866       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4867                        if node != self.op.src_node]
4868       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4869       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4870     else:
4871       self.context.glm.release(locking.LEVEL_NODE)
4872       del self.acquired_locks[locking.LEVEL_NODE]
4873
4874     if self.op.wait_for_sync:
4875       disk_abort = not _WaitForSync(self, iobj)
4876     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4877       # make sure the disks are not degraded (still sync-ing is ok)
4878       time.sleep(15)
4879       feedback_fn("* checking mirrors status")
4880       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4881     else:
4882       disk_abort = False
4883
4884     if disk_abort:
4885       _RemoveDisks(self, iobj)
4886       self.cfg.RemoveInstance(iobj.name)
4887       # Make sure the instance lock gets removed
4888       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4889       raise errors.OpExecError("There are some degraded disks for"
4890                                " this instance")
4891
4892     feedback_fn("creating os for instance %s on node %s" %
4893                 (instance, pnode_name))
4894
4895     if iobj.disk_template != constants.DT_DISKLESS:
4896       if self.op.mode == constants.INSTANCE_CREATE:
4897         feedback_fn("* running the instance OS create scripts...")
4898         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4899         msg = result.RemoteFailMsg()
4900         if msg:
4901           raise errors.OpExecError("Could not add os for instance %s"
4902                                    " on node %s: %s" %
4903                                    (instance, pnode_name, msg))
4904
4905       elif self.op.mode == constants.INSTANCE_IMPORT:
4906         feedback_fn("* running the instance OS import scripts...")
4907         src_node = self.op.src_node
4908         src_images = self.src_images
4909         cluster_name = self.cfg.GetClusterName()
4910         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4911                                                          src_node, src_images,
4912                                                          cluster_name)
4913         import_result.Raise()
4914         for idx, result in enumerate(import_result.data):
4915           if not result:
4916             self.LogWarning("Could not import the image %s for instance"
4917                             " %s, disk %d, on node %s" %
4918                             (src_images[idx], instance, idx, pnode_name))
4919       else:
4920         # also checked in the prereq part
4921         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4922                                      % self.op.mode)
4923
4924     if self.op.start:
4925       iobj.admin_up = True
4926       self.cfg.Update(iobj)
4927       logging.info("Starting instance %s on node %s", instance, pnode_name)
4928       feedback_fn("* starting instance...")
4929       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4930       msg = result.RemoteFailMsg()
4931       if msg:
4932         raise errors.OpExecError("Could not start instance: %s" % msg)
4933
4934
4935 class LUConnectConsole(NoHooksLU):
4936   """Connect to an instance's console.
4937
4938   This is somewhat special in that it returns the command line that
4939   you need to run on the master node in order to connect to the
4940   console.
4941
4942   """
4943   _OP_REQP = ["instance_name"]
4944   REQ_BGL = False
4945
4946   def ExpandNames(self):
4947     self._ExpandAndLockInstance()
4948
4949   def CheckPrereq(self):
4950     """Check prerequisites.
4951
4952     This checks that the instance is in the cluster.
4953
4954     """
4955     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4956     assert self.instance is not None, \
4957       "Cannot retrieve locked instance %s" % self.op.instance_name
4958     _CheckNodeOnline(self, self.instance.primary_node)
4959
4960   def Exec(self, feedback_fn):
4961     """Connect to the console of an instance
4962
4963     """
4964     instance = self.instance
4965     node = instance.primary_node
4966
4967     node_insts = self.rpc.call_instance_list([node],
4968                                              [instance.hypervisor])[node]
4969     node_insts.Raise()
4970
4971     if instance.name not in node_insts.data:
4972       raise errors.OpExecError("Instance %s is not running." % instance.name)
4973
4974     logging.debug("Connecting to console of %s on %s", instance.name, node)
4975
4976     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4977     cluster = self.cfg.GetClusterInfo()
4978     # beparams and hvparams are passed separately, to avoid editing the
4979     # instance and then saving the defaults in the instance itself.
4980     hvparams = cluster.FillHV(instance)
4981     beparams = cluster.FillBE(instance)
4982     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4983
4984     # build ssh cmdline
4985     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4986
4987
4988 class LUReplaceDisks(LogicalUnit):
4989   """Replace the disks of an instance.
4990
4991   """
4992   HPATH = "mirrors-replace"
4993   HTYPE = constants.HTYPE_INSTANCE
4994   _OP_REQP = ["instance_name", "mode", "disks"]
4995   REQ_BGL = False
4996
4997   def CheckArguments(self):
4998     if not hasattr(self.op, "remote_node"):
4999       self.op.remote_node = None
5000     if not hasattr(self.op, "iallocator"):
5001       self.op.iallocator = None
5002
5003     # check for valid parameter combination
5004     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5005     if self.op.mode == constants.REPLACE_DISK_CHG:
5006       if cnt == 2:
5007         raise errors.OpPrereqError("When changing the secondary either an"
5008                                    " iallocator script must be used or the"
5009                                    " new node given")
5010       elif cnt == 0:
5011         raise errors.OpPrereqError("Give either the iallocator or the new"
5012                                    " secondary, not both")
5013     else: # not replacing the secondary
5014       if cnt != 2:
5015         raise errors.OpPrereqError("The iallocator and new node options can"
5016                                    " be used only when changing the"
5017                                    " secondary node")
5018
5019   def ExpandNames(self):
5020     self._ExpandAndLockInstance()
5021
5022     if self.op.iallocator is not None:
5023       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5024     elif self.op.remote_node is not None:
5025       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5026       if remote_node is None:
5027         raise errors.OpPrereqError("Node '%s' not known" %
5028                                    self.op.remote_node)
5029       self.op.remote_node = remote_node
5030       # Warning: do not remove the locking of the new secondary here
5031       # unless DRBD8.AddChildren is changed to work in parallel;
5032       # currently it doesn't since parallel invocations of
5033       # FindUnusedMinor will conflict
5034       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5035       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5036     else:
5037       self.needed_locks[locking.LEVEL_NODE] = []
5038       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5039
5040   def DeclareLocks(self, level):
5041     # If we're not already locking all nodes in the set we have to declare the
5042     # instance's primary/secondary nodes.
5043     if (level == locking.LEVEL_NODE and
5044         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5045       self._LockInstancesNodes()
5046
5047   def _RunAllocator(self):
5048     """Compute a new secondary node using an IAllocator.
5049
5050     """
5051     ial = IAllocator(self,
5052                      mode=constants.IALLOCATOR_MODE_RELOC,
5053                      name=self.op.instance_name,
5054                      relocate_from=[self.sec_node])
5055
5056     ial.Run(self.op.iallocator)
5057
5058     if not ial.success:
5059       raise errors.OpPrereqError("Can't compute nodes using"
5060                                  " iallocator '%s': %s" % (self.op.iallocator,
5061                                                            ial.info))
5062     if len(ial.nodes) != ial.required_nodes:
5063       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5064                                  " of nodes (%s), required %s" %
5065                                  (len(ial.nodes), ial.required_nodes))
5066     self.op.remote_node = ial.nodes[0]
5067     self.LogInfo("Selected new secondary for the instance: %s",
5068                  self.op.remote_node)
5069
5070   def BuildHooksEnv(self):
5071     """Build hooks env.
5072
5073     This runs on the master, the primary and all the secondaries.
5074
5075     """
5076     env = {
5077       "MODE": self.op.mode,
5078       "NEW_SECONDARY": self.op.remote_node,
5079       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5080       }
5081     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5082     nl = [
5083       self.cfg.GetMasterNode(),
5084       self.instance.primary_node,
5085       ]
5086     if self.op.remote_node is not None:
5087       nl.append(self.op.remote_node)
5088     return env, nl, nl
5089
5090   def CheckPrereq(self):
5091     """Check prerequisites.
5092
5093     This checks that the instance is in the cluster.
5094
5095     """
5096     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5097     assert instance is not None, \
5098       "Cannot retrieve locked instance %s" % self.op.instance_name
5099     self.instance = instance
5100
5101     if instance.disk_template != constants.DT_DRBD8:
5102       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5103                                  " instances")
5104
5105     if len(instance.secondary_nodes) != 1:
5106       raise errors.OpPrereqError("The instance has a strange layout,"
5107                                  " expected one secondary but found %d" %
5108                                  len(instance.secondary_nodes))
5109
5110     self.sec_node = instance.secondary_nodes[0]
5111
5112     if self.op.iallocator is not None:
5113       self._RunAllocator()
5114
5115     remote_node = self.op.remote_node
5116     if remote_node is not None:
5117       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5118       assert self.remote_node_info is not None, \
5119         "Cannot retrieve locked node %s" % remote_node
5120     else:
5121       self.remote_node_info = None
5122     if remote_node == instance.primary_node:
5123       raise errors.OpPrereqError("The specified node is the primary node of"
5124                                  " the instance.")
5125     elif remote_node == self.sec_node:
5126       raise errors.OpPrereqError("The specified node is already the"
5127                                  " secondary node of the instance.")
5128
5129     if self.op.mode == constants.REPLACE_DISK_PRI:
5130       n1 = self.tgt_node = instance.primary_node
5131       n2 = self.oth_node = self.sec_node
5132     elif self.op.mode == constants.REPLACE_DISK_SEC:
5133       n1 = self.tgt_node = self.sec_node
5134       n2 = self.oth_node = instance.primary_node
5135     elif self.op.mode == constants.REPLACE_DISK_CHG:
5136       n1 = self.new_node = remote_node
5137       n2 = self.oth_node = instance.primary_node
5138       self.tgt_node = self.sec_node
5139       _CheckNodeNotDrained(self, remote_node)
5140     else:
5141       raise errors.ProgrammerError("Unhandled disk replace mode")
5142
5143     _CheckNodeOnline(self, n1)
5144     _CheckNodeOnline(self, n2)
5145
5146     if not self.op.disks:
5147       self.op.disks = range(len(instance.disks))
5148
5149     for disk_idx in self.op.disks:
5150       instance.FindDisk(disk_idx)
5151
5152   def _ExecD8DiskOnly(self, feedback_fn):
5153     """Replace a disk on the primary or secondary for dbrd8.
5154
5155     The algorithm for replace is quite complicated:
5156
5157       1. for each disk to be replaced:
5158
5159         1. create new LVs on the target node with unique names
5160         1. detach old LVs from the drbd device
5161         1. rename old LVs to name_replaced.<time_t>
5162         1. rename new LVs to old LVs
5163         1. attach the new LVs (with the old names now) to the drbd device
5164
5165       1. wait for sync across all devices
5166
5167       1. for each modified disk:
5168
5169         1. remove old LVs (which have the name name_replaces.<time_t>)
5170
5171     Failures are not very well handled.
5172
5173     """
5174     steps_total = 6
5175     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5176     instance = self.instance
5177     iv_names = {}
5178     vgname = self.cfg.GetVGName()
5179     # start of work
5180     cfg = self.cfg
5181     tgt_node = self.tgt_node
5182     oth_node = self.oth_node
5183
5184     # Step: check device activation
5185     self.proc.LogStep(1, steps_total, "check device existence")
5186     info("checking volume groups")
5187     my_vg = cfg.GetVGName()
5188     results = self.rpc.call_vg_list([oth_node, tgt_node])
5189     if not results:
5190       raise errors.OpExecError("Can't list volume groups on the nodes")
5191     for node in oth_node, tgt_node:
5192       res = results[node]
5193       if res.failed or not res.data or my_vg not in res.data:
5194         raise errors.OpExecError("Volume group '%s' not found on %s" %
5195                                  (my_vg, node))
5196     for idx, dev in enumerate(instance.disks):
5197       if idx not in self.op.disks:
5198         continue
5199       for node in tgt_node, oth_node:
5200         info("checking disk/%d on %s" % (idx, node))
5201         cfg.SetDiskID(dev, node)
5202         result = self.rpc.call_blockdev_find(node, dev)
5203         msg = result.RemoteFailMsg()
5204         if not msg and not result.payload:
5205           msg = "disk not found"
5206         if msg:
5207           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5208                                    (idx, node, msg))
5209
5210     # Step: check other node consistency
5211     self.proc.LogStep(2, steps_total, "check peer consistency")
5212     for idx, dev in enumerate(instance.disks):
5213       if idx not in self.op.disks:
5214         continue
5215       info("checking disk/%d consistency on %s" % (idx, oth_node))
5216       if not _CheckDiskConsistency(self, dev, oth_node,
5217                                    oth_node==instance.primary_node):
5218         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5219                                  " to replace disks on this node (%s)" %
5220                                  (oth_node, tgt_node))
5221
5222     # Step: create new storage
5223     self.proc.LogStep(3, steps_total, "allocate new storage")
5224     for idx, dev in enumerate(instance.disks):
5225       if idx not in self.op.disks:
5226         continue
5227       size = dev.size
5228       cfg.SetDiskID(dev, tgt_node)
5229       lv_names = [".disk%d_%s" % (idx, suf)
5230                   for suf in ["data", "meta"]]
5231       names = _GenerateUniqueNames(self, lv_names)
5232       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5233                              logical_id=(vgname, names[0]))
5234       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5235                              logical_id=(vgname, names[1]))
5236       new_lvs = [lv_data, lv_meta]
5237       old_lvs = dev.children
5238       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5239       info("creating new local storage on %s for %s" %
5240            (tgt_node, dev.iv_name))
5241       # we pass force_create=True to force the LVM creation
5242       for new_lv in new_lvs:
5243         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5244                         _GetInstanceInfoText(instance), False)
5245
5246     # Step: for each lv, detach+rename*2+attach
5247     self.proc.LogStep(4, steps_total, "change drbd configuration")
5248     for dev, old_lvs, new_lvs in iv_names.itervalues():
5249       info("detaching %s drbd from local storage" % dev.iv_name)
5250       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5251       result.Raise()
5252       if not result.data:
5253         raise errors.OpExecError("Can't detach drbd from local storage on node"
5254                                  " %s for device %s" % (tgt_node, dev.iv_name))
5255       #dev.children = []
5256       #cfg.Update(instance)
5257
5258       # ok, we created the new LVs, so now we know we have the needed
5259       # storage; as such, we proceed on the target node to rename
5260       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5261       # using the assumption that logical_id == physical_id (which in
5262       # turn is the unique_id on that node)
5263
5264       # FIXME(iustin): use a better name for the replaced LVs
5265       temp_suffix = int(time.time())
5266       ren_fn = lambda d, suff: (d.physical_id[0],
5267                                 d.physical_id[1] + "_replaced-%s" % suff)
5268       # build the rename list based on what LVs exist on the node
5269       rlist = []
5270       for to_ren in old_lvs:
5271         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5272         if not result.RemoteFailMsg() and result.payload:
5273           # device exists
5274           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5275
5276       info("renaming the old LVs on the target node")
5277       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5278       result.Raise()
5279       if not result.data:
5280         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5281       # now we rename the new LVs to the old LVs
5282       info("renaming the new LVs on the target node")
5283       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5284       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5285       result.Raise()
5286       if not result.data:
5287         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5288
5289       for old, new in zip(old_lvs, new_lvs):
5290         new.logical_id = old.logical_id
5291         cfg.SetDiskID(new, tgt_node)
5292
5293       for disk in old_lvs:
5294         disk.logical_id = ren_fn(disk, temp_suffix)
5295         cfg.SetDiskID(disk, tgt_node)
5296
5297       # now that the new lvs have the old name, we can add them to the device
5298       info("adding new mirror component on %s" % tgt_node)
5299       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5300       if result.failed or not result.data:
5301         for new_lv in new_lvs:
5302           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5303           if msg:
5304             warning("Can't rollback device %s: %s", dev, msg,
5305                     hint="cleanup manually the unused logical volumes")
5306         raise errors.OpExecError("Can't add local storage to drbd")
5307
5308       dev.children = new_lvs
5309       cfg.Update(instance)
5310
5311     # Step: wait for sync
5312
5313     # this can fail as the old devices are degraded and _WaitForSync
5314     # does a combined result over all disks, so we don't check its
5315     # return value
5316     self.proc.LogStep(5, steps_total, "sync devices")
5317     _WaitForSync(self, instance, unlock=True)
5318
5319     # so check manually all the devices
5320     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5321       cfg.SetDiskID(dev, instance.primary_node)
5322       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5323       msg = result.RemoteFailMsg()
5324       if not msg and not result.payload:
5325         msg = "disk not found"
5326       if msg:
5327         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5328                                  (name, msg))
5329       if result.payload[5]:
5330         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5331
5332     # Step: remove old storage
5333     self.proc.LogStep(6, steps_total, "removing old storage")
5334     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5335       info("remove logical volumes for %s" % name)
5336       for lv in old_lvs:
5337         cfg.SetDiskID(lv, tgt_node)
5338         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5339         if msg:
5340           warning("Can't remove old LV: %s" % msg,
5341                   hint="manually remove unused LVs")
5342           continue
5343
5344   def _ExecD8Secondary(self, feedback_fn):
5345     """Replace the secondary node for drbd8.
5346
5347     The algorithm for replace is quite complicated:
5348       - for all disks of the instance:
5349         - create new LVs on the new node with same names
5350         - shutdown the drbd device on the old secondary
5351         - disconnect the drbd network on the primary
5352         - create the drbd device on the new secondary
5353         - network attach the drbd on the primary, using an artifice:
5354           the drbd code for Attach() will connect to the network if it
5355           finds a device which is connected to the good local disks but
5356           not network enabled
5357       - wait for sync across all devices
5358       - remove all disks from the old secondary
5359
5360     Failures are not very well handled.
5361
5362     """
5363     steps_total = 6
5364     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5365     instance = self.instance
5366     iv_names = {}
5367     # start of work
5368     cfg = self.cfg
5369     old_node = self.tgt_node
5370     new_node = self.new_node
5371     pri_node = instance.primary_node
5372     nodes_ip = {
5373       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5374       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5375       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5376       }
5377
5378     # Step: check device activation
5379     self.proc.LogStep(1, steps_total, "check device existence")
5380     info("checking volume groups")
5381     my_vg = cfg.GetVGName()
5382     results = self.rpc.call_vg_list([pri_node, new_node])
5383     for node in pri_node, new_node:
5384       res = results[node]
5385       if res.failed or not res.data or my_vg not in res.data:
5386         raise errors.OpExecError("Volume group '%s' not found on %s" %
5387                                  (my_vg, node))
5388     for idx, dev in enumerate(instance.disks):
5389       if idx not in self.op.disks:
5390         continue
5391       info("checking disk/%d on %s" % (idx, pri_node))
5392       cfg.SetDiskID(dev, pri_node)
5393       result = self.rpc.call_blockdev_find(pri_node, dev)
5394       msg = result.RemoteFailMsg()
5395       if not msg and not result.payload:
5396         msg = "disk not found"
5397       if msg:
5398         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5399                                  (idx, pri_node, msg))
5400
5401     # Step: check other node consistency
5402     self.proc.LogStep(2, steps_total, "check peer consistency")
5403     for idx, dev in enumerate(instance.disks):
5404       if idx not in self.op.disks:
5405         continue
5406       info("checking disk/%d consistency on %s" % (idx, pri_node))
5407       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5408         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5409                                  " unsafe to replace the secondary" %
5410                                  pri_node)
5411
5412     # Step: create new storage
5413     self.proc.LogStep(3, steps_total, "allocate new storage")
5414     for idx, dev in enumerate(instance.disks):
5415       info("adding new local storage on %s for disk/%d" %
5416            (new_node, idx))
5417       # we pass force_create=True to force LVM creation
5418       for new_lv in dev.children:
5419         _CreateBlockDev(self, new_node, instance, new_lv, True,
5420                         _GetInstanceInfoText(instance), False)
5421
5422     # Step 4: dbrd minors and drbd setups changes
5423     # after this, we must manually remove the drbd minors on both the
5424     # error and the success paths
5425     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5426                                    instance.name)
5427     logging.debug("Allocated minors %s" % (minors,))
5428     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5429     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5430       size = dev.size
5431       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5432       # create new devices on new_node; note that we create two IDs:
5433       # one without port, so the drbd will be activated without
5434       # networking information on the new node at this stage, and one
5435       # with network, for the latter activation in step 4
5436       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5437       if pri_node == o_node1:
5438         p_minor = o_minor1
5439       else:
5440         p_minor = o_minor2
5441
5442       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5443       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5444
5445       iv_names[idx] = (dev, dev.children, new_net_id)
5446       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5447                     new_net_id)
5448       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5449                               logical_id=new_alone_id,
5450                               children=dev.children,
5451                               size=dev.size)
5452       try:
5453         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5454                               _GetInstanceInfoText(instance), False)
5455       except errors.GenericError:
5456         self.cfg.ReleaseDRBDMinors(instance.name)
5457         raise
5458
5459     for idx, dev in enumerate(instance.disks):
5460       # we have new devices, shutdown the drbd on the old secondary
5461       info("shutting down drbd for disk/%d on old node" % idx)
5462       cfg.SetDiskID(dev, old_node)
5463       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5464       if msg:
5465         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5466                 (idx, msg),
5467                 hint="Please cleanup this device manually as soon as possible")
5468
5469     info("detaching primary drbds from the network (=> standalone)")
5470     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5471                                                instance.disks)[pri_node]
5472
5473     msg = result.RemoteFailMsg()
5474     if msg:
5475       # detaches didn't succeed (unlikely)
5476       self.cfg.ReleaseDRBDMinors(instance.name)
5477       raise errors.OpExecError("Can't detach the disks from the network on"
5478                                " old node: %s" % (msg,))
5479
5480     # if we managed to detach at least one, we update all the disks of
5481     # the instance to point to the new secondary
5482     info("updating instance configuration")
5483     for dev, _, new_logical_id in iv_names.itervalues():
5484       dev.logical_id = new_logical_id
5485       cfg.SetDiskID(dev, pri_node)
5486     cfg.Update(instance)
5487
5488     # and now perform the drbd attach
5489     info("attaching primary drbds to new secondary (standalone => connected)")
5490     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5491                                            instance.disks, instance.name,
5492                                            False)
5493     for to_node, to_result in result.items():
5494       msg = to_result.RemoteFailMsg()
5495       if msg:
5496         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5497                 hint="please do a gnt-instance info to see the"
5498                 " status of disks")
5499
5500     # this can fail as the old devices are degraded and _WaitForSync
5501     # does a combined result over all disks, so we don't check its
5502     # return value
5503     self.proc.LogStep(5, steps_total, "sync devices")
5504     _WaitForSync(self, instance, unlock=True)
5505
5506     # so check manually all the devices
5507     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5508       cfg.SetDiskID(dev, pri_node)
5509       result = self.rpc.call_blockdev_find(pri_node, dev)
5510       msg = result.RemoteFailMsg()
5511       if not msg and not result.payload:
5512         msg = "disk not found"
5513       if msg:
5514         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5515                                  (idx, msg))
5516       if result.payload[5]:
5517         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5518
5519     self.proc.LogStep(6, steps_total, "removing old storage")
5520     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5521       info("remove logical volumes for disk/%d" % idx)
5522       for lv in old_lvs:
5523         cfg.SetDiskID(lv, old_node)
5524         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5525         if msg:
5526           warning("Can't remove LV on old secondary: %s", msg,
5527                   hint="Cleanup stale volumes by hand")
5528
5529   def Exec(self, feedback_fn):
5530     """Execute disk replacement.
5531
5532     This dispatches the disk replacement to the appropriate handler.
5533
5534     """
5535     instance = self.instance
5536
5537     # Activate the instance disks if we're replacing them on a down instance
5538     if not instance.admin_up:
5539       _StartInstanceDisks(self, instance, True)
5540
5541     if self.op.mode == constants.REPLACE_DISK_CHG:
5542       fn = self._ExecD8Secondary
5543     else:
5544       fn = self._ExecD8DiskOnly
5545
5546     ret = fn(feedback_fn)
5547
5548     # Deactivate the instance disks if we're replacing them on a down instance
5549     if not instance.admin_up:
5550       _SafeShutdownInstanceDisks(self, instance)
5551
5552     return ret
5553
5554
5555 class LUGrowDisk(LogicalUnit):
5556   """Grow a disk of an instance.
5557
5558   """
5559   HPATH = "disk-grow"
5560   HTYPE = constants.HTYPE_INSTANCE
5561   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5562   REQ_BGL = False
5563
5564   def ExpandNames(self):
5565     self._ExpandAndLockInstance()
5566     self.needed_locks[locking.LEVEL_NODE] = []
5567     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5568
5569   def DeclareLocks(self, level):
5570     if level == locking.LEVEL_NODE:
5571       self._LockInstancesNodes()
5572
5573   def BuildHooksEnv(self):
5574     """Build hooks env.
5575
5576     This runs on the master, the primary and all the secondaries.
5577
5578     """
5579     env = {
5580       "DISK": self.op.disk,
5581       "AMOUNT": self.op.amount,
5582       }
5583     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5584     nl = [
5585       self.cfg.GetMasterNode(),
5586       self.instance.primary_node,
5587       ]
5588     return env, nl, nl
5589
5590   def CheckPrereq(self):
5591     """Check prerequisites.
5592
5593     This checks that the instance is in the cluster.
5594
5595     """
5596     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5597     assert instance is not None, \
5598       "Cannot retrieve locked instance %s" % self.op.instance_name
5599     nodenames = list(instance.all_nodes)
5600     for node in nodenames:
5601       _CheckNodeOnline(self, node)
5602
5603
5604     self.instance = instance
5605
5606     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5607       raise errors.OpPrereqError("Instance's disk layout does not support"
5608                                  " growing.")
5609
5610     self.disk = instance.FindDisk(self.op.disk)
5611
5612     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5613                                        instance.hypervisor)
5614     for node in nodenames:
5615       info = nodeinfo[node]
5616       if info.failed or not info.data:
5617         raise errors.OpPrereqError("Cannot get current information"
5618                                    " from node '%s'" % node)
5619       vg_free = info.data.get('vg_free', None)
5620       if not isinstance(vg_free, int):
5621         raise errors.OpPrereqError("Can't compute free disk space on"
5622                                    " node %s" % node)
5623       if self.op.amount > vg_free:
5624         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5625                                    " %d MiB available, %d MiB required" %
5626                                    (node, vg_free, self.op.amount))
5627
5628   def Exec(self, feedback_fn):
5629     """Execute disk grow.
5630
5631     """
5632     instance = self.instance
5633     disk = self.disk
5634     for node in instance.all_nodes:
5635       self.cfg.SetDiskID(disk, node)
5636       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5637       msg = result.RemoteFailMsg()
5638       if msg:
5639         raise errors.OpExecError("Grow request failed to node %s: %s" %
5640                                  (node, msg))
5641     disk.RecordGrow(self.op.amount)
5642     self.cfg.Update(instance)
5643     if self.op.wait_for_sync:
5644       disk_abort = not _WaitForSync(self, instance)
5645       if disk_abort:
5646         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5647                              " status.\nPlease check the instance.")
5648
5649
5650 class LUQueryInstanceData(NoHooksLU):
5651   """Query runtime instance data.
5652
5653   """
5654   _OP_REQP = ["instances", "static"]
5655   REQ_BGL = False
5656
5657   def ExpandNames(self):
5658     self.needed_locks = {}
5659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5660
5661     if not isinstance(self.op.instances, list):
5662       raise errors.OpPrereqError("Invalid argument type 'instances'")
5663
5664     if self.op.instances:
5665       self.wanted_names = []
5666       for name in self.op.instances:
5667         full_name = self.cfg.ExpandInstanceName(name)
5668         if full_name is None:
5669           raise errors.OpPrereqError("Instance '%s' not known" % name)
5670         self.wanted_names.append(full_name)
5671       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5672     else:
5673       self.wanted_names = None
5674       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5675
5676     self.needed_locks[locking.LEVEL_NODE] = []
5677     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5678
5679   def DeclareLocks(self, level):
5680     if level == locking.LEVEL_NODE:
5681       self._LockInstancesNodes()
5682
5683   def CheckPrereq(self):
5684     """Check prerequisites.
5685
5686     This only checks the optional instance list against the existing names.
5687
5688     """
5689     if self.wanted_names is None:
5690       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5691
5692     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5693                              in self.wanted_names]
5694     return
5695
5696   def _ComputeDiskStatus(self, instance, snode, dev):
5697     """Compute block device status.
5698
5699     """
5700     static = self.op.static
5701     if not static:
5702       self.cfg.SetDiskID(dev, instance.primary_node)
5703       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5704       if dev_pstatus.offline:
5705         dev_pstatus = None
5706       else:
5707         msg = dev_pstatus.RemoteFailMsg()
5708         if msg:
5709           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5710                                    (instance.name, msg))
5711         dev_pstatus = dev_pstatus.payload
5712     else:
5713       dev_pstatus = None
5714
5715     if dev.dev_type in constants.LDS_DRBD:
5716       # we change the snode then (otherwise we use the one passed in)
5717       if dev.logical_id[0] == instance.primary_node:
5718         snode = dev.logical_id[1]
5719       else:
5720         snode = dev.logical_id[0]
5721
5722     if snode and not static:
5723       self.cfg.SetDiskID(dev, snode)
5724       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5725       if dev_sstatus.offline:
5726         dev_sstatus = None
5727       else:
5728         msg = dev_sstatus.RemoteFailMsg()
5729         if msg:
5730           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5731                                    (instance.name, msg))
5732         dev_sstatus = dev_sstatus.payload
5733     else:
5734       dev_sstatus = None
5735
5736     if dev.children:
5737       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5738                       for child in dev.children]
5739     else:
5740       dev_children = []
5741
5742     data = {
5743       "iv_name": dev.iv_name,
5744       "dev_type": dev.dev_type,
5745       "logical_id": dev.logical_id,
5746       "physical_id": dev.physical_id,
5747       "pstatus": dev_pstatus,
5748       "sstatus": dev_sstatus,
5749       "children": dev_children,
5750       "mode": dev.mode,
5751       "size": dev.size,
5752       }
5753
5754     return data
5755
5756   def Exec(self, feedback_fn):
5757     """Gather and return data"""
5758     result = {}
5759
5760     cluster = self.cfg.GetClusterInfo()
5761
5762     for instance in self.wanted_instances:
5763       if not self.op.static:
5764         remote_info = self.rpc.call_instance_info(instance.primary_node,
5765                                                   instance.name,
5766                                                   instance.hypervisor)
5767         remote_info.Raise()
5768         remote_info = remote_info.data
5769         if remote_info and "state" in remote_info:
5770           remote_state = "up"
5771         else:
5772           remote_state = "down"
5773       else:
5774         remote_state = None
5775       if instance.admin_up:
5776         config_state = "up"
5777       else:
5778         config_state = "down"
5779
5780       disks = [self._ComputeDiskStatus(instance, None, device)
5781                for device in instance.disks]
5782
5783       idict = {
5784         "name": instance.name,
5785         "config_state": config_state,
5786         "run_state": remote_state,
5787         "pnode": instance.primary_node,
5788         "snodes": instance.secondary_nodes,
5789         "os": instance.os,
5790         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5791         "disks": disks,
5792         "hypervisor": instance.hypervisor,
5793         "network_port": instance.network_port,
5794         "hv_instance": instance.hvparams,
5795         "hv_actual": cluster.FillHV(instance),
5796         "be_instance": instance.beparams,
5797         "be_actual": cluster.FillBE(instance),
5798         }
5799
5800       result[instance.name] = idict
5801
5802     return result
5803
5804
5805 class LUSetInstanceParams(LogicalUnit):
5806   """Modifies an instances's parameters.
5807
5808   """
5809   HPATH = "instance-modify"
5810   HTYPE = constants.HTYPE_INSTANCE
5811   _OP_REQP = ["instance_name"]
5812   REQ_BGL = False
5813
5814   def CheckArguments(self):
5815     if not hasattr(self.op, 'nics'):
5816       self.op.nics = []
5817     if not hasattr(self.op, 'disks'):
5818       self.op.disks = []
5819     if not hasattr(self.op, 'beparams'):
5820       self.op.beparams = {}
5821     if not hasattr(self.op, 'hvparams'):
5822       self.op.hvparams = {}
5823     self.op.force = getattr(self.op, "force", False)
5824     if not (self.op.nics or self.op.disks or
5825             self.op.hvparams or self.op.beparams):
5826       raise errors.OpPrereqError("No changes submitted")
5827
5828     # Disk validation
5829     disk_addremove = 0
5830     for disk_op, disk_dict in self.op.disks:
5831       if disk_op == constants.DDM_REMOVE:
5832         disk_addremove += 1
5833         continue
5834       elif disk_op == constants.DDM_ADD:
5835         disk_addremove += 1
5836       else:
5837         if not isinstance(disk_op, int):
5838           raise errors.OpPrereqError("Invalid disk index")
5839       if disk_op == constants.DDM_ADD:
5840         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5841         if mode not in constants.DISK_ACCESS_SET:
5842           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5843         size = disk_dict.get('size', None)
5844         if size is None:
5845           raise errors.OpPrereqError("Required disk parameter size missing")
5846         try:
5847           size = int(size)
5848         except ValueError, err:
5849           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5850                                      str(err))
5851         disk_dict['size'] = size
5852       else:
5853         # modification of disk
5854         if 'size' in disk_dict:
5855           raise errors.OpPrereqError("Disk size change not possible, use"
5856                                      " grow-disk")
5857
5858     if disk_addremove > 1:
5859       raise errors.OpPrereqError("Only one disk add or remove operation"
5860                                  " supported at a time")
5861
5862     # NIC validation
5863     nic_addremove = 0
5864     for nic_op, nic_dict in self.op.nics:
5865       if nic_op == constants.DDM_REMOVE:
5866         nic_addremove += 1
5867         continue
5868       elif nic_op == constants.DDM_ADD:
5869         nic_addremove += 1
5870       else:
5871         if not isinstance(nic_op, int):
5872           raise errors.OpPrereqError("Invalid nic index")
5873
5874       # nic_dict should be a dict
5875       nic_ip = nic_dict.get('ip', None)
5876       if nic_ip is not None:
5877         if nic_ip.lower() == constants.VALUE_NONE:
5878           nic_dict['ip'] = None
5879         else:
5880           if not utils.IsValidIP(nic_ip):
5881             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5882
5883       if nic_op == constants.DDM_ADD:
5884         nic_bridge = nic_dict.get('bridge', None)
5885         if nic_bridge is None:
5886           nic_dict['bridge'] = self.cfg.GetDefBridge()
5887         nic_mac = nic_dict.get('mac', None)
5888         if nic_mac is None:
5889           nic_dict['mac'] = constants.VALUE_AUTO
5890
5891       if 'mac' in nic_dict:
5892         nic_mac = nic_dict['mac']
5893         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5894           if not utils.IsValidMac(nic_mac):
5895             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5896         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5897           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5898                                      " modifying an existing nic")
5899
5900     if nic_addremove > 1:
5901       raise errors.OpPrereqError("Only one NIC add or remove operation"
5902                                  " supported at a time")
5903
5904   def ExpandNames(self):
5905     self._ExpandAndLockInstance()
5906     self.needed_locks[locking.LEVEL_NODE] = []
5907     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5908
5909   def DeclareLocks(self, level):
5910     if level == locking.LEVEL_NODE:
5911       self._LockInstancesNodes()
5912
5913   def BuildHooksEnv(self):
5914     """Build hooks env.
5915
5916     This runs on the master, primary and secondaries.
5917
5918     """
5919     args = dict()
5920     if constants.BE_MEMORY in self.be_new:
5921       args['memory'] = self.be_new[constants.BE_MEMORY]
5922     if constants.BE_VCPUS in self.be_new:
5923       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5924     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5925     # information at all.
5926     if self.op.nics:
5927       args['nics'] = []
5928       nic_override = dict(self.op.nics)
5929       for idx, nic in enumerate(self.instance.nics):
5930         if idx in nic_override:
5931           this_nic_override = nic_override[idx]
5932         else:
5933           this_nic_override = {}
5934         if 'ip' in this_nic_override:
5935           ip = this_nic_override['ip']
5936         else:
5937           ip = nic.ip
5938         if 'bridge' in this_nic_override:
5939           bridge = this_nic_override['bridge']
5940         else:
5941           bridge = nic.bridge
5942         if 'mac' in this_nic_override:
5943           mac = this_nic_override['mac']
5944         else:
5945           mac = nic.mac
5946         args['nics'].append((ip, bridge, mac))
5947       if constants.DDM_ADD in nic_override:
5948         ip = nic_override[constants.DDM_ADD].get('ip', None)
5949         bridge = nic_override[constants.DDM_ADD]['bridge']
5950         mac = nic_override[constants.DDM_ADD]['mac']
5951         args['nics'].append((ip, bridge, mac))
5952       elif constants.DDM_REMOVE in nic_override:
5953         del args['nics'][-1]
5954
5955     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5956     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5957     return env, nl, nl
5958
5959   def CheckPrereq(self):
5960     """Check prerequisites.
5961
5962     This only checks the instance list against the existing names.
5963
5964     """
5965     force = self.force = self.op.force
5966
5967     # checking the new params on the primary/secondary nodes
5968
5969     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5970     assert self.instance is not None, \
5971       "Cannot retrieve locked instance %s" % self.op.instance_name
5972     pnode = instance.primary_node
5973     nodelist = list(instance.all_nodes)
5974
5975     # hvparams processing
5976     if self.op.hvparams:
5977       i_hvdict = copy.deepcopy(instance.hvparams)
5978       for key, val in self.op.hvparams.iteritems():
5979         if val == constants.VALUE_DEFAULT:
5980           try:
5981             del i_hvdict[key]
5982           except KeyError:
5983             pass
5984         else:
5985           i_hvdict[key] = val
5986       cluster = self.cfg.GetClusterInfo()
5987       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5988       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5989                                 i_hvdict)
5990       # local check
5991       hypervisor.GetHypervisor(
5992         instance.hypervisor).CheckParameterSyntax(hv_new)
5993       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5994       self.hv_new = hv_new # the new actual values
5995       self.hv_inst = i_hvdict # the new dict (without defaults)
5996     else:
5997       self.hv_new = self.hv_inst = {}
5998
5999     # beparams processing
6000     if self.op.beparams:
6001       i_bedict = copy.deepcopy(instance.beparams)
6002       for key, val in self.op.beparams.iteritems():
6003         if val == constants.VALUE_DEFAULT:
6004           try:
6005             del i_bedict[key]
6006           except KeyError:
6007             pass
6008         else:
6009           i_bedict[key] = val
6010       cluster = self.cfg.GetClusterInfo()
6011       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6012       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6013                                 i_bedict)
6014       self.be_new = be_new # the new actual values
6015       self.be_inst = i_bedict # the new dict (without defaults)
6016     else:
6017       self.be_new = self.be_inst = {}
6018
6019     self.warn = []
6020
6021     if constants.BE_MEMORY in self.op.beparams and not self.force:
6022       mem_check_list = [pnode]
6023       if be_new[constants.BE_AUTO_BALANCE]:
6024         # either we changed auto_balance to yes or it was from before
6025         mem_check_list.extend(instance.secondary_nodes)
6026       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6027                                                   instance.hypervisor)
6028       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6029                                          instance.hypervisor)
6030       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6031         # Assume the primary node is unreachable and go ahead
6032         self.warn.append("Can't get info from primary node %s" % pnode)
6033       else:
6034         if not instance_info.failed and instance_info.data:
6035           current_mem = int(instance_info.data['memory'])
6036         else:
6037           # Assume instance not running
6038           # (there is a slight race condition here, but it's not very probable,
6039           # and we have no other way to check)
6040           current_mem = 0
6041         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6042                     nodeinfo[pnode].data['memory_free'])
6043         if miss_mem > 0:
6044           raise errors.OpPrereqError("This change will prevent the instance"
6045                                      " from starting, due to %d MB of memory"
6046                                      " missing on its primary node" % miss_mem)
6047
6048       if be_new[constants.BE_AUTO_BALANCE]:
6049         for node, nres in nodeinfo.iteritems():
6050           if node not in instance.secondary_nodes:
6051             continue
6052           if nres.failed or not isinstance(nres.data, dict):
6053             self.warn.append("Can't get info from secondary node %s" % node)
6054           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6055             self.warn.append("Not enough memory to failover instance to"
6056                              " secondary node %s" % node)
6057
6058     # NIC processing
6059     for nic_op, nic_dict in self.op.nics:
6060       if nic_op == constants.DDM_REMOVE:
6061         if not instance.nics:
6062           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6063         continue
6064       if nic_op != constants.DDM_ADD:
6065         # an existing nic
6066         if nic_op < 0 or nic_op >= len(instance.nics):
6067           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6068                                      " are 0 to %d" %
6069                                      (nic_op, len(instance.nics)))
6070       if 'bridge' in nic_dict:
6071         nic_bridge = nic_dict['bridge']
6072         if nic_bridge is None:
6073           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6074         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6075           msg = ("Bridge '%s' doesn't exist on one of"
6076                  " the instance nodes" % nic_bridge)
6077           if self.force:
6078             self.warn.append(msg)
6079           else:
6080             raise errors.OpPrereqError(msg)
6081       if 'mac' in nic_dict:
6082         nic_mac = nic_dict['mac']
6083         if nic_mac is None:
6084           raise errors.OpPrereqError('Cannot set the nic mac to None')
6085         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6086           # otherwise generate the mac
6087           nic_dict['mac'] = self.cfg.GenerateMAC()
6088         else:
6089           # or validate/reserve the current one
6090           if self.cfg.IsMacInUse(nic_mac):
6091             raise errors.OpPrereqError("MAC address %s already in use"
6092                                        " in cluster" % nic_mac)
6093
6094     # DISK processing
6095     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6096       raise errors.OpPrereqError("Disk operations not supported for"
6097                                  " diskless instances")
6098     for disk_op, disk_dict in self.op.disks:
6099       if disk_op == constants.DDM_REMOVE:
6100         if len(instance.disks) == 1:
6101           raise errors.OpPrereqError("Cannot remove the last disk of"
6102                                      " an instance")
6103         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6104         ins_l = ins_l[pnode]
6105         if ins_l.failed or not isinstance(ins_l.data, list):
6106           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6107         if instance.name in ins_l.data:
6108           raise errors.OpPrereqError("Instance is running, can't remove"
6109                                      " disks.")
6110
6111       if (disk_op == constants.DDM_ADD and
6112           len(instance.nics) >= constants.MAX_DISKS):
6113         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6114                                    " add more" % constants.MAX_DISKS)
6115       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6116         # an existing disk
6117         if disk_op < 0 or disk_op >= len(instance.disks):
6118           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6119                                      " are 0 to %d" %
6120                                      (disk_op, len(instance.disks)))
6121
6122     return
6123
6124   def Exec(self, feedback_fn):
6125     """Modifies an instance.
6126
6127     All parameters take effect only at the next restart of the instance.
6128
6129     """
6130     # Process here the warnings from CheckPrereq, as we don't have a
6131     # feedback_fn there.
6132     for warn in self.warn:
6133       feedback_fn("WARNING: %s" % warn)
6134
6135     result = []
6136     instance = self.instance
6137     # disk changes
6138     for disk_op, disk_dict in self.op.disks:
6139       if disk_op == constants.DDM_REMOVE:
6140         # remove the last disk
6141         device = instance.disks.pop()
6142         device_idx = len(instance.disks)
6143         for node, disk in device.ComputeNodeTree(instance.primary_node):
6144           self.cfg.SetDiskID(disk, node)
6145           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6146           if msg:
6147             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6148                             " continuing anyway", device_idx, node, msg)
6149         result.append(("disk/%d" % device_idx, "remove"))
6150       elif disk_op == constants.DDM_ADD:
6151         # add a new disk
6152         if instance.disk_template == constants.DT_FILE:
6153           file_driver, file_path = instance.disks[0].logical_id
6154           file_path = os.path.dirname(file_path)
6155         else:
6156           file_driver = file_path = None
6157         disk_idx_base = len(instance.disks)
6158         new_disk = _GenerateDiskTemplate(self,
6159                                          instance.disk_template,
6160                                          instance.name, instance.primary_node,
6161                                          instance.secondary_nodes,
6162                                          [disk_dict],
6163                                          file_path,
6164                                          file_driver,
6165                                          disk_idx_base)[0]
6166         instance.disks.append(new_disk)
6167         info = _GetInstanceInfoText(instance)
6168
6169         logging.info("Creating volume %s for instance %s",
6170                      new_disk.iv_name, instance.name)
6171         # Note: this needs to be kept in sync with _CreateDisks
6172         #HARDCODE
6173         for node in instance.all_nodes:
6174           f_create = node == instance.primary_node
6175           try:
6176             _CreateBlockDev(self, node, instance, new_disk,
6177                             f_create, info, f_create)
6178           except errors.OpExecError, err:
6179             self.LogWarning("Failed to create volume %s (%s) on"
6180                             " node %s: %s",
6181                             new_disk.iv_name, new_disk, node, err)
6182         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6183                        (new_disk.size, new_disk.mode)))
6184       else:
6185         # change a given disk
6186         instance.disks[disk_op].mode = disk_dict['mode']
6187         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6188     # NIC changes
6189     for nic_op, nic_dict in self.op.nics:
6190       if nic_op == constants.DDM_REMOVE:
6191         # remove the last nic
6192         del instance.nics[-1]
6193         result.append(("nic.%d" % len(instance.nics), "remove"))
6194       elif nic_op == constants.DDM_ADD:
6195         # mac and bridge should be set, by now
6196         mac = nic_dict['mac']
6197         bridge = nic_dict['bridge']
6198         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6199                               bridge=bridge)
6200         instance.nics.append(new_nic)
6201         result.append(("nic.%d" % (len(instance.nics) - 1),
6202                        "add:mac=%s,ip=%s,bridge=%s" %
6203                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6204       else:
6205         # change a given nic
6206         for key in 'mac', 'ip', 'bridge':
6207           if key in nic_dict:
6208             setattr(instance.nics[nic_op], key, nic_dict[key])
6209             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6210
6211     # hvparams changes
6212     if self.op.hvparams:
6213       instance.hvparams = self.hv_inst
6214       for key, val in self.op.hvparams.iteritems():
6215         result.append(("hv/%s" % key, val))
6216
6217     # beparams changes
6218     if self.op.beparams:
6219       instance.beparams = self.be_inst
6220       for key, val in self.op.beparams.iteritems():
6221         result.append(("be/%s" % key, val))
6222
6223     self.cfg.Update(instance)
6224
6225     return result
6226
6227
6228 class LUQueryExports(NoHooksLU):
6229   """Query the exports list
6230
6231   """
6232   _OP_REQP = ['nodes']
6233   REQ_BGL = False
6234
6235   def ExpandNames(self):
6236     self.needed_locks = {}
6237     self.share_locks[locking.LEVEL_NODE] = 1
6238     if not self.op.nodes:
6239       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6240     else:
6241       self.needed_locks[locking.LEVEL_NODE] = \
6242         _GetWantedNodes(self, self.op.nodes)
6243
6244   def CheckPrereq(self):
6245     """Check prerequisites.
6246
6247     """
6248     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6249
6250   def Exec(self, feedback_fn):
6251     """Compute the list of all the exported system images.
6252
6253     @rtype: dict
6254     @return: a dictionary with the structure node->(export-list)
6255         where export-list is a list of the instances exported on
6256         that node.
6257
6258     """
6259     rpcresult = self.rpc.call_export_list(self.nodes)
6260     result = {}
6261     for node in rpcresult:
6262       if rpcresult[node].failed:
6263         result[node] = False
6264       else:
6265         result[node] = rpcresult[node].data
6266
6267     return result
6268
6269
6270 class LUExportInstance(LogicalUnit):
6271   """Export an instance to an image in the cluster.
6272
6273   """
6274   HPATH = "instance-export"
6275   HTYPE = constants.HTYPE_INSTANCE
6276   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6277   REQ_BGL = False
6278
6279   def ExpandNames(self):
6280     self._ExpandAndLockInstance()
6281     # FIXME: lock only instance primary and destination node
6282     #
6283     # Sad but true, for now we have do lock all nodes, as we don't know where
6284     # the previous export might be, and and in this LU we search for it and
6285     # remove it from its current node. In the future we could fix this by:
6286     #  - making a tasklet to search (share-lock all), then create the new one,
6287     #    then one to remove, after
6288     #  - removing the removal operation altoghether
6289     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6290
6291   def DeclareLocks(self, level):
6292     """Last minute lock declaration."""
6293     # All nodes are locked anyway, so nothing to do here.
6294
6295   def BuildHooksEnv(self):
6296     """Build hooks env.
6297
6298     This will run on the master, primary node and target node.
6299
6300     """
6301     env = {
6302       "EXPORT_NODE": self.op.target_node,
6303       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6304       }
6305     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6306     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6307           self.op.target_node]
6308     return env, nl, nl
6309
6310   def CheckPrereq(self):
6311     """Check prerequisites.
6312
6313     This checks that the instance and node names are valid.
6314
6315     """
6316     instance_name = self.op.instance_name
6317     self.instance = self.cfg.GetInstanceInfo(instance_name)
6318     assert self.instance is not None, \
6319           "Cannot retrieve locked instance %s" % self.op.instance_name
6320     _CheckNodeOnline(self, self.instance.primary_node)
6321
6322     self.dst_node = self.cfg.GetNodeInfo(
6323       self.cfg.ExpandNodeName(self.op.target_node))
6324
6325     if self.dst_node is None:
6326       # This is wrong node name, not a non-locked node
6327       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6328     _CheckNodeOnline(self, self.dst_node.name)
6329     _CheckNodeNotDrained(self, self.dst_node.name)
6330
6331     # instance disk type verification
6332     for disk in self.instance.disks:
6333       if disk.dev_type == constants.LD_FILE:
6334         raise errors.OpPrereqError("Export not supported for instances with"
6335                                    " file-based disks")
6336
6337   def Exec(self, feedback_fn):
6338     """Export an instance to an image in the cluster.
6339
6340     """
6341     instance = self.instance
6342     dst_node = self.dst_node
6343     src_node = instance.primary_node
6344     if self.op.shutdown:
6345       # shutdown the instance, but not the disks
6346       result = self.rpc.call_instance_shutdown(src_node, instance)
6347       msg = result.RemoteFailMsg()
6348       if msg:
6349         raise errors.OpExecError("Could not shutdown instance %s on"
6350                                  " node %s: %s" %
6351                                  (instance.name, src_node, msg))
6352
6353     vgname = self.cfg.GetVGName()
6354
6355     snap_disks = []
6356
6357     # set the disks ID correctly since call_instance_start needs the
6358     # correct drbd minor to create the symlinks
6359     for disk in instance.disks:
6360       self.cfg.SetDiskID(disk, src_node)
6361
6362     try:
6363       for idx, disk in enumerate(instance.disks):
6364         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6365         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6366         if new_dev_name.failed or not new_dev_name.data:
6367           self.LogWarning("Could not snapshot disk/%d on node %s",
6368                           idx, src_node)
6369           snap_disks.append(False)
6370         else:
6371           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6372                                  logical_id=(vgname, new_dev_name.data),
6373                                  physical_id=(vgname, new_dev_name.data),
6374                                  iv_name=disk.iv_name)
6375           snap_disks.append(new_dev)
6376
6377     finally:
6378       if self.op.shutdown and instance.admin_up:
6379         result = self.rpc.call_instance_start(src_node, instance, None, None)
6380         msg = result.RemoteFailMsg()
6381         if msg:
6382           _ShutdownInstanceDisks(self, instance)
6383           raise errors.OpExecError("Could not start instance: %s" % msg)
6384
6385     # TODO: check for size
6386
6387     cluster_name = self.cfg.GetClusterName()
6388     for idx, dev in enumerate(snap_disks):
6389       if dev:
6390         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6391                                                instance, cluster_name, idx)
6392         if result.failed or not result.data:
6393           self.LogWarning("Could not export disk/%d from node %s to"
6394                           " node %s", idx, src_node, dst_node.name)
6395         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6396         if msg:
6397           self.LogWarning("Could not remove snapshot for disk/%d from node"
6398                           " %s: %s", idx, src_node, msg)
6399
6400     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6401     if result.failed or not result.data:
6402       self.LogWarning("Could not finalize export for instance %s on node %s",
6403                       instance.name, dst_node.name)
6404
6405     nodelist = self.cfg.GetNodeList()
6406     nodelist.remove(dst_node.name)
6407
6408     # on one-node clusters nodelist will be empty after the removal
6409     # if we proceed the backup would be removed because OpQueryExports
6410     # substitutes an empty list with the full cluster node list.
6411     if nodelist:
6412       exportlist = self.rpc.call_export_list(nodelist)
6413       for node in exportlist:
6414         if exportlist[node].failed:
6415           continue
6416         if instance.name in exportlist[node].data:
6417           if not self.rpc.call_export_remove(node, instance.name):
6418             self.LogWarning("Could not remove older export for instance %s"
6419                             " on node %s", instance.name, node)
6420
6421
6422 class LURemoveExport(NoHooksLU):
6423   """Remove exports related to the named instance.
6424
6425   """
6426   _OP_REQP = ["instance_name"]
6427   REQ_BGL = False
6428
6429   def ExpandNames(self):
6430     self.needed_locks = {}
6431     # We need all nodes to be locked in order for RemoveExport to work, but we
6432     # don't need to lock the instance itself, as nothing will happen to it (and
6433     # we can remove exports also for a removed instance)
6434     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6435
6436   def CheckPrereq(self):
6437     """Check prerequisites.
6438     """
6439     pass
6440
6441   def Exec(self, feedback_fn):
6442     """Remove any export.
6443
6444     """
6445     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6446     # If the instance was not found we'll try with the name that was passed in.
6447     # This will only work if it was an FQDN, though.
6448     fqdn_warn = False
6449     if not instance_name:
6450       fqdn_warn = True
6451       instance_name = self.op.instance_name
6452
6453     exportlist = self.rpc.call_export_list(self.acquired_locks[
6454       locking.LEVEL_NODE])
6455     found = False
6456     for node in exportlist:
6457       if exportlist[node].failed:
6458         self.LogWarning("Failed to query node %s, continuing" % node)
6459         continue
6460       if instance_name in exportlist[node].data:
6461         found = True
6462         result = self.rpc.call_export_remove(node, instance_name)
6463         if result.failed or not result.data:
6464           logging.error("Could not remove export for instance %s"
6465                         " on node %s", instance_name, node)
6466
6467     if fqdn_warn and not found:
6468       feedback_fn("Export not found. If trying to remove an export belonging"
6469                   " to a deleted instance please use its Fully Qualified"
6470                   " Domain Name.")
6471
6472
6473 class TagsLU(NoHooksLU):
6474   """Generic tags LU.
6475
6476   This is an abstract class which is the parent of all the other tags LUs.
6477
6478   """
6479
6480   def ExpandNames(self):
6481     self.needed_locks = {}
6482     if self.op.kind == constants.TAG_NODE:
6483       name = self.cfg.ExpandNodeName(self.op.name)
6484       if name is None:
6485         raise errors.OpPrereqError("Invalid node name (%s)" %
6486                                    (self.op.name,))
6487       self.op.name = name
6488       self.needed_locks[locking.LEVEL_NODE] = name
6489     elif self.op.kind == constants.TAG_INSTANCE:
6490       name = self.cfg.ExpandInstanceName(self.op.name)
6491       if name is None:
6492         raise errors.OpPrereqError("Invalid instance name (%s)" %
6493                                    (self.op.name,))
6494       self.op.name = name
6495       self.needed_locks[locking.LEVEL_INSTANCE] = name
6496
6497   def CheckPrereq(self):
6498     """Check prerequisites.
6499
6500     """
6501     if self.op.kind == constants.TAG_CLUSTER:
6502       self.target = self.cfg.GetClusterInfo()
6503     elif self.op.kind == constants.TAG_NODE:
6504       self.target = self.cfg.GetNodeInfo(self.op.name)
6505     elif self.op.kind == constants.TAG_INSTANCE:
6506       self.target = self.cfg.GetInstanceInfo(self.op.name)
6507     else:
6508       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6509                                  str(self.op.kind))
6510
6511
6512 class LUGetTags(TagsLU):
6513   """Returns the tags of a given object.
6514
6515   """
6516   _OP_REQP = ["kind", "name"]
6517   REQ_BGL = False
6518
6519   def Exec(self, feedback_fn):
6520     """Returns the tag list.
6521
6522     """
6523     return list(self.target.GetTags())
6524
6525
6526 class LUSearchTags(NoHooksLU):
6527   """Searches the tags for a given pattern.
6528
6529   """
6530   _OP_REQP = ["pattern"]
6531   REQ_BGL = False
6532
6533   def ExpandNames(self):
6534     self.needed_locks = {}
6535
6536   def CheckPrereq(self):
6537     """Check prerequisites.
6538
6539     This checks the pattern passed for validity by compiling it.
6540
6541     """
6542     try:
6543       self.re = re.compile(self.op.pattern)
6544     except re.error, err:
6545       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6546                                  (self.op.pattern, err))
6547
6548   def Exec(self, feedback_fn):
6549     """Returns the tag list.
6550
6551     """
6552     cfg = self.cfg
6553     tgts = [("/cluster", cfg.GetClusterInfo())]
6554     ilist = cfg.GetAllInstancesInfo().values()
6555     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6556     nlist = cfg.GetAllNodesInfo().values()
6557     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6558     results = []
6559     for path, target in tgts:
6560       for tag in target.GetTags():
6561         if self.re.search(tag):
6562           results.append((path, tag))
6563     return results
6564
6565
6566 class LUAddTags(TagsLU):
6567   """Sets a tag on a given object.
6568
6569   """
6570   _OP_REQP = ["kind", "name", "tags"]
6571   REQ_BGL = False
6572
6573   def CheckPrereq(self):
6574     """Check prerequisites.
6575
6576     This checks the type and length of the tag name and value.
6577
6578     """
6579     TagsLU.CheckPrereq(self)
6580     for tag in self.op.tags:
6581       objects.TaggableObject.ValidateTag(tag)
6582
6583   def Exec(self, feedback_fn):
6584     """Sets the tag.
6585
6586     """
6587     try:
6588       for tag in self.op.tags:
6589         self.target.AddTag(tag)
6590     except errors.TagError, err:
6591       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6592     try:
6593       self.cfg.Update(self.target)
6594     except errors.ConfigurationError:
6595       raise errors.OpRetryError("There has been a modification to the"
6596                                 " config file and the operation has been"
6597                                 " aborted. Please retry.")
6598
6599
6600 class LUDelTags(TagsLU):
6601   """Delete a list of tags from a given object.
6602
6603   """
6604   _OP_REQP = ["kind", "name", "tags"]
6605   REQ_BGL = False
6606
6607   def CheckPrereq(self):
6608     """Check prerequisites.
6609
6610     This checks that we have the given tag.
6611
6612     """
6613     TagsLU.CheckPrereq(self)
6614     for tag in self.op.tags:
6615       objects.TaggableObject.ValidateTag(tag)
6616     del_tags = frozenset(self.op.tags)
6617     cur_tags = self.target.GetTags()
6618     if not del_tags <= cur_tags:
6619       diff_tags = del_tags - cur_tags
6620       diff_names = ["'%s'" % tag for tag in diff_tags]
6621       diff_names.sort()
6622       raise errors.OpPrereqError("Tag(s) %s not found" %
6623                                  (",".join(diff_names)))
6624
6625   def Exec(self, feedback_fn):
6626     """Remove the tag from the object.
6627
6628     """
6629     for tag in self.op.tags:
6630       self.target.RemoveTag(tag)
6631     try:
6632       self.cfg.Update(self.target)
6633     except errors.ConfigurationError:
6634       raise errors.OpRetryError("There has been a modification to the"
6635                                 " config file and the operation has been"
6636                                 " aborted. Please retry.")
6637
6638
6639 class LUTestDelay(NoHooksLU):
6640   """Sleep for a specified amount of time.
6641
6642   This LU sleeps on the master and/or nodes for a specified amount of
6643   time.
6644
6645   """
6646   _OP_REQP = ["duration", "on_master", "on_nodes"]
6647   REQ_BGL = False
6648
6649   def ExpandNames(self):
6650     """Expand names and set required locks.
6651
6652     This expands the node list, if any.
6653
6654     """
6655     self.needed_locks = {}
6656     if self.op.on_nodes:
6657       # _GetWantedNodes can be used here, but is not always appropriate to use
6658       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6659       # more information.
6660       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6661       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6662
6663   def CheckPrereq(self):
6664     """Check prerequisites.
6665
6666     """
6667
6668   def Exec(self, feedback_fn):
6669     """Do the actual sleep.
6670
6671     """
6672     if self.op.on_master:
6673       if not utils.TestDelay(self.op.duration):
6674         raise errors.OpExecError("Error during master delay test")
6675     if self.op.on_nodes:
6676       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6677       if not result:
6678         raise errors.OpExecError("Complete failure from rpc call")
6679       for node, node_result in result.items():
6680         node_result.Raise()
6681         if not node_result.data:
6682           raise errors.OpExecError("Failure during rpc call to node %s,"
6683                                    " result: %s" % (node, node_result.data))
6684
6685
6686 class IAllocator(object):
6687   """IAllocator framework.
6688
6689   An IAllocator instance has three sets of attributes:
6690     - cfg that is needed to query the cluster
6691     - input data (all members of the _KEYS class attribute are required)
6692     - four buffer attributes (in|out_data|text), that represent the
6693       input (to the external script) in text and data structure format,
6694       and the output from it, again in two formats
6695     - the result variables from the script (success, info, nodes) for
6696       easy usage
6697
6698   """
6699   _ALLO_KEYS = [
6700     "mem_size", "disks", "disk_template",
6701     "os", "tags", "nics", "vcpus", "hypervisor",
6702     ]
6703   _RELO_KEYS = [
6704     "relocate_from",
6705     ]
6706
6707   def __init__(self, lu, mode, name, **kwargs):
6708     self.lu = lu
6709     # init buffer variables
6710     self.in_text = self.out_text = self.in_data = self.out_data = None
6711     # init all input fields so that pylint is happy
6712     self.mode = mode
6713     self.name = name
6714     self.mem_size = self.disks = self.disk_template = None
6715     self.os = self.tags = self.nics = self.vcpus = None
6716     self.hypervisor = None
6717     self.relocate_from = None
6718     # computed fields
6719     self.required_nodes = None
6720     # init result fields
6721     self.success = self.info = self.nodes = None
6722     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6723       keyset = self._ALLO_KEYS
6724     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6725       keyset = self._RELO_KEYS
6726     else:
6727       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6728                                    " IAllocator" % self.mode)
6729     for key in kwargs:
6730       if key not in keyset:
6731         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6732                                      " IAllocator" % key)
6733       setattr(self, key, kwargs[key])
6734     for key in keyset:
6735       if key not in kwargs:
6736         raise errors.ProgrammerError("Missing input parameter '%s' to"
6737                                      " IAllocator" % key)
6738     self._BuildInputData()
6739
6740   def _ComputeClusterData(self):
6741     """Compute the generic allocator input data.
6742
6743     This is the data that is independent of the actual operation.
6744
6745     """
6746     cfg = self.lu.cfg
6747     cluster_info = cfg.GetClusterInfo()
6748     # cluster data
6749     data = {
6750       "version": constants.IALLOCATOR_VERSION,
6751       "cluster_name": cfg.GetClusterName(),
6752       "cluster_tags": list(cluster_info.GetTags()),
6753       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6754       # we don't have job IDs
6755       }
6756     iinfo = cfg.GetAllInstancesInfo().values()
6757     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6758
6759     # node data
6760     node_results = {}
6761     node_list = cfg.GetNodeList()
6762
6763     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6764       hypervisor_name = self.hypervisor
6765     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6766       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6767
6768     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6769                                            hypervisor_name)
6770     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6771                        cluster_info.enabled_hypervisors)
6772     for nname, nresult in node_data.items():
6773       # first fill in static (config-based) values
6774       ninfo = cfg.GetNodeInfo(nname)
6775       pnr = {
6776         "tags": list(ninfo.GetTags()),
6777         "primary_ip": ninfo.primary_ip,
6778         "secondary_ip": ninfo.secondary_ip,
6779         "offline": ninfo.offline,
6780         "drained": ninfo.drained,
6781         "master_candidate": ninfo.master_candidate,
6782         }
6783
6784       if not ninfo.offline:
6785         nresult.Raise()
6786         if not isinstance(nresult.data, dict):
6787           raise errors.OpExecError("Can't get data for node %s" % nname)
6788         remote_info = nresult.data
6789         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6790                      'vg_size', 'vg_free', 'cpu_total']:
6791           if attr not in remote_info:
6792             raise errors.OpExecError("Node '%s' didn't return attribute"
6793                                      " '%s'" % (nname, attr))
6794           try:
6795             remote_info[attr] = int(remote_info[attr])
6796           except ValueError, err:
6797             raise errors.OpExecError("Node '%s' returned invalid value"
6798                                      " for '%s': %s" % (nname, attr, err))
6799         # compute memory used by primary instances
6800         i_p_mem = i_p_up_mem = 0
6801         for iinfo, beinfo in i_list:
6802           if iinfo.primary_node == nname:
6803             i_p_mem += beinfo[constants.BE_MEMORY]
6804             if iinfo.name not in node_iinfo[nname].data:
6805               i_used_mem = 0
6806             else:
6807               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6808             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6809             remote_info['memory_free'] -= max(0, i_mem_diff)
6810
6811             if iinfo.admin_up:
6812               i_p_up_mem += beinfo[constants.BE_MEMORY]
6813
6814         # compute memory used by instances
6815         pnr_dyn = {
6816           "total_memory": remote_info['memory_total'],
6817           "reserved_memory": remote_info['memory_dom0'],
6818           "free_memory": remote_info['memory_free'],
6819           "total_disk": remote_info['vg_size'],
6820           "free_disk": remote_info['vg_free'],
6821           "total_cpus": remote_info['cpu_total'],
6822           "i_pri_memory": i_p_mem,
6823           "i_pri_up_memory": i_p_up_mem,
6824           }
6825         pnr.update(pnr_dyn)
6826
6827       node_results[nname] = pnr
6828     data["nodes"] = node_results
6829
6830     # instance data
6831     instance_data = {}
6832     for iinfo, beinfo in i_list:
6833       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6834                   for n in iinfo.nics]
6835       pir = {
6836         "tags": list(iinfo.GetTags()),
6837         "admin_up": iinfo.admin_up,
6838         "vcpus": beinfo[constants.BE_VCPUS],
6839         "memory": beinfo[constants.BE_MEMORY],
6840         "os": iinfo.os,
6841         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6842         "nics": nic_data,
6843         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6844         "disk_template": iinfo.disk_template,
6845         "hypervisor": iinfo.hypervisor,
6846         }
6847       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6848                                                  pir["disks"])
6849       instance_data[iinfo.name] = pir
6850
6851     data["instances"] = instance_data
6852
6853     self.in_data = data
6854
6855   def _AddNewInstance(self):
6856     """Add new instance data to allocator structure.
6857
6858     This in combination with _AllocatorGetClusterData will create the
6859     correct structure needed as input for the allocator.
6860
6861     The checks for the completeness of the opcode must have already been
6862     done.
6863
6864     """
6865     data = self.in_data
6866
6867     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6868
6869     if self.disk_template in constants.DTS_NET_MIRROR:
6870       self.required_nodes = 2
6871     else:
6872       self.required_nodes = 1
6873     request = {
6874       "type": "allocate",
6875       "name": self.name,
6876       "disk_template": self.disk_template,
6877       "tags": self.tags,
6878       "os": self.os,
6879       "vcpus": self.vcpus,
6880       "memory": self.mem_size,
6881       "disks": self.disks,
6882       "disk_space_total": disk_space,
6883       "nics": self.nics,
6884       "required_nodes": self.required_nodes,
6885       }
6886     data["request"] = request
6887
6888   def _AddRelocateInstance(self):
6889     """Add relocate instance data to allocator structure.
6890
6891     This in combination with _IAllocatorGetClusterData will create the
6892     correct structure needed as input for the allocator.
6893
6894     The checks for the completeness of the opcode must have already been
6895     done.
6896
6897     """
6898     instance = self.lu.cfg.GetInstanceInfo(self.name)
6899     if instance is None:
6900       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6901                                    " IAllocator" % self.name)
6902
6903     if instance.disk_template not in constants.DTS_NET_MIRROR:
6904       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6905
6906     if len(instance.secondary_nodes) != 1:
6907       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6908
6909     self.required_nodes = 1
6910     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6911     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6912
6913     request = {
6914       "type": "relocate",
6915       "name": self.name,
6916       "disk_space_total": disk_space,
6917       "required_nodes": self.required_nodes,
6918       "relocate_from": self.relocate_from,
6919       }
6920     self.in_data["request"] = request
6921
6922   def _BuildInputData(self):
6923     """Build input data structures.
6924
6925     """
6926     self._ComputeClusterData()
6927
6928     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6929       self._AddNewInstance()
6930     else:
6931       self._AddRelocateInstance()
6932
6933     self.in_text = serializer.Dump(self.in_data)
6934
6935   def Run(self, name, validate=True, call_fn=None):
6936     """Run an instance allocator and return the results.
6937
6938     """
6939     if call_fn is None:
6940       call_fn = self.lu.rpc.call_iallocator_runner
6941     data = self.in_text
6942
6943     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6944     result.Raise()
6945
6946     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6947       raise errors.OpExecError("Invalid result from master iallocator runner")
6948
6949     rcode, stdout, stderr, fail = result.data
6950
6951     if rcode == constants.IARUN_NOTFOUND:
6952       raise errors.OpExecError("Can't find allocator '%s'" % name)
6953     elif rcode == constants.IARUN_FAILURE:
6954       raise errors.OpExecError("Instance allocator call failed: %s,"
6955                                " output: %s" % (fail, stdout+stderr))
6956     self.out_text = stdout
6957     if validate:
6958       self._ValidateResult()
6959
6960   def _ValidateResult(self):
6961     """Process the allocator results.
6962
6963     This will process and if successful save the result in
6964     self.out_data and the other parameters.
6965
6966     """
6967     try:
6968       rdict = serializer.Load(self.out_text)
6969     except Exception, err:
6970       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6971
6972     if not isinstance(rdict, dict):
6973       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6974
6975     for key in "success", "info", "nodes":
6976       if key not in rdict:
6977         raise errors.OpExecError("Can't parse iallocator results:"
6978                                  " missing key '%s'" % key)
6979       setattr(self, key, rdict[key])
6980
6981     if not isinstance(rdict["nodes"], list):
6982       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6983                                " is not a list")
6984     self.out_data = rdict
6985
6986
6987 class LUTestAllocator(NoHooksLU):
6988   """Run allocator tests.
6989
6990   This LU runs the allocator tests
6991
6992   """
6993   _OP_REQP = ["direction", "mode", "name"]
6994
6995   def CheckPrereq(self):
6996     """Check prerequisites.
6997
6998     This checks the opcode parameters depending on the director and mode test.
6999
7000     """
7001     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7002       for attr in ["name", "mem_size", "disks", "disk_template",
7003                    "os", "tags", "nics", "vcpus"]:
7004         if not hasattr(self.op, attr):
7005           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7006                                      attr)
7007       iname = self.cfg.ExpandInstanceName(self.op.name)
7008       if iname is not None:
7009         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7010                                    iname)
7011       if not isinstance(self.op.nics, list):
7012         raise errors.OpPrereqError("Invalid parameter 'nics'")
7013       for row in self.op.nics:
7014         if (not isinstance(row, dict) or
7015             "mac" not in row or
7016             "ip" not in row or
7017             "bridge" not in row):
7018           raise errors.OpPrereqError("Invalid contents of the"
7019                                      " 'nics' parameter")
7020       if not isinstance(self.op.disks, list):
7021         raise errors.OpPrereqError("Invalid parameter 'disks'")
7022       for row in self.op.disks:
7023         if (not isinstance(row, dict) or
7024             "size" not in row or
7025             not isinstance(row["size"], int) or
7026             "mode" not in row or
7027             row["mode"] not in ['r', 'w']):
7028           raise errors.OpPrereqError("Invalid contents of the"
7029                                      " 'disks' parameter")
7030       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7031         self.op.hypervisor = self.cfg.GetHypervisorType()
7032     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7033       if not hasattr(self.op, "name"):
7034         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7035       fname = self.cfg.ExpandInstanceName(self.op.name)
7036       if fname is None:
7037         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7038                                    self.op.name)
7039       self.op.name = fname
7040       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7041     else:
7042       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7043                                  self.op.mode)
7044
7045     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7046       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7047         raise errors.OpPrereqError("Missing allocator name")
7048     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7049       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7050                                  self.op.direction)
7051
7052   def Exec(self, feedback_fn):
7053     """Run the allocator test.
7054
7055     """
7056     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7057       ial = IAllocator(self,
7058                        mode=self.op.mode,
7059                        name=self.op.name,
7060                        mem_size=self.op.mem_size,
7061                        disks=self.op.disks,
7062                        disk_template=self.op.disk_template,
7063                        os=self.op.os,
7064                        tags=self.op.tags,
7065                        nics=self.op.nics,
7066                        vcpus=self.op.vcpus,
7067                        hypervisor=self.op.hypervisor,
7068                        )
7069     else:
7070       ial = IAllocator(self,
7071                        mode=self.op.mode,
7072                        name=self.op.name,
7073                        relocate_from=list(self.relocate_from),
7074                        )
7075
7076     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7077       result = ial.in_text
7078     else:
7079       ial.Run(self.op.allocator, validate=False)
7080       result = ial.out_text
7081     return result