Change failover instance when instance is stopped
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
755                         " '%s'" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURenameCluster(LogicalUnit):
1333   """Rename the cluster.
1334
1335   """
1336   HPATH = "cluster-rename"
1337   HTYPE = constants.HTYPE_CLUSTER
1338   _OP_REQP = ["name"]
1339
1340   def BuildHooksEnv(self):
1341     """Build hooks env.
1342
1343     """
1344     env = {
1345       "OP_TARGET": self.cfg.GetClusterName(),
1346       "NEW_NAME": self.op.name,
1347       }
1348     mn = self.cfg.GetMasterNode()
1349     return env, [mn], [mn]
1350
1351   def CheckPrereq(self):
1352     """Verify that the passed name is a valid one.
1353
1354     """
1355     hostname = utils.HostInfo(self.op.name)
1356
1357     new_name = hostname.name
1358     self.ip = new_ip = hostname.ip
1359     old_name = self.cfg.GetClusterName()
1360     old_ip = self.cfg.GetMasterIP()
1361     if new_name == old_name and new_ip == old_ip:
1362       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363                                  " cluster has changed")
1364     if new_ip != old_ip:
1365       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367                                    " reachable on the network. Aborting." %
1368                                    new_ip)
1369
1370     self.op.name = new_name
1371
1372   def Exec(self, feedback_fn):
1373     """Rename the cluster.
1374
1375     """
1376     clustername = self.op.name
1377     ip = self.ip
1378
1379     # shutdown the master IP
1380     master = self.cfg.GetMasterNode()
1381     result = self.rpc.call_node_stop_master(master, False)
1382     if result.failed or not result.data:
1383       raise errors.OpExecError("Could not disable the master role")
1384
1385     try:
1386       cluster = self.cfg.GetClusterInfo()
1387       cluster.cluster_name = clustername
1388       cluster.master_ip = ip
1389       self.cfg.Update(cluster)
1390
1391       # update the known hosts file
1392       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393       node_list = self.cfg.GetNodeList()
1394       try:
1395         node_list.remove(master)
1396       except ValueError:
1397         pass
1398       result = self.rpc.call_upload_file(node_list,
1399                                          constants.SSH_KNOWN_HOSTS_FILE)
1400       for to_node, to_result in result.iteritems():
1401         if to_result.failed or not to_result.data:
1402           logging.error("Copy of file %s to node %s failed",
1403                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1404
1405     finally:
1406       result = self.rpc.call_node_start_master(master, False)
1407       if result.failed or not result.data:
1408         self.LogWarning("Could not re-enable the master role on"
1409                         " the master, please restart manually.")
1410
1411
1412 def _RecursiveCheckIfLVMBased(disk):
1413   """Check if the given disk or its children are lvm-based.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to check
1417   @rtype: booleean
1418   @return: boolean indicating whether a LD_LV dev_type was found or not
1419
1420   """
1421   if disk.children:
1422     for chdisk in disk.children:
1423       if _RecursiveCheckIfLVMBased(chdisk):
1424         return True
1425   return disk.dev_type == constants.LD_LV
1426
1427
1428 class LUSetClusterParams(LogicalUnit):
1429   """Change the parameters of the cluster.
1430
1431   """
1432   HPATH = "cluster-modify"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = []
1435   REQ_BGL = False
1436
1437   def CheckArguments(self):
1438     """Check parameters
1439
1440     """
1441     if not hasattr(self.op, "candidate_pool_size"):
1442       self.op.candidate_pool_size = None
1443     if self.op.candidate_pool_size is not None:
1444       try:
1445         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446       except (ValueError, TypeError), err:
1447         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1448                                    str(err))
1449       if self.op.candidate_pool_size < 1:
1450         raise errors.OpPrereqError("At least one master candidate needed")
1451
1452   def ExpandNames(self):
1453     # FIXME: in the future maybe other cluster params won't require checking on
1454     # all nodes to be modified.
1455     self.needed_locks = {
1456       locking.LEVEL_NODE: locking.ALL_SET,
1457     }
1458     self.share_locks[locking.LEVEL_NODE] = 1
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     """
1464     env = {
1465       "OP_TARGET": self.cfg.GetClusterName(),
1466       "NEW_VG_NAME": self.op.vg_name,
1467       }
1468     mn = self.cfg.GetMasterNode()
1469     return env, [mn], [mn]
1470
1471   def CheckPrereq(self):
1472     """Check prerequisites.
1473
1474     This checks whether the given params don't conflict and
1475     if the given volume group is valid.
1476
1477     """
1478     if self.op.vg_name is not None and not self.op.vg_name:
1479       instances = self.cfg.GetAllInstancesInfo().values()
1480       for inst in instances:
1481         for disk in inst.disks:
1482           if _RecursiveCheckIfLVMBased(disk):
1483             raise errors.OpPrereqError("Cannot disable lvm storage while"
1484                                        " lvm-based instances exist")
1485
1486     node_list = self.acquired_locks[locking.LEVEL_NODE]
1487
1488     # if vg_name not None, checks given volume group on all nodes
1489     if self.op.vg_name:
1490       vglist = self.rpc.call_vg_list(node_list)
1491       for node in node_list:
1492         if vglist[node].failed:
1493           # ignoring down node
1494           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1495           continue
1496         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1497                                               self.op.vg_name,
1498                                               constants.MIN_VG_SIZE)
1499         if vgstatus:
1500           raise errors.OpPrereqError("Error on node '%s': %s" %
1501                                      (node, vgstatus))
1502
1503     self.cluster = cluster = self.cfg.GetClusterInfo()
1504     # validate beparams changes
1505     if self.op.beparams:
1506       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507       self.new_beparams = cluster.FillDict(
1508         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1509
1510     # hypervisor list/parameters
1511     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512     if self.op.hvparams:
1513       if not isinstance(self.op.hvparams, dict):
1514         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515       for hv_name, hv_dict in self.op.hvparams.items():
1516         if hv_name not in self.new_hvparams:
1517           self.new_hvparams[hv_name] = hv_dict
1518         else:
1519           self.new_hvparams[hv_name].update(hv_dict)
1520
1521     if self.op.enabled_hypervisors is not None:
1522       self.hv_list = self.op.enabled_hypervisors
1523     else:
1524       self.hv_list = cluster.enabled_hypervisors
1525
1526     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527       # either the enabled list has changed, or the parameters have, validate
1528       for hv_name, hv_params in self.new_hvparams.items():
1529         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530             (self.op.enabled_hypervisors and
1531              hv_name in self.op.enabled_hypervisors)):
1532           # either this is a new hypervisor, or its parameters have changed
1533           hv_class = hypervisor.GetHypervisor(hv_name)
1534           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535           hv_class.CheckParameterSyntax(hv_params)
1536           _CheckHVParams(self, node_list, hv_name, hv_params)
1537
1538   def Exec(self, feedback_fn):
1539     """Change the parameters of the cluster.
1540
1541     """
1542     if self.op.vg_name is not None:
1543       new_volume = self.op.vg_name
1544       if not new_volume:
1545         new_volume = None
1546       if new_volume != self.cfg.GetVGName():
1547         self.cfg.SetVGName(new_volume)
1548       else:
1549         feedback_fn("Cluster LVM configuration already in desired"
1550                     " state, not changing")
1551     if self.op.hvparams:
1552       self.cluster.hvparams = self.new_hvparams
1553     if self.op.enabled_hypervisors is not None:
1554       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555     if self.op.beparams:
1556       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557     if self.op.candidate_pool_size is not None:
1558       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559
1560     self.cfg.Update(self.cluster)
1561
1562     # we want to update nodes after the cluster so that if any errors
1563     # happen, we have recorded and saved the cluster info
1564     if self.op.candidate_pool_size is not None:
1565       _AdjustCandidatePool(self)
1566
1567
1568 class LURedistributeConfig(NoHooksLU):
1569   """Force the redistribution of cluster configuration.
1570
1571   This is a very simple LU.
1572
1573   """
1574   _OP_REQP = []
1575   REQ_BGL = False
1576
1577   def ExpandNames(self):
1578     self.needed_locks = {
1579       locking.LEVEL_NODE: locking.ALL_SET,
1580     }
1581     self.share_locks[locking.LEVEL_NODE] = 1
1582
1583   def CheckPrereq(self):
1584     """Check prerequisites.
1585
1586     """
1587
1588   def Exec(self, feedback_fn):
1589     """Redistribute the configuration.
1590
1591     """
1592     self.cfg.Update(self.cfg.GetClusterInfo())
1593
1594
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596   """Sleep and poll for an instance's disk to sync.
1597
1598   """
1599   if not instance.disks:
1600     return True
1601
1602   if not oneshot:
1603     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604
1605   node = instance.primary_node
1606
1607   for dev in instance.disks:
1608     lu.cfg.SetDiskID(dev, node)
1609
1610   retries = 0
1611   while True:
1612     max_time = 0
1613     done = True
1614     cumul_degraded = False
1615     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1616     if rstats.failed or not rstats.data:
1617       lu.LogWarning("Can't get any data from node %s", node)
1618       retries += 1
1619       if retries >= 10:
1620         raise errors.RemoteError("Can't contact node %s for mirror data,"
1621                                  " aborting." % node)
1622       time.sleep(6)
1623       continue
1624     rstats = rstats.data
1625     retries = 0
1626     for i, mstat in enumerate(rstats):
1627       if mstat is None:
1628         lu.LogWarning("Can't compute data for node %s/%s",
1629                            node, instance.disks[i].iv_name)
1630         continue
1631       # we ignore the ldisk parameter
1632       perc_done, est_time, is_degraded, _ = mstat
1633       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1634       if perc_done is not None:
1635         done = False
1636         if est_time is not None:
1637           rem_time = "%d estimated seconds remaining" % est_time
1638           max_time = est_time
1639         else:
1640           rem_time = "no time estimate"
1641         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1642                         (instance.disks[i].iv_name, perc_done, rem_time))
1643     if done or oneshot:
1644       break
1645
1646     time.sleep(min(60, max_time))
1647
1648   if done:
1649     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1650   return not cumul_degraded
1651
1652
1653 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1654   """Check that mirrors are not degraded.
1655
1656   The ldisk parameter, if True, will change the test from the
1657   is_degraded attribute (which represents overall non-ok status for
1658   the device(s)) to the ldisk (representing the local storage status).
1659
1660   """
1661   lu.cfg.SetDiskID(dev, node)
1662   if ldisk:
1663     idx = 6
1664   else:
1665     idx = 5
1666
1667   result = True
1668   if on_primary or dev.AssembleOnSecondary():
1669     rstats = lu.rpc.call_blockdev_find(node, dev)
1670     msg = rstats.RemoteFailMsg()
1671     if msg:
1672       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1673       result = False
1674     elif not rstats.payload:
1675       lu.LogWarning("Can't find disk on node %s", node)
1676       result = False
1677     else:
1678       result = result and (not rstats.payload[idx])
1679   if dev.children:
1680     for child in dev.children:
1681       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1682
1683   return result
1684
1685
1686 class LUDiagnoseOS(NoHooksLU):
1687   """Logical unit for OS diagnose/query.
1688
1689   """
1690   _OP_REQP = ["output_fields", "names"]
1691   REQ_BGL = False
1692   _FIELDS_STATIC = utils.FieldSet()
1693   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1694
1695   def ExpandNames(self):
1696     if self.op.names:
1697       raise errors.OpPrereqError("Selective OS query not supported")
1698
1699     _CheckOutputFields(static=self._FIELDS_STATIC,
1700                        dynamic=self._FIELDS_DYNAMIC,
1701                        selected=self.op.output_fields)
1702
1703     # Lock all nodes, in shared mode
1704     # Temporary removal of locks, should be reverted later
1705     # TODO: reintroduce locks when they are lighter-weight
1706     self.needed_locks = {}
1707     #self.share_locks[locking.LEVEL_NODE] = 1
1708     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1709
1710   def CheckPrereq(self):
1711     """Check prerequisites.
1712
1713     """
1714
1715   @staticmethod
1716   def _DiagnoseByOS(node_list, rlist):
1717     """Remaps a per-node return list into an a per-os per-node dictionary
1718
1719     @param node_list: a list with the names of all nodes
1720     @param rlist: a map with node names as keys and OS objects as values
1721
1722     @rtype: dict
1723     @return: a dictionary with osnames as keys and as value another map, with
1724         nodes as keys and list of OS objects as values, eg::
1725
1726           {"debian-etch": {"node1": [<object>,...],
1727                            "node2": [<object>,]}
1728           }
1729
1730     """
1731     all_os = {}
1732     # we build here the list of nodes that didn't fail the RPC (at RPC
1733     # level), so that nodes with a non-responding node daemon don't
1734     # make all OSes invalid
1735     good_nodes = [node_name for node_name in rlist
1736                   if not rlist[node_name].failed]
1737     for node_name, nr in rlist.iteritems():
1738       if nr.failed or not nr.data:
1739         continue
1740       for os_obj in nr.data:
1741         if os_obj.name not in all_os:
1742           # build a list of nodes for this os containing empty lists
1743           # for each node in node_list
1744           all_os[os_obj.name] = {}
1745           for nname in good_nodes:
1746             all_os[os_obj.name][nname] = []
1747         all_os[os_obj.name][node_name].append(os_obj)
1748     return all_os
1749
1750   def Exec(self, feedback_fn):
1751     """Compute the list of OSes.
1752
1753     """
1754     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1755     node_data = self.rpc.call_os_diagnose(valid_nodes)
1756     if node_data == False:
1757       raise errors.OpExecError("Can't gather the list of OSes")
1758     pol = self._DiagnoseByOS(valid_nodes, node_data)
1759     output = []
1760     for os_name, os_data in pol.iteritems():
1761       row = []
1762       for field in self.op.output_fields:
1763         if field == "name":
1764           val = os_name
1765         elif field == "valid":
1766           val = utils.all([osl and osl[0] for osl in os_data.values()])
1767         elif field == "node_status":
1768           val = {}
1769           for node_name, nos_list in os_data.iteritems():
1770             val[node_name] = [(v.status, v.path) for v in nos_list]
1771         else:
1772           raise errors.ParameterError(field)
1773         row.append(val)
1774       output.append(row)
1775
1776     return output
1777
1778
1779 class LURemoveNode(LogicalUnit):
1780   """Logical unit for removing a node.
1781
1782   """
1783   HPATH = "node-remove"
1784   HTYPE = constants.HTYPE_NODE
1785   _OP_REQP = ["node_name"]
1786
1787   def BuildHooksEnv(self):
1788     """Build hooks env.
1789
1790     This doesn't run on the target node in the pre phase as a failed
1791     node would then be impossible to remove.
1792
1793     """
1794     env = {
1795       "OP_TARGET": self.op.node_name,
1796       "NODE_NAME": self.op.node_name,
1797       }
1798     all_nodes = self.cfg.GetNodeList()
1799     all_nodes.remove(self.op.node_name)
1800     return env, all_nodes, all_nodes
1801
1802   def CheckPrereq(self):
1803     """Check prerequisites.
1804
1805     This checks:
1806      - the node exists in the configuration
1807      - it does not have primary or secondary instances
1808      - it's not the master
1809
1810     Any errors are signalled by raising errors.OpPrereqError.
1811
1812     """
1813     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1814     if node is None:
1815       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1816
1817     instance_list = self.cfg.GetInstanceList()
1818
1819     masternode = self.cfg.GetMasterNode()
1820     if node.name == masternode:
1821       raise errors.OpPrereqError("Node is the master node,"
1822                                  " you need to failover first.")
1823
1824     for instance_name in instance_list:
1825       instance = self.cfg.GetInstanceInfo(instance_name)
1826       if node.name in instance.all_nodes:
1827         raise errors.OpPrereqError("Instance %s is still running on the node,"
1828                                    " please remove first." % instance_name)
1829     self.op.node_name = node.name
1830     self.node = node
1831
1832   def Exec(self, feedback_fn):
1833     """Removes the node from the cluster.
1834
1835     """
1836     node = self.node
1837     logging.info("Stopping the node daemon and removing configs from node %s",
1838                  node.name)
1839
1840     self.context.RemoveNode(node.name)
1841
1842     self.rpc.call_node_leave_cluster(node.name)
1843
1844     # Promote nodes to master candidate as needed
1845     _AdjustCandidatePool(self)
1846
1847
1848 class LUQueryNodes(NoHooksLU):
1849   """Logical unit for querying nodes.
1850
1851   """
1852   _OP_REQP = ["output_fields", "names", "use_locking"]
1853   REQ_BGL = False
1854   _FIELDS_DYNAMIC = utils.FieldSet(
1855     "dtotal", "dfree",
1856     "mtotal", "mnode", "mfree",
1857     "bootid",
1858     "ctotal", "cnodes", "csockets",
1859     )
1860
1861   _FIELDS_STATIC = utils.FieldSet(
1862     "name", "pinst_cnt", "sinst_cnt",
1863     "pinst_list", "sinst_list",
1864     "pip", "sip", "tags",
1865     "serial_no",
1866     "master_candidate",
1867     "master",
1868     "offline",
1869     "drained",
1870     )
1871
1872   def ExpandNames(self):
1873     _CheckOutputFields(static=self._FIELDS_STATIC,
1874                        dynamic=self._FIELDS_DYNAMIC,
1875                        selected=self.op.output_fields)
1876
1877     self.needed_locks = {}
1878     self.share_locks[locking.LEVEL_NODE] = 1
1879
1880     if self.op.names:
1881       self.wanted = _GetWantedNodes(self, self.op.names)
1882     else:
1883       self.wanted = locking.ALL_SET
1884
1885     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1886     self.do_locking = self.do_node_query and self.op.use_locking
1887     if self.do_locking:
1888       # if we don't request only static fields, we need to lock the nodes
1889       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1890
1891
1892   def CheckPrereq(self):
1893     """Check prerequisites.
1894
1895     """
1896     # The validation of the node list is done in the _GetWantedNodes,
1897     # if non empty, and if empty, there's no validation to do
1898     pass
1899
1900   def Exec(self, feedback_fn):
1901     """Computes the list of nodes and their attributes.
1902
1903     """
1904     all_info = self.cfg.GetAllNodesInfo()
1905     if self.do_locking:
1906       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1907     elif self.wanted != locking.ALL_SET:
1908       nodenames = self.wanted
1909       missing = set(nodenames).difference(all_info.keys())
1910       if missing:
1911         raise errors.OpExecError(
1912           "Some nodes were removed before retrieving their data: %s" % missing)
1913     else:
1914       nodenames = all_info.keys()
1915
1916     nodenames = utils.NiceSort(nodenames)
1917     nodelist = [all_info[name] for name in nodenames]
1918
1919     # begin data gathering
1920
1921     if self.do_node_query:
1922       live_data = {}
1923       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1924                                           self.cfg.GetHypervisorType())
1925       for name in nodenames:
1926         nodeinfo = node_data[name]
1927         if not nodeinfo.failed and nodeinfo.data:
1928           nodeinfo = nodeinfo.data
1929           fn = utils.TryConvert
1930           live_data[name] = {
1931             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1932             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1933             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1934             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1935             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1936             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1937             "bootid": nodeinfo.get('bootid', None),
1938             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1939             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1940             }
1941         else:
1942           live_data[name] = {}
1943     else:
1944       live_data = dict.fromkeys(nodenames, {})
1945
1946     node_to_primary = dict([(name, set()) for name in nodenames])
1947     node_to_secondary = dict([(name, set()) for name in nodenames])
1948
1949     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1950                              "sinst_cnt", "sinst_list"))
1951     if inst_fields & frozenset(self.op.output_fields):
1952       instancelist = self.cfg.GetInstanceList()
1953
1954       for instance_name in instancelist:
1955         inst = self.cfg.GetInstanceInfo(instance_name)
1956         if inst.primary_node in node_to_primary:
1957           node_to_primary[inst.primary_node].add(inst.name)
1958         for secnode in inst.secondary_nodes:
1959           if secnode in node_to_secondary:
1960             node_to_secondary[secnode].add(inst.name)
1961
1962     master_node = self.cfg.GetMasterNode()
1963
1964     # end data gathering
1965
1966     output = []
1967     for node in nodelist:
1968       node_output = []
1969       for field in self.op.output_fields:
1970         if field == "name":
1971           val = node.name
1972         elif field == "pinst_list":
1973           val = list(node_to_primary[node.name])
1974         elif field == "sinst_list":
1975           val = list(node_to_secondary[node.name])
1976         elif field == "pinst_cnt":
1977           val = len(node_to_primary[node.name])
1978         elif field == "sinst_cnt":
1979           val = len(node_to_secondary[node.name])
1980         elif field == "pip":
1981           val = node.primary_ip
1982         elif field == "sip":
1983           val = node.secondary_ip
1984         elif field == "tags":
1985           val = list(node.GetTags())
1986         elif field == "serial_no":
1987           val = node.serial_no
1988         elif field == "master_candidate":
1989           val = node.master_candidate
1990         elif field == "master":
1991           val = node.name == master_node
1992         elif field == "offline":
1993           val = node.offline
1994         elif field == "drained":
1995           val = node.drained
1996         elif self._FIELDS_DYNAMIC.Matches(field):
1997           val = live_data[node.name].get(field, None)
1998         else:
1999           raise errors.ParameterError(field)
2000         node_output.append(val)
2001       output.append(node_output)
2002
2003     return output
2004
2005
2006 class LUQueryNodeVolumes(NoHooksLU):
2007   """Logical unit for getting volumes on node(s).
2008
2009   """
2010   _OP_REQP = ["nodes", "output_fields"]
2011   REQ_BGL = False
2012   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2013   _FIELDS_STATIC = utils.FieldSet("node")
2014
2015   def ExpandNames(self):
2016     _CheckOutputFields(static=self._FIELDS_STATIC,
2017                        dynamic=self._FIELDS_DYNAMIC,
2018                        selected=self.op.output_fields)
2019
2020     self.needed_locks = {}
2021     self.share_locks[locking.LEVEL_NODE] = 1
2022     if not self.op.nodes:
2023       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2024     else:
2025       self.needed_locks[locking.LEVEL_NODE] = \
2026         _GetWantedNodes(self, self.op.nodes)
2027
2028   def CheckPrereq(self):
2029     """Check prerequisites.
2030
2031     This checks that the fields required are valid output fields.
2032
2033     """
2034     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2035
2036   def Exec(self, feedback_fn):
2037     """Computes the list of nodes and their attributes.
2038
2039     """
2040     nodenames = self.nodes
2041     volumes = self.rpc.call_node_volumes(nodenames)
2042
2043     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2044              in self.cfg.GetInstanceList()]
2045
2046     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2047
2048     output = []
2049     for node in nodenames:
2050       if node not in volumes or volumes[node].failed or not volumes[node].data:
2051         continue
2052
2053       node_vols = volumes[node].data[:]
2054       node_vols.sort(key=lambda vol: vol['dev'])
2055
2056       for vol in node_vols:
2057         node_output = []
2058         for field in self.op.output_fields:
2059           if field == "node":
2060             val = node
2061           elif field == "phys":
2062             val = vol['dev']
2063           elif field == "vg":
2064             val = vol['vg']
2065           elif field == "name":
2066             val = vol['name']
2067           elif field == "size":
2068             val = int(float(vol['size']))
2069           elif field == "instance":
2070             for inst in ilist:
2071               if node not in lv_by_node[inst]:
2072                 continue
2073               if vol['name'] in lv_by_node[inst][node]:
2074                 val = inst.name
2075                 break
2076             else:
2077               val = '-'
2078           else:
2079             raise errors.ParameterError(field)
2080           node_output.append(str(val))
2081
2082         output.append(node_output)
2083
2084     return output
2085
2086
2087 class LUAddNode(LogicalUnit):
2088   """Logical unit for adding node to the cluster.
2089
2090   """
2091   HPATH = "node-add"
2092   HTYPE = constants.HTYPE_NODE
2093   _OP_REQP = ["node_name"]
2094
2095   def BuildHooksEnv(self):
2096     """Build hooks env.
2097
2098     This will run on all nodes before, and on all nodes + the new node after.
2099
2100     """
2101     env = {
2102       "OP_TARGET": self.op.node_name,
2103       "NODE_NAME": self.op.node_name,
2104       "NODE_PIP": self.op.primary_ip,
2105       "NODE_SIP": self.op.secondary_ip,
2106       }
2107     nodes_0 = self.cfg.GetNodeList()
2108     nodes_1 = nodes_0 + [self.op.node_name, ]
2109     return env, nodes_0, nodes_1
2110
2111   def CheckPrereq(self):
2112     """Check prerequisites.
2113
2114     This checks:
2115      - the new node is not already in the config
2116      - it is resolvable
2117      - its parameters (single/dual homed) matches the cluster
2118
2119     Any errors are signalled by raising errors.OpPrereqError.
2120
2121     """
2122     node_name = self.op.node_name
2123     cfg = self.cfg
2124
2125     dns_data = utils.HostInfo(node_name)
2126
2127     node = dns_data.name
2128     primary_ip = self.op.primary_ip = dns_data.ip
2129     secondary_ip = getattr(self.op, "secondary_ip", None)
2130     if secondary_ip is None:
2131       secondary_ip = primary_ip
2132     if not utils.IsValidIP(secondary_ip):
2133       raise errors.OpPrereqError("Invalid secondary IP given")
2134     self.op.secondary_ip = secondary_ip
2135
2136     node_list = cfg.GetNodeList()
2137     if not self.op.readd and node in node_list:
2138       raise errors.OpPrereqError("Node %s is already in the configuration" %
2139                                  node)
2140     elif self.op.readd and node not in node_list:
2141       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2142
2143     for existing_node_name in node_list:
2144       existing_node = cfg.GetNodeInfo(existing_node_name)
2145
2146       if self.op.readd and node == existing_node_name:
2147         if (existing_node.primary_ip != primary_ip or
2148             existing_node.secondary_ip != secondary_ip):
2149           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2150                                      " address configuration as before")
2151         continue
2152
2153       if (existing_node.primary_ip == primary_ip or
2154           existing_node.secondary_ip == primary_ip or
2155           existing_node.primary_ip == secondary_ip or
2156           existing_node.secondary_ip == secondary_ip):
2157         raise errors.OpPrereqError("New node ip address(es) conflict with"
2158                                    " existing node %s" % existing_node.name)
2159
2160     # check that the type of the node (single versus dual homed) is the
2161     # same as for the master
2162     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2163     master_singlehomed = myself.secondary_ip == myself.primary_ip
2164     newbie_singlehomed = secondary_ip == primary_ip
2165     if master_singlehomed != newbie_singlehomed:
2166       if master_singlehomed:
2167         raise errors.OpPrereqError("The master has no private ip but the"
2168                                    " new node has one")
2169       else:
2170         raise errors.OpPrereqError("The master has a private ip but the"
2171                                    " new node doesn't have one")
2172
2173     # checks reachablity
2174     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2175       raise errors.OpPrereqError("Node not reachable by ping")
2176
2177     if not newbie_singlehomed:
2178       # check reachability from my secondary ip to newbie's secondary ip
2179       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2180                            source=myself.secondary_ip):
2181         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2182                                    " based ping to noded port")
2183
2184     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2185     mc_now, _ = self.cfg.GetMasterCandidateStats()
2186     master_candidate = mc_now < cp_size
2187
2188     self.new_node = objects.Node(name=node,
2189                                  primary_ip=primary_ip,
2190                                  secondary_ip=secondary_ip,
2191                                  master_candidate=master_candidate,
2192                                  offline=False, drained=False)
2193
2194   def Exec(self, feedback_fn):
2195     """Adds the new node to the cluster.
2196
2197     """
2198     new_node = self.new_node
2199     node = new_node.name
2200
2201     # check connectivity
2202     result = self.rpc.call_version([node])[node]
2203     result.Raise()
2204     if result.data:
2205       if constants.PROTOCOL_VERSION == result.data:
2206         logging.info("Communication to node %s fine, sw version %s match",
2207                      node, result.data)
2208       else:
2209         raise errors.OpExecError("Version mismatch master version %s,"
2210                                  " node version %s" %
2211                                  (constants.PROTOCOL_VERSION, result.data))
2212     else:
2213       raise errors.OpExecError("Cannot get version from the new node")
2214
2215     # setup ssh on node
2216     logging.info("Copy ssh key to node %s", node)
2217     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2218     keyarray = []
2219     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2220                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2221                 priv_key, pub_key]
2222
2223     for i in keyfiles:
2224       f = open(i, 'r')
2225       try:
2226         keyarray.append(f.read())
2227       finally:
2228         f.close()
2229
2230     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2231                                     keyarray[2],
2232                                     keyarray[3], keyarray[4], keyarray[5])
2233
2234     msg = result.RemoteFailMsg()
2235     if msg:
2236       raise errors.OpExecError("Cannot transfer ssh keys to the"
2237                                " new node: %s" % msg)
2238
2239     # Add node to our /etc/hosts, and add key to known_hosts
2240     utils.AddHostToEtcHosts(new_node.name)
2241
2242     if new_node.secondary_ip != new_node.primary_ip:
2243       result = self.rpc.call_node_has_ip_address(new_node.name,
2244                                                  new_node.secondary_ip)
2245       if result.failed or not result.data:
2246         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2247                                  " you gave (%s). Please fix and re-run this"
2248                                  " command." % new_node.secondary_ip)
2249
2250     node_verify_list = [self.cfg.GetMasterNode()]
2251     node_verify_param = {
2252       'nodelist': [node],
2253       # TODO: do a node-net-test as well?
2254     }
2255
2256     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2257                                        self.cfg.GetClusterName())
2258     for verifier in node_verify_list:
2259       if result[verifier].failed or not result[verifier].data:
2260         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2261                                  " for remote verification" % verifier)
2262       if result[verifier].data['nodelist']:
2263         for failed in result[verifier].data['nodelist']:
2264           feedback_fn("ssh/hostname verification failed %s -> %s" %
2265                       (verifier, result[verifier].data['nodelist'][failed]))
2266         raise errors.OpExecError("ssh/hostname verification failed.")
2267
2268     # Distribute updated /etc/hosts and known_hosts to all nodes,
2269     # including the node just added
2270     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2271     dist_nodes = self.cfg.GetNodeList()
2272     if not self.op.readd:
2273       dist_nodes.append(node)
2274     if myself.name in dist_nodes:
2275       dist_nodes.remove(myself.name)
2276
2277     logging.debug("Copying hosts and known_hosts to all nodes")
2278     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2279       result = self.rpc.call_upload_file(dist_nodes, fname)
2280       for to_node, to_result in result.iteritems():
2281         if to_result.failed or not to_result.data:
2282           logging.error("Copy of file %s to node %s failed", fname, to_node)
2283
2284     to_copy = []
2285     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2286     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2287       to_copy.append(constants.VNC_PASSWORD_FILE)
2288
2289     for fname in to_copy:
2290       result = self.rpc.call_upload_file([node], fname)
2291       if result[node].failed or not result[node]:
2292         logging.error("Could not copy file %s to node %s", fname, node)
2293
2294     if self.op.readd:
2295       self.context.ReaddNode(new_node)
2296     else:
2297       self.context.AddNode(new_node)
2298
2299
2300 class LUSetNodeParams(LogicalUnit):
2301   """Modifies the parameters of a node.
2302
2303   """
2304   HPATH = "node-modify"
2305   HTYPE = constants.HTYPE_NODE
2306   _OP_REQP = ["node_name"]
2307   REQ_BGL = False
2308
2309   def CheckArguments(self):
2310     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2311     if node_name is None:
2312       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2313     self.op.node_name = node_name
2314     _CheckBooleanOpField(self.op, 'master_candidate')
2315     _CheckBooleanOpField(self.op, 'offline')
2316     _CheckBooleanOpField(self.op, 'drained')
2317     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2318     if all_mods.count(None) == 3:
2319       raise errors.OpPrereqError("Please pass at least one modification")
2320     if all_mods.count(True) > 1:
2321       raise errors.OpPrereqError("Can't set the node into more than one"
2322                                  " state at the same time")
2323
2324   def ExpandNames(self):
2325     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2326
2327   def BuildHooksEnv(self):
2328     """Build hooks env.
2329
2330     This runs on the master node.
2331
2332     """
2333     env = {
2334       "OP_TARGET": self.op.node_name,
2335       "MASTER_CANDIDATE": str(self.op.master_candidate),
2336       "OFFLINE": str(self.op.offline),
2337       "DRAINED": str(self.op.drained),
2338       }
2339     nl = [self.cfg.GetMasterNode(),
2340           self.op.node_name]
2341     return env, nl, nl
2342
2343   def CheckPrereq(self):
2344     """Check prerequisites.
2345
2346     This only checks the instance list against the existing names.
2347
2348     """
2349     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2350
2351     if ((self.op.master_candidate == False or self.op.offline == True or
2352          self.op.drained == True) and node.master_candidate):
2353       # we will demote the node from master_candidate
2354       if self.op.node_name == self.cfg.GetMasterNode():
2355         raise errors.OpPrereqError("The master node has to be a"
2356                                    " master candidate, online and not drained")
2357       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2358       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2359       if num_candidates <= cp_size:
2360         msg = ("Not enough master candidates (desired"
2361                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2362         if self.op.force:
2363           self.LogWarning(msg)
2364         else:
2365           raise errors.OpPrereqError(msg)
2366
2367     if (self.op.master_candidate == True and
2368         ((node.offline and not self.op.offline == False) or
2369          (node.drained and not self.op.drained == False))):
2370       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2371                                  " to master_candidate" % node.name)
2372
2373     return
2374
2375   def Exec(self, feedback_fn):
2376     """Modifies a node.
2377
2378     """
2379     node = self.node
2380
2381     result = []
2382     changed_mc = False
2383
2384     if self.op.offline is not None:
2385       node.offline = self.op.offline
2386       result.append(("offline", str(self.op.offline)))
2387       if self.op.offline == True:
2388         if node.master_candidate:
2389           node.master_candidate = False
2390           changed_mc = True
2391           result.append(("master_candidate", "auto-demotion due to offline"))
2392         if node.drained:
2393           node.drained = False
2394           result.append(("drained", "clear drained status due to offline"))
2395
2396     if self.op.master_candidate is not None:
2397       node.master_candidate = self.op.master_candidate
2398       changed_mc = True
2399       result.append(("master_candidate", str(self.op.master_candidate)))
2400       if self.op.master_candidate == False:
2401         rrc = self.rpc.call_node_demote_from_mc(node.name)
2402         msg = rrc.RemoteFailMsg()
2403         if msg:
2404           self.LogWarning("Node failed to demote itself: %s" % msg)
2405
2406     if self.op.drained is not None:
2407       node.drained = self.op.drained
2408       result.append(("drained", str(self.op.drained)))
2409       if self.op.drained == True:
2410         if node.master_candidate:
2411           node.master_candidate = False
2412           changed_mc = True
2413           result.append(("master_candidate", "auto-demotion due to drain"))
2414         if node.offline:
2415           node.offline = False
2416           result.append(("offline", "clear offline status due to drain"))
2417
2418     # this will trigger configuration file update, if needed
2419     self.cfg.Update(node)
2420     # this will trigger job queue propagation or cleanup
2421     if changed_mc:
2422       self.context.ReaddNode(node)
2423
2424     return result
2425
2426
2427 class LUQueryClusterInfo(NoHooksLU):
2428   """Query cluster configuration.
2429
2430   """
2431   _OP_REQP = []
2432   REQ_BGL = False
2433
2434   def ExpandNames(self):
2435     self.needed_locks = {}
2436
2437   def CheckPrereq(self):
2438     """No prerequsites needed for this LU.
2439
2440     """
2441     pass
2442
2443   def Exec(self, feedback_fn):
2444     """Return cluster config.
2445
2446     """
2447     cluster = self.cfg.GetClusterInfo()
2448     result = {
2449       "software_version": constants.RELEASE_VERSION,
2450       "protocol_version": constants.PROTOCOL_VERSION,
2451       "config_version": constants.CONFIG_VERSION,
2452       "os_api_version": constants.OS_API_VERSION,
2453       "export_version": constants.EXPORT_VERSION,
2454       "architecture": (platform.architecture()[0], platform.machine()),
2455       "name": cluster.cluster_name,
2456       "master": cluster.master_node,
2457       "default_hypervisor": cluster.default_hypervisor,
2458       "enabled_hypervisors": cluster.enabled_hypervisors,
2459       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2460                         for hypervisor in cluster.enabled_hypervisors]),
2461       "beparams": cluster.beparams,
2462       "candidate_pool_size": cluster.candidate_pool_size,
2463       "default_bridge": cluster.default_bridge,
2464       "master_netdev": cluster.master_netdev,
2465       "volume_group_name": cluster.volume_group_name,
2466       "file_storage_dir": cluster.file_storage_dir,
2467       }
2468
2469     return result
2470
2471
2472 class LUQueryConfigValues(NoHooksLU):
2473   """Return configuration values.
2474
2475   """
2476   _OP_REQP = []
2477   REQ_BGL = False
2478   _FIELDS_DYNAMIC = utils.FieldSet()
2479   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2480
2481   def ExpandNames(self):
2482     self.needed_locks = {}
2483
2484     _CheckOutputFields(static=self._FIELDS_STATIC,
2485                        dynamic=self._FIELDS_DYNAMIC,
2486                        selected=self.op.output_fields)
2487
2488   def CheckPrereq(self):
2489     """No prerequisites.
2490
2491     """
2492     pass
2493
2494   def Exec(self, feedback_fn):
2495     """Dump a representation of the cluster config to the standard output.
2496
2497     """
2498     values = []
2499     for field in self.op.output_fields:
2500       if field == "cluster_name":
2501         entry = self.cfg.GetClusterName()
2502       elif field == "master_node":
2503         entry = self.cfg.GetMasterNode()
2504       elif field == "drain_flag":
2505         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2506       else:
2507         raise errors.ParameterError(field)
2508       values.append(entry)
2509     return values
2510
2511
2512 class LUActivateInstanceDisks(NoHooksLU):
2513   """Bring up an instance's disks.
2514
2515   """
2516   _OP_REQP = ["instance_name"]
2517   REQ_BGL = False
2518
2519   def ExpandNames(self):
2520     self._ExpandAndLockInstance()
2521     self.needed_locks[locking.LEVEL_NODE] = []
2522     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2523
2524   def DeclareLocks(self, level):
2525     if level == locking.LEVEL_NODE:
2526       self._LockInstancesNodes()
2527
2528   def CheckPrereq(self):
2529     """Check prerequisites.
2530
2531     This checks that the instance is in the cluster.
2532
2533     """
2534     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2535     assert self.instance is not None, \
2536       "Cannot retrieve locked instance %s" % self.op.instance_name
2537     _CheckNodeOnline(self, self.instance.primary_node)
2538
2539   def Exec(self, feedback_fn):
2540     """Activate the disks.
2541
2542     """
2543     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2544     if not disks_ok:
2545       raise errors.OpExecError("Cannot activate block devices")
2546
2547     return disks_info
2548
2549
2550 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2551   """Prepare the block devices for an instance.
2552
2553   This sets up the block devices on all nodes.
2554
2555   @type lu: L{LogicalUnit}
2556   @param lu: the logical unit on whose behalf we execute
2557   @type instance: L{objects.Instance}
2558   @param instance: the instance for whose disks we assemble
2559   @type ignore_secondaries: boolean
2560   @param ignore_secondaries: if true, errors on secondary nodes
2561       won't result in an error return from the function
2562   @return: False if the operation failed, otherwise a list of
2563       (host, instance_visible_name, node_visible_name)
2564       with the mapping from node devices to instance devices
2565
2566   """
2567   device_info = []
2568   disks_ok = True
2569   iname = instance.name
2570   # With the two passes mechanism we try to reduce the window of
2571   # opportunity for the race condition of switching DRBD to primary
2572   # before handshaking occured, but we do not eliminate it
2573
2574   # The proper fix would be to wait (with some limits) until the
2575   # connection has been made and drbd transitions from WFConnection
2576   # into any other network-connected state (Connected, SyncTarget,
2577   # SyncSource, etc.)
2578
2579   # 1st pass, assemble on all nodes in secondary mode
2580   for inst_disk in instance.disks:
2581     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2582       lu.cfg.SetDiskID(node_disk, node)
2583       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2584       msg = result.RemoteFailMsg()
2585       if msg:
2586         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2587                            " (is_primary=False, pass=1): %s",
2588                            inst_disk.iv_name, node, msg)
2589         if not ignore_secondaries:
2590           disks_ok = False
2591
2592   # FIXME: race condition on drbd migration to primary
2593
2594   # 2nd pass, do only the primary node
2595   for inst_disk in instance.disks:
2596     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2597       if node != instance.primary_node:
2598         continue
2599       lu.cfg.SetDiskID(node_disk, node)
2600       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2601       msg = result.RemoteFailMsg()
2602       if msg:
2603         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2604                            " (is_primary=True, pass=2): %s",
2605                            inst_disk.iv_name, node, msg)
2606         disks_ok = False
2607     device_info.append((instance.primary_node, inst_disk.iv_name,
2608                         result.payload))
2609
2610   # leave the disks configured for the primary node
2611   # this is a workaround that would be fixed better by
2612   # improving the logical/physical id handling
2613   for disk in instance.disks:
2614     lu.cfg.SetDiskID(disk, instance.primary_node)
2615
2616   return disks_ok, device_info
2617
2618
2619 def _StartInstanceDisks(lu, instance, force):
2620   """Start the disks of an instance.
2621
2622   """
2623   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2624                                            ignore_secondaries=force)
2625   if not disks_ok:
2626     _ShutdownInstanceDisks(lu, instance)
2627     if force is not None and not force:
2628       lu.proc.LogWarning("", hint="If the message above refers to a"
2629                          " secondary node,"
2630                          " you can retry the operation using '--force'.")
2631     raise errors.OpExecError("Disk consistency error")
2632
2633
2634 class LUDeactivateInstanceDisks(NoHooksLU):
2635   """Shutdown an instance's disks.
2636
2637   """
2638   _OP_REQP = ["instance_name"]
2639   REQ_BGL = False
2640
2641   def ExpandNames(self):
2642     self._ExpandAndLockInstance()
2643     self.needed_locks[locking.LEVEL_NODE] = []
2644     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2645
2646   def DeclareLocks(self, level):
2647     if level == locking.LEVEL_NODE:
2648       self._LockInstancesNodes()
2649
2650   def CheckPrereq(self):
2651     """Check prerequisites.
2652
2653     This checks that the instance is in the cluster.
2654
2655     """
2656     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2657     assert self.instance is not None, \
2658       "Cannot retrieve locked instance %s" % self.op.instance_name
2659
2660   def Exec(self, feedback_fn):
2661     """Deactivate the disks
2662
2663     """
2664     instance = self.instance
2665     _SafeShutdownInstanceDisks(self, instance)
2666
2667
2668 def _SafeShutdownInstanceDisks(lu, instance):
2669   """Shutdown block devices of an instance.
2670
2671   This function checks if an instance is running, before calling
2672   _ShutdownInstanceDisks.
2673
2674   """
2675   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2676                                       [instance.hypervisor])
2677   ins_l = ins_l[instance.primary_node]
2678   if ins_l.failed or not isinstance(ins_l.data, list):
2679     raise errors.OpExecError("Can't contact node '%s'" %
2680                              instance.primary_node)
2681
2682   if instance.name in ins_l.data:
2683     raise errors.OpExecError("Instance is running, can't shutdown"
2684                              " block devices.")
2685
2686   _ShutdownInstanceDisks(lu, instance)
2687
2688
2689 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2690   """Shutdown block devices of an instance.
2691
2692   This does the shutdown on all nodes of the instance.
2693
2694   If the ignore_primary is false, errors on the primary node are
2695   ignored.
2696
2697   """
2698   all_result = True
2699   for disk in instance.disks:
2700     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2701       lu.cfg.SetDiskID(top_disk, node)
2702       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2703       msg = result.RemoteFailMsg()
2704       if msg:
2705         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2706                       disk.iv_name, node, msg)
2707         if not ignore_primary or node != instance.primary_node:
2708           all_result = False
2709   return all_result
2710
2711
2712 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2713   """Checks if a node has enough free memory.
2714
2715   This function check if a given node has the needed amount of free
2716   memory. In case the node has less memory or we cannot get the
2717   information from the node, this function raise an OpPrereqError
2718   exception.
2719
2720   @type lu: C{LogicalUnit}
2721   @param lu: a logical unit from which we get configuration data
2722   @type node: C{str}
2723   @param node: the node to check
2724   @type reason: C{str}
2725   @param reason: string to use in the error message
2726   @type requested: C{int}
2727   @param requested: the amount of memory in MiB to check for
2728   @type hypervisor_name: C{str}
2729   @param hypervisor_name: the hypervisor to ask for memory stats
2730   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2731       we cannot check the node
2732
2733   """
2734   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2735   nodeinfo[node].Raise()
2736   free_mem = nodeinfo[node].data.get('memory_free')
2737   if not isinstance(free_mem, int):
2738     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2739                              " was '%s'" % (node, free_mem))
2740   if requested > free_mem:
2741     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2742                              " needed %s MiB, available %s MiB" %
2743                              (node, reason, requested, free_mem))
2744
2745
2746 class LUStartupInstance(LogicalUnit):
2747   """Starts an instance.
2748
2749   """
2750   HPATH = "instance-start"
2751   HTYPE = constants.HTYPE_INSTANCE
2752   _OP_REQP = ["instance_name", "force"]
2753   REQ_BGL = False
2754
2755   def ExpandNames(self):
2756     self._ExpandAndLockInstance()
2757
2758   def BuildHooksEnv(self):
2759     """Build hooks env.
2760
2761     This runs on master, primary and secondary nodes of the instance.
2762
2763     """
2764     env = {
2765       "FORCE": self.op.force,
2766       }
2767     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2768     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2769     return env, nl, nl
2770
2771   def CheckPrereq(self):
2772     """Check prerequisites.
2773
2774     This checks that the instance is in the cluster.
2775
2776     """
2777     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2778     assert self.instance is not None, \
2779       "Cannot retrieve locked instance %s" % self.op.instance_name
2780
2781     # extra beparams
2782     self.beparams = getattr(self.op, "beparams", {})
2783     if self.beparams:
2784       if not isinstance(self.beparams, dict):
2785         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2786                                    " dict" % (type(self.beparams), ))
2787       # fill the beparams dict
2788       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2789       self.op.beparams = self.beparams
2790
2791     # extra hvparams
2792     self.hvparams = getattr(self.op, "hvparams", {})
2793     if self.hvparams:
2794       if not isinstance(self.hvparams, dict):
2795         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2796                                    " dict" % (type(self.hvparams), ))
2797
2798       # check hypervisor parameter syntax (locally)
2799       cluster = self.cfg.GetClusterInfo()
2800       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2801       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2802                                     instance.hvparams)
2803       filled_hvp.update(self.hvparams)
2804       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2805       hv_type.CheckParameterSyntax(filled_hvp)
2806       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2807       self.op.hvparams = self.hvparams
2808
2809     _CheckNodeOnline(self, instance.primary_node)
2810
2811     bep = self.cfg.GetClusterInfo().FillBE(instance)
2812     # check bridges existance
2813     _CheckInstanceBridgesExist(self, instance)
2814
2815     remote_info = self.rpc.call_instance_info(instance.primary_node,
2816                                               instance.name,
2817                                               instance.hypervisor)
2818     remote_info.Raise()
2819     if not remote_info.data:
2820       _CheckNodeFreeMemory(self, instance.primary_node,
2821                            "starting instance %s" % instance.name,
2822                            bep[constants.BE_MEMORY], instance.hypervisor)
2823
2824   def Exec(self, feedback_fn):
2825     """Start the instance.
2826
2827     """
2828     instance = self.instance
2829     force = self.op.force
2830
2831     self.cfg.MarkInstanceUp(instance.name)
2832
2833     node_current = instance.primary_node
2834
2835     _StartInstanceDisks(self, instance, force)
2836
2837     result = self.rpc.call_instance_start(node_current, instance,
2838                                           self.hvparams, self.beparams)
2839     msg = result.RemoteFailMsg()
2840     if msg:
2841       _ShutdownInstanceDisks(self, instance)
2842       raise errors.OpExecError("Could not start instance: %s" % msg)
2843
2844
2845 class LURebootInstance(LogicalUnit):
2846   """Reboot an instance.
2847
2848   """
2849   HPATH = "instance-reboot"
2850   HTYPE = constants.HTYPE_INSTANCE
2851   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2852   REQ_BGL = False
2853
2854   def ExpandNames(self):
2855     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2856                                    constants.INSTANCE_REBOOT_HARD,
2857                                    constants.INSTANCE_REBOOT_FULL]:
2858       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2859                                   (constants.INSTANCE_REBOOT_SOFT,
2860                                    constants.INSTANCE_REBOOT_HARD,
2861                                    constants.INSTANCE_REBOOT_FULL))
2862     self._ExpandAndLockInstance()
2863
2864   def BuildHooksEnv(self):
2865     """Build hooks env.
2866
2867     This runs on master, primary and secondary nodes of the instance.
2868
2869     """
2870     env = {
2871       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2872       "REBOOT_TYPE": self.op.reboot_type,
2873       }
2874     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2875     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2876     return env, nl, nl
2877
2878   def CheckPrereq(self):
2879     """Check prerequisites.
2880
2881     This checks that the instance is in the cluster.
2882
2883     """
2884     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2885     assert self.instance is not None, \
2886       "Cannot retrieve locked instance %s" % self.op.instance_name
2887
2888     _CheckNodeOnline(self, instance.primary_node)
2889
2890     # check bridges existance
2891     _CheckInstanceBridgesExist(self, instance)
2892
2893   def Exec(self, feedback_fn):
2894     """Reboot the instance.
2895
2896     """
2897     instance = self.instance
2898     ignore_secondaries = self.op.ignore_secondaries
2899     reboot_type = self.op.reboot_type
2900
2901     node_current = instance.primary_node
2902
2903     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2904                        constants.INSTANCE_REBOOT_HARD]:
2905       for disk in instance.disks:
2906         self.cfg.SetDiskID(disk, node_current)
2907       result = self.rpc.call_instance_reboot(node_current, instance,
2908                                              reboot_type)
2909       msg = result.RemoteFailMsg()
2910       if msg:
2911         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2912     else:
2913       result = self.rpc.call_instance_shutdown(node_current, instance)
2914       msg = result.RemoteFailMsg()
2915       if msg:
2916         raise errors.OpExecError("Could not shutdown instance for"
2917                                  " full reboot: %s" % msg)
2918       _ShutdownInstanceDisks(self, instance)
2919       _StartInstanceDisks(self, instance, ignore_secondaries)
2920       result = self.rpc.call_instance_start(node_current, instance, None, None)
2921       msg = result.RemoteFailMsg()
2922       if msg:
2923         _ShutdownInstanceDisks(self, instance)
2924         raise errors.OpExecError("Could not start instance for"
2925                                  " full reboot: %s" % msg)
2926
2927     self.cfg.MarkInstanceUp(instance.name)
2928
2929
2930 class LUShutdownInstance(LogicalUnit):
2931   """Shutdown an instance.
2932
2933   """
2934   HPATH = "instance-stop"
2935   HTYPE = constants.HTYPE_INSTANCE
2936   _OP_REQP = ["instance_name"]
2937   REQ_BGL = False
2938
2939   def ExpandNames(self):
2940     self._ExpandAndLockInstance()
2941
2942   def BuildHooksEnv(self):
2943     """Build hooks env.
2944
2945     This runs on master, primary and secondary nodes of the instance.
2946
2947     """
2948     env = _BuildInstanceHookEnvByObject(self, self.instance)
2949     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2950     return env, nl, nl
2951
2952   def CheckPrereq(self):
2953     """Check prerequisites.
2954
2955     This checks that the instance is in the cluster.
2956
2957     """
2958     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2959     assert self.instance is not None, \
2960       "Cannot retrieve locked instance %s" % self.op.instance_name
2961     _CheckNodeOnline(self, self.instance.primary_node)
2962
2963   def Exec(self, feedback_fn):
2964     """Shutdown the instance.
2965
2966     """
2967     instance = self.instance
2968     node_current = instance.primary_node
2969     self.cfg.MarkInstanceDown(instance.name)
2970     result = self.rpc.call_instance_shutdown(node_current, instance)
2971     msg = result.RemoteFailMsg()
2972     if msg:
2973       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2974
2975     _ShutdownInstanceDisks(self, instance)
2976
2977
2978 class LUReinstallInstance(LogicalUnit):
2979   """Reinstall an instance.
2980
2981   """
2982   HPATH = "instance-reinstall"
2983   HTYPE = constants.HTYPE_INSTANCE
2984   _OP_REQP = ["instance_name"]
2985   REQ_BGL = False
2986
2987   def ExpandNames(self):
2988     self._ExpandAndLockInstance()
2989
2990   def BuildHooksEnv(self):
2991     """Build hooks env.
2992
2993     This runs on master, primary and secondary nodes of the instance.
2994
2995     """
2996     env = _BuildInstanceHookEnvByObject(self, self.instance)
2997     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2998     return env, nl, nl
2999
3000   def CheckPrereq(self):
3001     """Check prerequisites.
3002
3003     This checks that the instance is in the cluster and is not running.
3004
3005     """
3006     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3007     assert instance is not None, \
3008       "Cannot retrieve locked instance %s" % self.op.instance_name
3009     _CheckNodeOnline(self, instance.primary_node)
3010
3011     if instance.disk_template == constants.DT_DISKLESS:
3012       raise errors.OpPrereqError("Instance '%s' has no disks" %
3013                                  self.op.instance_name)
3014     if instance.admin_up:
3015       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3016                                  self.op.instance_name)
3017     remote_info = self.rpc.call_instance_info(instance.primary_node,
3018                                               instance.name,
3019                                               instance.hypervisor)
3020     remote_info.Raise()
3021     if remote_info.data:
3022       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3023                                  (self.op.instance_name,
3024                                   instance.primary_node))
3025
3026     self.op.os_type = getattr(self.op, "os_type", None)
3027     if self.op.os_type is not None:
3028       # OS verification
3029       pnode = self.cfg.GetNodeInfo(
3030         self.cfg.ExpandNodeName(instance.primary_node))
3031       if pnode is None:
3032         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3033                                    self.op.pnode)
3034       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3035       result.Raise()
3036       if not isinstance(result.data, objects.OS):
3037         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3038                                    " primary node"  % self.op.os_type)
3039
3040     self.instance = instance
3041
3042   def Exec(self, feedback_fn):
3043     """Reinstall the instance.
3044
3045     """
3046     inst = self.instance
3047
3048     if self.op.os_type is not None:
3049       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3050       inst.os = self.op.os_type
3051       self.cfg.Update(inst)
3052
3053     _StartInstanceDisks(self, inst, None)
3054     try:
3055       feedback_fn("Running the instance OS create scripts...")
3056       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3057       msg = result.RemoteFailMsg()
3058       if msg:
3059         raise errors.OpExecError("Could not install OS for instance %s"
3060                                  " on node %s: %s" %
3061                                  (inst.name, inst.primary_node, msg))
3062     finally:
3063       _ShutdownInstanceDisks(self, inst)
3064
3065
3066 class LURenameInstance(LogicalUnit):
3067   """Rename an instance.
3068
3069   """
3070   HPATH = "instance-rename"
3071   HTYPE = constants.HTYPE_INSTANCE
3072   _OP_REQP = ["instance_name", "new_name"]
3073
3074   def BuildHooksEnv(self):
3075     """Build hooks env.
3076
3077     This runs on master, primary and secondary nodes of the instance.
3078
3079     """
3080     env = _BuildInstanceHookEnvByObject(self, self.instance)
3081     env["INSTANCE_NEW_NAME"] = self.op.new_name
3082     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3083     return env, nl, nl
3084
3085   def CheckPrereq(self):
3086     """Check prerequisites.
3087
3088     This checks that the instance is in the cluster and is not running.
3089
3090     """
3091     instance = self.cfg.GetInstanceInfo(
3092       self.cfg.ExpandInstanceName(self.op.instance_name))
3093     if instance is None:
3094       raise errors.OpPrereqError("Instance '%s' not known" %
3095                                  self.op.instance_name)
3096     _CheckNodeOnline(self, instance.primary_node)
3097
3098     if instance.admin_up:
3099       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3100                                  self.op.instance_name)
3101     remote_info = self.rpc.call_instance_info(instance.primary_node,
3102                                               instance.name,
3103                                               instance.hypervisor)
3104     remote_info.Raise()
3105     if remote_info.data:
3106       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3107                                  (self.op.instance_name,
3108                                   instance.primary_node))
3109     self.instance = instance
3110
3111     # new name verification
3112     name_info = utils.HostInfo(self.op.new_name)
3113
3114     self.op.new_name = new_name = name_info.name
3115     instance_list = self.cfg.GetInstanceList()
3116     if new_name in instance_list:
3117       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3118                                  new_name)
3119
3120     if not getattr(self.op, "ignore_ip", False):
3121       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3122         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3123                                    (name_info.ip, new_name))
3124
3125
3126   def Exec(self, feedback_fn):
3127     """Reinstall the instance.
3128
3129     """
3130     inst = self.instance
3131     old_name = inst.name
3132
3133     if inst.disk_template == constants.DT_FILE:
3134       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3135
3136     self.cfg.RenameInstance(inst.name, self.op.new_name)
3137     # Change the instance lock. This is definitely safe while we hold the BGL
3138     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3139     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3140
3141     # re-read the instance from the configuration after rename
3142     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3143
3144     if inst.disk_template == constants.DT_FILE:
3145       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3146       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3147                                                      old_file_storage_dir,
3148                                                      new_file_storage_dir)
3149       result.Raise()
3150       if not result.data:
3151         raise errors.OpExecError("Could not connect to node '%s' to rename"
3152                                  " directory '%s' to '%s' (but the instance"
3153                                  " has been renamed in Ganeti)" % (
3154                                  inst.primary_node, old_file_storage_dir,
3155                                  new_file_storage_dir))
3156
3157       if not result.data[0]:
3158         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3159                                  " (but the instance has been renamed in"
3160                                  " Ganeti)" % (old_file_storage_dir,
3161                                                new_file_storage_dir))
3162
3163     _StartInstanceDisks(self, inst, None)
3164     try:
3165       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3166                                                  old_name)
3167       msg = result.RemoteFailMsg()
3168       if msg:
3169         msg = ("Could not run OS rename script for instance %s on node %s"
3170                " (but the instance has been renamed in Ganeti): %s" %
3171                (inst.name, inst.primary_node, msg))
3172         self.proc.LogWarning(msg)
3173     finally:
3174       _ShutdownInstanceDisks(self, inst)
3175
3176
3177 class LURemoveInstance(LogicalUnit):
3178   """Remove an instance.
3179
3180   """
3181   HPATH = "instance-remove"
3182   HTYPE = constants.HTYPE_INSTANCE
3183   _OP_REQP = ["instance_name", "ignore_failures"]
3184   REQ_BGL = False
3185
3186   def ExpandNames(self):
3187     self._ExpandAndLockInstance()
3188     self.needed_locks[locking.LEVEL_NODE] = []
3189     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3190
3191   def DeclareLocks(self, level):
3192     if level == locking.LEVEL_NODE:
3193       self._LockInstancesNodes()
3194
3195   def BuildHooksEnv(self):
3196     """Build hooks env.
3197
3198     This runs on master, primary and secondary nodes of the instance.
3199
3200     """
3201     env = _BuildInstanceHookEnvByObject(self, self.instance)
3202     nl = [self.cfg.GetMasterNode()]
3203     return env, nl, nl
3204
3205   def CheckPrereq(self):
3206     """Check prerequisites.
3207
3208     This checks that the instance is in the cluster.
3209
3210     """
3211     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212     assert self.instance is not None, \
3213       "Cannot retrieve locked instance %s" % self.op.instance_name
3214
3215   def Exec(self, feedback_fn):
3216     """Remove the instance.
3217
3218     """
3219     instance = self.instance
3220     logging.info("Shutting down instance %s on node %s",
3221                  instance.name, instance.primary_node)
3222
3223     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3224     msg = result.RemoteFailMsg()
3225     if msg:
3226       if self.op.ignore_failures:
3227         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3228       else:
3229         raise errors.OpExecError("Could not shutdown instance %s on"
3230                                  " node %s: %s" %
3231                                  (instance.name, instance.primary_node, msg))
3232
3233     logging.info("Removing block devices for instance %s", instance.name)
3234
3235     if not _RemoveDisks(self, instance):
3236       if self.op.ignore_failures:
3237         feedback_fn("Warning: can't remove instance's disks")
3238       else:
3239         raise errors.OpExecError("Can't remove instance's disks")
3240
3241     logging.info("Removing instance %s out of cluster config", instance.name)
3242
3243     self.cfg.RemoveInstance(instance.name)
3244     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3245
3246
3247 class LUQueryInstances(NoHooksLU):
3248   """Logical unit for querying instances.
3249
3250   """
3251   _OP_REQP = ["output_fields", "names", "use_locking"]
3252   REQ_BGL = False
3253   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3254                                     "admin_state",
3255                                     "disk_template", "ip", "mac", "bridge",
3256                                     "sda_size", "sdb_size", "vcpus", "tags",
3257                                     "network_port", "beparams",
3258                                     r"(disk)\.(size)/([0-9]+)",
3259                                     r"(disk)\.(sizes)", "disk_usage",
3260                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3261                                     r"(nic)\.(macs|ips|bridges)",
3262                                     r"(disk|nic)\.(count)",
3263                                     "serial_no", "hypervisor", "hvparams",] +
3264                                   ["hv/%s" % name
3265                                    for name in constants.HVS_PARAMETERS] +
3266                                   ["be/%s" % name
3267                                    for name in constants.BES_PARAMETERS])
3268   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3269
3270
3271   def ExpandNames(self):
3272     _CheckOutputFields(static=self._FIELDS_STATIC,
3273                        dynamic=self._FIELDS_DYNAMIC,
3274                        selected=self.op.output_fields)
3275
3276     self.needed_locks = {}
3277     self.share_locks[locking.LEVEL_INSTANCE] = 1
3278     self.share_locks[locking.LEVEL_NODE] = 1
3279
3280     if self.op.names:
3281       self.wanted = _GetWantedInstances(self, self.op.names)
3282     else:
3283       self.wanted = locking.ALL_SET
3284
3285     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3286     self.do_locking = self.do_node_query and self.op.use_locking
3287     if self.do_locking:
3288       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3289       self.needed_locks[locking.LEVEL_NODE] = []
3290       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3291
3292   def DeclareLocks(self, level):
3293     if level == locking.LEVEL_NODE and self.do_locking:
3294       self._LockInstancesNodes()
3295
3296   def CheckPrereq(self):
3297     """Check prerequisites.
3298
3299     """
3300     pass
3301
3302   def Exec(self, feedback_fn):
3303     """Computes the list of nodes and their attributes.
3304
3305     """
3306     all_info = self.cfg.GetAllInstancesInfo()
3307     if self.wanted == locking.ALL_SET:
3308       # caller didn't specify instance names, so ordering is not important
3309       if self.do_locking:
3310         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3311       else:
3312         instance_names = all_info.keys()
3313       instance_names = utils.NiceSort(instance_names)
3314     else:
3315       # caller did specify names, so we must keep the ordering
3316       if self.do_locking:
3317         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3318       else:
3319         tgt_set = all_info.keys()
3320       missing = set(self.wanted).difference(tgt_set)
3321       if missing:
3322         raise errors.OpExecError("Some instances were removed before"
3323                                  " retrieving their data: %s" % missing)
3324       instance_names = self.wanted
3325
3326     instance_list = [all_info[iname] for iname in instance_names]
3327
3328     # begin data gathering
3329
3330     nodes = frozenset([inst.primary_node for inst in instance_list])
3331     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3332
3333     bad_nodes = []
3334     off_nodes = []
3335     if self.do_node_query:
3336       live_data = {}
3337       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3338       for name in nodes:
3339         result = node_data[name]
3340         if result.offline:
3341           # offline nodes will be in both lists
3342           off_nodes.append(name)
3343         if result.failed:
3344           bad_nodes.append(name)
3345         else:
3346           if result.data:
3347             live_data.update(result.data)
3348             # else no instance is alive
3349     else:
3350       live_data = dict([(name, {}) for name in instance_names])
3351
3352     # end data gathering
3353
3354     HVPREFIX = "hv/"
3355     BEPREFIX = "be/"
3356     output = []
3357     for instance in instance_list:
3358       iout = []
3359       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3360       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3361       for field in self.op.output_fields:
3362         st_match = self._FIELDS_STATIC.Matches(field)
3363         if field == "name":
3364           val = instance.name
3365         elif field == "os":
3366           val = instance.os
3367         elif field == "pnode":
3368           val = instance.primary_node
3369         elif field == "snodes":
3370           val = list(instance.secondary_nodes)
3371         elif field == "admin_state":
3372           val = instance.admin_up
3373         elif field == "oper_state":
3374           if instance.primary_node in bad_nodes:
3375             val = None
3376           else:
3377             val = bool(live_data.get(instance.name))
3378         elif field == "status":
3379           if instance.primary_node in off_nodes:
3380             val = "ERROR_nodeoffline"
3381           elif instance.primary_node in bad_nodes:
3382             val = "ERROR_nodedown"
3383           else:
3384             running = bool(live_data.get(instance.name))
3385             if running:
3386               if instance.admin_up:
3387                 val = "running"
3388               else:
3389                 val = "ERROR_up"
3390             else:
3391               if instance.admin_up:
3392                 val = "ERROR_down"
3393               else:
3394                 val = "ADMIN_down"
3395         elif field == "oper_ram":
3396           if instance.primary_node in bad_nodes:
3397             val = None
3398           elif instance.name in live_data:
3399             val = live_data[instance.name].get("memory", "?")
3400           else:
3401             val = "-"
3402         elif field == "disk_template":
3403           val = instance.disk_template
3404         elif field == "ip":
3405           val = instance.nics[0].ip
3406         elif field == "bridge":
3407           val = instance.nics[0].bridge
3408         elif field == "mac":
3409           val = instance.nics[0].mac
3410         elif field == "sda_size" or field == "sdb_size":
3411           idx = ord(field[2]) - ord('a')
3412           try:
3413             val = instance.FindDisk(idx).size
3414           except errors.OpPrereqError:
3415             val = None
3416         elif field == "disk_usage": # total disk usage per node
3417           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3418           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3419         elif field == "tags":
3420           val = list(instance.GetTags())
3421         elif field == "serial_no":
3422           val = instance.serial_no
3423         elif field == "network_port":
3424           val = instance.network_port
3425         elif field == "hypervisor":
3426           val = instance.hypervisor
3427         elif field == "hvparams":
3428           val = i_hv
3429         elif (field.startswith(HVPREFIX) and
3430               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3431           val = i_hv.get(field[len(HVPREFIX):], None)
3432         elif field == "beparams":
3433           val = i_be
3434         elif (field.startswith(BEPREFIX) and
3435               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3436           val = i_be.get(field[len(BEPREFIX):], None)
3437         elif st_match and st_match.groups():
3438           # matches a variable list
3439           st_groups = st_match.groups()
3440           if st_groups and st_groups[0] == "disk":
3441             if st_groups[1] == "count":
3442               val = len(instance.disks)
3443             elif st_groups[1] == "sizes":
3444               val = [disk.size for disk in instance.disks]
3445             elif st_groups[1] == "size":
3446               try:
3447                 val = instance.FindDisk(st_groups[2]).size
3448               except errors.OpPrereqError:
3449                 val = None
3450             else:
3451               assert False, "Unhandled disk parameter"
3452           elif st_groups[0] == "nic":
3453             if st_groups[1] == "count":
3454               val = len(instance.nics)
3455             elif st_groups[1] == "macs":
3456               val = [nic.mac for nic in instance.nics]
3457             elif st_groups[1] == "ips":
3458               val = [nic.ip for nic in instance.nics]
3459             elif st_groups[1] == "bridges":
3460               val = [nic.bridge for nic in instance.nics]
3461             else:
3462               # index-based item
3463               nic_idx = int(st_groups[2])
3464               if nic_idx >= len(instance.nics):
3465                 val = None
3466               else:
3467                 if st_groups[1] == "mac":
3468                   val = instance.nics[nic_idx].mac
3469                 elif st_groups[1] == "ip":
3470                   val = instance.nics[nic_idx].ip
3471                 elif st_groups[1] == "bridge":
3472                   val = instance.nics[nic_idx].bridge
3473                 else:
3474                   assert False, "Unhandled NIC parameter"
3475           else:
3476             assert False, "Unhandled variable parameter"
3477         else:
3478           raise errors.ParameterError(field)
3479         iout.append(val)
3480       output.append(iout)
3481
3482     return output
3483
3484
3485 class LUFailoverInstance(LogicalUnit):
3486   """Failover an instance.
3487
3488   """
3489   HPATH = "instance-failover"
3490   HTYPE = constants.HTYPE_INSTANCE
3491   _OP_REQP = ["instance_name", "ignore_consistency"]
3492   REQ_BGL = False
3493
3494   def ExpandNames(self):
3495     self._ExpandAndLockInstance()
3496     self.needed_locks[locking.LEVEL_NODE] = []
3497     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3498
3499   def DeclareLocks(self, level):
3500     if level == locking.LEVEL_NODE:
3501       self._LockInstancesNodes()
3502
3503   def BuildHooksEnv(self):
3504     """Build hooks env.
3505
3506     This runs on master, primary and secondary nodes of the instance.
3507
3508     """
3509     env = {
3510       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3511       }
3512     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3513     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3514     return env, nl, nl
3515
3516   def CheckPrereq(self):
3517     """Check prerequisites.
3518
3519     This checks that the instance is in the cluster.
3520
3521     """
3522     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3523     assert self.instance is not None, \
3524       "Cannot retrieve locked instance %s" % self.op.instance_name
3525
3526     bep = self.cfg.GetClusterInfo().FillBE(instance)
3527     if instance.disk_template not in constants.DTS_NET_MIRROR:
3528       raise errors.OpPrereqError("Instance's disk layout is not"
3529                                  " network mirrored, cannot failover.")
3530
3531     secondary_nodes = instance.secondary_nodes
3532     if not secondary_nodes:
3533       raise errors.ProgrammerError("no secondary node but using "
3534                                    "a mirrored disk template")
3535
3536     target_node = secondary_nodes[0]
3537     _CheckNodeOnline(self, target_node)
3538     _CheckNodeNotDrained(self, target_node)
3539
3540     if instance.admin_up:
3541       # check memory requirements on the secondary node
3542       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3543                            instance.name, bep[constants.BE_MEMORY],
3544                            instance.hypervisor)
3545     else:
3546       self.LogInfo("Not checking memory on the secondary node as"
3547                    " instance will not be started")
3548
3549     # check bridge existance
3550     brlist = [nic.bridge for nic in instance.nics]
3551     result = self.rpc.call_bridges_exist(target_node, brlist)
3552     result.Raise()
3553     if not result.data:
3554       raise errors.OpPrereqError("One or more target bridges %s does not"
3555                                  " exist on destination node '%s'" %
3556                                  (brlist, target_node))
3557
3558   def Exec(self, feedback_fn):
3559     """Failover an instance.
3560
3561     The failover is done by shutting it down on its present node and
3562     starting it on the secondary.
3563
3564     """
3565     instance = self.instance
3566
3567     source_node = instance.primary_node
3568     target_node = instance.secondary_nodes[0]
3569
3570     feedback_fn("* checking disk consistency between source and target")
3571     for dev in instance.disks:
3572       # for drbd, these are drbd over lvm
3573       if not _CheckDiskConsistency(self, dev, target_node, False):
3574         if instance.admin_up and not self.op.ignore_consistency:
3575           raise errors.OpExecError("Disk %s is degraded on target node,"
3576                                    " aborting failover." % dev.iv_name)
3577
3578     feedback_fn("* shutting down instance on source node")
3579     logging.info("Shutting down instance %s on node %s",
3580                  instance.name, source_node)
3581
3582     result = self.rpc.call_instance_shutdown(source_node, instance)
3583     msg = result.RemoteFailMsg()
3584     if msg:
3585       if self.op.ignore_consistency:
3586         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3587                              " Proceeding anyway. Please make sure node"
3588                              " %s is down. Error details: %s",
3589                              instance.name, source_node, source_node, msg)
3590       else:
3591         raise errors.OpExecError("Could not shutdown instance %s on"
3592                                  " node %s: %s" %
3593                                  (instance.name, source_node, msg))
3594
3595     feedback_fn("* deactivating the instance's disks on source node")
3596     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3597       raise errors.OpExecError("Can't shut down the instance's disks.")
3598
3599     instance.primary_node = target_node
3600     # distribute new instance config to the other nodes
3601     self.cfg.Update(instance)
3602
3603     # Only start the instance if it's marked as up
3604     if instance.admin_up:
3605       feedback_fn("* activating the instance's disks on target node")
3606       logging.info("Starting instance %s on node %s",
3607                    instance.name, target_node)
3608
3609       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3610                                                ignore_secondaries=True)
3611       if not disks_ok:
3612         _ShutdownInstanceDisks(self, instance)
3613         raise errors.OpExecError("Can't activate the instance's disks")
3614
3615       feedback_fn("* starting the instance on the target node")
3616       result = self.rpc.call_instance_start(target_node, instance, None, None)
3617       msg = result.RemoteFailMsg()
3618       if msg:
3619         _ShutdownInstanceDisks(self, instance)
3620         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3621                                  (instance.name, target_node, msg))
3622
3623
3624 class LUMigrateInstance(LogicalUnit):
3625   """Migrate an instance.
3626
3627   This is migration without shutting down, compared to the failover,
3628   which is done with shutdown.
3629
3630   """
3631   HPATH = "instance-migrate"
3632   HTYPE = constants.HTYPE_INSTANCE
3633   _OP_REQP = ["instance_name", "live", "cleanup"]
3634
3635   REQ_BGL = False
3636
3637   def ExpandNames(self):
3638     self._ExpandAndLockInstance()
3639     self.needed_locks[locking.LEVEL_NODE] = []
3640     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3641
3642   def DeclareLocks(self, level):
3643     if level == locking.LEVEL_NODE:
3644       self._LockInstancesNodes()
3645
3646   def BuildHooksEnv(self):
3647     """Build hooks env.
3648
3649     This runs on master, primary and secondary nodes of the instance.
3650
3651     """
3652     env = _BuildInstanceHookEnvByObject(self, self.instance)
3653     env["MIGRATE_LIVE"] = self.op.live
3654     env["MIGRATE_CLEANUP"] = self.op.cleanup
3655     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3656     return env, nl, nl
3657
3658   def CheckPrereq(self):
3659     """Check prerequisites.
3660
3661     This checks that the instance is in the cluster.
3662
3663     """
3664     instance = self.cfg.GetInstanceInfo(
3665       self.cfg.ExpandInstanceName(self.op.instance_name))
3666     if instance is None:
3667       raise errors.OpPrereqError("Instance '%s' not known" %
3668                                  self.op.instance_name)
3669
3670     if instance.disk_template != constants.DT_DRBD8:
3671       raise errors.OpPrereqError("Instance's disk layout is not"
3672                                  " drbd8, cannot migrate.")
3673
3674     secondary_nodes = instance.secondary_nodes
3675     if not secondary_nodes:
3676       raise errors.ConfigurationError("No secondary node but using"
3677                                       " drbd8 disk template")
3678
3679     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3680
3681     target_node = secondary_nodes[0]
3682     # check memory requirements on the secondary node
3683     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3684                          instance.name, i_be[constants.BE_MEMORY],
3685                          instance.hypervisor)
3686
3687     # check bridge existance
3688     brlist = [nic.bridge for nic in instance.nics]
3689     result = self.rpc.call_bridges_exist(target_node, brlist)
3690     if result.failed or not result.data:
3691       raise errors.OpPrereqError("One or more target bridges %s does not"
3692                                  " exist on destination node '%s'" %
3693                                  (brlist, target_node))
3694
3695     if not self.op.cleanup:
3696       _CheckNodeNotDrained(self, target_node)
3697       result = self.rpc.call_instance_migratable(instance.primary_node,
3698                                                  instance)
3699       msg = result.RemoteFailMsg()
3700       if msg:
3701         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3702                                    msg)
3703
3704     self.instance = instance
3705
3706   def _WaitUntilSync(self):
3707     """Poll with custom rpc for disk sync.
3708
3709     This uses our own step-based rpc call.
3710
3711     """
3712     self.feedback_fn("* wait until resync is done")
3713     all_done = False
3714     while not all_done:
3715       all_done = True
3716       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3717                                             self.nodes_ip,
3718                                             self.instance.disks)
3719       min_percent = 100
3720       for node, nres in result.items():
3721         msg = nres.RemoteFailMsg()
3722         if msg:
3723           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3724                                    (node, msg))
3725         node_done, node_percent = nres.payload
3726         all_done = all_done and node_done
3727         if node_percent is not None:
3728           min_percent = min(min_percent, node_percent)
3729       if not all_done:
3730         if min_percent < 100:
3731           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3732         time.sleep(2)
3733
3734   def _EnsureSecondary(self, node):
3735     """Demote a node to secondary.
3736
3737     """
3738     self.feedback_fn("* switching node %s to secondary mode" % node)
3739
3740     for dev in self.instance.disks:
3741       self.cfg.SetDiskID(dev, node)
3742
3743     result = self.rpc.call_blockdev_close(node, self.instance.name,
3744                                           self.instance.disks)
3745     msg = result.RemoteFailMsg()
3746     if msg:
3747       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3748                                " error %s" % (node, msg))
3749
3750   def _GoStandalone(self):
3751     """Disconnect from the network.
3752
3753     """
3754     self.feedback_fn("* changing into standalone mode")
3755     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3756                                                self.instance.disks)
3757     for node, nres in result.items():
3758       msg = nres.RemoteFailMsg()
3759       if msg:
3760         raise errors.OpExecError("Cannot disconnect disks node %s,"
3761                                  " error %s" % (node, msg))
3762
3763   def _GoReconnect(self, multimaster):
3764     """Reconnect to the network.
3765
3766     """
3767     if multimaster:
3768       msg = "dual-master"
3769     else:
3770       msg = "single-master"
3771     self.feedback_fn("* changing disks into %s mode" % msg)
3772     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3773                                            self.instance.disks,
3774                                            self.instance.name, multimaster)
3775     for node, nres in result.items():
3776       msg = nres.RemoteFailMsg()
3777       if msg:
3778         raise errors.OpExecError("Cannot change disks config on node %s,"
3779                                  " error: %s" % (node, msg))
3780
3781   def _ExecCleanup(self):
3782     """Try to cleanup after a failed migration.
3783
3784     The cleanup is done by:
3785       - check that the instance is running only on one node
3786         (and update the config if needed)
3787       - change disks on its secondary node to secondary
3788       - wait until disks are fully synchronized
3789       - disconnect from the network
3790       - change disks into single-master mode
3791       - wait again until disks are fully synchronized
3792
3793     """
3794     instance = self.instance
3795     target_node = self.target_node
3796     source_node = self.source_node
3797
3798     # check running on only one node
3799     self.feedback_fn("* checking where the instance actually runs"
3800                      " (if this hangs, the hypervisor might be in"
3801                      " a bad state)")
3802     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3803     for node, result in ins_l.items():
3804       result.Raise()
3805       if not isinstance(result.data, list):
3806         raise errors.OpExecError("Can't contact node '%s'" % node)
3807
3808     runningon_source = instance.name in ins_l[source_node].data
3809     runningon_target = instance.name in ins_l[target_node].data
3810
3811     if runningon_source and runningon_target:
3812       raise errors.OpExecError("Instance seems to be running on two nodes,"
3813                                " or the hypervisor is confused. You will have"
3814                                " to ensure manually that it runs only on one"
3815                                " and restart this operation.")
3816
3817     if not (runningon_source or runningon_target):
3818       raise errors.OpExecError("Instance does not seem to be running at all."
3819                                " In this case, it's safer to repair by"
3820                                " running 'gnt-instance stop' to ensure disk"
3821                                " shutdown, and then restarting it.")
3822
3823     if runningon_target:
3824       # the migration has actually succeeded, we need to update the config
3825       self.feedback_fn("* instance running on secondary node (%s),"
3826                        " updating config" % target_node)
3827       instance.primary_node = target_node
3828       self.cfg.Update(instance)
3829       demoted_node = source_node
3830     else:
3831       self.feedback_fn("* instance confirmed to be running on its"
3832                        " primary node (%s)" % source_node)
3833       demoted_node = target_node
3834
3835     self._EnsureSecondary(demoted_node)
3836     try:
3837       self._WaitUntilSync()
3838     except errors.OpExecError:
3839       # we ignore here errors, since if the device is standalone, it
3840       # won't be able to sync
3841       pass
3842     self._GoStandalone()
3843     self._GoReconnect(False)
3844     self._WaitUntilSync()
3845
3846     self.feedback_fn("* done")
3847
3848   def _RevertDiskStatus(self):
3849     """Try to revert the disk status after a failed migration.
3850
3851     """
3852     target_node = self.target_node
3853     try:
3854       self._EnsureSecondary(target_node)
3855       self._GoStandalone()
3856       self._GoReconnect(False)
3857       self._WaitUntilSync()
3858     except errors.OpExecError, err:
3859       self.LogWarning("Migration failed and I can't reconnect the"
3860                       " drives: error '%s'\n"
3861                       "Please look and recover the instance status" %
3862                       str(err))
3863
3864   def _AbortMigration(self):
3865     """Call the hypervisor code to abort a started migration.
3866
3867     """
3868     instance = self.instance
3869     target_node = self.target_node
3870     migration_info = self.migration_info
3871
3872     abort_result = self.rpc.call_finalize_migration(target_node,
3873                                                     instance,
3874                                                     migration_info,
3875                                                     False)
3876     abort_msg = abort_result.RemoteFailMsg()
3877     if abort_msg:
3878       logging.error("Aborting migration failed on target node %s: %s" %
3879                     (target_node, abort_msg))
3880       # Don't raise an exception here, as we stil have to try to revert the
3881       # disk status, even if this step failed.
3882
3883   def _ExecMigration(self):
3884     """Migrate an instance.
3885
3886     The migrate is done by:
3887       - change the disks into dual-master mode
3888       - wait until disks are fully synchronized again
3889       - migrate the instance
3890       - change disks on the new secondary node (the old primary) to secondary
3891       - wait until disks are fully synchronized
3892       - change disks into single-master mode
3893
3894     """
3895     instance = self.instance
3896     target_node = self.target_node
3897     source_node = self.source_node
3898
3899     self.feedback_fn("* checking disk consistency between source and target")
3900     for dev in instance.disks:
3901       if not _CheckDiskConsistency(self, dev, target_node, False):
3902         raise errors.OpExecError("Disk %s is degraded or not fully"
3903                                  " synchronized on target node,"
3904                                  " aborting migrate." % dev.iv_name)
3905
3906     # First get the migration information from the remote node
3907     result = self.rpc.call_migration_info(source_node, instance)
3908     msg = result.RemoteFailMsg()
3909     if msg:
3910       log_err = ("Failed fetching source migration information from %s: %s" %
3911                  (source_node, msg))
3912       logging.error(log_err)
3913       raise errors.OpExecError(log_err)
3914
3915     self.migration_info = migration_info = result.payload
3916
3917     # Then switch the disks to master/master mode
3918     self._EnsureSecondary(target_node)
3919     self._GoStandalone()
3920     self._GoReconnect(True)
3921     self._WaitUntilSync()
3922
3923     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3924     result = self.rpc.call_accept_instance(target_node,
3925                                            instance,
3926                                            migration_info,
3927                                            self.nodes_ip[target_node])
3928
3929     msg = result.RemoteFailMsg()
3930     if msg:
3931       logging.error("Instance pre-migration failed, trying to revert"
3932                     " disk status: %s", msg)
3933       self._AbortMigration()
3934       self._RevertDiskStatus()
3935       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3936                                (instance.name, msg))
3937
3938     self.feedback_fn("* migrating instance to %s" % target_node)
3939     time.sleep(10)
3940     result = self.rpc.call_instance_migrate(source_node, instance,
3941                                             self.nodes_ip[target_node],
3942                                             self.op.live)
3943     msg = result.RemoteFailMsg()
3944     if msg:
3945       logging.error("Instance migration failed, trying to revert"
3946                     " disk status: %s", msg)
3947       self._AbortMigration()
3948       self._RevertDiskStatus()
3949       raise errors.OpExecError("Could not migrate instance %s: %s" %
3950                                (instance.name, msg))
3951     time.sleep(10)
3952
3953     instance.primary_node = target_node
3954     # distribute new instance config to the other nodes
3955     self.cfg.Update(instance)
3956
3957     result = self.rpc.call_finalize_migration(target_node,
3958                                               instance,
3959                                               migration_info,
3960                                               True)
3961     msg = result.RemoteFailMsg()
3962     if msg:
3963       logging.error("Instance migration succeeded, but finalization failed:"
3964                     " %s" % msg)
3965       raise errors.OpExecError("Could not finalize instance migration: %s" %
3966                                msg)
3967
3968     self._EnsureSecondary(source_node)
3969     self._WaitUntilSync()
3970     self._GoStandalone()
3971     self._GoReconnect(False)
3972     self._WaitUntilSync()
3973
3974     self.feedback_fn("* done")
3975
3976   def Exec(self, feedback_fn):
3977     """Perform the migration.
3978
3979     """
3980     self.feedback_fn = feedback_fn
3981
3982     self.source_node = self.instance.primary_node
3983     self.target_node = self.instance.secondary_nodes[0]
3984     self.all_nodes = [self.source_node, self.target_node]
3985     self.nodes_ip = {
3986       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3987       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3988       }
3989     if self.op.cleanup:
3990       return self._ExecCleanup()
3991     else:
3992       return self._ExecMigration()
3993
3994
3995 def _CreateBlockDev(lu, node, instance, device, force_create,
3996                     info, force_open):
3997   """Create a tree of block devices on a given node.
3998
3999   If this device type has to be created on secondaries, create it and
4000   all its children.
4001
4002   If not, just recurse to children keeping the same 'force' value.
4003
4004   @param lu: the lu on whose behalf we execute
4005   @param node: the node on which to create the device
4006   @type instance: L{objects.Instance}
4007   @param instance: the instance which owns the device
4008   @type device: L{objects.Disk}
4009   @param device: the device to create
4010   @type force_create: boolean
4011   @param force_create: whether to force creation of this device; this
4012       will be change to True whenever we find a device which has
4013       CreateOnSecondary() attribute
4014   @param info: the extra 'metadata' we should attach to the device
4015       (this will be represented as a LVM tag)
4016   @type force_open: boolean
4017   @param force_open: this parameter will be passes to the
4018       L{backend.BlockdevCreate} function where it specifies
4019       whether we run on primary or not, and it affects both
4020       the child assembly and the device own Open() execution
4021
4022   """
4023   if device.CreateOnSecondary():
4024     force_create = True
4025
4026   if device.children:
4027     for child in device.children:
4028       _CreateBlockDev(lu, node, instance, child, force_create,
4029                       info, force_open)
4030
4031   if not force_create:
4032     return
4033
4034   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4035
4036
4037 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4038   """Create a single block device on a given node.
4039
4040   This will not recurse over children of the device, so they must be
4041   created in advance.
4042
4043   @param lu: the lu on whose behalf we execute
4044   @param node: the node on which to create the device
4045   @type instance: L{objects.Instance}
4046   @param instance: the instance which owns the device
4047   @type device: L{objects.Disk}
4048   @param device: the device to create
4049   @param info: the extra 'metadata' we should attach to the device
4050       (this will be represented as a LVM tag)
4051   @type force_open: boolean
4052   @param force_open: this parameter will be passes to the
4053       L{backend.BlockdevCreate} function where it specifies
4054       whether we run on primary or not, and it affects both
4055       the child assembly and the device own Open() execution
4056
4057   """
4058   lu.cfg.SetDiskID(device, node)
4059   result = lu.rpc.call_blockdev_create(node, device, device.size,
4060                                        instance.name, force_open, info)
4061   msg = result.RemoteFailMsg()
4062   if msg:
4063     raise errors.OpExecError("Can't create block device %s on"
4064                              " node %s for instance %s: %s" %
4065                              (device, node, instance.name, msg))
4066   if device.physical_id is None:
4067     device.physical_id = result.payload
4068
4069
4070 def _GenerateUniqueNames(lu, exts):
4071   """Generate a suitable LV name.
4072
4073   This will generate a logical volume name for the given instance.
4074
4075   """
4076   results = []
4077   for val in exts:
4078     new_id = lu.cfg.GenerateUniqueID()
4079     results.append("%s%s" % (new_id, val))
4080   return results
4081
4082
4083 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4084                          p_minor, s_minor):
4085   """Generate a drbd8 device complete with its children.
4086
4087   """
4088   port = lu.cfg.AllocatePort()
4089   vgname = lu.cfg.GetVGName()
4090   shared_secret = lu.cfg.GenerateDRBDSecret()
4091   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4092                           logical_id=(vgname, names[0]))
4093   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4094                           logical_id=(vgname, names[1]))
4095   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4096                           logical_id=(primary, secondary, port,
4097                                       p_minor, s_minor,
4098                                       shared_secret),
4099                           children=[dev_data, dev_meta],
4100                           iv_name=iv_name)
4101   return drbd_dev
4102
4103
4104 def _GenerateDiskTemplate(lu, template_name,
4105                           instance_name, primary_node,
4106                           secondary_nodes, disk_info,
4107                           file_storage_dir, file_driver,
4108                           base_index):
4109   """Generate the entire disk layout for a given template type.
4110
4111   """
4112   #TODO: compute space requirements
4113
4114   vgname = lu.cfg.GetVGName()
4115   disk_count = len(disk_info)
4116   disks = []
4117   if template_name == constants.DT_DISKLESS:
4118     pass
4119   elif template_name == constants.DT_PLAIN:
4120     if len(secondary_nodes) != 0:
4121       raise errors.ProgrammerError("Wrong template configuration")
4122
4123     names = _GenerateUniqueNames(lu, [".disk%d" % i
4124                                       for i in range(disk_count)])
4125     for idx, disk in enumerate(disk_info):
4126       disk_index = idx + base_index
4127       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4128                               logical_id=(vgname, names[idx]),
4129                               iv_name="disk/%d" % disk_index,
4130                               mode=disk["mode"])
4131       disks.append(disk_dev)
4132   elif template_name == constants.DT_DRBD8:
4133     if len(secondary_nodes) != 1:
4134       raise errors.ProgrammerError("Wrong template configuration")
4135     remote_node = secondary_nodes[0]
4136     minors = lu.cfg.AllocateDRBDMinor(
4137       [primary_node, remote_node] * len(disk_info), instance_name)
4138
4139     names = []
4140     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4141                                                for i in range(disk_count)]):
4142       names.append(lv_prefix + "_data")
4143       names.append(lv_prefix + "_meta")
4144     for idx, disk in enumerate(disk_info):
4145       disk_index = idx + base_index
4146       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4147                                       disk["size"], names[idx*2:idx*2+2],
4148                                       "disk/%d" % disk_index,
4149                                       minors[idx*2], minors[idx*2+1])
4150       disk_dev.mode = disk["mode"]
4151       disks.append(disk_dev)
4152   elif template_name == constants.DT_FILE:
4153     if len(secondary_nodes) != 0:
4154       raise errors.ProgrammerError("Wrong template configuration")
4155
4156     for idx, disk in enumerate(disk_info):
4157       disk_index = idx + base_index
4158       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4159                               iv_name="disk/%d" % disk_index,
4160                               logical_id=(file_driver,
4161                                           "%s/disk%d" % (file_storage_dir,
4162                                                          disk_index)),
4163                               mode=disk["mode"])
4164       disks.append(disk_dev)
4165   else:
4166     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4167   return disks
4168
4169
4170 def _GetInstanceInfoText(instance):
4171   """Compute that text that should be added to the disk's metadata.
4172
4173   """
4174   return "originstname+%s" % instance.name
4175
4176
4177 def _CreateDisks(lu, instance):
4178   """Create all disks for an instance.
4179
4180   This abstracts away some work from AddInstance.
4181
4182   @type lu: L{LogicalUnit}
4183   @param lu: the logical unit on whose behalf we execute
4184   @type instance: L{objects.Instance}
4185   @param instance: the instance whose disks we should create
4186   @rtype: boolean
4187   @return: the success of the creation
4188
4189   """
4190   info = _GetInstanceInfoText(instance)
4191   pnode = instance.primary_node
4192
4193   if instance.disk_template == constants.DT_FILE:
4194     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4195     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4196
4197     if result.failed or not result.data:
4198       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4199
4200     if not result.data[0]:
4201       raise errors.OpExecError("Failed to create directory '%s'" %
4202                                file_storage_dir)
4203
4204   # Note: this needs to be kept in sync with adding of disks in
4205   # LUSetInstanceParams
4206   for device in instance.disks:
4207     logging.info("Creating volume %s for instance %s",
4208                  device.iv_name, instance.name)
4209     #HARDCODE
4210     for node in instance.all_nodes:
4211       f_create = node == pnode
4212       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4213
4214
4215 def _RemoveDisks(lu, instance):
4216   """Remove all disks for an instance.
4217
4218   This abstracts away some work from `AddInstance()` and
4219   `RemoveInstance()`. Note that in case some of the devices couldn't
4220   be removed, the removal will continue with the other ones (compare
4221   with `_CreateDisks()`).
4222
4223   @type lu: L{LogicalUnit}
4224   @param lu: the logical unit on whose behalf we execute
4225   @type instance: L{objects.Instance}
4226   @param instance: the instance whose disks we should remove
4227   @rtype: boolean
4228   @return: the success of the removal
4229
4230   """
4231   logging.info("Removing block devices for instance %s", instance.name)
4232
4233   all_result = True
4234   for device in instance.disks:
4235     for node, disk in device.ComputeNodeTree(instance.primary_node):
4236       lu.cfg.SetDiskID(disk, node)
4237       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4238       if msg:
4239         lu.LogWarning("Could not remove block device %s on node %s,"
4240                       " continuing anyway: %s", device.iv_name, node, msg)
4241         all_result = False
4242
4243   if instance.disk_template == constants.DT_FILE:
4244     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4245     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4246                                                  file_storage_dir)
4247     if result.failed or not result.data:
4248       logging.error("Could not remove directory '%s'", file_storage_dir)
4249       all_result = False
4250
4251   return all_result
4252
4253
4254 def _ComputeDiskSize(disk_template, disks):
4255   """Compute disk size requirements in the volume group
4256
4257   """
4258   # Required free disk space as a function of disk and swap space
4259   req_size_dict = {
4260     constants.DT_DISKLESS: None,
4261     constants.DT_PLAIN: sum(d["size"] for d in disks),
4262     # 128 MB are added for drbd metadata for each disk
4263     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4264     constants.DT_FILE: None,
4265   }
4266
4267   if disk_template not in req_size_dict:
4268     raise errors.ProgrammerError("Disk template '%s' size requirement"
4269                                  " is unknown" %  disk_template)
4270
4271   return req_size_dict[disk_template]
4272
4273
4274 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4275   """Hypervisor parameter validation.
4276
4277   This function abstract the hypervisor parameter validation to be
4278   used in both instance create and instance modify.
4279
4280   @type lu: L{LogicalUnit}
4281   @param lu: the logical unit for which we check
4282   @type nodenames: list
4283   @param nodenames: the list of nodes on which we should check
4284   @type hvname: string
4285   @param hvname: the name of the hypervisor we should use
4286   @type hvparams: dict
4287   @param hvparams: the parameters which we need to check
4288   @raise errors.OpPrereqError: if the parameters are not valid
4289
4290   """
4291   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4292                                                   hvname,
4293                                                   hvparams)
4294   for node in nodenames:
4295     info = hvinfo[node]
4296     if info.offline:
4297       continue
4298     msg = info.RemoteFailMsg()
4299     if msg:
4300       raise errors.OpPrereqError("Hypervisor parameter validation"
4301                                  " failed on node %s: %s" % (node, msg))
4302
4303
4304 class LUCreateInstance(LogicalUnit):
4305   """Create an instance.
4306
4307   """
4308   HPATH = "instance-add"
4309   HTYPE = constants.HTYPE_INSTANCE
4310   _OP_REQP = ["instance_name", "disks", "disk_template",
4311               "mode", "start",
4312               "wait_for_sync", "ip_check", "nics",
4313               "hvparams", "beparams"]
4314   REQ_BGL = False
4315
4316   def _ExpandNode(self, node):
4317     """Expands and checks one node name.
4318
4319     """
4320     node_full = self.cfg.ExpandNodeName(node)
4321     if node_full is None:
4322       raise errors.OpPrereqError("Unknown node %s" % node)
4323     return node_full
4324
4325   def ExpandNames(self):
4326     """ExpandNames for CreateInstance.
4327
4328     Figure out the right locks for instance creation.
4329
4330     """
4331     self.needed_locks = {}
4332
4333     # set optional parameters to none if they don't exist
4334     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4335       if not hasattr(self.op, attr):
4336         setattr(self.op, attr, None)
4337
4338     # cheap checks, mostly valid constants given
4339
4340     # verify creation mode
4341     if self.op.mode not in (constants.INSTANCE_CREATE,
4342                             constants.INSTANCE_IMPORT):
4343       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4344                                  self.op.mode)
4345
4346     # disk template and mirror node verification
4347     if self.op.disk_template not in constants.DISK_TEMPLATES:
4348       raise errors.OpPrereqError("Invalid disk template name")
4349
4350     if self.op.hypervisor is None:
4351       self.op.hypervisor = self.cfg.GetHypervisorType()
4352
4353     cluster = self.cfg.GetClusterInfo()
4354     enabled_hvs = cluster.enabled_hypervisors
4355     if self.op.hypervisor not in enabled_hvs:
4356       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4357                                  " cluster (%s)" % (self.op.hypervisor,
4358                                   ",".join(enabled_hvs)))
4359
4360     # check hypervisor parameter syntax (locally)
4361     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4362     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4363                                   self.op.hvparams)
4364     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4365     hv_type.CheckParameterSyntax(filled_hvp)
4366     self.hv_full = filled_hvp
4367
4368     # fill and remember the beparams dict
4369     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4370     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4371                                     self.op.beparams)
4372
4373     #### instance parameters check
4374
4375     # instance name verification
4376     hostname1 = utils.HostInfo(self.op.instance_name)
4377     self.op.instance_name = instance_name = hostname1.name
4378
4379     # this is just a preventive check, but someone might still add this
4380     # instance in the meantime, and creation will fail at lock-add time
4381     if instance_name in self.cfg.GetInstanceList():
4382       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4383                                  instance_name)
4384
4385     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4386
4387     # NIC buildup
4388     self.nics = []
4389     for nic in self.op.nics:
4390       # ip validity checks
4391       ip = nic.get("ip", None)
4392       if ip is None or ip.lower() == "none":
4393         nic_ip = None
4394       elif ip.lower() == constants.VALUE_AUTO:
4395         nic_ip = hostname1.ip
4396       else:
4397         if not utils.IsValidIP(ip):
4398           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4399                                      " like a valid IP" % ip)
4400         nic_ip = ip
4401
4402       # MAC address verification
4403       mac = nic.get("mac", constants.VALUE_AUTO)
4404       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4405         if not utils.IsValidMac(mac.lower()):
4406           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4407                                      mac)
4408       # bridge verification
4409       bridge = nic.get("bridge", None)
4410       if bridge is None:
4411         bridge = self.cfg.GetDefBridge()
4412       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4413
4414     # disk checks/pre-build
4415     self.disks = []
4416     for disk in self.op.disks:
4417       mode = disk.get("mode", constants.DISK_RDWR)
4418       if mode not in constants.DISK_ACCESS_SET:
4419         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4420                                    mode)
4421       size = disk.get("size", None)
4422       if size is None:
4423         raise errors.OpPrereqError("Missing disk size")
4424       try:
4425         size = int(size)
4426       except ValueError:
4427         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4428       self.disks.append({"size": size, "mode": mode})
4429
4430     # used in CheckPrereq for ip ping check
4431     self.check_ip = hostname1.ip
4432
4433     # file storage checks
4434     if (self.op.file_driver and
4435         not self.op.file_driver in constants.FILE_DRIVER):
4436       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4437                                  self.op.file_driver)
4438
4439     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4440       raise errors.OpPrereqError("File storage directory path not absolute")
4441
4442     ### Node/iallocator related checks
4443     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4444       raise errors.OpPrereqError("One and only one of iallocator and primary"
4445                                  " node must be given")
4446
4447     if self.op.iallocator:
4448       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4449     else:
4450       self.op.pnode = self._ExpandNode(self.op.pnode)
4451       nodelist = [self.op.pnode]
4452       if self.op.snode is not None:
4453         self.op.snode = self._ExpandNode(self.op.snode)
4454         nodelist.append(self.op.snode)
4455       self.needed_locks[locking.LEVEL_NODE] = nodelist
4456
4457     # in case of import lock the source node too
4458     if self.op.mode == constants.INSTANCE_IMPORT:
4459       src_node = getattr(self.op, "src_node", None)
4460       src_path = getattr(self.op, "src_path", None)
4461
4462       if src_path is None:
4463         self.op.src_path = src_path = self.op.instance_name
4464
4465       if src_node is None:
4466         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4467         self.op.src_node = None
4468         if os.path.isabs(src_path):
4469           raise errors.OpPrereqError("Importing an instance from an absolute"
4470                                      " path requires a source node option.")
4471       else:
4472         self.op.src_node = src_node = self._ExpandNode(src_node)
4473         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4474           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4475         if not os.path.isabs(src_path):
4476           self.op.src_path = src_path = \
4477             os.path.join(constants.EXPORT_DIR, src_path)
4478
4479     else: # INSTANCE_CREATE
4480       if getattr(self.op, "os_type", None) is None:
4481         raise errors.OpPrereqError("No guest OS specified")
4482
4483   def _RunAllocator(self):
4484     """Run the allocator based on input opcode.
4485
4486     """
4487     nics = [n.ToDict() for n in self.nics]
4488     ial = IAllocator(self,
4489                      mode=constants.IALLOCATOR_MODE_ALLOC,
4490                      name=self.op.instance_name,
4491                      disk_template=self.op.disk_template,
4492                      tags=[],
4493                      os=self.op.os_type,
4494                      vcpus=self.be_full[constants.BE_VCPUS],
4495                      mem_size=self.be_full[constants.BE_MEMORY],
4496                      disks=self.disks,
4497                      nics=nics,
4498                      hypervisor=self.op.hypervisor,
4499                      )
4500
4501     ial.Run(self.op.iallocator)
4502
4503     if not ial.success:
4504       raise errors.OpPrereqError("Can't compute nodes using"
4505                                  " iallocator '%s': %s" % (self.op.iallocator,
4506                                                            ial.info))
4507     if len(ial.nodes) != ial.required_nodes:
4508       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4509                                  " of nodes (%s), required %s" %
4510                                  (self.op.iallocator, len(ial.nodes),
4511                                   ial.required_nodes))
4512     self.op.pnode = ial.nodes[0]
4513     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4514                  self.op.instance_name, self.op.iallocator,
4515                  ", ".join(ial.nodes))
4516     if ial.required_nodes == 2:
4517       self.op.snode = ial.nodes[1]
4518
4519   def BuildHooksEnv(self):
4520     """Build hooks env.
4521
4522     This runs on master, primary and secondary nodes of the instance.
4523
4524     """
4525     env = {
4526       "ADD_MODE": self.op.mode,
4527       }
4528     if self.op.mode == constants.INSTANCE_IMPORT:
4529       env["SRC_NODE"] = self.op.src_node
4530       env["SRC_PATH"] = self.op.src_path
4531       env["SRC_IMAGES"] = self.src_images
4532
4533     env.update(_BuildInstanceHookEnv(
4534       name=self.op.instance_name,
4535       primary_node=self.op.pnode,
4536       secondary_nodes=self.secondaries,
4537       status=self.op.start,
4538       os_type=self.op.os_type,
4539       memory=self.be_full[constants.BE_MEMORY],
4540       vcpus=self.be_full[constants.BE_VCPUS],
4541       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4542       disk_template=self.op.disk_template,
4543       disks=[(d["size"], d["mode"]) for d in self.disks],
4544       bep=self.be_full,
4545       hvp=self.hv_full,
4546       hypervisor=self.op.hypervisor,
4547     ))
4548
4549     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4550           self.secondaries)
4551     return env, nl, nl
4552
4553
4554   def CheckPrereq(self):
4555     """Check prerequisites.
4556
4557     """
4558     if (not self.cfg.GetVGName() and
4559         self.op.disk_template not in constants.DTS_NOT_LVM):
4560       raise errors.OpPrereqError("Cluster does not support lvm-based"
4561                                  " instances")
4562
4563     if self.op.mode == constants.INSTANCE_IMPORT:
4564       src_node = self.op.src_node
4565       src_path = self.op.src_path
4566
4567       if src_node is None:
4568         exp_list = self.rpc.call_export_list(
4569           self.acquired_locks[locking.LEVEL_NODE])
4570         found = False
4571         for node in exp_list:
4572           if not exp_list[node].failed and src_path in exp_list[node].data:
4573             found = True
4574             self.op.src_node = src_node = node
4575             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4576                                                        src_path)
4577             break
4578         if not found:
4579           raise errors.OpPrereqError("No export found for relative path %s" %
4580                                       src_path)
4581
4582       _CheckNodeOnline(self, src_node)
4583       result = self.rpc.call_export_info(src_node, src_path)
4584       result.Raise()
4585       if not result.data:
4586         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4587
4588       export_info = result.data
4589       if not export_info.has_section(constants.INISECT_EXP):
4590         raise errors.ProgrammerError("Corrupted export config")
4591
4592       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4593       if (int(ei_version) != constants.EXPORT_VERSION):
4594         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4595                                    (ei_version, constants.EXPORT_VERSION))
4596
4597       # Check that the new instance doesn't have less disks than the export
4598       instance_disks = len(self.disks)
4599       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4600       if instance_disks < export_disks:
4601         raise errors.OpPrereqError("Not enough disks to import."
4602                                    " (instance: %d, export: %d)" %
4603                                    (instance_disks, export_disks))
4604
4605       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4606       disk_images = []
4607       for idx in range(export_disks):
4608         option = 'disk%d_dump' % idx
4609         if export_info.has_option(constants.INISECT_INS, option):
4610           # FIXME: are the old os-es, disk sizes, etc. useful?
4611           export_name = export_info.get(constants.INISECT_INS, option)
4612           image = os.path.join(src_path, export_name)
4613           disk_images.append(image)
4614         else:
4615           disk_images.append(False)
4616
4617       self.src_images = disk_images
4618
4619       old_name = export_info.get(constants.INISECT_INS, 'name')
4620       # FIXME: int() here could throw a ValueError on broken exports
4621       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4622       if self.op.instance_name == old_name:
4623         for idx, nic in enumerate(self.nics):
4624           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4625             nic_mac_ini = 'nic%d_mac' % idx
4626             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4627
4628     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4629     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4630     if self.op.start and not self.op.ip_check:
4631       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4632                                  " adding an instance in start mode")
4633
4634     if self.op.ip_check:
4635       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4636         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4637                                    (self.check_ip, self.op.instance_name))
4638
4639     #### mac address generation
4640     # By generating here the mac address both the allocator and the hooks get
4641     # the real final mac address rather than the 'auto' or 'generate' value.
4642     # There is a race condition between the generation and the instance object
4643     # creation, which means that we know the mac is valid now, but we're not
4644     # sure it will be when we actually add the instance. If things go bad
4645     # adding the instance will abort because of a duplicate mac, and the
4646     # creation job will fail.
4647     for nic in self.nics:
4648       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4649         nic.mac = self.cfg.GenerateMAC()
4650
4651     #### allocator run
4652
4653     if self.op.iallocator is not None:
4654       self._RunAllocator()
4655
4656     #### node related checks
4657
4658     # check primary node
4659     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4660     assert self.pnode is not None, \
4661       "Cannot retrieve locked node %s" % self.op.pnode
4662     if pnode.offline:
4663       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4664                                  pnode.name)
4665     if pnode.drained:
4666       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4667                                  pnode.name)
4668
4669     self.secondaries = []
4670
4671     # mirror node verification
4672     if self.op.disk_template in constants.DTS_NET_MIRROR:
4673       if self.op.snode is None:
4674         raise errors.OpPrereqError("The networked disk templates need"
4675                                    " a mirror node")
4676       if self.op.snode == pnode.name:
4677         raise errors.OpPrereqError("The secondary node cannot be"
4678                                    " the primary node.")
4679       _CheckNodeOnline(self, self.op.snode)
4680       _CheckNodeNotDrained(self, self.op.snode)
4681       self.secondaries.append(self.op.snode)
4682
4683     nodenames = [pnode.name] + self.secondaries
4684
4685     req_size = _ComputeDiskSize(self.op.disk_template,
4686                                 self.disks)
4687
4688     # Check lv size requirements
4689     if req_size is not None:
4690       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4691                                          self.op.hypervisor)
4692       for node in nodenames:
4693         info = nodeinfo[node]
4694         info.Raise()
4695         info = info.data
4696         if not info:
4697           raise errors.OpPrereqError("Cannot get current information"
4698                                      " from node '%s'" % node)
4699         vg_free = info.get('vg_free', None)
4700         if not isinstance(vg_free, int):
4701           raise errors.OpPrereqError("Can't compute free disk space on"
4702                                      " node %s" % node)
4703         if req_size > info['vg_free']:
4704           raise errors.OpPrereqError("Not enough disk space on target node %s."
4705                                      " %d MB available, %d MB required" %
4706                                      (node, info['vg_free'], req_size))
4707
4708     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4709
4710     # os verification
4711     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4712     result.Raise()
4713     if not isinstance(result.data, objects.OS):
4714       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4715                                  " primary node"  % self.op.os_type)
4716
4717     # bridge check on primary node
4718     bridges = [n.bridge for n in self.nics]
4719     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4720     result.Raise()
4721     if not result.data:
4722       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4723                                  " exist on destination node '%s'" %
4724                                  (",".join(bridges), pnode.name))
4725
4726     # memory check on primary node
4727     if self.op.start:
4728       _CheckNodeFreeMemory(self, self.pnode.name,
4729                            "creating instance %s" % self.op.instance_name,
4730                            self.be_full[constants.BE_MEMORY],
4731                            self.op.hypervisor)
4732
4733   def Exec(self, feedback_fn):
4734     """Create and add the instance to the cluster.
4735
4736     """
4737     instance = self.op.instance_name
4738     pnode_name = self.pnode.name
4739
4740     ht_kind = self.op.hypervisor
4741     if ht_kind in constants.HTS_REQ_PORT:
4742       network_port = self.cfg.AllocatePort()
4743     else:
4744       network_port = None
4745
4746     ##if self.op.vnc_bind_address is None:
4747     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4748
4749     # this is needed because os.path.join does not accept None arguments
4750     if self.op.file_storage_dir is None:
4751       string_file_storage_dir = ""
4752     else:
4753       string_file_storage_dir = self.op.file_storage_dir
4754
4755     # build the full file storage dir path
4756     file_storage_dir = os.path.normpath(os.path.join(
4757                                         self.cfg.GetFileStorageDir(),
4758                                         string_file_storage_dir, instance))
4759
4760
4761     disks = _GenerateDiskTemplate(self,
4762                                   self.op.disk_template,
4763                                   instance, pnode_name,
4764                                   self.secondaries,
4765                                   self.disks,
4766                                   file_storage_dir,
4767                                   self.op.file_driver,
4768                                   0)
4769
4770     iobj = objects.Instance(name=instance, os=self.op.os_type,
4771                             primary_node=pnode_name,
4772                             nics=self.nics, disks=disks,
4773                             disk_template=self.op.disk_template,
4774                             admin_up=False,
4775                             network_port=network_port,
4776                             beparams=self.op.beparams,
4777                             hvparams=self.op.hvparams,
4778                             hypervisor=self.op.hypervisor,
4779                             )
4780
4781     feedback_fn("* creating instance disks...")
4782     try:
4783       _CreateDisks(self, iobj)
4784     except errors.OpExecError:
4785       self.LogWarning("Device creation failed, reverting...")
4786       try:
4787         _RemoveDisks(self, iobj)
4788       finally:
4789         self.cfg.ReleaseDRBDMinors(instance)
4790         raise
4791
4792     feedback_fn("adding instance %s to cluster config" % instance)
4793
4794     self.cfg.AddInstance(iobj)
4795     # Declare that we don't want to remove the instance lock anymore, as we've
4796     # added the instance to the config
4797     del self.remove_locks[locking.LEVEL_INSTANCE]
4798     # Unlock all the nodes
4799     if self.op.mode == constants.INSTANCE_IMPORT:
4800       nodes_keep = [self.op.src_node]
4801       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4802                        if node != self.op.src_node]
4803       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4804       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4805     else:
4806       self.context.glm.release(locking.LEVEL_NODE)
4807       del self.acquired_locks[locking.LEVEL_NODE]
4808
4809     if self.op.wait_for_sync:
4810       disk_abort = not _WaitForSync(self, iobj)
4811     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4812       # make sure the disks are not degraded (still sync-ing is ok)
4813       time.sleep(15)
4814       feedback_fn("* checking mirrors status")
4815       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4816     else:
4817       disk_abort = False
4818
4819     if disk_abort:
4820       _RemoveDisks(self, iobj)
4821       self.cfg.RemoveInstance(iobj.name)
4822       # Make sure the instance lock gets removed
4823       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4824       raise errors.OpExecError("There are some degraded disks for"
4825                                " this instance")
4826
4827     feedback_fn("creating os for instance %s on node %s" %
4828                 (instance, pnode_name))
4829
4830     if iobj.disk_template != constants.DT_DISKLESS:
4831       if self.op.mode == constants.INSTANCE_CREATE:
4832         feedback_fn("* running the instance OS create scripts...")
4833         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4834         msg = result.RemoteFailMsg()
4835         if msg:
4836           raise errors.OpExecError("Could not add os for instance %s"
4837                                    " on node %s: %s" %
4838                                    (instance, pnode_name, msg))
4839
4840       elif self.op.mode == constants.INSTANCE_IMPORT:
4841         feedback_fn("* running the instance OS import scripts...")
4842         src_node = self.op.src_node
4843         src_images = self.src_images
4844         cluster_name = self.cfg.GetClusterName()
4845         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4846                                                          src_node, src_images,
4847                                                          cluster_name)
4848         import_result.Raise()
4849         for idx, result in enumerate(import_result.data):
4850           if not result:
4851             self.LogWarning("Could not import the image %s for instance"
4852                             " %s, disk %d, on node %s" %
4853                             (src_images[idx], instance, idx, pnode_name))
4854       else:
4855         # also checked in the prereq part
4856         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4857                                      % self.op.mode)
4858
4859     if self.op.start:
4860       iobj.admin_up = True
4861       self.cfg.Update(iobj)
4862       logging.info("Starting instance %s on node %s", instance, pnode_name)
4863       feedback_fn("* starting instance...")
4864       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4865       msg = result.RemoteFailMsg()
4866       if msg:
4867         raise errors.OpExecError("Could not start instance: %s" % msg)
4868
4869
4870 class LUConnectConsole(NoHooksLU):
4871   """Connect to an instance's console.
4872
4873   This is somewhat special in that it returns the command line that
4874   you need to run on the master node in order to connect to the
4875   console.
4876
4877   """
4878   _OP_REQP = ["instance_name"]
4879   REQ_BGL = False
4880
4881   def ExpandNames(self):
4882     self._ExpandAndLockInstance()
4883
4884   def CheckPrereq(self):
4885     """Check prerequisites.
4886
4887     This checks that the instance is in the cluster.
4888
4889     """
4890     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4891     assert self.instance is not None, \
4892       "Cannot retrieve locked instance %s" % self.op.instance_name
4893     _CheckNodeOnline(self, self.instance.primary_node)
4894
4895   def Exec(self, feedback_fn):
4896     """Connect to the console of an instance
4897
4898     """
4899     instance = self.instance
4900     node = instance.primary_node
4901
4902     node_insts = self.rpc.call_instance_list([node],
4903                                              [instance.hypervisor])[node]
4904     node_insts.Raise()
4905
4906     if instance.name not in node_insts.data:
4907       raise errors.OpExecError("Instance %s is not running." % instance.name)
4908
4909     logging.debug("Connecting to console of %s on %s", instance.name, node)
4910
4911     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4912     cluster = self.cfg.GetClusterInfo()
4913     # beparams and hvparams are passed separately, to avoid editing the
4914     # instance and then saving the defaults in the instance itself.
4915     hvparams = cluster.FillHV(instance)
4916     beparams = cluster.FillBE(instance)
4917     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4918
4919     # build ssh cmdline
4920     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4921
4922
4923 class LUReplaceDisks(LogicalUnit):
4924   """Replace the disks of an instance.
4925
4926   """
4927   HPATH = "mirrors-replace"
4928   HTYPE = constants.HTYPE_INSTANCE
4929   _OP_REQP = ["instance_name", "mode", "disks"]
4930   REQ_BGL = False
4931
4932   def CheckArguments(self):
4933     if not hasattr(self.op, "remote_node"):
4934       self.op.remote_node = None
4935     if not hasattr(self.op, "iallocator"):
4936       self.op.iallocator = None
4937
4938     # check for valid parameter combination
4939     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4940     if self.op.mode == constants.REPLACE_DISK_CHG:
4941       if cnt == 2:
4942         raise errors.OpPrereqError("When changing the secondary either an"
4943                                    " iallocator script must be used or the"
4944                                    " new node given")
4945       elif cnt == 0:
4946         raise errors.OpPrereqError("Give either the iallocator or the new"
4947                                    " secondary, not both")
4948     else: # not replacing the secondary
4949       if cnt != 2:
4950         raise errors.OpPrereqError("The iallocator and new node options can"
4951                                    " be used only when changing the"
4952                                    " secondary node")
4953
4954   def ExpandNames(self):
4955     self._ExpandAndLockInstance()
4956
4957     if self.op.iallocator is not None:
4958       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4959     elif self.op.remote_node is not None:
4960       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4961       if remote_node is None:
4962         raise errors.OpPrereqError("Node '%s' not known" %
4963                                    self.op.remote_node)
4964       self.op.remote_node = remote_node
4965       # Warning: do not remove the locking of the new secondary here
4966       # unless DRBD8.AddChildren is changed to work in parallel;
4967       # currently it doesn't since parallel invocations of
4968       # FindUnusedMinor will conflict
4969       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4970       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4971     else:
4972       self.needed_locks[locking.LEVEL_NODE] = []
4973       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4974
4975   def DeclareLocks(self, level):
4976     # If we're not already locking all nodes in the set we have to declare the
4977     # instance's primary/secondary nodes.
4978     if (level == locking.LEVEL_NODE and
4979         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4980       self._LockInstancesNodes()
4981
4982   def _RunAllocator(self):
4983     """Compute a new secondary node using an IAllocator.
4984
4985     """
4986     ial = IAllocator(self,
4987                      mode=constants.IALLOCATOR_MODE_RELOC,
4988                      name=self.op.instance_name,
4989                      relocate_from=[self.sec_node])
4990
4991     ial.Run(self.op.iallocator)
4992
4993     if not ial.success:
4994       raise errors.OpPrereqError("Can't compute nodes using"
4995                                  " iallocator '%s': %s" % (self.op.iallocator,
4996                                                            ial.info))
4997     if len(ial.nodes) != ial.required_nodes:
4998       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4999                                  " of nodes (%s), required %s" %
5000                                  (len(ial.nodes), ial.required_nodes))
5001     self.op.remote_node = ial.nodes[0]
5002     self.LogInfo("Selected new secondary for the instance: %s",
5003                  self.op.remote_node)
5004
5005   def BuildHooksEnv(self):
5006     """Build hooks env.
5007
5008     This runs on the master, the primary and all the secondaries.
5009
5010     """
5011     env = {
5012       "MODE": self.op.mode,
5013       "NEW_SECONDARY": self.op.remote_node,
5014       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5015       }
5016     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5017     nl = [
5018       self.cfg.GetMasterNode(),
5019       self.instance.primary_node,
5020       ]
5021     if self.op.remote_node is not None:
5022       nl.append(self.op.remote_node)
5023     return env, nl, nl
5024
5025   def CheckPrereq(self):
5026     """Check prerequisites.
5027
5028     This checks that the instance is in the cluster.
5029
5030     """
5031     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5032     assert instance is not None, \
5033       "Cannot retrieve locked instance %s" % self.op.instance_name
5034     self.instance = instance
5035
5036     if instance.disk_template != constants.DT_DRBD8:
5037       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5038                                  " instances")
5039
5040     if len(instance.secondary_nodes) != 1:
5041       raise errors.OpPrereqError("The instance has a strange layout,"
5042                                  " expected one secondary but found %d" %
5043                                  len(instance.secondary_nodes))
5044
5045     self.sec_node = instance.secondary_nodes[0]
5046
5047     if self.op.iallocator is not None:
5048       self._RunAllocator()
5049
5050     remote_node = self.op.remote_node
5051     if remote_node is not None:
5052       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5053       assert self.remote_node_info is not None, \
5054         "Cannot retrieve locked node %s" % remote_node
5055     else:
5056       self.remote_node_info = None
5057     if remote_node == instance.primary_node:
5058       raise errors.OpPrereqError("The specified node is the primary node of"
5059                                  " the instance.")
5060     elif remote_node == self.sec_node:
5061       raise errors.OpPrereqError("The specified node is already the"
5062                                  " secondary node of the instance.")
5063
5064     if self.op.mode == constants.REPLACE_DISK_PRI:
5065       n1 = self.tgt_node = instance.primary_node
5066       n2 = self.oth_node = self.sec_node
5067     elif self.op.mode == constants.REPLACE_DISK_SEC:
5068       n1 = self.tgt_node = self.sec_node
5069       n2 = self.oth_node = instance.primary_node
5070     elif self.op.mode == constants.REPLACE_DISK_CHG:
5071       n1 = self.new_node = remote_node
5072       n2 = self.oth_node = instance.primary_node
5073       self.tgt_node = self.sec_node
5074       _CheckNodeNotDrained(self, remote_node)
5075     else:
5076       raise errors.ProgrammerError("Unhandled disk replace mode")
5077
5078     _CheckNodeOnline(self, n1)
5079     _CheckNodeOnline(self, n2)
5080
5081     if not self.op.disks:
5082       self.op.disks = range(len(instance.disks))
5083
5084     for disk_idx in self.op.disks:
5085       instance.FindDisk(disk_idx)
5086
5087   def _ExecD8DiskOnly(self, feedback_fn):
5088     """Replace a disk on the primary or secondary for dbrd8.
5089
5090     The algorithm for replace is quite complicated:
5091
5092       1. for each disk to be replaced:
5093
5094         1. create new LVs on the target node with unique names
5095         1. detach old LVs from the drbd device
5096         1. rename old LVs to name_replaced.<time_t>
5097         1. rename new LVs to old LVs
5098         1. attach the new LVs (with the old names now) to the drbd device
5099
5100       1. wait for sync across all devices
5101
5102       1. for each modified disk:
5103
5104         1. remove old LVs (which have the name name_replaces.<time_t>)
5105
5106     Failures are not very well handled.
5107
5108     """
5109     steps_total = 6
5110     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5111     instance = self.instance
5112     iv_names = {}
5113     vgname = self.cfg.GetVGName()
5114     # start of work
5115     cfg = self.cfg
5116     tgt_node = self.tgt_node
5117     oth_node = self.oth_node
5118
5119     # Step: check device activation
5120     self.proc.LogStep(1, steps_total, "check device existence")
5121     info("checking volume groups")
5122     my_vg = cfg.GetVGName()
5123     results = self.rpc.call_vg_list([oth_node, tgt_node])
5124     if not results:
5125       raise errors.OpExecError("Can't list volume groups on the nodes")
5126     for node in oth_node, tgt_node:
5127       res = results[node]
5128       if res.failed or not res.data or my_vg not in res.data:
5129         raise errors.OpExecError("Volume group '%s' not found on %s" %
5130                                  (my_vg, node))
5131     for idx, dev in enumerate(instance.disks):
5132       if idx not in self.op.disks:
5133         continue
5134       for node in tgt_node, oth_node:
5135         info("checking disk/%d on %s" % (idx, node))
5136         cfg.SetDiskID(dev, node)
5137         result = self.rpc.call_blockdev_find(node, dev)
5138         msg = result.RemoteFailMsg()
5139         if not msg and not result.payload:
5140           msg = "disk not found"
5141         if msg:
5142           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5143                                    (idx, node, msg))
5144
5145     # Step: check other node consistency
5146     self.proc.LogStep(2, steps_total, "check peer consistency")
5147     for idx, dev in enumerate(instance.disks):
5148       if idx not in self.op.disks:
5149         continue
5150       info("checking disk/%d consistency on %s" % (idx, oth_node))
5151       if not _CheckDiskConsistency(self, dev, oth_node,
5152                                    oth_node==instance.primary_node):
5153         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5154                                  " to replace disks on this node (%s)" %
5155                                  (oth_node, tgt_node))
5156
5157     # Step: create new storage
5158     self.proc.LogStep(3, steps_total, "allocate new storage")
5159     for idx, dev in enumerate(instance.disks):
5160       if idx not in self.op.disks:
5161         continue
5162       size = dev.size
5163       cfg.SetDiskID(dev, tgt_node)
5164       lv_names = [".disk%d_%s" % (idx, suf)
5165                   for suf in ["data", "meta"]]
5166       names = _GenerateUniqueNames(self, lv_names)
5167       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5168                              logical_id=(vgname, names[0]))
5169       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5170                              logical_id=(vgname, names[1]))
5171       new_lvs = [lv_data, lv_meta]
5172       old_lvs = dev.children
5173       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5174       info("creating new local storage on %s for %s" %
5175            (tgt_node, dev.iv_name))
5176       # we pass force_create=True to force the LVM creation
5177       for new_lv in new_lvs:
5178         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5179                         _GetInstanceInfoText(instance), False)
5180
5181     # Step: for each lv, detach+rename*2+attach
5182     self.proc.LogStep(4, steps_total, "change drbd configuration")
5183     for dev, old_lvs, new_lvs in iv_names.itervalues():
5184       info("detaching %s drbd from local storage" % dev.iv_name)
5185       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5186       result.Raise()
5187       if not result.data:
5188         raise errors.OpExecError("Can't detach drbd from local storage on node"
5189                                  " %s for device %s" % (tgt_node, dev.iv_name))
5190       #dev.children = []
5191       #cfg.Update(instance)
5192
5193       # ok, we created the new LVs, so now we know we have the needed
5194       # storage; as such, we proceed on the target node to rename
5195       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5196       # using the assumption that logical_id == physical_id (which in
5197       # turn is the unique_id on that node)
5198
5199       # FIXME(iustin): use a better name for the replaced LVs
5200       temp_suffix = int(time.time())
5201       ren_fn = lambda d, suff: (d.physical_id[0],
5202                                 d.physical_id[1] + "_replaced-%s" % suff)
5203       # build the rename list based on what LVs exist on the node
5204       rlist = []
5205       for to_ren in old_lvs:
5206         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5207         if not result.RemoteFailMsg() and result.payload:
5208           # device exists
5209           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5210
5211       info("renaming the old LVs on the target node")
5212       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5213       result.Raise()
5214       if not result.data:
5215         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5216       # now we rename the new LVs to the old LVs
5217       info("renaming the new LVs on the target node")
5218       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5219       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5220       result.Raise()
5221       if not result.data:
5222         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5223
5224       for old, new in zip(old_lvs, new_lvs):
5225         new.logical_id = old.logical_id
5226         cfg.SetDiskID(new, tgt_node)
5227
5228       for disk in old_lvs:
5229         disk.logical_id = ren_fn(disk, temp_suffix)
5230         cfg.SetDiskID(disk, tgt_node)
5231
5232       # now that the new lvs have the old name, we can add them to the device
5233       info("adding new mirror component on %s" % tgt_node)
5234       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5235       if result.failed or not result.data:
5236         for new_lv in new_lvs:
5237           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5238           if msg:
5239             warning("Can't rollback device %s: %s", dev, msg,
5240                     hint="cleanup manually the unused logical volumes")
5241         raise errors.OpExecError("Can't add local storage to drbd")
5242
5243       dev.children = new_lvs
5244       cfg.Update(instance)
5245
5246     # Step: wait for sync
5247
5248     # this can fail as the old devices are degraded and _WaitForSync
5249     # does a combined result over all disks, so we don't check its
5250     # return value
5251     self.proc.LogStep(5, steps_total, "sync devices")
5252     _WaitForSync(self, instance, unlock=True)
5253
5254     # so check manually all the devices
5255     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5256       cfg.SetDiskID(dev, instance.primary_node)
5257       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5258       msg = result.RemoteFailMsg()
5259       if not msg and not result.payload:
5260         msg = "disk not found"
5261       if msg:
5262         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5263                                  (name, msg))
5264       if result.payload[5]:
5265         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5266
5267     # Step: remove old storage
5268     self.proc.LogStep(6, steps_total, "removing old storage")
5269     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5270       info("remove logical volumes for %s" % name)
5271       for lv in old_lvs:
5272         cfg.SetDiskID(lv, tgt_node)
5273         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5274         if msg:
5275           warning("Can't remove old LV: %s" % msg,
5276                   hint="manually remove unused LVs")
5277           continue
5278
5279   def _ExecD8Secondary(self, feedback_fn):
5280     """Replace the secondary node for drbd8.
5281
5282     The algorithm for replace is quite complicated:
5283       - for all disks of the instance:
5284         - create new LVs on the new node with same names
5285         - shutdown the drbd device on the old secondary
5286         - disconnect the drbd network on the primary
5287         - create the drbd device on the new secondary
5288         - network attach the drbd on the primary, using an artifice:
5289           the drbd code for Attach() will connect to the network if it
5290           finds a device which is connected to the good local disks but
5291           not network enabled
5292       - wait for sync across all devices
5293       - remove all disks from the old secondary
5294
5295     Failures are not very well handled.
5296
5297     """
5298     steps_total = 6
5299     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5300     instance = self.instance
5301     iv_names = {}
5302     # start of work
5303     cfg = self.cfg
5304     old_node = self.tgt_node
5305     new_node = self.new_node
5306     pri_node = instance.primary_node
5307     nodes_ip = {
5308       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5309       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5310       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5311       }
5312
5313     # Step: check device activation
5314     self.proc.LogStep(1, steps_total, "check device existence")
5315     info("checking volume groups")
5316     my_vg = cfg.GetVGName()
5317     results = self.rpc.call_vg_list([pri_node, new_node])
5318     for node in pri_node, new_node:
5319       res = results[node]
5320       if res.failed or not res.data or my_vg not in res.data:
5321         raise errors.OpExecError("Volume group '%s' not found on %s" %
5322                                  (my_vg, node))
5323     for idx, dev in enumerate(instance.disks):
5324       if idx not in self.op.disks:
5325         continue
5326       info("checking disk/%d on %s" % (idx, pri_node))
5327       cfg.SetDiskID(dev, pri_node)
5328       result = self.rpc.call_blockdev_find(pri_node, dev)
5329       msg = result.RemoteFailMsg()
5330       if not msg and not result.payload:
5331         msg = "disk not found"
5332       if msg:
5333         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5334                                  (idx, pri_node, msg))
5335
5336     # Step: check other node consistency
5337     self.proc.LogStep(2, steps_total, "check peer consistency")
5338     for idx, dev in enumerate(instance.disks):
5339       if idx not in self.op.disks:
5340         continue
5341       info("checking disk/%d consistency on %s" % (idx, pri_node))
5342       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5343         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5344                                  " unsafe to replace the secondary" %
5345                                  pri_node)
5346
5347     # Step: create new storage
5348     self.proc.LogStep(3, steps_total, "allocate new storage")
5349     for idx, dev in enumerate(instance.disks):
5350       info("adding new local storage on %s for disk/%d" %
5351            (new_node, idx))
5352       # we pass force_create=True to force LVM creation
5353       for new_lv in dev.children:
5354         _CreateBlockDev(self, new_node, instance, new_lv, True,
5355                         _GetInstanceInfoText(instance), False)
5356
5357     # Step 4: dbrd minors and drbd setups changes
5358     # after this, we must manually remove the drbd minors on both the
5359     # error and the success paths
5360     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5361                                    instance.name)
5362     logging.debug("Allocated minors %s" % (minors,))
5363     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5364     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5365       size = dev.size
5366       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5367       # create new devices on new_node; note that we create two IDs:
5368       # one without port, so the drbd will be activated without
5369       # networking information on the new node at this stage, and one
5370       # with network, for the latter activation in step 4
5371       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5372       if pri_node == o_node1:
5373         p_minor = o_minor1
5374       else:
5375         p_minor = o_minor2
5376
5377       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5378       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5379
5380       iv_names[idx] = (dev, dev.children, new_net_id)
5381       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5382                     new_net_id)
5383       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5384                               logical_id=new_alone_id,
5385                               children=dev.children)
5386       try:
5387         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5388                               _GetInstanceInfoText(instance), False)
5389       except errors.GenericError:
5390         self.cfg.ReleaseDRBDMinors(instance.name)
5391         raise
5392
5393     for idx, dev in enumerate(instance.disks):
5394       # we have new devices, shutdown the drbd on the old secondary
5395       info("shutting down drbd for disk/%d on old node" % idx)
5396       cfg.SetDiskID(dev, old_node)
5397       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5398       if msg:
5399         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5400                 (idx, msg),
5401                 hint="Please cleanup this device manually as soon as possible")
5402
5403     info("detaching primary drbds from the network (=> standalone)")
5404     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5405                                                instance.disks)[pri_node]
5406
5407     msg = result.RemoteFailMsg()
5408     if msg:
5409       # detaches didn't succeed (unlikely)
5410       self.cfg.ReleaseDRBDMinors(instance.name)
5411       raise errors.OpExecError("Can't detach the disks from the network on"
5412                                " old node: %s" % (msg,))
5413
5414     # if we managed to detach at least one, we update all the disks of
5415     # the instance to point to the new secondary
5416     info("updating instance configuration")
5417     for dev, _, new_logical_id in iv_names.itervalues():
5418       dev.logical_id = new_logical_id
5419       cfg.SetDiskID(dev, pri_node)
5420     cfg.Update(instance)
5421
5422     # and now perform the drbd attach
5423     info("attaching primary drbds to new secondary (standalone => connected)")
5424     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5425                                            instance.disks, instance.name,
5426                                            False)
5427     for to_node, to_result in result.items():
5428       msg = to_result.RemoteFailMsg()
5429       if msg:
5430         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5431                 hint="please do a gnt-instance info to see the"
5432                 " status of disks")
5433
5434     # this can fail as the old devices are degraded and _WaitForSync
5435     # does a combined result over all disks, so we don't check its
5436     # return value
5437     self.proc.LogStep(5, steps_total, "sync devices")
5438     _WaitForSync(self, instance, unlock=True)
5439
5440     # so check manually all the devices
5441     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5442       cfg.SetDiskID(dev, pri_node)
5443       result = self.rpc.call_blockdev_find(pri_node, dev)
5444       msg = result.RemoteFailMsg()
5445       if not msg and not result.payload:
5446         msg = "disk not found"
5447       if msg:
5448         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5449                                  (idx, msg))
5450       if result.payload[5]:
5451         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5452
5453     self.proc.LogStep(6, steps_total, "removing old storage")
5454     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5455       info("remove logical volumes for disk/%d" % idx)
5456       for lv in old_lvs:
5457         cfg.SetDiskID(lv, old_node)
5458         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5459         if msg:
5460           warning("Can't remove LV on old secondary: %s", msg,
5461                   hint="Cleanup stale volumes by hand")
5462
5463   def Exec(self, feedback_fn):
5464     """Execute disk replacement.
5465
5466     This dispatches the disk replacement to the appropriate handler.
5467
5468     """
5469     instance = self.instance
5470
5471     # Activate the instance disks if we're replacing them on a down instance
5472     if not instance.admin_up:
5473       _StartInstanceDisks(self, instance, True)
5474
5475     if self.op.mode == constants.REPLACE_DISK_CHG:
5476       fn = self._ExecD8Secondary
5477     else:
5478       fn = self._ExecD8DiskOnly
5479
5480     ret = fn(feedback_fn)
5481
5482     # Deactivate the instance disks if we're replacing them on a down instance
5483     if not instance.admin_up:
5484       _SafeShutdownInstanceDisks(self, instance)
5485
5486     return ret
5487
5488
5489 class LUGrowDisk(LogicalUnit):
5490   """Grow a disk of an instance.
5491
5492   """
5493   HPATH = "disk-grow"
5494   HTYPE = constants.HTYPE_INSTANCE
5495   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5496   REQ_BGL = False
5497
5498   def ExpandNames(self):
5499     self._ExpandAndLockInstance()
5500     self.needed_locks[locking.LEVEL_NODE] = []
5501     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5502
5503   def DeclareLocks(self, level):
5504     if level == locking.LEVEL_NODE:
5505       self._LockInstancesNodes()
5506
5507   def BuildHooksEnv(self):
5508     """Build hooks env.
5509
5510     This runs on the master, the primary and all the secondaries.
5511
5512     """
5513     env = {
5514       "DISK": self.op.disk,
5515       "AMOUNT": self.op.amount,
5516       }
5517     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5518     nl = [
5519       self.cfg.GetMasterNode(),
5520       self.instance.primary_node,
5521       ]
5522     return env, nl, nl
5523
5524   def CheckPrereq(self):
5525     """Check prerequisites.
5526
5527     This checks that the instance is in the cluster.
5528
5529     """
5530     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5531     assert instance is not None, \
5532       "Cannot retrieve locked instance %s" % self.op.instance_name
5533     nodenames = list(instance.all_nodes)
5534     for node in nodenames:
5535       _CheckNodeOnline(self, node)
5536
5537
5538     self.instance = instance
5539
5540     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5541       raise errors.OpPrereqError("Instance's disk layout does not support"
5542                                  " growing.")
5543
5544     self.disk = instance.FindDisk(self.op.disk)
5545
5546     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5547                                        instance.hypervisor)
5548     for node in nodenames:
5549       info = nodeinfo[node]
5550       if info.failed or not info.data:
5551         raise errors.OpPrereqError("Cannot get current information"
5552                                    " from node '%s'" % node)
5553       vg_free = info.data.get('vg_free', None)
5554       if not isinstance(vg_free, int):
5555         raise errors.OpPrereqError("Can't compute free disk space on"
5556                                    " node %s" % node)
5557       if self.op.amount > vg_free:
5558         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5559                                    " %d MiB available, %d MiB required" %
5560                                    (node, vg_free, self.op.amount))
5561
5562   def Exec(self, feedback_fn):
5563     """Execute disk grow.
5564
5565     """
5566     instance = self.instance
5567     disk = self.disk
5568     for node in instance.all_nodes:
5569       self.cfg.SetDiskID(disk, node)
5570       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5571       msg = result.RemoteFailMsg()
5572       if msg:
5573         raise errors.OpExecError("Grow request failed to node %s: %s" %
5574                                  (node, msg))
5575     disk.RecordGrow(self.op.amount)
5576     self.cfg.Update(instance)
5577     if self.op.wait_for_sync:
5578       disk_abort = not _WaitForSync(self, instance)
5579       if disk_abort:
5580         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5581                              " status.\nPlease check the instance.")
5582
5583
5584 class LUQueryInstanceData(NoHooksLU):
5585   """Query runtime instance data.
5586
5587   """
5588   _OP_REQP = ["instances", "static"]
5589   REQ_BGL = False
5590
5591   def ExpandNames(self):
5592     self.needed_locks = {}
5593     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5594
5595     if not isinstance(self.op.instances, list):
5596       raise errors.OpPrereqError("Invalid argument type 'instances'")
5597
5598     if self.op.instances:
5599       self.wanted_names = []
5600       for name in self.op.instances:
5601         full_name = self.cfg.ExpandInstanceName(name)
5602         if full_name is None:
5603           raise errors.OpPrereqError("Instance '%s' not known" % name)
5604         self.wanted_names.append(full_name)
5605       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5606     else:
5607       self.wanted_names = None
5608       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5609
5610     self.needed_locks[locking.LEVEL_NODE] = []
5611     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5612
5613   def DeclareLocks(self, level):
5614     if level == locking.LEVEL_NODE:
5615       self._LockInstancesNodes()
5616
5617   def CheckPrereq(self):
5618     """Check prerequisites.
5619
5620     This only checks the optional instance list against the existing names.
5621
5622     """
5623     if self.wanted_names is None:
5624       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5625
5626     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5627                              in self.wanted_names]
5628     return
5629
5630   def _ComputeDiskStatus(self, instance, snode, dev):
5631     """Compute block device status.
5632
5633     """
5634     static = self.op.static
5635     if not static:
5636       self.cfg.SetDiskID(dev, instance.primary_node)
5637       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5638       if dev_pstatus.offline:
5639         dev_pstatus = None
5640       else:
5641         msg = dev_pstatus.RemoteFailMsg()
5642         if msg:
5643           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5644                                    (instance.name, msg))
5645         dev_pstatus = dev_pstatus.payload
5646     else:
5647       dev_pstatus = None
5648
5649     if dev.dev_type in constants.LDS_DRBD:
5650       # we change the snode then (otherwise we use the one passed in)
5651       if dev.logical_id[0] == instance.primary_node:
5652         snode = dev.logical_id[1]
5653       else:
5654         snode = dev.logical_id[0]
5655
5656     if snode and not static:
5657       self.cfg.SetDiskID(dev, snode)
5658       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5659       if dev_sstatus.offline:
5660         dev_sstatus = None
5661       else:
5662         msg = dev_sstatus.RemoteFailMsg()
5663         if msg:
5664           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5665                                    (instance.name, msg))
5666         dev_sstatus = dev_sstatus.payload
5667     else:
5668       dev_sstatus = None
5669
5670     if dev.children:
5671       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5672                       for child in dev.children]
5673     else:
5674       dev_children = []
5675
5676     data = {
5677       "iv_name": dev.iv_name,
5678       "dev_type": dev.dev_type,
5679       "logical_id": dev.logical_id,
5680       "physical_id": dev.physical_id,
5681       "pstatus": dev_pstatus,
5682       "sstatus": dev_sstatus,
5683       "children": dev_children,
5684       "mode": dev.mode,
5685       }
5686
5687     return data
5688
5689   def Exec(self, feedback_fn):
5690     """Gather and return data"""
5691     result = {}
5692
5693     cluster = self.cfg.GetClusterInfo()
5694
5695     for instance in self.wanted_instances:
5696       if not self.op.static:
5697         remote_info = self.rpc.call_instance_info(instance.primary_node,
5698                                                   instance.name,
5699                                                   instance.hypervisor)
5700         remote_info.Raise()
5701         remote_info = remote_info.data
5702         if remote_info and "state" in remote_info:
5703           remote_state = "up"
5704         else:
5705           remote_state = "down"
5706       else:
5707         remote_state = None
5708       if instance.admin_up:
5709         config_state = "up"
5710       else:
5711         config_state = "down"
5712
5713       disks = [self._ComputeDiskStatus(instance, None, device)
5714                for device in instance.disks]
5715
5716       idict = {
5717         "name": instance.name,
5718         "config_state": config_state,
5719         "run_state": remote_state,
5720         "pnode": instance.primary_node,
5721         "snodes": instance.secondary_nodes,
5722         "os": instance.os,
5723         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5724         "disks": disks,
5725         "hypervisor": instance.hypervisor,
5726         "network_port": instance.network_port,
5727         "hv_instance": instance.hvparams,
5728         "hv_actual": cluster.FillHV(instance),
5729         "be_instance": instance.beparams,
5730         "be_actual": cluster.FillBE(instance),
5731         }
5732
5733       result[instance.name] = idict
5734
5735     return result
5736
5737
5738 class LUSetInstanceParams(LogicalUnit):
5739   """Modifies an instances's parameters.
5740
5741   """
5742   HPATH = "instance-modify"
5743   HTYPE = constants.HTYPE_INSTANCE
5744   _OP_REQP = ["instance_name"]
5745   REQ_BGL = False
5746
5747   def CheckArguments(self):
5748     if not hasattr(self.op, 'nics'):
5749       self.op.nics = []
5750     if not hasattr(self.op, 'disks'):
5751       self.op.disks = []
5752     if not hasattr(self.op, 'beparams'):
5753       self.op.beparams = {}
5754     if not hasattr(self.op, 'hvparams'):
5755       self.op.hvparams = {}
5756     self.op.force = getattr(self.op, "force", False)
5757     if not (self.op.nics or self.op.disks or
5758             self.op.hvparams or self.op.beparams):
5759       raise errors.OpPrereqError("No changes submitted")
5760
5761     # Disk validation
5762     disk_addremove = 0
5763     for disk_op, disk_dict in self.op.disks:
5764       if disk_op == constants.DDM_REMOVE:
5765         disk_addremove += 1
5766         continue
5767       elif disk_op == constants.DDM_ADD:
5768         disk_addremove += 1
5769       else:
5770         if not isinstance(disk_op, int):
5771           raise errors.OpPrereqError("Invalid disk index")
5772       if disk_op == constants.DDM_ADD:
5773         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5774         if mode not in constants.DISK_ACCESS_SET:
5775           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5776         size = disk_dict.get('size', None)
5777         if size is None:
5778           raise errors.OpPrereqError("Required disk parameter size missing")
5779         try:
5780           size = int(size)
5781         except ValueError, err:
5782           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5783                                      str(err))
5784         disk_dict['size'] = size
5785       else:
5786         # modification of disk
5787         if 'size' in disk_dict:
5788           raise errors.OpPrereqError("Disk size change not possible, use"
5789                                      " grow-disk")
5790
5791     if disk_addremove > 1:
5792       raise errors.OpPrereqError("Only one disk add or remove operation"
5793                                  " supported at a time")
5794
5795     # NIC validation
5796     nic_addremove = 0
5797     for nic_op, nic_dict in self.op.nics:
5798       if nic_op == constants.DDM_REMOVE:
5799         nic_addremove += 1
5800         continue
5801       elif nic_op == constants.DDM_ADD:
5802         nic_addremove += 1
5803       else:
5804         if not isinstance(nic_op, int):
5805           raise errors.OpPrereqError("Invalid nic index")
5806
5807       # nic_dict should be a dict
5808       nic_ip = nic_dict.get('ip', None)
5809       if nic_ip is not None:
5810         if nic_ip.lower() == constants.VALUE_NONE:
5811           nic_dict['ip'] = None
5812         else:
5813           if not utils.IsValidIP(nic_ip):
5814             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5815
5816       if nic_op == constants.DDM_ADD:
5817         nic_bridge = nic_dict.get('bridge', None)
5818         if nic_bridge is None:
5819           nic_dict['bridge'] = self.cfg.GetDefBridge()
5820         nic_mac = nic_dict.get('mac', None)
5821         if nic_mac is None:
5822           nic_dict['mac'] = constants.VALUE_AUTO
5823
5824       if 'mac' in nic_dict:
5825         nic_mac = nic_dict['mac']
5826         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5827           if not utils.IsValidMac(nic_mac):
5828             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5829         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5830           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5831                                      " modifying an existing nic")
5832
5833     if nic_addremove > 1:
5834       raise errors.OpPrereqError("Only one NIC add or remove operation"
5835                                  " supported at a time")
5836
5837   def ExpandNames(self):
5838     self._ExpandAndLockInstance()
5839     self.needed_locks[locking.LEVEL_NODE] = []
5840     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5841
5842   def DeclareLocks(self, level):
5843     if level == locking.LEVEL_NODE:
5844       self._LockInstancesNodes()
5845
5846   def BuildHooksEnv(self):
5847     """Build hooks env.
5848
5849     This runs on the master, primary and secondaries.
5850
5851     """
5852     args = dict()
5853     if constants.BE_MEMORY in self.be_new:
5854       args['memory'] = self.be_new[constants.BE_MEMORY]
5855     if constants.BE_VCPUS in self.be_new:
5856       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5857     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5858     # information at all.
5859     if self.op.nics:
5860       args['nics'] = []
5861       nic_override = dict(self.op.nics)
5862       for idx, nic in enumerate(self.instance.nics):
5863         if idx in nic_override:
5864           this_nic_override = nic_override[idx]
5865         else:
5866           this_nic_override = {}
5867         if 'ip' in this_nic_override:
5868           ip = this_nic_override['ip']
5869         else:
5870           ip = nic.ip
5871         if 'bridge' in this_nic_override:
5872           bridge = this_nic_override['bridge']
5873         else:
5874           bridge = nic.bridge
5875         if 'mac' in this_nic_override:
5876           mac = this_nic_override['mac']
5877         else:
5878           mac = nic.mac
5879         args['nics'].append((ip, bridge, mac))
5880       if constants.DDM_ADD in nic_override:
5881         ip = nic_override[constants.DDM_ADD].get('ip', None)
5882         bridge = nic_override[constants.DDM_ADD]['bridge']
5883         mac = nic_override[constants.DDM_ADD]['mac']
5884         args['nics'].append((ip, bridge, mac))
5885       elif constants.DDM_REMOVE in nic_override:
5886         del args['nics'][-1]
5887
5888     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5889     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5890     return env, nl, nl
5891
5892   def CheckPrereq(self):
5893     """Check prerequisites.
5894
5895     This only checks the instance list against the existing names.
5896
5897     """
5898     force = self.force = self.op.force
5899
5900     # checking the new params on the primary/secondary nodes
5901
5902     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5903     assert self.instance is not None, \
5904       "Cannot retrieve locked instance %s" % self.op.instance_name
5905     pnode = instance.primary_node
5906     nodelist = list(instance.all_nodes)
5907
5908     # hvparams processing
5909     if self.op.hvparams:
5910       i_hvdict = copy.deepcopy(instance.hvparams)
5911       for key, val in self.op.hvparams.iteritems():
5912         if val == constants.VALUE_DEFAULT:
5913           try:
5914             del i_hvdict[key]
5915           except KeyError:
5916             pass
5917         else:
5918           i_hvdict[key] = val
5919       cluster = self.cfg.GetClusterInfo()
5920       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5921       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5922                                 i_hvdict)
5923       # local check
5924       hypervisor.GetHypervisor(
5925         instance.hypervisor).CheckParameterSyntax(hv_new)
5926       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5927       self.hv_new = hv_new # the new actual values
5928       self.hv_inst = i_hvdict # the new dict (without defaults)
5929     else:
5930       self.hv_new = self.hv_inst = {}
5931
5932     # beparams processing
5933     if self.op.beparams:
5934       i_bedict = copy.deepcopy(instance.beparams)
5935       for key, val in self.op.beparams.iteritems():
5936         if val == constants.VALUE_DEFAULT:
5937           try:
5938             del i_bedict[key]
5939           except KeyError:
5940             pass
5941         else:
5942           i_bedict[key] = val
5943       cluster = self.cfg.GetClusterInfo()
5944       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5945       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5946                                 i_bedict)
5947       self.be_new = be_new # the new actual values
5948       self.be_inst = i_bedict # the new dict (without defaults)
5949     else:
5950       self.be_new = self.be_inst = {}
5951
5952     self.warn = []
5953
5954     if constants.BE_MEMORY in self.op.beparams and not self.force:
5955       mem_check_list = [pnode]
5956       if be_new[constants.BE_AUTO_BALANCE]:
5957         # either we changed auto_balance to yes or it was from before
5958         mem_check_list.extend(instance.secondary_nodes)
5959       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5960                                                   instance.hypervisor)
5961       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5962                                          instance.hypervisor)
5963       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5964         # Assume the primary node is unreachable and go ahead
5965         self.warn.append("Can't get info from primary node %s" % pnode)
5966       else:
5967         if not instance_info.failed and instance_info.data:
5968           current_mem = int(instance_info.data['memory'])
5969         else:
5970           # Assume instance not running
5971           # (there is a slight race condition here, but it's not very probable,
5972           # and we have no other way to check)
5973           current_mem = 0
5974         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5975                     nodeinfo[pnode].data['memory_free'])
5976         if miss_mem > 0:
5977           raise errors.OpPrereqError("This change will prevent the instance"
5978                                      " from starting, due to %d MB of memory"
5979                                      " missing on its primary node" % miss_mem)
5980
5981       if be_new[constants.BE_AUTO_BALANCE]:
5982         for node, nres in nodeinfo.iteritems():
5983           if node not in instance.secondary_nodes:
5984             continue
5985           if nres.failed or not isinstance(nres.data, dict):
5986             self.warn.append("Can't get info from secondary node %s" % node)
5987           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5988             self.warn.append("Not enough memory to failover instance to"
5989                              " secondary node %s" % node)
5990
5991     # NIC processing
5992     for nic_op, nic_dict in self.op.nics:
5993       if nic_op == constants.DDM_REMOVE:
5994         if not instance.nics:
5995           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5996         continue
5997       if nic_op != constants.DDM_ADD:
5998         # an existing nic
5999         if nic_op < 0 or nic_op >= len(instance.nics):
6000           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6001                                      " are 0 to %d" %
6002                                      (nic_op, len(instance.nics)))
6003       if 'bridge' in nic_dict:
6004         nic_bridge = nic_dict['bridge']
6005         if nic_bridge is None:
6006           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6007         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6008           msg = ("Bridge '%s' doesn't exist on one of"
6009                  " the instance nodes" % nic_bridge)
6010           if self.force:
6011             self.warn.append(msg)
6012           else:
6013             raise errors.OpPrereqError(msg)
6014       if 'mac' in nic_dict:
6015         nic_mac = nic_dict['mac']
6016         if nic_mac is None:
6017           raise errors.OpPrereqError('Cannot set the nic mac to None')
6018         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6019           # otherwise generate the mac
6020           nic_dict['mac'] = self.cfg.GenerateMAC()
6021         else:
6022           # or validate/reserve the current one
6023           if self.cfg.IsMacInUse(nic_mac):
6024             raise errors.OpPrereqError("MAC address %s already in use"
6025                                        " in cluster" % nic_mac)
6026
6027     # DISK processing
6028     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6029       raise errors.OpPrereqError("Disk operations not supported for"
6030                                  " diskless instances")
6031     for disk_op, disk_dict in self.op.disks:
6032       if disk_op == constants.DDM_REMOVE:
6033         if len(instance.disks) == 1:
6034           raise errors.OpPrereqError("Cannot remove the last disk of"
6035                                      " an instance")
6036         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6037         ins_l = ins_l[pnode]
6038         if ins_l.failed or not isinstance(ins_l.data, list):
6039           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6040         if instance.name in ins_l.data:
6041           raise errors.OpPrereqError("Instance is running, can't remove"
6042                                      " disks.")
6043
6044       if (disk_op == constants.DDM_ADD and
6045           len(instance.nics) >= constants.MAX_DISKS):
6046         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6047                                    " add more" % constants.MAX_DISKS)
6048       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6049         # an existing disk
6050         if disk_op < 0 or disk_op >= len(instance.disks):
6051           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6052                                      " are 0 to %d" %
6053                                      (disk_op, len(instance.disks)))
6054
6055     return
6056
6057   def Exec(self, feedback_fn):
6058     """Modifies an instance.
6059
6060     All parameters take effect only at the next restart of the instance.
6061
6062     """
6063     # Process here the warnings from CheckPrereq, as we don't have a
6064     # feedback_fn there.
6065     for warn in self.warn:
6066       feedback_fn("WARNING: %s" % warn)
6067
6068     result = []
6069     instance = self.instance
6070     # disk changes
6071     for disk_op, disk_dict in self.op.disks:
6072       if disk_op == constants.DDM_REMOVE:
6073         # remove the last disk
6074         device = instance.disks.pop()
6075         device_idx = len(instance.disks)
6076         for node, disk in device.ComputeNodeTree(instance.primary_node):
6077           self.cfg.SetDiskID(disk, node)
6078           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6079           if msg:
6080             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6081                             " continuing anyway", device_idx, node, msg)
6082         result.append(("disk/%d" % device_idx, "remove"))
6083       elif disk_op == constants.DDM_ADD:
6084         # add a new disk
6085         if instance.disk_template == constants.DT_FILE:
6086           file_driver, file_path = instance.disks[0].logical_id
6087           file_path = os.path.dirname(file_path)
6088         else:
6089           file_driver = file_path = None
6090         disk_idx_base = len(instance.disks)
6091         new_disk = _GenerateDiskTemplate(self,
6092                                          instance.disk_template,
6093                                          instance.name, instance.primary_node,
6094                                          instance.secondary_nodes,
6095                                          [disk_dict],
6096                                          file_path,
6097                                          file_driver,
6098                                          disk_idx_base)[0]
6099         instance.disks.append(new_disk)
6100         info = _GetInstanceInfoText(instance)
6101
6102         logging.info("Creating volume %s for instance %s",
6103                      new_disk.iv_name, instance.name)
6104         # Note: this needs to be kept in sync with _CreateDisks
6105         #HARDCODE
6106         for node in instance.all_nodes:
6107           f_create = node == instance.primary_node
6108           try:
6109             _CreateBlockDev(self, node, instance, new_disk,
6110                             f_create, info, f_create)
6111           except errors.OpExecError, err:
6112             self.LogWarning("Failed to create volume %s (%s) on"
6113                             " node %s: %s",
6114                             new_disk.iv_name, new_disk, node, err)
6115         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6116                        (new_disk.size, new_disk.mode)))
6117       else:
6118         # change a given disk
6119         instance.disks[disk_op].mode = disk_dict['mode']
6120         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6121     # NIC changes
6122     for nic_op, nic_dict in self.op.nics:
6123       if nic_op == constants.DDM_REMOVE:
6124         # remove the last nic
6125         del instance.nics[-1]
6126         result.append(("nic.%d" % len(instance.nics), "remove"))
6127       elif nic_op == constants.DDM_ADD:
6128         # mac and bridge should be set, by now
6129         mac = nic_dict['mac']
6130         bridge = nic_dict['bridge']
6131         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6132                               bridge=bridge)
6133         instance.nics.append(new_nic)
6134         result.append(("nic.%d" % (len(instance.nics) - 1),
6135                        "add:mac=%s,ip=%s,bridge=%s" %
6136                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6137       else:
6138         # change a given nic
6139         for key in 'mac', 'ip', 'bridge':
6140           if key in nic_dict:
6141             setattr(instance.nics[nic_op], key, nic_dict[key])
6142             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6143
6144     # hvparams changes
6145     if self.op.hvparams:
6146       instance.hvparams = self.hv_inst
6147       for key, val in self.op.hvparams.iteritems():
6148         result.append(("hv/%s" % key, val))
6149
6150     # beparams changes
6151     if self.op.beparams:
6152       instance.beparams = self.be_inst
6153       for key, val in self.op.beparams.iteritems():
6154         result.append(("be/%s" % key, val))
6155
6156     self.cfg.Update(instance)
6157
6158     return result
6159
6160
6161 class LUQueryExports(NoHooksLU):
6162   """Query the exports list
6163
6164   """
6165   _OP_REQP = ['nodes']
6166   REQ_BGL = False
6167
6168   def ExpandNames(self):
6169     self.needed_locks = {}
6170     self.share_locks[locking.LEVEL_NODE] = 1
6171     if not self.op.nodes:
6172       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6173     else:
6174       self.needed_locks[locking.LEVEL_NODE] = \
6175         _GetWantedNodes(self, self.op.nodes)
6176
6177   def CheckPrereq(self):
6178     """Check prerequisites.
6179
6180     """
6181     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6182
6183   def Exec(self, feedback_fn):
6184     """Compute the list of all the exported system images.
6185
6186     @rtype: dict
6187     @return: a dictionary with the structure node->(export-list)
6188         where export-list is a list of the instances exported on
6189         that node.
6190
6191     """
6192     rpcresult = self.rpc.call_export_list(self.nodes)
6193     result = {}
6194     for node in rpcresult:
6195       if rpcresult[node].failed:
6196         result[node] = False
6197       else:
6198         result[node] = rpcresult[node].data
6199
6200     return result
6201
6202
6203 class LUExportInstance(LogicalUnit):
6204   """Export an instance to an image in the cluster.
6205
6206   """
6207   HPATH = "instance-export"
6208   HTYPE = constants.HTYPE_INSTANCE
6209   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6210   REQ_BGL = False
6211
6212   def ExpandNames(self):
6213     self._ExpandAndLockInstance()
6214     # FIXME: lock only instance primary and destination node
6215     #
6216     # Sad but true, for now we have do lock all nodes, as we don't know where
6217     # the previous export might be, and and in this LU we search for it and
6218     # remove it from its current node. In the future we could fix this by:
6219     #  - making a tasklet to search (share-lock all), then create the new one,
6220     #    then one to remove, after
6221     #  - removing the removal operation altoghether
6222     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6223
6224   def DeclareLocks(self, level):
6225     """Last minute lock declaration."""
6226     # All nodes are locked anyway, so nothing to do here.
6227
6228   def BuildHooksEnv(self):
6229     """Build hooks env.
6230
6231     This will run on the master, primary node and target node.
6232
6233     """
6234     env = {
6235       "EXPORT_NODE": self.op.target_node,
6236       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6237       }
6238     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6239     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6240           self.op.target_node]
6241     return env, nl, nl
6242
6243   def CheckPrereq(self):
6244     """Check prerequisites.
6245
6246     This checks that the instance and node names are valid.
6247
6248     """
6249     instance_name = self.op.instance_name
6250     self.instance = self.cfg.GetInstanceInfo(instance_name)
6251     assert self.instance is not None, \
6252           "Cannot retrieve locked instance %s" % self.op.instance_name
6253     _CheckNodeOnline(self, self.instance.primary_node)
6254
6255     self.dst_node = self.cfg.GetNodeInfo(
6256       self.cfg.ExpandNodeName(self.op.target_node))
6257
6258     if self.dst_node is None:
6259       # This is wrong node name, not a non-locked node
6260       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6261     _CheckNodeOnline(self, self.dst_node.name)
6262     _CheckNodeNotDrained(self, self.dst_node.name)
6263
6264     # instance disk type verification
6265     for disk in self.instance.disks:
6266       if disk.dev_type == constants.LD_FILE:
6267         raise errors.OpPrereqError("Export not supported for instances with"
6268                                    " file-based disks")
6269
6270   def Exec(self, feedback_fn):
6271     """Export an instance to an image in the cluster.
6272
6273     """
6274     instance = self.instance
6275     dst_node = self.dst_node
6276     src_node = instance.primary_node
6277     if self.op.shutdown:
6278       # shutdown the instance, but not the disks
6279       result = self.rpc.call_instance_shutdown(src_node, instance)
6280       msg = result.RemoteFailMsg()
6281       if msg:
6282         raise errors.OpExecError("Could not shutdown instance %s on"
6283                                  " node %s: %s" %
6284                                  (instance.name, src_node, msg))
6285
6286     vgname = self.cfg.GetVGName()
6287
6288     snap_disks = []
6289
6290     # set the disks ID correctly since call_instance_start needs the
6291     # correct drbd minor to create the symlinks
6292     for disk in instance.disks:
6293       self.cfg.SetDiskID(disk, src_node)
6294
6295     try:
6296       for disk in instance.disks:
6297         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6298         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6299         if new_dev_name.failed or not new_dev_name.data:
6300           self.LogWarning("Could not snapshot block device %s on node %s",
6301                           disk.logical_id[1], src_node)
6302           snap_disks.append(False)
6303         else:
6304           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6305                                  logical_id=(vgname, new_dev_name.data),
6306                                  physical_id=(vgname, new_dev_name.data),
6307                                  iv_name=disk.iv_name)
6308           snap_disks.append(new_dev)
6309
6310     finally:
6311       if self.op.shutdown and instance.admin_up:
6312         result = self.rpc.call_instance_start(src_node, instance, None, None)
6313         msg = result.RemoteFailMsg()
6314         if msg:
6315           _ShutdownInstanceDisks(self, instance)
6316           raise errors.OpExecError("Could not start instance: %s" % msg)
6317
6318     # TODO: check for size
6319
6320     cluster_name = self.cfg.GetClusterName()
6321     for idx, dev in enumerate(snap_disks):
6322       if dev:
6323         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6324                                                instance, cluster_name, idx)
6325         if result.failed or not result.data:
6326           self.LogWarning("Could not export block device %s from node %s to"
6327                           " node %s", dev.logical_id[1], src_node,
6328                           dst_node.name)
6329         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6330         if msg:
6331           self.LogWarning("Could not remove snapshot block device %s from node"
6332                           " %s: %s", dev.logical_id[1], src_node, msg)
6333
6334     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6335     if result.failed or not result.data:
6336       self.LogWarning("Could not finalize export for instance %s on node %s",
6337                       instance.name, dst_node.name)
6338
6339     nodelist = self.cfg.GetNodeList()
6340     nodelist.remove(dst_node.name)
6341
6342     # on one-node clusters nodelist will be empty after the removal
6343     # if we proceed the backup would be removed because OpQueryExports
6344     # substitutes an empty list with the full cluster node list.
6345     if nodelist:
6346       exportlist = self.rpc.call_export_list(nodelist)
6347       for node in exportlist:
6348         if exportlist[node].failed:
6349           continue
6350         if instance.name in exportlist[node].data:
6351           if not self.rpc.call_export_remove(node, instance.name):
6352             self.LogWarning("Could not remove older export for instance %s"
6353                             " on node %s", instance.name, node)
6354
6355
6356 class LURemoveExport(NoHooksLU):
6357   """Remove exports related to the named instance.
6358
6359   """
6360   _OP_REQP = ["instance_name"]
6361   REQ_BGL = False
6362
6363   def ExpandNames(self):
6364     self.needed_locks = {}
6365     # We need all nodes to be locked in order for RemoveExport to work, but we
6366     # don't need to lock the instance itself, as nothing will happen to it (and
6367     # we can remove exports also for a removed instance)
6368     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6369
6370   def CheckPrereq(self):
6371     """Check prerequisites.
6372     """
6373     pass
6374
6375   def Exec(self, feedback_fn):
6376     """Remove any export.
6377
6378     """
6379     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6380     # If the instance was not found we'll try with the name that was passed in.
6381     # This will only work if it was an FQDN, though.
6382     fqdn_warn = False
6383     if not instance_name:
6384       fqdn_warn = True
6385       instance_name = self.op.instance_name
6386
6387     exportlist = self.rpc.call_export_list(self.acquired_locks[
6388       locking.LEVEL_NODE])
6389     found = False
6390     for node in exportlist:
6391       if exportlist[node].failed:
6392         self.LogWarning("Failed to query node %s, continuing" % node)
6393         continue
6394       if instance_name in exportlist[node].data:
6395         found = True
6396         result = self.rpc.call_export_remove(node, instance_name)
6397         if result.failed or not result.data:
6398           logging.error("Could not remove export for instance %s"
6399                         " on node %s", instance_name, node)
6400
6401     if fqdn_warn and not found:
6402       feedback_fn("Export not found. If trying to remove an export belonging"
6403                   " to a deleted instance please use its Fully Qualified"
6404                   " Domain Name.")
6405
6406
6407 class TagsLU(NoHooksLU):
6408   """Generic tags LU.
6409
6410   This is an abstract class which is the parent of all the other tags LUs.
6411
6412   """
6413
6414   def ExpandNames(self):
6415     self.needed_locks = {}
6416     if self.op.kind == constants.TAG_NODE:
6417       name = self.cfg.ExpandNodeName(self.op.name)
6418       if name is None:
6419         raise errors.OpPrereqError("Invalid node name (%s)" %
6420                                    (self.op.name,))
6421       self.op.name = name
6422       self.needed_locks[locking.LEVEL_NODE] = name
6423     elif self.op.kind == constants.TAG_INSTANCE:
6424       name = self.cfg.ExpandInstanceName(self.op.name)
6425       if name is None:
6426         raise errors.OpPrereqError("Invalid instance name (%s)" %
6427                                    (self.op.name,))
6428       self.op.name = name
6429       self.needed_locks[locking.LEVEL_INSTANCE] = name
6430
6431   def CheckPrereq(self):
6432     """Check prerequisites.
6433
6434     """
6435     if self.op.kind == constants.TAG_CLUSTER:
6436       self.target = self.cfg.GetClusterInfo()
6437     elif self.op.kind == constants.TAG_NODE:
6438       self.target = self.cfg.GetNodeInfo(self.op.name)
6439     elif self.op.kind == constants.TAG_INSTANCE:
6440       self.target = self.cfg.GetInstanceInfo(self.op.name)
6441     else:
6442       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6443                                  str(self.op.kind))
6444
6445
6446 class LUGetTags(TagsLU):
6447   """Returns the tags of a given object.
6448
6449   """
6450   _OP_REQP = ["kind", "name"]
6451   REQ_BGL = False
6452
6453   def Exec(self, feedback_fn):
6454     """Returns the tag list.
6455
6456     """
6457     return list(self.target.GetTags())
6458
6459
6460 class LUSearchTags(NoHooksLU):
6461   """Searches the tags for a given pattern.
6462
6463   """
6464   _OP_REQP = ["pattern"]
6465   REQ_BGL = False
6466
6467   def ExpandNames(self):
6468     self.needed_locks = {}
6469
6470   def CheckPrereq(self):
6471     """Check prerequisites.
6472
6473     This checks the pattern passed for validity by compiling it.
6474
6475     """
6476     try:
6477       self.re = re.compile(self.op.pattern)
6478     except re.error, err:
6479       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6480                                  (self.op.pattern, err))
6481
6482   def Exec(self, feedback_fn):
6483     """Returns the tag list.
6484
6485     """
6486     cfg = self.cfg
6487     tgts = [("/cluster", cfg.GetClusterInfo())]
6488     ilist = cfg.GetAllInstancesInfo().values()
6489     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6490     nlist = cfg.GetAllNodesInfo().values()
6491     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6492     results = []
6493     for path, target in tgts:
6494       for tag in target.GetTags():
6495         if self.re.search(tag):
6496           results.append((path, tag))
6497     return results
6498
6499
6500 class LUAddTags(TagsLU):
6501   """Sets a tag on a given object.
6502
6503   """
6504   _OP_REQP = ["kind", "name", "tags"]
6505   REQ_BGL = False
6506
6507   def CheckPrereq(self):
6508     """Check prerequisites.
6509
6510     This checks the type and length of the tag name and value.
6511
6512     """
6513     TagsLU.CheckPrereq(self)
6514     for tag in self.op.tags:
6515       objects.TaggableObject.ValidateTag(tag)
6516
6517   def Exec(self, feedback_fn):
6518     """Sets the tag.
6519
6520     """
6521     try:
6522       for tag in self.op.tags:
6523         self.target.AddTag(tag)
6524     except errors.TagError, err:
6525       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6526     try:
6527       self.cfg.Update(self.target)
6528     except errors.ConfigurationError:
6529       raise errors.OpRetryError("There has been a modification to the"
6530                                 " config file and the operation has been"
6531                                 " aborted. Please retry.")
6532
6533
6534 class LUDelTags(TagsLU):
6535   """Delete a list of tags from a given object.
6536
6537   """
6538   _OP_REQP = ["kind", "name", "tags"]
6539   REQ_BGL = False
6540
6541   def CheckPrereq(self):
6542     """Check prerequisites.
6543
6544     This checks that we have the given tag.
6545
6546     """
6547     TagsLU.CheckPrereq(self)
6548     for tag in self.op.tags:
6549       objects.TaggableObject.ValidateTag(tag)
6550     del_tags = frozenset(self.op.tags)
6551     cur_tags = self.target.GetTags()
6552     if not del_tags <= cur_tags:
6553       diff_tags = del_tags - cur_tags
6554       diff_names = ["'%s'" % tag for tag in diff_tags]
6555       diff_names.sort()
6556       raise errors.OpPrereqError("Tag(s) %s not found" %
6557                                  (",".join(diff_names)))
6558
6559   def Exec(self, feedback_fn):
6560     """Remove the tag from the object.
6561
6562     """
6563     for tag in self.op.tags:
6564       self.target.RemoveTag(tag)
6565     try:
6566       self.cfg.Update(self.target)
6567     except errors.ConfigurationError:
6568       raise errors.OpRetryError("There has been a modification to the"
6569                                 " config file and the operation has been"
6570                                 " aborted. Please retry.")
6571
6572
6573 class LUTestDelay(NoHooksLU):
6574   """Sleep for a specified amount of time.
6575
6576   This LU sleeps on the master and/or nodes for a specified amount of
6577   time.
6578
6579   """
6580   _OP_REQP = ["duration", "on_master", "on_nodes"]
6581   REQ_BGL = False
6582
6583   def ExpandNames(self):
6584     """Expand names and set required locks.
6585
6586     This expands the node list, if any.
6587
6588     """
6589     self.needed_locks = {}
6590     if self.op.on_nodes:
6591       # _GetWantedNodes can be used here, but is not always appropriate to use
6592       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6593       # more information.
6594       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6595       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6596
6597   def CheckPrereq(self):
6598     """Check prerequisites.
6599
6600     """
6601
6602   def Exec(self, feedback_fn):
6603     """Do the actual sleep.
6604
6605     """
6606     if self.op.on_master:
6607       if not utils.TestDelay(self.op.duration):
6608         raise errors.OpExecError("Error during master delay test")
6609     if self.op.on_nodes:
6610       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6611       if not result:
6612         raise errors.OpExecError("Complete failure from rpc call")
6613       for node, node_result in result.items():
6614         node_result.Raise()
6615         if not node_result.data:
6616           raise errors.OpExecError("Failure during rpc call to node %s,"
6617                                    " result: %s" % (node, node_result.data))
6618
6619
6620 class IAllocator(object):
6621   """IAllocator framework.
6622
6623   An IAllocator instance has three sets of attributes:
6624     - cfg that is needed to query the cluster
6625     - input data (all members of the _KEYS class attribute are required)
6626     - four buffer attributes (in|out_data|text), that represent the
6627       input (to the external script) in text and data structure format,
6628       and the output from it, again in two formats
6629     - the result variables from the script (success, info, nodes) for
6630       easy usage
6631
6632   """
6633   _ALLO_KEYS = [
6634     "mem_size", "disks", "disk_template",
6635     "os", "tags", "nics", "vcpus", "hypervisor",
6636     ]
6637   _RELO_KEYS = [
6638     "relocate_from",
6639     ]
6640
6641   def __init__(self, lu, mode, name, **kwargs):
6642     self.lu = lu
6643     # init buffer variables
6644     self.in_text = self.out_text = self.in_data = self.out_data = None
6645     # init all input fields so that pylint is happy
6646     self.mode = mode
6647     self.name = name
6648     self.mem_size = self.disks = self.disk_template = None
6649     self.os = self.tags = self.nics = self.vcpus = None
6650     self.hypervisor = None
6651     self.relocate_from = None
6652     # computed fields
6653     self.required_nodes = None
6654     # init result fields
6655     self.success = self.info = self.nodes = None
6656     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6657       keyset = self._ALLO_KEYS
6658     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6659       keyset = self._RELO_KEYS
6660     else:
6661       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6662                                    " IAllocator" % self.mode)
6663     for key in kwargs:
6664       if key not in keyset:
6665         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6666                                      " IAllocator" % key)
6667       setattr(self, key, kwargs[key])
6668     for key in keyset:
6669       if key not in kwargs:
6670         raise errors.ProgrammerError("Missing input parameter '%s' to"
6671                                      " IAllocator" % key)
6672     self._BuildInputData()
6673
6674   def _ComputeClusterData(self):
6675     """Compute the generic allocator input data.
6676
6677     This is the data that is independent of the actual operation.
6678
6679     """
6680     cfg = self.lu.cfg
6681     cluster_info = cfg.GetClusterInfo()
6682     # cluster data
6683     data = {
6684       "version": constants.IALLOCATOR_VERSION,
6685       "cluster_name": cfg.GetClusterName(),
6686       "cluster_tags": list(cluster_info.GetTags()),
6687       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6688       # we don't have job IDs
6689       }
6690     iinfo = cfg.GetAllInstancesInfo().values()
6691     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6692
6693     # node data
6694     node_results = {}
6695     node_list = cfg.GetNodeList()
6696
6697     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6698       hypervisor_name = self.hypervisor
6699     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6700       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6701
6702     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6703                                            hypervisor_name)
6704     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6705                        cluster_info.enabled_hypervisors)
6706     for nname, nresult in node_data.items():
6707       # first fill in static (config-based) values
6708       ninfo = cfg.GetNodeInfo(nname)
6709       pnr = {
6710         "tags": list(ninfo.GetTags()),
6711         "primary_ip": ninfo.primary_ip,
6712         "secondary_ip": ninfo.secondary_ip,
6713         "offline": ninfo.offline,
6714         "drained": ninfo.drained,
6715         "master_candidate": ninfo.master_candidate,
6716         }
6717
6718       if not ninfo.offline:
6719         nresult.Raise()
6720         if not isinstance(nresult.data, dict):
6721           raise errors.OpExecError("Can't get data for node %s" % nname)
6722         remote_info = nresult.data
6723         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6724                      'vg_size', 'vg_free', 'cpu_total']:
6725           if attr not in remote_info:
6726             raise errors.OpExecError("Node '%s' didn't return attribute"
6727                                      " '%s'" % (nname, attr))
6728           try:
6729             remote_info[attr] = int(remote_info[attr])
6730           except ValueError, err:
6731             raise errors.OpExecError("Node '%s' returned invalid value"
6732                                      " for '%s': %s" % (nname, attr, err))
6733         # compute memory used by primary instances
6734         i_p_mem = i_p_up_mem = 0
6735         for iinfo, beinfo in i_list:
6736           if iinfo.primary_node == nname:
6737             i_p_mem += beinfo[constants.BE_MEMORY]
6738             if iinfo.name not in node_iinfo[nname].data:
6739               i_used_mem = 0
6740             else:
6741               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6742             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6743             remote_info['memory_free'] -= max(0, i_mem_diff)
6744
6745             if iinfo.admin_up:
6746               i_p_up_mem += beinfo[constants.BE_MEMORY]
6747
6748         # compute memory used by instances
6749         pnr_dyn = {
6750           "total_memory": remote_info['memory_total'],
6751           "reserved_memory": remote_info['memory_dom0'],
6752           "free_memory": remote_info['memory_free'],
6753           "total_disk": remote_info['vg_size'],
6754           "free_disk": remote_info['vg_free'],
6755           "total_cpus": remote_info['cpu_total'],
6756           "i_pri_memory": i_p_mem,
6757           "i_pri_up_memory": i_p_up_mem,
6758           }
6759         pnr.update(pnr_dyn)
6760
6761       node_results[nname] = pnr
6762     data["nodes"] = node_results
6763
6764     # instance data
6765     instance_data = {}
6766     for iinfo, beinfo in i_list:
6767       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6768                   for n in iinfo.nics]
6769       pir = {
6770         "tags": list(iinfo.GetTags()),
6771         "admin_up": iinfo.admin_up,
6772         "vcpus": beinfo[constants.BE_VCPUS],
6773         "memory": beinfo[constants.BE_MEMORY],
6774         "os": iinfo.os,
6775         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6776         "nics": nic_data,
6777         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6778         "disk_template": iinfo.disk_template,
6779         "hypervisor": iinfo.hypervisor,
6780         }
6781       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6782                                                  pir["disks"])
6783       instance_data[iinfo.name] = pir
6784
6785     data["instances"] = instance_data
6786
6787     self.in_data = data
6788
6789   def _AddNewInstance(self):
6790     """Add new instance data to allocator structure.
6791
6792     This in combination with _AllocatorGetClusterData will create the
6793     correct structure needed as input for the allocator.
6794
6795     The checks for the completeness of the opcode must have already been
6796     done.
6797
6798     """
6799     data = self.in_data
6800
6801     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6802
6803     if self.disk_template in constants.DTS_NET_MIRROR:
6804       self.required_nodes = 2
6805     else:
6806       self.required_nodes = 1
6807     request = {
6808       "type": "allocate",
6809       "name": self.name,
6810       "disk_template": self.disk_template,
6811       "tags": self.tags,
6812       "os": self.os,
6813       "vcpus": self.vcpus,
6814       "memory": self.mem_size,
6815       "disks": self.disks,
6816       "disk_space_total": disk_space,
6817       "nics": self.nics,
6818       "required_nodes": self.required_nodes,
6819       }
6820     data["request"] = request
6821
6822   def _AddRelocateInstance(self):
6823     """Add relocate instance data to allocator structure.
6824
6825     This in combination with _IAllocatorGetClusterData will create the
6826     correct structure needed as input for the allocator.
6827
6828     The checks for the completeness of the opcode must have already been
6829     done.
6830
6831     """
6832     instance = self.lu.cfg.GetInstanceInfo(self.name)
6833     if instance is None:
6834       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6835                                    " IAllocator" % self.name)
6836
6837     if instance.disk_template not in constants.DTS_NET_MIRROR:
6838       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6839
6840     if len(instance.secondary_nodes) != 1:
6841       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6842
6843     self.required_nodes = 1
6844     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6845     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6846
6847     request = {
6848       "type": "relocate",
6849       "name": self.name,
6850       "disk_space_total": disk_space,
6851       "required_nodes": self.required_nodes,
6852       "relocate_from": self.relocate_from,
6853       }
6854     self.in_data["request"] = request
6855
6856   def _BuildInputData(self):
6857     """Build input data structures.
6858
6859     """
6860     self._ComputeClusterData()
6861
6862     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6863       self._AddNewInstance()
6864     else:
6865       self._AddRelocateInstance()
6866
6867     self.in_text = serializer.Dump(self.in_data)
6868
6869   def Run(self, name, validate=True, call_fn=None):
6870     """Run an instance allocator and return the results.
6871
6872     """
6873     if call_fn is None:
6874       call_fn = self.lu.rpc.call_iallocator_runner
6875     data = self.in_text
6876
6877     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6878     result.Raise()
6879
6880     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6881       raise errors.OpExecError("Invalid result from master iallocator runner")
6882
6883     rcode, stdout, stderr, fail = result.data
6884
6885     if rcode == constants.IARUN_NOTFOUND:
6886       raise errors.OpExecError("Can't find allocator '%s'" % name)
6887     elif rcode == constants.IARUN_FAILURE:
6888       raise errors.OpExecError("Instance allocator call failed: %s,"
6889                                " output: %s" % (fail, stdout+stderr))
6890     self.out_text = stdout
6891     if validate:
6892       self._ValidateResult()
6893
6894   def _ValidateResult(self):
6895     """Process the allocator results.
6896
6897     This will process and if successful save the result in
6898     self.out_data and the other parameters.
6899
6900     """
6901     try:
6902       rdict = serializer.Load(self.out_text)
6903     except Exception, err:
6904       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6905
6906     if not isinstance(rdict, dict):
6907       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6908
6909     for key in "success", "info", "nodes":
6910       if key not in rdict:
6911         raise errors.OpExecError("Can't parse iallocator results:"
6912                                  " missing key '%s'" % key)
6913       setattr(self, key, rdict[key])
6914
6915     if not isinstance(rdict["nodes"], list):
6916       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6917                                " is not a list")
6918     self.out_data = rdict
6919
6920
6921 class LUTestAllocator(NoHooksLU):
6922   """Run allocator tests.
6923
6924   This LU runs the allocator tests
6925
6926   """
6927   _OP_REQP = ["direction", "mode", "name"]
6928
6929   def CheckPrereq(self):
6930     """Check prerequisites.
6931
6932     This checks the opcode parameters depending on the director and mode test.
6933
6934     """
6935     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6936       for attr in ["name", "mem_size", "disks", "disk_template",
6937                    "os", "tags", "nics", "vcpus"]:
6938         if not hasattr(self.op, attr):
6939           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6940                                      attr)
6941       iname = self.cfg.ExpandInstanceName(self.op.name)
6942       if iname is not None:
6943         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6944                                    iname)
6945       if not isinstance(self.op.nics, list):
6946         raise errors.OpPrereqError("Invalid parameter 'nics'")
6947       for row in self.op.nics:
6948         if (not isinstance(row, dict) or
6949             "mac" not in row or
6950             "ip" not in row or
6951             "bridge" not in row):
6952           raise errors.OpPrereqError("Invalid contents of the"
6953                                      " 'nics' parameter")
6954       if not isinstance(self.op.disks, list):
6955         raise errors.OpPrereqError("Invalid parameter 'disks'")
6956       for row in self.op.disks:
6957         if (not isinstance(row, dict) or
6958             "size" not in row or
6959             not isinstance(row["size"], int) or
6960             "mode" not in row or
6961             row["mode"] not in ['r', 'w']):
6962           raise errors.OpPrereqError("Invalid contents of the"
6963                                      " 'disks' parameter")
6964       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6965         self.op.hypervisor = self.cfg.GetHypervisorType()
6966     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6967       if not hasattr(self.op, "name"):
6968         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6969       fname = self.cfg.ExpandInstanceName(self.op.name)
6970       if fname is None:
6971         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6972                                    self.op.name)
6973       self.op.name = fname
6974       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6975     else:
6976       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6977                                  self.op.mode)
6978
6979     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6980       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6981         raise errors.OpPrereqError("Missing allocator name")
6982     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6983       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6984                                  self.op.direction)
6985
6986   def Exec(self, feedback_fn):
6987     """Run the allocator test.
6988
6989     """
6990     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6991       ial = IAllocator(self,
6992                        mode=self.op.mode,
6993                        name=self.op.name,
6994                        mem_size=self.op.mem_size,
6995                        disks=self.op.disks,
6996                        disk_template=self.op.disk_template,
6997                        os=self.op.os,
6998                        tags=self.op.tags,
6999                        nics=self.op.nics,
7000                        vcpus=self.op.vcpus,
7001                        hypervisor=self.op.hypervisor,
7002                        )
7003     else:
7004       ial = IAllocator(self,
7005                        mode=self.op.mode,
7006                        name=self.op.name,
7007                        relocate_from=list(self.relocate_from),
7008                        )
7009
7010     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7011       result = ial.in_text
7012     else:
7013       ial.Run(self.op.allocator, validate=False)
7014       result = ial.out_text
7015     return result